Kafka에서 topic을 생성할 때, partition 별로 replica를 어떻게 할당하나 찾아보았다. 여러개의 broker를 가진 Kafka Cluster를 운영할 경우 각 broekr 마다 replica 불균형이 발생한다. 최고 topic 생성 이후에 broker shutdown 등 다른 이유에 의해 replica가 바뀔 수 있겠지만 topic 생성 최초에 partition 별 replica 할당이 어떻게 되나 궁금했기 때문이다.
그래서 topic이 생성하는 코드 안쪽에서 partition 분배 로직을 뒤졌다. partition 분배 로직에는 broker의 rack 설정이 있는 것과 없는 것 2개의 로직이 존재했다. 이 글에서는 rack이 없는 설정에 대해서 먼저 분석해 본다.
다음은 partition 분배 로직에 대한 부분을 발췌한 코드이다.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, | |
replicationFactor: Int, | |
brokerList: Seq[Int], | |
fixedStartIndex: Int, | |
startPartitionId: Int): Map[Int, Seq[Int]] = { | |
val ret = mutable.Map[Int, Seq[Int]]() | |
val brokerArray = brokerList.toArray | |
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) | |
var currentPartitionId = math.max(0, startPartitionId) | |
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) | |
for (_ <- 0 until nPartitions) { | |
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) | |
nextReplicaShift += 1 | |
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length | |
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) | |
for (j <- 0 until replicationFactor - 1) | |
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) | |
ret.put(currentPartitionId, replicaBuffer) | |
currentPartitionId += 1 | |
} | |
ret | |
} | |
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { | |
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) | |
(firstReplicaIndex + shift) % nBrokers | |
} |
분석을 위해 주의깊게 봐야할 변수들이다.
- brokerArray : broker meta data 들의 set.
- startIndex : 각 partition 별 첫번째 replica를 얻기 위해 사용되는 index (랜덤값).
- currentPartitionId : 설정할 partition 번호 (기본적으로 0부터 시작)
- nextReplicaShift : 다음 replica를 설정하기 위한 shift 값 (랜덤값).
- replicaBuffer : 각 partition의 replica들을 저장할 array.
그럼 이제 로직을 살펴보자.
처음에 기본적으로 필요한 변수들의 값들을 설정한다(Line: 7-10). brokerArray, startIndex, currentPartitionId, nextReplicaShift.
그리고 각 partition 별로 replica 설정이 필요하므로 topic의 partition 숫자에 맞춰 for 문을 돌린다 (Line: 11).
Line 14에서 첫 replica를 설정한다. 규칙은 (currentPartitionId + startIndex) % brokerArray.length currentPartitionId를 기준으로 값을 구했기 때문에 partition 별로 첫 번째 replica는 겹치지 않는다. 이렇게 한 이유는 최초 생성되는 topic의 partition들은 첫번째 replica를 leader로 삼기 때문이다. leader가 겹치지 않아야 하는 중요한 이유는 leader를 통해 producer, consumer의 write/read 작업이 다 이루어지기 때문에 leader가 겹치게 되면 분산효과가 떨어지게 된다.
첫 번째 replica를 찾았으면 topic의 replication-factor 갑 만큼 추가로 replica를 할당해 줘야 한다. 2번째 이후부터의 replica는 replicaIndex 메서드가 값을 구해준다. 메서드의 파라미터를 살펴보자.
첫번째 replica index, shift 값, 몇 번째 replica 인지 그리고 broker 수. 이렇게 4개이다.
replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int)
이렇게 받은 값을 토대로 다음 규칙을 적용한다. 결국은 일정한 간격을 두고 broker 들을 순회할 수 있도록 shift 값을 구하고, 첫 번째 replica index를 토대로 replica들을 쌓아가는 형태이다.
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
코드를 보면 다음과 같은 주석이 있다.
For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
주석에서 보듯이 partition의 replica를 할당해 주는 기본 전략은 broker들에게 고르게 분배해 주는 것이다.
여기서 주의할 점은.
Kafka를 운영하다보면 각 broker 별로 disk의 불균형이 일어나는 현상이 발생할 수 있는데. 그 원인이 replica를 분배하는데 broker들의 현재 disk 용량을 기준으로 분배를 하지 않기 때문이다. 각 broker가 가진 replica의 숫자들을 균등하게 유지하려고 하지만 data 용량이 많은 특정 topic이 몇 개의 broker에 집중될 수 있기 때문에 disk 불균형은 발생할 수 있다. 이를 해결하는 방법은 다음에 기술해보겠다.
관련문서
반응형
'Big Data > Kafka' 카테고리의 다른 글
[Kafka] Configurable SASL callback handler (0) | 2019.04.18 |
---|---|
[Kafka] 파티션 이동 (0) | 2019.04.12 |
[Kafka] 인증 - SASL/PLAIN (0) | 2019.03.26 |
[Kafka] consumer group offset change by python. (0) | 2018.12.31 |
[Kafka] consumer LAG 수집 및 elasticsearch 적재 (0) | 2018.12.28 |