Big Data/Kafka

[Kafka] Producer Partitioner 변천사 (no key)

devidea 2023. 5. 16. 17:43

이번 글에서는 producer에서 데이터를 보낼 때, 데이터의 토픽 파티션 분배에 대해서 살펴봅니다. 특히 Key값이 정해지지 않았을 때 파티션 분배가 어떤 로직으로 이루어지는지 분석합니다. 카프카의 버전이 올라가면서 파티션 분배의 단점들을 개선하는 시도들이 이루어지는데 초기 버전에서부터 시작하여 현재의 분배 로직이 이루어진 과정에 대해 소개합니다.

 

1. 개요 (파티션)

카프카를 써 보신 분들이라면 파티션의 존재 이유에 대해서 아실 것입니다. 카프카는 분산 시스템으로 대규모의 데이터 처리를 위해 여러 파티션을 두고 데이터들을 분산해서 저장합니다. 파티션 분배 로직이 카프카 브로커에 존재하지 않나?라고 생각하실 수도 있는데, 파티션 분배 로직은 producer 내부에 존재합니다. 그리고 producer의 설정(partitioner.class)[각주:1]으로 별도의 파티션 분배 로직을 개발하여 적용할 수 있습니다. 다만 이번 글에서는 Producer의 default 파티션 분배 로직에 대해서 설명합니다.

 

앞서 언급했지만, Producer에 의해 보내지는 레코드의 Key 값의 존재 여부에 따라 파티션 분배 로직이 달라집니다. Key가 존재한다면, Key의 해시값으로 파티션을 분배합니다. 그 말은 같은 Key를 가진 레코드들은 모두 동일한 파티션으로 분배된다는 것을 의미합니다. Consumer 입장에서 특정 Key를 가진 레코드만 구독하고 싶다면, 해당 Key를 가진 파티션만 구독하면 됩니다. 이 글에서는 Key를 가자지 않는 레코드에 초점을 맞춥니다.

 

어떤 파티션으로 보내질 지는 Producer 코드의 Accumulator로 보낼 때 결정됩니다. 아래는 Producer Config를 정리하는 글[각주:2]에서 그렸던 그림을 다시 가져왔습니다. 파티션을 결정하는 로직은 Partitioner 인터페이스[각주:3]에서 담당합니다.

그림 1. Producer - Accumulator, Sender 정리

Partitioner 인터페이스는 별도의 로직을 포함하여 새롭게 개발할 수 있습니다. 그리고 Producer 설정(partitioner.class)에 추가하여 변경할 수 있습니다. Producer에서는 기본으로 내장된 Partitioner가 어떤 변천사에 의해 현재(3.4.0)에 이르렀는지 살펴보겠습니다.

 

2. 초기 파티션 분배 방식 (2.3 이하)

초기의 파티션 분배는 간단한 방식으로 이루어졌습니다. Round Robin 방식입니다. 레코드들을 파티션을 순회하면서 하나씩 넣어주는 방식입니다. 균등하게 분배되는 것처럼 보이지만 단점이 존재합니다. 작은 사이즈의 배치들로 나눠지게 되어서 브로커에 더 많은 요청을 보내게 됩니다. 결국 latency가 늘어나게 됩니다.

그림 2. Round Robin Partitioner - 출처(conduktor.io)

Round Robin 방식은 Producer 2.3 버전 이하에서 사용되었습니다. 그럼 2.4 이상에서는 어떤 변화가 있었을까요?

 

3. Sticky Partitioner (2.4 이상, 3.3 이하)

KIP-480[각주:4]에서 처음으로 Sticky Partitioner가 소개됩니다. KIP-480은 카프카 2.4.0 버전에 포함된 기능입니다. Stickiy Partitioner의 도입 배경은 앞서 설명한 Round Robin의 단점 작은 사이즈의 배치의 영향을 줄이기 위함입니다. Sticky(접착성)이라는 단어가 사용된 것은 기존에 할당되었던 파티션에 더해서 레코드를 더 접착하여 할당한다고 이해할 수 있습니다. 모든 파티션에 균등하게 할당하지 않고 일부 파티션에 보다 큰 배치로 할당을 하게 됩니다. 아래 그림을 보면 더 이해하는데 도움이 됩니다.

 

