[Spark] Direct API for Kafka (직접 모델)
스파크 스트리밍에서 데이터 소스로 카프카를 많이 사용한다. 빅 데이터의 스트리밍 처리에서 카프카는 거의 필수로 사용되기 때문이며 스파크 스트리밍이 카프카와의 통합을 잘 지원하기 때문이다.
스파크 스트리밍에서 카프카의 데이터를 가져오는 여러 방법이 있는데 exactly-once를 지원하기 위해 개발된 Direct API for Kafka(이하 직접 모델)를 소개한다. 그리고 기존 리시버 모델과는 무슨 차이가 있는지 살펴보자.
1. 리시버 모델
직접 모델이 나오기 전에 사용되던 리시버 모델이다. 각 executor(이하 익스큐터)에 리시버가 존재하고 리시버에 의해서 카프카의 데이터를 가져온다. 리시버가 카프카 컨슈머가 된다. 내결험성을 위해 리시버는 WAL(Write Ahead Logs)를 기록한다. 데이터 처리 실패 시 WAL은 데이터 재처리에 활용된다.
그러나 데이터 처리가 완료된 지점을 카프카 컨슈머 오프셋 커밋을 하지 않았다면 데이터 중복은 발생할 수 밖에 없다.
2. 직접 모델
직접 모델는 계속해서 데이터를 수신하는 리시버를 제거했다. 그럼 어떻게 카프카의 데이터를 가져올까?
카프카 컨슈머가 가져올 오프셋 범위를 미리 정하고 해당 오프셋에 해당하는 데이터를 각 배치 단계에서 처리한다. 그리고 처리가 완료된 오프셋을 기록한다. 오프셋은 카프카 컨슈머 그룹에 커밋을 할 수도 있고 내부 DB에 별도로 저장해도 된다.
위 그림을 보면 드라이버 노드가 익스큐터 노드에게 배치에서 처리할 카프카 데이터의 오프셋 범위를 알려준다.
오프셋 범위만 알아도 카프카 컨슈머가 데이터를 가져올 수 있는지 의문스러울 수 있다.
컨슈머는 seek 함수를 가지고 있는데, 해당 함수는 컨슈머가 처리할 오프셋을 지정한다. 드라이버가 오프셋 범위를 전달하면 익스큐터는 컨슈머의 seek 함수로 가져올 범위의 첫 오프셋으로 맞추고 데이터를 컨슘하는 것이다.
3. 코드
그럼 마지막으로 코드를 한번 살펴보자.
KafkaUtils.createDirectStream 함수를 호출해서InputDStream을 리턴한다. 함수의 인자값으로 카프카 컨슈머의 설정을 추가할 수 있다.
그리고 직접 모델에서 오프셋을 내부 DB에 별도로 저장할 수 있다고 했다. createDirectStream 함수에 가져올 카프카 오프셋을 전달할 수 있다. 따라서 처리한 카프카 오프셋을 별도로 관리할 수 있는 것이다.
하지만 대부분 카프카에 컨슈머 오프셋을 커밋함으로서 관리를 한다. 그럼 어떻게 처리한 오프셋을 확인 후 커밋할까?
아래 코드처럼 RDD의 오프셋 범위를 확인 후 카프카에 커밋한다.
4. 결론
스파크 스트리밍을 사용하는 분들이 대부분의 서적에 많이 소개하는 리시버 모델을 사용하는 경우가 많을 것이다. 그런데 보다 안정성이 높은 Direct API for Kafka (직접 모델)도 있으니 한번 검토하시면 좋을 것 같다.
참고 문서