나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Spark

[Spark] Accumulators

devidea 2019. 5. 3. 19:00
Spark 으로 ETL 작업을 처리하다가 처리한 데이터의 누적 양을 집계하고 싶었다. 예를 들면, Kafka의 데이터를 활용해 spark streamming 작업을 할 때, 각 단계(spark streaming은 짧은 간격의 배치)마다 처리된 데이터 건수를 집계한다고 하자. spark은 분산으로 데이터를 처리하기 때문에 각 executor의 처리된 결과를 조합한 공유 변수가 필요하다. executor가 처리한 데이터를 하나의 공유변수로 값을 기록하는 것이다. 이 때 공유변수는 결합 및 가환 연산을 지원해야 한다. '가환'이라는 단어가 어색한데 사전으로 찾아본 결과 '조작이나 연산의 순서를 바꾸어도 그 결과가 변하지 않는 일' 이다. 다시 간단히 정리하면 공유변수를 통한 연산의 순서, 연산의 조합의 결과가 동일함을 유지해야 한다는 것이다. 병렬프로그래밍에서 스레드 안전한 변수를 사용하는 것과 동일한 개념이다.

지금까지 장황하게 설명한 그 '공유변수'Accumulator이다.

이것으로 개념적으로 설명을 마무리 할 수 있으나 코드로 예시만 살펴보도록 하자.

spark 문서에 나와있는 코드를 그대로 가져왔다.
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10

SparkContext에서 Accumulator를 생성하고 있다. 그리고 각 task에서 add 함수로 값을 저장하고 있다. 다만 값은 드라이버 프로그램에서만 value 함수로 읽을 수 있다. 참고 할 부분은 Accumulator의 값은 action 연산을 처리하는 과정에서만 갱신된다.

Accumulator를 생성하는 2번째 방법은 AccumulatorV2 클래스를 상속받아 long, double이 아닌 다름 type에 대해서도 처리할 수 있게 한다.
SparkContext에서는 long, double type의 Accumulator만 생성할 수 있다.
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }
  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")


필자 역시 AccumulatorV2를 상속받은 클래스를 만들고 조합해서 필요한 데이터를 갖는 클래스를 디자인했다. 만들어진 객체로 ETL 작업이 끝난 이후 Accumulator에 저장된 값을 따로 수집하도록 했다.


관련문서


반응형

'Big Data > Spark' 카테고리의 다른 글

[Spark] Direct API for Kafka (직접 모델)  (2) 2021.08.12
[spark] hadoop 3 & hive 3 환경 설정  (0) 2020.05.25
[Spark] 2.4.0 - bucket pruning  (0) 2020.03.10
[Spark] Dataset  (0) 2020.01.14
Spark SQL  (0) 2018.11.20