나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Kafka

[Kafka] consumer LAG 수집 및 elasticsearch 적재

devidea 2018. 12. 28. 18:37
Kafka에 저장된 데이터를 consumer를 통해 처리를 하고 있을 때, 개발한 코드로 데이터가 잘 처리되고 있는 건지 궁금할 때가 있다. consumer의 데이터 처리 상황을 주기적으로 모니터링 한다면 처리하지 못하고 Kafka에 남아있는 데이터 수치를 알아야 한다. Kafka에 남아 있는 데이터 수의 증감을 통해 consumer의 throughput도 계산할 수 있다. Kafka 문서에 보면 consumer group의 현재 position을 얻는 command가 있다.

다음은 해당 command인 kafka-consumer-group.sh의 실행 예제이다.
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

--bootstrap-server              REQUIRED The server to connect to.
--group <consumer group>        The consumer group we wish to act on.
--describe                      Describe consumer group and list offset lag related to given group.

다음 command를 실행하면 몇 가지 데이터를 볼 수 있다. 이 문서는 LAG을 설명하기 위함이니 관련 정보만 자세히 살펴보도록 하자.
  • CURRENT-OFFSET : consumer group이 Kafka에서 읽은 offset
  • LOG-END-OFFSET : 해당 topic, partition의 마지막 offset
  • LAG : LOG-END-OFFSET과 CURRENT-OFFSET의 차이.

LAG은 topic의 partition 단위로 마지막 offset과 consumer가 읽어간 offset의 차이를 나타내므로 consumer가 읽어가야 할 남은 데이터 수를 의미한다.
그렇다면 간단한 bash 프로그램으로 LAG를 구하는 코드를 짜고 cron으로 등록만 해주면 주기적으로 consumer의 처리 상태를 모니터링 할 수 있다.

필자는 모니터링 로그들을 elasticsearch를 통해서 수집하는 걸 선호하는데, kibana로 수집된 데이터를 확인하기에 좋기 때문이다.

아래 코드는 kafka-consumer-group.sh로 수집한 LAG 정보를 awk로 구분해서 추출하고 elasticsearch로 넣는다.
elasticsearch로 데이터를 넣을 때는 json으로 데이터를 만들어서 REST로 elasticsearch 서버로 호출하면 된다.
간단한 로직이니 코드로 나머지 설명을 대신하기로 한다.

반응형