그림3. Sticky partitioning strategy - 출처(confluent blog)

 

Sticky Partitioning의 규칙을 간략히 정리하면 다음과 같습니다.

  • 배치가 가득차거나 linger.ms가 경과할 때까지 파티션을 "고정"합니다.
  • 배치를 전송한 후 고정된 파티션을 변경합니다.

 

그림 3에서 알 수 있듯이 레코드가 할당된 파티션 수가 적어지며, 배치에 할당된 레코드의 수가 많아집니다. 그래서 latency가 줄어들게 됩니다. 지금 시점에서는 파티션이 불균형한 것으로 보이지만 시간이 지날수록 파티션의 균형에는 영향을 미치지 않게 됩니다.

 

KIP-480에는 Sticky로 변경했을 때, Round Robin과의 성능 차이를 비교했습니다. 한 가지만 소개하면(자세한 사항은 KIP-480을 확인하세요). 그림 4는 3개의 Producer, 16개의 파티션에서 1000msg/sec의 레코드를 보냈을 때의 latency를 비교합니다. 명확하게 Sticky의 latency가 줄어드는 것이 보입니다. KIP-480 및 Sticky Partitioner를 소개하는 confluent 블로그 글을[각주:5] 보면 추가적인 테스트 결과를 볼 수 있습니다. 파티션을 늘리거나 linger.ms의 값을 0보다 크게 했을 때의 성능 차이도 소개합니다.

 

그림4. 3개의 Producer, 16개의 파티션에서 Partitioner 성능 비교 - 출처 (KIP-480)

 

4. Strictly Uniform Sticky Partitioner (3.4 이상)

Sticky Partitioner를 사용한 Producer에서도 예상치 못한 단점이 이슈로 등록됩니다. Sticky Partitoner로 인해서 파티션 레코드 불균형이 심하게 일어난다는 이슈입니다. 앞서 설명할 때는 Sticky Partitoner도 시간이 지나면서는 균형을 맞출 거라고 했는데요. 무슨 이유로 불균형이 발생했는지 살펴봅시다. 그리고 어떻게 해결했는지 분석할 예정인데요. 관련 내용은 3.4에 포함된 KIP-794[각주:6]에 포함된 기능입니다.

 

브로커의 성능이 모두 동일하면 이슈가 발생하지 않습니다. 그러나 특정 브로커의 latency가 느리면 상황이 달라집니다. "느린 브로커"가 담당하는 파티션은 브로커의 느린 응답을 기다리면서 배치에 레코드가 쌓이는 시간이 길어집니다. 그러므로 느린 브로커가 담당하는 파티션에는 더 많은 레코드가 쌓이며 더 느린 latency로 계속 이어집니다.

 

아래 그림1은 느린 브로커가 보유한 파티션을 0일 때의 테스트 결과를 보여줍니다. (KIP-794 포함 테스트)

그림 1. 느린 브로커에 더 많이 쌓인 레코드

오히려 더 적은 배치 수로 더 많은 레코드가 쌓였습니다. 느린 브로커의 응답을 기다리면서 배치 수는 줄지만 그 시간 사이에 레코드가 계속 쌓였습니다. 그런 어떤 방법으로 해결했을까요?

 

2+1가지의 규칙을 추가한 Partitioner를 기본 로직으로 변경했습니다.

  • batch.size에 도달할 때까지 파티션을 유지한다.
  • 대기 중인 큐의 크기를 고려하여 파티션을 전환한다.
  • (+1) Adaptive Partition Switching : 특정 조건에 부합하지 않은 느린 브로커에게는 파티션 할당을 제외한다.

 

4.1. batch.size에 의한 파티션 전환

batch.size에 의해서 파티션을 전환하면 아래와 같은 장점을 얻을 수 있습니다.

  • 낮은 생산 속도에서는 지연시간을 추가하지 않고, 높은 생산 속도에서는 빠르게 배치를 전환할 수 있습니다.
  • 지연시간이 긴 브로커에 적응하여 데이터를 쌓고 처리량과 데이터의 분배를 균일하게 유지합니다.

 

4.2. 대기 중인 큐의 크기를 고려한 파티션 전환

