스파크 스트리밍에서 데이터 소스로 카프카를 많이 사용한다. 빅 데이터의 스트리밍 처리에서 카프카는 거의 필수로 사용되기 때문이며 스파크 스트리밍이 카프카와의 통합을 잘 지원하기 때문이다.
스파크 스트리밍에서 카프카의 데이터를 가져오는 여러 방법이 있는데 exactly-once를 지원하기 위해 개발된 Direct API for Kafka(이하 직접 모델)를 소개한다. 그리고 기존 리시버 모델과는 무슨 차이가 있는지 살펴보자.
1. 리시버 모델
직접 모델이 나오기 전에 사용되던 리시버 모델이다. 각 executor(이하 익스큐터)에 리시버가 존재하고 리시버에 의해서 카프카의 데이터를 가져온다. 리시버가 카프카 컨슈머가 된다. 내결험성을 위해 리시버는 WAL(Write Ahead Logs)를 기록한다. 데이터 처리 실패 시 WAL은 데이터 재처리에 활용된다.

그러나 데이터 처리가 완료된 지점을 카프카 컨슈머 오프셋 커밋을 하지 않았다면 데이터 중복은 발생할 수 밖에 없다.
2. 직접 모델
직접 모델는 계속해서 데이터를 수신하는 리시버를 제거했다. 그럼 어떻게 카프카의 데이터를 가져올까?
카프카 컨슈머가 가져올 오프셋 범위를 미리 정하고 해당 오프셋에 해당하는 데이터를 각 배치 단계에서 처리한다. 그리고 처리가 완료된 오프셋을 기록한다. 오프셋은 카프카 컨슈머 그룹에 커밋을 할 수도 있고 내부 DB에 별도로 저장해도 된다.

위 그림을 보면 드라이버 노드가 익스큐터 노드에게 배치에서 처리할 카프카 데이터의 오프셋 범위를 알려준다.
오프셋 범위만 알아도 카프카 컨슈머가 데이터를 가져올 수 있는지 의문스러울 수 있다.
컨슈머는 seek 함수를 가지고 있는데, 해당 함수는 컨슈머가 처리할 오프셋을 지정한다. 드라이버가 오프셋 범위를 전달하면 익스큐터는 컨슈머의 seek 함수로 가져올 범위의 첫 오프셋으로 맞추고 데이터를 컨슘하는 것이다.
3. 코드
그럼 마지막으로 코드를 한번 살펴보자.
KafkaUtils.createDirectStream 함수를 호출해서InputDStream을 리턴한다. 함수의 인자값으로 카프카 컨슈머의 설정을 추가할 수 있다.
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092") | |
// Define which topics to read from | |
val topics = Set("sometopic", "anothertopic") | |
// Create the direct stream with the Kafka parameters and topics | |
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) |
그리고 직접 모델에서 오프셋을 내부 DB에 별도로 저장할 수 있다고 했다. createDirectStream 함수에 가져올 카프카 오프셋을 전달할 수 있다. 따라서 처리한 카프카 오프셋을 별도로 관리할 수 있는 것이다.
하지만 대부분 카프카에 컨슈머 오프셋을 커밋함으로서 관리를 한다. 그럼 어떻게 처리한 오프셋을 확인 후 커밋할까?
아래 코드처럼 RDD의 오프셋 범위를 확인 후 카프카에 커밋한다.
stream.foreachRDD { rdd => | |
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges | |
// some time later, after outputs have completed | |
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) | |
} |
4. 결론
스파크 스트리밍을 사용하는 분들이 대부분의 서적에 많이 소개하는 리시버 모델을 사용하는 경우가 많을 것이다. 그런데 보다 안정성이 높은 Direct API for Kafka (직접 모델)도 있으니 한번 검토하시면 좋을 것 같다.
참고 문서
'Big Data > Spark' 카테고리의 다른 글
[Spark] event log의 spark 옵션 Elasticsearch 적재 (0) | 2023.03.20 |
---|---|
hadoop 2.6.0 버전을 위한 spark 3.x 빌드 (0) | 2022.09.28 |
[spark] hadoop 3 & hive 3 환경 설정 (0) | 2020.05.25 |
[Spark] 2.4.0 - bucket pruning (0) | 2020.03.10 |
[Spark] Dataset (0) | 2020.01.14 |