Big Data/Kafka

[Kafka] consumer group offset change by python.

devidea 2018. 12. 31. 16:30
Kafka consumer 관련 2번째 글이다. 지난 글에서 LAG에 대해서 설명했는데, consumer group과 offset에 대한 설명을 빼 놓고 지나가니 이해하는데 부족할 수 있어 consumer group에 대한 개념 설명을 하고자 한다. 그리고 추가로 consumer group의 offset을 변경하고자 할 때가 발생할 수 있는데 간단한 python 코드로 offset을 변경하는 방법을 알아보자. offset을 왜 변경하지? 라는 질문이 있을 수 있는데, consumer group에 대해서 이해하면 질문에 대한 답이 저절로 될 듯 하다.

1. consumer group
아래 그림이 consumer group을 이해할 수 있는 가장 좋은 그림이다. Kafka의 Topic은 여러 partition으로 나뉘어서 데이터를 저장할 수 있다. 나누어진 여러 파티션에 하나의 consumer가 아닌 여러 consumer가 데이터를 가져올 수 있다. 여러 consumer를 사용하는 이유는 분산처리로 인한 성능 향상 때문이다. 이 때, consumer group이라는 값을 통해서 여러 consumer들을 묶어서 관리한다(consumer group이란 용어에 모든 의미가 담겨있는 듯 하다).


그럼 consumer group의 offset은 왜 필요할까? 각 파티션별로 consumer group이 가져간 데이터 개수가 다를텐데, 얼마나 데이터를 가져갔는지 체크하기 위해서다. 아래 Figure 2를 살펴보자. 주요하게 볼 부분은 Last Committed Offset, Log End Offset이다. Last Committed Offset은 consumer group이 commit한 즉, 현재까지 읽어간 데이터의 위치이다. Log End Offset은 Kafka Broker에 저장된 마지막 데이터 위치이다.
offset은 파티션별로 구분하게 계속 증가만 되는 숫자값으로 유지된다. 저장할 수 있는 한계를 넘으면 어떻하냐는 질문에 대한 답을 confluence(Jun Rao)가 한 적이 있다. offset은 long 타입으로 되어 있어서 하루에 1TB씩 데이터를 쌓는다고 해도 4백만일 동안 계속 데이터를 넣을 수 있다고 했다. 
아무튼 consumer group offset이 중요한 이유는 consumer 입장에서 내가 어디까지 데이터를 읽었지? 라는 물음의 기록이기 때문이다. consumer가 정지되고 다시 시작할 때 consumer group offset부터 읽으면 된다.



2. consumer group offset 변경하기.
consumer group과 offset에 대해서 설명을 하고 나니 자연스럽게 offset 변경에 대해서도 관심이 생기게 된다. 만약 100개 중 50개의 데이터를 처리했는데 알고보니 10번부터 다시 받아야 하는 상황이 발생한다면 어떻게 할까? 처음부터 다시 읽어야 한다면 다른 consumer group으로 consumer를 시작하면 되겠지만 특정 위치부터 다시 받으려면 offset을 변경해야 한다. python 코드로 간단히 작성해서 특정 topic의 consumer group을 바꿔보도록 하자.

먼저 사용한 python 모듈은 kafka-python 이다. pip로 설치하면 간단하다.
pip install kafka-python

다음은 consumer group offset을 변경하는 코드이다.


위 코드는 3개의 argument를 받도록 해놨다. file, topic, group이다. topic과 group은 이해가 되는데 file은 무엇인가 궁금해 할 수 있다.
file에는 {partition},{offset} 으로 씌여진 파일을 받로고 했다. partition은 일반적으로 여러개로 구성되고 각 partition마다 offset을 달리 설정해야 할 수 있기에 파일로 미리 작성하고 실행하면 편할 것 같았다.
[Line 39]까지는 입력받은 argument를 parsing하고 정리하는 부분이다. 그리고 [Line 44]에서 consumer group을 지정한다.

offset를 바꾸는 코드는 [Line 47~53]를 주의해서 보면 된다. 
앞서 설명했듯이 partition 단위로 offset를 변경해야해서 partition 개수만큼 for 루프가 돈다. 그리고 topic과 partition 정보를 가진 TopicPartition을 만든다. 만들어진 TopicPartition를 consumer할당하고 consumer.seek으로 지정한 offset으로 이동한다. 그리고 [Line 53]에서 해당 offset으로 commit을 하면 consumer group offset 정보가 바뀌게 된다.


관련 문서


반응형