이번글에서는 Connect의 에러처리와 관련하여 정리한다. 에러처리와 관련하여 주요한 업데이트가 2.0과 2.6 버전에서 있었다. 2.6 버전에서의 추가 기능은 2.0의 기능 확장이라고 할 수 있는데, 자체적으로 개발한 plugin의 에러처리에 활용된다.
먼저 2.0 버전에서 처음 나온 개념인 dead queue letter에 대해 알아보자.
1. dead letter queue
dead letter queue는 이름에서 유추할 수 있듯이 실패한 레코드를 보관하는 별도의 큐이다. 카프카는 dead letter queue로 원천 데이터가 보관된 토픽이 아닌 별도의 토픽으로 설정한다. 그리고 실패한 레코드의 메타정보도 포함시켜 저장한다.
아래 [그림 1]을 보면 쉽게 이해할 수 있다. 원천 토픽이 레코드 중에서 유효하지 않은 레코드가 dead letter queue로 이동된다.
설정하는 방법은 간단하다. Connect 설정에 아래와 같이 추가하면 된다.
유효하지 않은 레코드의 메타정보를 header에 포함하여 저장할 수 있는데 transform 로직 개선하는데 도움이 된다. 메타정보에 포함된 내용으로는 토픽, 파티션, 오프셋, Exception 내용 등이 있다.
그런데 주의할 점이 있다. 유효하지 않은 데이터를 판단하는 곳이 convert, transform 단계라는 점이다. 왜 중요하냐면 connect plugin을 개발할 때, 데이터가 유입되는 곳이 put 메서드인데, put 메서드로 들어오기 이전에 발생한 에러에 대해서만 dead letter queue로 보내기 때문이다. 왜냐하면 convert와 transform는 put 메서드로 레코드를 보내기 이전 단계여서다.
convert와 transform의 역할을 요약하면 다음과 같다.
단계 | 역할 |
convert | 카프카 데이터를 읽고 쓸때의 [de]serialize. 예를 들면, 카프카 레코드를 json/ avro로 변환. |
transform | 각 레코드의 변환로직. Transformation 인터페이스를 통해 구현 가능한다. |
그래서 plugin을 개발하면서 발생하는 에러처리에 적용하지 못하는 한계가 있었다.
여기까지가 2.0 버전에서 추가된 내용이다. 보다 자세한 내용은 confluent 기술 블로그 Kafka Connect Deep Dive – Error Handling and Dead Letter Queues 글을 통해 확인 할 수 있다.
2. Error Reporting in Sink Connectors
앞서 설명한 dead letter queue의 한계를 극복하기 위해 KIP-610에서 개선된 기능이 개발되었다. 해당 기능은 2.6 버전에 포함되어 릴리즈됐다.
KIP-610의 기능을 한마디로 표현한 문장이다.
Write records that fail in the put() step of a sink connector to the dead letter queue.
plugin에서의 데이터 유입 지점인 put() 메서드 단계에서 발생하는 에러를 dead letter queue로 보낼 수 있게 하는 기능이다.
방법은 새롭게 추가된 ErrantRecordReporter 인터페이스로 에러가 발생한 레코드를 보내는 것이다. 그러면 ErrantRecordReporter는 비동기적으로 dead letter queue로 레코드를 저장한다.
사용하는 방법은 KIP-610의 예제 코드를 그대로 가져왔다.
DLQ 관련 설정을 하게 되면 ErrantRecordReporter 객체를 가져올 수 있다. 해당 객체를 가지고 있다가 레코드 처리를 하는 로직이 포함된 put() 메서드에서 Exception이 발생하면 해당 Exception을 일으키는 레코드를 ErrantRecordReporter 객체의 report 메서드로 전달하면 된다.
Future로 받게되는 이유는 카프카에 데이터를 전송하는 Producer의 send 메서드가 비동기도 데이터를 전송 후 Future로 리턴하기 때문이다. Producer send 메서드가 궁금하면 다음 API 문서를 확인하면 된다.
convert, transform 에러처리와는 다르게 코딩할 부분이 있지만 ErrantRecordReporter를 활용하면 plugin 내에서도 dead letter queue로 유효하지 않은 레코드를 전송할 수 있다.
3. 결론
필자도 2.6 버전 이전의 Connect를 사용하고 있었다. 그래서 에러처리를 위해서 추가 개발을 하여 에러가 발생한 레코드를 별도의 토픽으로 쌓았었다. 하지만 2.6 버전에서 추가 지원하는 ErrantRecordReporter를 사용하면 plugin 개발 시 에러처리를 패턴을 균일하게 적용할 수 있고 또한, DLQ의 설정도 그대로 사용할 수 있어 장점이 많다.
관련 문서
- Transformation
- Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
- KIP-610: Error Reporting in Sink Connectors
'Big Data > Kafka' 카테고리의 다른 글
[Kafka] Avro Consumer의 GenericRecord schema (0) | 2021.12.30 |
---|---|
[Kafka] MirrorMaker2 in Connect cluster (0) | 2021.11.19 |
[Kafka] MirrorMaker2 마이그레이션 (0) | 2021.05.21 |
[kafka] kafka-producer-perf-test (0) | 2020.12.18 |
[Kafka] Burrow 인증 설정 (0) | 2020.10.16 |