나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Spark

[Spark] Structured Streaming checkpoints

devidea 2025. 4. 21. 17:51

1. checkpoints가 필요한 이유

Structured Streaming에서 처리한 데이터의 신뢰성과 복구를 보장하기 위해 필요하다. Yarn Cluster의 장애로 인해 정상 동작 중이던 spark streaming이 비정상 종료되었다고 하자. 이 때, 어디부터 데이터 처리를 다시 해야하는지 판단해야 하는데 checkpoints에 기록된 메타 데이터를 확인한다.

 

checkpoints를 지정하는 방법은 아래와 같이 writeStream에서 option으로 지정한다.

df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")

microBatch 마다 처리한 결과를 해당 디렉토리(혹은 hdfs)에 저장한다.


checkpoints가 지정된 Structured Streaming의 코드 변경은 허용/ 비허용의 기준이 존재한다.

  • 입력 소스의 수 또는 종류 변경 불가
  • 입력 소스 파라미터 변경은 소스와 상황에 따라 다름
  • 트리거 간격(Trigger interval) 변경은 허용
  • 일부 싱크 간에는 출력 싱크의 종류 변경 허용
  • 출력 싱크 파라미터는 종류와 상황에 따라 변경 가능
  • projection, filter, map과 같은 연산은 일부 경우 변경 가능
  • stateful 연산(상태를 가지는 연산)에 대한 변경은 불가 (상태 데이터의 스키마가 반드시 동일해야 함)

 

2. checkpoints 파일 구조

checkpoints의 디렉토리에 저장되는 파일은 아래의 구조를 따른다.

파일의 번호는 microBatch의 순서에 따라 늘어난다.

checkpoint-location/
├── commits/ # 커밋된 오프셋 정보
│ ├── 0
│ ├── 1
│ └── ...
├── metadata # 쿼리 메타데이터
├── offsets/ # 처리된 오프셋 정보
│ ├── 0
│ ├── 1
│ └── ...
└── state/ # 집계 연산을 위한 상태 정보
├── 0
├── 1
└── ...

 

checkpoints를 기록하는 코드의 일부를 살펴봤다. 파일 시스템에 저장되는 로직은 HDFSMetadataLog 클래스를 통해 이루어진다.

markMicroBatchStart -> offsetLog.add()
markMicroBatchEnd -> commitLog.add()

 

아래 코드는 markMicroBatchStart() 메서드이다. micro batch가 시작할 때, 호출되는 메서드인데 offsetLog.add() 메서드를 통해 파일 시스템에 해당 micro batch가 처리하는 오프셋을 기록한다. Kafka 소스를 기준으로 하기에 여기에서의 오프셋은 카프카 토픽의 레코드이다.

protected def markMicroBatchStart(): Unit = {
assert(offsetLog.add(currentBatchId,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${offsetSeqMetadata.toString}")
}

 

offsetLog는 OffsetSeqLog 클래스의 객체인데, OffsetSeqLog 클래스에서 파일 시스템에 offset을 기록할 때, 파일의 내용을 시리얼라이즈, 디시리얼라이즈 하는 로직이 포함되어 있다. Offset 파일의 내용을 하나 가져왔다. 버전, batch의 시간 등의 메타데이터가 나오고 마지막에 토픽의 파티션과 오프셋 번호가 기록되어 있다. OffsetSeqLog 클래스의 deserialize() 메서드에서 해당 규칙으로 파일의 내용을 만든 것이다.

v1
{"batchWatermarkMs":0,"batchTimestampMs":1744950785339,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"test":{"0":588091}}

 

3. Checkpoints 파일의 삭제

그럼 checkpoints 파일은 임의로 삭제하지 않는다면 계속 존재하는 것일까?

아래 코드는MicroBatchExecution 클래스의 constructNextBatch 메서드이다. micro batch의 다음 배치를 구성하는 로직이 들어있는데 주의할 점은 purge(), purgeAsync() 메서드 호출 부분이다. 현재 배치Id 번호를 통해서 몇 개의 배치 파일을 유지할 지 판단한다. 기본 설정에서 100개의 파일을 유지하게 되어있다. purge(), purgeAsync()는 파일 삭제를 동기, 비동기로 처리할지에 대한 구분이다.

if (shouldConstructNextBatch) {
// Commit the next batch offset range to the offset log
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
markMicroBatchStart()
// NOTE: The following code is correct because runStream() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
// the same time), this cleanup logic will need to change.
// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
// sources to discard data from the previous batch.
cleanUpLastExecutedMicroBatch()
// It is now safe to discard the metadata beyond the minimum number to retain.
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minLogEntriesToMaintain < currentBatchId) {
if (useAsyncPurge) {
purgeAsync()
} else {
purge(currentBatchId - minLogEntriesToMaintain)
}
}
}
noNewData = false
} else {
noNewData = true
awaitProgressLockCondition.signalAll()
}

 

결론적으로 최대 100개 * 디렉토리 수 만큼의 파일들이 유지된다. 파일의 수는 "spark.sql.streaming.minBatchesToRetain" 옵션으로 조정할 수 있다.

 

참고 문서

반응형