나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Spark

[Spark] Direct API for Kafka (직접 모델)

devidea 2021. 8. 12. 22:43

스파크 스트리밍에서 데이터 소스로 카프카를 많이 사용한다. 빅 데이터의 스트리밍 처리에서 카프카는 거의 필수로 사용되기 때문이며 스파크 스트리밍이 카프카와의 통합을 잘 지원하기 때문이다.

 

스파크 스트리밍에서 카프카의 데이터를 가져오는 여러 방법이 있는데 exactly-once를 지원하기 위해 개발된 Direct API for Kafka(이하 직접 모델)를 소개한다. 그리고 기존 리시버 모델과는 무슨 차이가 있는지 살펴보자.

 

 

1. 리시버 모델

직접 모델이 나오기 전에 사용되던 리시버 모델이다. 각 executor(이하 익스큐터)에 리시버가 존재하고 리시버에 의해서 카프카의 데이터를 가져온다. 리시버가 카프카 컨슈머가 된다. 내결험성을 위해 리시버는 WAL(Write Ahead Logs)를 기록한다. 데이터 처리 실패 시 WAL은 데이터 재처리에 활용된다.

그림 1. 리시버 모델

그러나 데이터 처리가 완료된 지점을 카프카 컨슈머 오프셋 커밋을 하지 않았다면 데이터 중복은 발생할 수 밖에 없다.

 

2. 직접 모델

직접 모델는 계속해서 데이터를 수신하는 리시버를 제거했다. 그럼 어떻게 카프카의 데이터를 가져올까?

카프카 컨슈머가 가져올 오프셋 범위를 미리 정하고 해당 오프셋에 해당하는 데이터를 각 배치 단계에서 처리한다. 그리고 처리가 완료된 오프셋을 기록한다. 오프셋은 카프카 컨슈머 그룹에 커밋을 할 수도 있고 내부 DB에 별도로 저장해도 된다.

그림 2. 다이렉트 모델

위 그림을 보면 드라이버 노드가 익스큐터 노드에게 배치에서 처리할 카프카 데이터의 오프셋 범위를 알려준다.

 

오프셋 범위만 알아도 카프카 컨슈머가 데이터를 가져올 수 있는지 의문스러울 수 있다.

컨슈머는 seek 함수를 가지고 있는데, 해당 함수는 컨슈머가 처리할 오프셋을 지정한다. 드라이버가 오프셋 범위를 전달하면 익스큐터는 컨슈머의 seek 함수로 가져올 범위의 첫 오프셋으로 맞추고 데이터를 컨슘하는 것이다.

 

3. 코드

그럼 마지막으로 코드를 한번 살펴보자.

KafkaUtils.createDirectStream 함수를 호출해서InputDStream을 리턴한다. 함수의 인자값으로 카프카 컨슈머의 설정을 추가할 수 있다.

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092")
// Define which topics to read from
val topics = Set("sometopic", "anothertopic")
// Create the direct stream with the Kafka parameters and topics
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)

그리고 직접 모델에서 오프셋을 내부 DB에 별도로 저장할 수 있다고 했다. createDirectStream 함수에 가져올 카프카 오프셋을 전달할 수 있다. 따라서 처리한 카프카 오프셋을 별도로 관리할 수 있는 것이다.

 

하지만 대부분 카프카에 컨슈머 오프셋을 커밋함으로서 관리를 한다. 그럼 어떻게 처리한 오프셋을 확인 후 커밋할까?

아래 코드처럼 RDD의 오프셋 범위를 확인 후 카프카에 커밋한다.

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

 

4. 결론

스파크 스트리밍을 사용하는 분들이 대부분의 서적에 많이 소개하는 리시버 모델을 사용하는 경우가 많을 것이다. 그런데 보다 안정성이 높은 Direct API for Kafka (직접 모델)도 있으니 한번 검토하시면 좋을 것 같다.

 

 

참고 문서

 

반응형