이번 글에서는 Kafka Connect 관련된 내용을 소개하고자 한다. Connect에 대한 전반적인 개요 글은 아니고 Sink Connector에서 offset 처리에 대한 내용이다.
Sink Connector?
Offset 관련 설명을 하기 전에 Sink Connector에 대해서는 기본적인 소개가 필요하다. 아래 그림으로 Connect의 전체적인 개념을 쉽게 이해해 보자.
Connect는 크게 Source/ Sink Connector로 구성되어 있다.
Sink Connector는 그림에서 표시한 부분으로서 Kafka의 데이터를 다른 저장소에 넣는데 사용한다.
Sink의 사전적 의미에 '밀어넣다'가 포함되어 있는데 다른 저장소에 데이터를 밀어넣는다고 이해하면 된다.
Sink Connector 내부적으로 Kafka Consumer가 포함되어 있는데, Consumer가 가져가서 처리한 데이터의 위치 즉, consumer offset을 주기적으로 commit 해야 한다.
이번 글에서는 Sink Connector를 개발하기 위해 구현할 인터페이스를 살펴보며 offset을 commit하는 순간을 분석하고자 했다.
Sink Connector Interface
Sink Connector를 개발하기 위해서는 SinkTask를 상속받아 아래와 같은 메서드들을 구현해야 한다.
offset과 관련해서 주의깊게 봐야할 메서드들은 put과 flush 이다.
put 메서드부터 살펴보자.
put 메서드는 SinkRecord를 포함하는 Collection을 전달 받는다. SinkRecord가 Kafka로부터 받은 데이터이다.
SinkRecord는 Kafka의 데이터에 추가적으로 offset 정보를 포함한 객체이다. offset 정보가 포함되어 있기에 데이터를 처리하는 과정에서 어디까지 했는지 알수 있다는 점이 중요하다.
다음은 flush 메서드이다. 0.10.2 버전 이후로 flush 메서드에 변화가 생기는데 일단 flush 메서드의 기본적인 동작부터 살펴보자.
flush는 전달받은 파티션의 offset들을 commit한다. 여기서 offset들은 put 메서드로부터 받은 SinkRecord의 전체에 해당한다.
Connect에서는 Consumer가 Kafka로부터 다시 poll 하기 전에 이전에 받았던 데이터들이 모두 처리되다고 간주하고 commit을 한다.
Consumer가 받아온 모든 데이터들이 처리되었다고 간주하는 부분에서 데이터 누락현상이 발생할 수 있다.
decouple flush and offset commit
Kafka 0.10.2 버전에서 변경된 내용을 살펴보자. KIP-89의 제목과 같이 flush 메서드에서 offset commit을 분리한다.
기능 분리와 함께 메서드 하나가 추가되었다. preCommit 메서드를 살펴보자.
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
flush(currentOffsets);
return currentOffsets;
}
preCommit 메서드도 파라미터로 currentOffsets을 받는다. javadoc 문서를 보면 현재의 offset을 편의를 위해 제공하지만 처리된 SinkRecord를 추적하여 offset을 결정한다고 씌여있다.
리턴되는 값이 파라미터로 전달되는 타입과 똑같음을 주의깊게 보자.
Sink Connector로 데이터를 저장하는 처리를 하는 중에 처리가 완료되었다고 판단되는 SinkRecord의 offset을 기록해뒀다가 그 값을 리턴하면 해당 offset으로 commit이 된다는 것이다.
결론적으로 commit이 되는 offset을 자유롭게 조정할 수 있다.
최종 저장소로 저장이 완료되는 기준이 시간, 사이즈 등 다양하게 설정하여 처리가 끝난 데이터에 대해서만 offset을 commit 하도록 한다.
관련 문서
반응형
'Big Data > Kafka' 카테고리의 다른 글
[Kafka] Kerberos 인증 #1 (0) | 2020.02.03 |
---|---|
[Kafka] Introducing ksqlDB (0) | 2019.11.21 |
[Kafka] 컨트롤러 분석 (0) | 2019.10.07 |
[Kafka] mirrorMaker v1 단점. v2는? (0) | 2019.07.18 |
[Kafka] Configurable SASL callback handler (0) | 2019.04.18 |