필자도 대기 중인 큐의 크기를 어떻게 고려하여 파티션을 전환하는지 바로 이해되지는 않았습니다. KIP-794에 의해 추가된 코드 중의 주석을 보면서 이해도를 높일 수 있었습니다. 아래는 큐 사이즈를 계산하는 로직 중 일부입니다.

 

BuiltInPartitioner.class updatePartitionLoadStats 함수에 존재하는 주석의 설명을 토대로 균일하게 파티션을 분표하는 과정을 살펴봅니다.

 

아래와 같이 큐에 레코드가 적재된 파티션 3개가 있다고 가정합니다.

0 3 1

최대 큐 크기 + 1 = 4에서 큐 크기를 빼서 반전합니다.

4 1 3

그리고 누계 합계로 변환합니다.

4 5 8

 

이제 [0..8) 범위의 난수를 구하고 난수 값에서 가장 가까운 큰 값을 누계 합계에서 찾습니다. 그 갑의 인덱스가 파티션 값이 됩니다.

그럼 난수에 따른 파티션 할당은 아래와 같습니다.

  • 0, 1, 2, 3 - 파티션 0
  • 4 - 파티션 1
  • 5, 6, 7 - 파티션 1

위의 설명과 같이 큐에 레코드를 쌓고 주기적으로 누적 통계값을 유지합니다. 다음 파티션을 선택할 때, 큐의 크기를 고려하여 파티션을 선택하게 됩니다.

 

 

4.3. Adaptive Partition Switching

Adaptive Partition Switching은 "느린 브로커"를 발견했을 때, 해당 브로커가 담당하는 파티션은 할당 대상에서 제외합니다. 얼마나 느린 파티션을 제외할 건지의 기준은partitioner.availability.timeout.ms 설정의 시간을 따릅니다.

 

RecordAccumulator에서 partitioner.availability.timeout.ms 시간을 확인하는 부분을 살펴봅시다.

if (partitionAvailabilityTimeoutMs > 0) {
    // Check if we want to exclude the partition from the list of available partitions
    // if the broker hasn't responded for some time.
    NodeLatencyStats nodeLatencyStats = nodeStats.get(leader.id());
    if (nodeLatencyStats != null) {
        // NOTE: there is no synchronization between reading metrics,
        // so we read ready time first to avoid accidentally marking partition
        // unavailable if we read while the metrics are being updated.
        long readyTimeMs = nodeLatencyStats.readyTimeMs;
        if (readyTimeMs - nodeLatencyStats.drainTimeMs > partitionAvailabilityTimeoutMs)
            --queueSizesIndex;
    }
}

partitionAvailabilityTimeoutMs 변수가 partitioner.availability.timeout.ms 시간을 의미합니다. 0보다 크게 값이 설정되어 있다면 브로커의 지연시간을 계산하여 파티션을 선택하는 대상에서 제외합니다. 지연시간은 drainTimeMs - readyTimeMs의 차이로 계산합니다. readyTimeMs은 batch가 send를 준비한 마지막 시간이며, drainTimeMs은 drain(브로커에 보낸) 시간을 의미합니다. Producer는 Sender에 의해 주기적으로 RecordAccumulator에 모아진 배치를 보내기 때문에 브로커별 지연시간을 계산할 수 있습니다.

 

5. 결론

이번 글에서는 Producer에서 key 없는 레코드의 파티션 분배에 대한 변천사를 알아보았습니다. 분산 아키텍처를 가진 카프카에서 레코드의 분산은 중요한 부분 중 하나입니다. 가장 단순한 형태의 분배로직에서 최근의 발전된 분배 형태로 변환된 이유와 장점들을 확인할 수 있엇습니다. 여러 이유들(네트워크, 서버 사양 등) 브로커 별로 성능을 동일하게 유지할 수 없을 텐데, Producer가 카프카 클러스터 환경에 상관없이 동일 성능을 유지할 수 있게한 장치들이 흥미로웠습니다. 이 글을 읽어보시는 분들도 Producer에 의해 적재된 레코드가 파티션에 어떻게 할당되어 있는지 한번 확인해 보시고, 사용하고 계신 Producer의 버전을 달리하며 상황을 분석해보면 더 재밌지 않을까 싶습니다.

 

 

반응형