나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Kafka

[Kafka] MirrorMaker2 in Connect cluster

devidea 2021. 11. 19. 10:54

MirrorMaker2(이하 MM2)는 띄우는데는 여러 방법이 있다. 분산 환경으로 한정하면 2가지 방법이 있다. 그것은 MM2 전용 클러스를 띄우거나 기존 Connect 클러스터에 MM2를 추가하는 것이다. 기존 Connect 클러스터에는 MM2를 추가한다는 표현을 썼는데, MM2가 Source Connector를 구현한 방식으로 개발되었기 때문이다.

 

이번 글에서는 기존 Connect 클러스터에 MM2 관련 Source Connector를 추가하는 방법에 대해서 설명한다.

1. Connect 클러스터를 활용하는 이유
본격적인 활용 방법에 대해서 설명을 하기 전에 기존 Connect 클러스터를 활용하는 이유에 대해서 설명하고자 한다.

사실 가장 큰 이유는 기존에 운영 중인 Conenct 클러스터가 있다면 별도의 서버가 필요하지 않기 때문에 활용성이 높기 때문이다. 하지만 MM2의 기능만 쓴다면 MM2 전용 서버를 띄우는 것이 더 간편하다. 필자 역시 MM2만을 위한 전용 클러스터 구축을 진행했다. 하지만 아직 MM2 전용 클러스터를 쓰기에 부족한 점이 있었다.

부족한점을 KIP-710의 카프카 기능 제안 문서를 보면 알 수 있다. KIP-710에서는 MM2 전용 클러스터를 띄울 때, Connect 클러스터의 REST API 기능이 전부 포함되어 있지 않아서 구현이 필요하다고 제안했다. REST API 기능이 없어도 분산으로 MM2 클러스터가 잘 동작한다면 무슨 문제가 있냐라고 질문할 수 있다. 필자는 테스트 결과 분산으로 처리하는데 문제가 있었다.

Connect 클러스터의 rest.advertised.host.name, rest.advertised.host.port 설정이 필요했기 때문이다. 해당 설정은 Connect 서버 간에 통신할 때도 활용되는 설정이다. 그런데 MM2 전용 클러스터에는 REST API 기능이 다 구현되지 않아 설정할 수 없다. rest.advertised.host.name, rest.advertised.host.port 설정에 관련한 문제에 대해서는 다음 블로그 글에 나와 있으니 참고하시면 좋다(Common mistakes made when configuring multiple Kafka Connect workers).

 

이런 원인에 따라서 Connect 클러스터에 MM2 Source Connector를 추가하는 방법을 테스트했다.

2. MM2 관련 Source Connector 추가
MM2 전용 클러스터를 띄울 때는 내부적으로 3개의 Connector를 자동으로 실행해 준다. 3개의 Connector는 다음과 같다. 

  • MirrorHeartbeatConnector
    • heartbeat 데이터를 기록하며 remote 카프카가 정상 동작하여 연결이 되는지 주기적으로 모니터링 한다.
  • MirrorCheckpointConnector
    • 컨슈머 오프셋을 복제한다.
  • MirrorSourceConnector
    • 카프카 레코드 복제(source -> target)를 수행한다.

 

Connect 클러스터에 MM2 기능을 수행하려면 3개의 Connector를 등록하면 된다.
등록하는 방법은 Connect 클러스터의 API를 아래 예와 같이 구성하여 호출하면 된다.

새로운 Connector를 생성하는 API는 POST /connectors 이다.
생성한 Connector의 Config 정보, 삭제와 관련한 API들은 카프카 공식 문서를 통해서 확인해 보자.

 

3. MM2 in Connect Cluster 확인
MM2 기능을 수행하는 Connector들을 띄웠다면 정상적으로 처리하고 있는지 확인을 해보자.
미러링을 하는 과정에서 internal 토픽들이 생성되며 미러링과 관련된 데이터를 적재하고 있다.

3개의 토픽을 살펴볼 예정이다.

  • heartbeats (target)
    • heartbeat 수행 내역 (source, target 카프카 alias와 timestamp가 기록된다)
  • {source}.checkpoints.internal
    • - source consumer group,  partition, upstream/downstream offset
  • mm2-offset-syncs.{target}.internal
    • source와 target 카프카의 오프셋 매칭 정보를 보여준다.

 

내부 토픽들은 별도의 Formatter를 통해 데이터를 확인할 수 있다.
Formatter는 다음 KIP-597를 참고하자.

 

heartbeats
heartbeats는 미러링이 잘 되고 있는지 계속 검증하고 로그성으로 이력을 남긴다.
heartbeats를 기록하는 주기는 emit.heartbeats.interval.seconds 설정으로 관리한다.

heartbeats

checkpoint
checkpoint는 source 카프카의 consumer group의 offset 동기화 이력을 기록한다.
upstreamOffset이 source 카프카의 consumer group offset이고, downstramOffset은 target 카프카의 consumer group offset이다.

checkpoint

추가적으로 consumer group을 관리하는 __consumer_offsets에도 반영이 되는지 확인해보자.
__consumer_offsets의 동기화는 2.7.0 버전 이후에 가능하다.

__consumer_offsets

앞서 checkpoint 토픽에서 남겨진 consumer group offset과 일치하는지 확인해 보면 좋다.

offset-syncs
마지막으로 offset-syncs 정보이다.
이 값은 source 카프카의 오프셋이 target 카프카의 어떤 오프셋 번호와 매칭되는지 알려준다.

source, target의 오프셋은 토픽 파티션이 생성된 시점에 따라 다를 것이기 때문에 매칭 정보를 관리함으로써 어디까지 미러링이 됐는지 확인할 수 있다. 그리고 sync가 된 오프셋을 토대로 consumer group도 동일하게 관리된다.

offset-syncs

4. 결론
이번글에서는 MM2를 Connector 클러스터에 올리는 방법에 대해서 설명했다. 분산처리 목적으로 확인하게 된 이유가 있었지만 MM2가 사용하는 Connector들의 특징들도 같이 분석해 볼 수 있는 좋은 기회였다. MM2 전용 클러스터가 REST API를 포함한 버전으로 개선이 되겠지만, MM2를 위한 구현된 Connector들의 구현 방식을 알면 운영하는데 더 도움이 될 것 같다.
MM2는 여러 카프카 클러스터 간의 active/standby, active/active 구성 등 운영 환경에서 더 활용할 가능성이 높다. 카프카 오픈소스 진영에서도 MirrorMaker1은 deprecated되고 MM2가 미러링의 중요한 역할을 수행할 예정이니 관심있게 지켜보는 것이 좋다.


참고 문서

 

반응형

'Big Data > Kafka' 카테고리의 다른 글

MirrorMaker2 소개 발표  (0) 2022.04.15
[Kafka] Avro Consumer의 GenericRecord schema  (0) 2021.12.30
[Kafka] Connect dead letter queue  (0) 2021.10.06
[Kafka] MirrorMaker2 마이그레이션  (0) 2021.05.21
[kafka] kafka-producer-perf-test  (0) 2020.12.18