Spark 으로 ETL 작업을 처리하다가 처리한 데이터의 누적 양을 집계하고 싶었다. 예를 들면, Kafka의 데이터를 활용해 spark streamming 작업을 할 때, 각 단계(spark streaming은 짧은 간격의 배치)마다 처리된 데이터 건수를 집계한다고 하자. spark은 분산으로 데이터를 처리하기 때문에 각 executor의 처리된 결과를 조합한 공유 변수가 필요하다. executor가 처리한 데이터를 하나의 공유변수로 값을 기록하는 것이다. 이 때 공유변수는 결합 및 가환 연산을 지원해야 한다. '가환'이라는 단어가 어색한데 사전으로 찾아본 결과 '조작이나 연산의 순서를 바꾸어도 그 결과가 변하지 않는 일' 이다. 다시 간단히 정리하면 공유변수를 통한 연산의 순서, 연산의 조합의 결과가 동일함을 유지해야 한다는 것이다. 병렬프로그래밍에서 스레드 안전한 변수를 사용하는 것과 동일한 개념이다.
지금까지 장황하게 설명한 그 '공유변수'가 Accumulator이다.
이것으로 개념적으로 설명을 마무리 할 수 있으나 코드로 예시만 살펴보도록 하자.
spark 문서에 나와있는 코드를 그대로 가져왔다.
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
SparkContext에서 Accumulator를 생성하고 있다. 그리고 각 task에서 add 함수로 값을 저장하고 있다. 다만 값은 드라이버 프로그램에서만 value 함수로 읽을 수 있다. 참고 할 부분은 Accumulator의 값은 action 연산을 처리하는 과정에서만 갱신된다.
Accumulator를 생성하는 2번째 방법은 AccumulatorV2 클래스를 상속받아 long, double이 아닌 다름 type에 대해서도 처리할 수 있게 한다.
SparkContext에서는 long, double type의 Accumulator만 생성할 수 있다.
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
필자 역시 AccumulatorV2를 상속받은 클래스를 만들고 조합해서 필요한 데이터를 갖는 클래스를 디자인했다. 만들어진 객체로 ETL 작업이 끝난 이후 Accumulator에 저장된 값을 따로 수집하도록 했다.
관련문서
반응형
'Big Data > Spark' 카테고리의 다른 글
[Spark] Direct API for Kafka (직접 모델) (2) | 2021.08.12 |
---|---|
[spark] hadoop 3 & hive 3 환경 설정 (0) | 2020.05.25 |
[Spark] 2.4.0 - bucket pruning (0) | 2020.03.10 |
[Spark] Dataset (0) | 2020.01.14 |
Spark SQL (0) | 2018.11.20 |