Kafka에서 topic을 생성할 때, partition 별로 replica를 어떻게 할당하나 찾아보았다. 여러개의 broker를 가진 Kafka Cluster를 운영할 경우 각 broekr 마다 replica 불균형이 발생한다. 최고 topic 생성 이후에 broker shutdown 등 다른 이유에 의해 replica가 바뀔 수 있겠지만 topic 생성 최초에 partition 별 replica 할당이 어떻게 되나 궁금했기 때문이다.
그래서 topic이 생성하는 코드 안쪽에서 partition 분배 로직을 뒤졌다. partition 분배 로직에는 broker의 rack 설정이 있는 것과 없는 것 2개의 로직이 존재했다. 이 글에서는 rack이 없는 설정에 대해서 먼저 분석해 본다.
다음은 partition 분배 로직에 대한 부분을 발췌한 코드이다.
분석을 위해 주의깊게 봐야할 변수들이다.
- brokerArray : broker meta data 들의 set.
- startIndex : 각 partition 별 첫번째 replica를 얻기 위해 사용되는 index (랜덤값).
- currentPartitionId : 설정할 partition 번호 (기본적으로 0부터 시작)
- nextReplicaShift : 다음 replica를 설정하기 위한 shift 값 (랜덤값).
- replicaBuffer : 각 partition의 replica들을 저장할 array.
그럼 이제 로직을 살펴보자.
처음에 기본적으로 필요한 변수들의 값들을 설정한다(Line: 7-10). brokerArray, startIndex, currentPartitionId, nextReplicaShift.
그리고 각 partition 별로 replica 설정이 필요하므로 topic의 partition 숫자에 맞춰 for 문을 돌린다 (Line: 11).
Line 14에서 첫 replica를 설정한다. 규칙은 (currentPartitionId + startIndex) % brokerArray.length currentPartitionId를 기준으로 값을 구했기 때문에 partition 별로 첫 번째 replica는 겹치지 않는다. 이렇게 한 이유는 최초 생성되는 topic의 partition들은 첫번째 replica를 leader로 삼기 때문이다. leader가 겹치지 않아야 하는 중요한 이유는 leader를 통해 producer, consumer의 write/read 작업이 다 이루어지기 때문에 leader가 겹치게 되면 분산효과가 떨어지게 된다.
첫 번째 replica를 찾았으면 topic의 replication-factor 갑 만큼 추가로 replica를 할당해 줘야 한다. 2번째 이후부터의 replica는 replicaIndex 메서드가 값을 구해준다. 메서드의 파라미터를 살펴보자.
첫번째 replica index, shift 값, 몇 번째 replica 인지 그리고 broker 수. 이렇게 4개이다.
replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int)
이렇게 받은 값을 토대로 다음 규칙을 적용한다. 결국은 일정한 간격을 두고 broker 들을 순회할 수 있도록 shift 값을 구하고, 첫 번째 replica index를 토대로 replica들을 쌓아가는 형태이다.
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
코드를 보면 다음과 같은 주석이 있다.
For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
주석에서 보듯이 partition의 replica를 할당해 주는 기본 전략은 broker들에게 고르게 분배해 주는 것이다.
여기서 주의할 점은.
Kafka를 운영하다보면 각 broker 별로 disk의 불균형이 일어나는 현상이 발생할 수 있는데. 그 원인이 replica를 분배하는데 broker들의 현재 disk 용량을 기준으로 분배를 하지 않기 때문이다. 각 broker가 가진 replica의 숫자들을 균등하게 유지하려고 하지만 data 용량이 많은 특정 topic이 몇 개의 broker에 집중될 수 있기 때문에 disk 불균형은 발생할 수 있다. 이를 해결하는 방법은 다음에 기술해보겠다.
관련문서
반응형
'Big Data > Kafka' 카테고리의 다른 글
[Kafka] Configurable SASL callback handler (0) | 2019.04.18 |
---|---|
[Kafka] 파티션 이동 (0) | 2019.04.12 |
[Kafka] 인증 - SASL/PLAIN (0) | 2019.03.26 |
[Kafka] consumer group offset change by python. (0) | 2018.12.31 |
[Kafka] consumer LAG 수집 및 elasticsearch 적재 (0) | 2018.12.28 |