[Kafka] MirrorMaker2 마이그레이션
업무가 많아서 기술 정리를 하지 못했었는데 오랜만에 블로그 글을 작성해 본다. 이번글에서는 MirrorMaker2(이하 MM2) 관련 내용을 소개한다.
최근 MM2로 미러링을 하던 카프카 클러스터의 이전을 계획하면서 MM2가 미러링 처리한 시점을 확인해야 했고, 이전 시점에 MM2에 처리한 토픽의 오프셋 이후부터 다시 미러링하도록 작업해야 했다. 그래서 MM2에서 미러링 처리한 데이터의 토픽/오프셋을 어떻게 관리하고, MM2를 새로 시작하거나 이전할 때 앞서 처리한 오프셋의 위치를 확인해 다시 미러링을 하는지 분석하게 되었다.
이번 글에서는 MM2 이전을 하는 과정에 포커스를 맞추지만 MM2의 오프셋 관리방식도 같이 소개한다.
1. MM2 구동시 시작되는 Connector
MM2는 기본적으로 Kafka Connect 기반으로 개발되었으며, MM2를 시작하면 기본적으로 3개의 Connector가 실행된다.
MirrorMaker.java 코드를 살펴보면 시작할 때 아래 3개의 Connector가 지정되며 수행된다.
private static final List<Class<?>> CONNECTOR_CLASSES = Arrays.asList(
MirrorSourceConnector.class,
MirrorHeartbeatConnector.class,
MirrorCheckpointConnector.class);
3개 중에서 데이터 미러링을 직접 담당하는 Connect는 MirrorSourceConnector이다.
그래서 MirrorSourceConnector가 시작될 때 미러링 완료된 오프셋을 먼저 확인하고 이후 미러링을 진행한다.
처리된 offset을 가지고 있는 토픽은 mm2-offsets.{source cluster}.internal 이다.
mm2-offsets 토픽 이름은 offset.storage.topic 설정을 바꾸지 않으면 default로 사용하는 토픽이름이다.
MirrorMaker.java 코드를 보면 설정된 토픽을 확인하는 부분이 있다.
props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets."
+ sourceAndTarget.source() + ".internal");
2. MM2가 미러링 완료 오프셋 가져오는 방식
미러링 완료 오프셋 가져오는 코드부분을 보기 전에 오프셋을 처리하는 부분을 담당하는 클래스들의 구조를 보자.
MM2의 미러링 완료 오프셋 처리를 하는 클래스는 KafkaOffsetBackingStore이다. KafkaOffsetBackingStore를 통해서 미러링 완료 오프셋을 가져오거나 처리가 끝난 미러링 완료 오프셋을 저장한다.
MirrorSourceTask는 task.assigned.partitions로 할당된 토픽을 대상으로 미러링 처리를 한다. 미러링할 대상 토픽은 MM2 설정에 존재한다. 미러링할 토픽에 대해서 미러링을 처리하기 전에 먼저 처리된 미러링 완료 오프셋을 가져온다. 미러링 완료 오프셋을 먼저 확인하고 그 이후 데이터부터 미러링을 하는 것이다. 미러링 완료 오프셋을 가져오는 메서는 KafkaOffsetBackingStore의 get 메서드를 사용한다. 해당 메서드는 오프셋 확인이 필요한 토픽들을 파라미터로 던진다. 그럼 파라미터로 전달한 토픽의 미러링 완료 오프셋들을 리턴받는다.
미러링 완료 오프셋들을 가져올 땐 실제로 어떤 과정을 통했을까?
KafkaBasedLog의 start() 메서드를 보면 mm2-offsets.{source cluster}.internal 토픽의 데이터를 처음부터 가져온다. 해당 토픽은 "compact"된 토픽이다. 그래서 미러링이 된 모든 토픽의 오프셋 정보가 사라지지 않고 계속 보관된다.
compact 토픽을 처음부터 가져오는 로직은 아래 코드로 볼 수 있다. seekToBeginning(partitions) 코드에 주석이 있는데 consumer group으로 커밋된 오프셋을 사용하지 않고 compact 토픽 처음부터 가져온다는 설명이 있다.
consumer.assign(partitions);
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
// when a 'group.id' is specified (if offsets happen to have been committed unexpectedly).
consumer.seekToBeginning(partitions);
readToLogEnd();
그리고 처음부터 가져온 미러링 완료 오프셋 정보는 KafkaOffsetBackingStore.java의 data 변수에 값을 저장했다가 미러링 완료 오프셋 확인이 필요한 토픽을 요청받으면 전달해 준다.
3. MM2 마이그레이션 방법
MM2를 마이그레이션 하는 방법은 앞서 설명한 미러링 완료 오프셋을 활용하는 것이다.
미러링 완료 오프셋은 MM2를 띄울 때 처음부터 모든 데이터를 가져오기 때문에 새로운 MM2를 띄우기전에 mm2-offsets.{source cluster}.internal 토픽에 마이그레이션이 필요한 토픽의 마지막 미러링 오프셋을 넣어두는 것이다. 그럼 새로운 MM2는 미러링할 토픽의 미러링 완료 오프셋을 인지하게 되고 해당 시점 이후의 데이터를 미러링하게 된다.
그럼 mm2-offsets.{source cluster}.internal 토픽에 어떻게 데이터를 넣으면 될까?
아래와 같이 key, value 형태로 데이터를 넣어주면 된다.
# producer 실행
./bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic mm2-offsets.{source cluster}.internal --property "parse.key=true" --property "key.separator=%"
# 전송 데이터 예
["MirrorSourceConnector",{"cluster":"{source cluster}","partition":0,"topic":"test"}]%{"offset":5}
예시로 든 데이터를 보면 test 토픽의 0번 파티션의 미러링 완료 오프셋을 5로 설정했다.
그럼 새로운 MM2가 띄워지면 test 토픽은 6번 오프셋부터 미러링을 하게 된다.
4. 결론
이번 글에서는 MM2의 운영 노하우 중 하나를 소개했다.
해당 아이디어는 아래 참고문서의 내용을 참조했다. 참고한 문서는 Confluent의 Replicator에서 MM2로의 마이그레이션 방법을 소개하고 있는데 기본적인 아이디어는 동일하다.
MM2는 기존 MirrorMaker의 단점을 보안한 좋은 프레임워크이다. 하지만 출시된지 얼마되지 않아서 운영 상 생기는 이슈들도 계속해서 정리하면서 노하우를 쌓을 필요가 있다. 이번 글에서 소개한 MM2의 마이그레이션 방법이 운영 중 도움이 되었으면 좋겠다.
참고문서