나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Spark

[Spark] event log의 spark 옵션 Elasticsearch 적재

devidea 2023. 3. 20. 14:48

이번 글에서는 spark 관리자 역할에서 사용하고 있는 spark 버전 및 설정 정보를 수집하여 dashboard 형태로 모니터링하는 방법에 대해서 공유한다. 이런 아이디어를 생각하게 된 이유는 다른 사용자에 의해서 사용되는 spark 버전들을 관리하고 최신의 버전으로 가이드하기 위해서는 먼저 현황 파악이 우선되어야 했기 때문이다. 사실 spark는 event log를 적재하여 history server를 통해 spark job의 실행 결과를 분석할 수 있다. 문제는 history server의 UI에서는 전체적인 현황을 파악하기 보다는 한 개의 job에 대한 분석을 하는데 더 용이하다는데 있다. 그래서 일부 spark 옵션에 대해서만 수집해서 spark job들의 옵션들을 통합하고 통계화하면 spark를 관리하는데 도움이 될 것 같았다.

 

1. event log

먼저 event log에 대해서 간략히 살펴보자. spark를 실행하면 실행할 때의 옵션들. 예를 들면 java 버전, driver/ executor node의 memory/ instacne 정보들을 특정 hdfs 경로에 저장한다. 옵션들 뿐만 아니라 task의 실행 내역의 로드들도 전부 기록하게 된다. event log는 spark 아래 설정에서 저장될 위치를 지정한다.

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
 

history server는 spark.eventLog.dir의 위치의 event log들을 파싱하여 UI 형태로 분석하는 툴을 제공한다. 그래서 history server를 활용하면 task의 수행 시간, 실행했을 때의 task 상태, SQL의 Logical/ Physical Plan 들도 전부 살펴볼 수 있다.

그런데 필자는 event log의 전체 항목을 모두 통계화하려는 것은 아니고 spark deployMode, executor instance 수, driver/ executor memory 등 실행할 때의 runtime 옵션과 spark 옵션들만 통계화하고 싶었다. 그럼 전체 event log에서 일부 내용만 어떻게 가져올 수 있을까?

 

2. event log 내용 확인 및 분석

event log를 저장한 hdfs의 경로에 가서 event log을 읽어보면 전부 json 형태인 걸 확인할 수 있다. 그럼 json의 key에 따라 어떤 event를 기록할지 정의가 되어 있다는 것인데, event log에 대한 분석을 잘 해놓은 블로그를 찾을 수 있었다. 해당 블로그[각주:1]의 저자를 확인해 보니 엔디비아의 spark 엔지이어였다. event log의 분석 내용을 보고 싶은 분들을 추천하는 글이다. 해당 글에서 필자가 가져오고 싶은 내용을 포함한 json 키를 구분할 수 있었다. spark job의 이름과 실행 시간을 포함한 "SparkListenerApplicationStart". 그리고 java 버전 및 spark 옵션들을 포함한  "SparkListenerEnvironmentUpdate" 였다. event log 예를 들면 다음과 같다.

 

[SparkListenerApplicationStart sample]

{
  "Event": "SparkListenerApplicationStart",
  "App Name": "Spark Pi",
  "App ID": "app-20210413122423-0000",
  "Timestamp": 1618341862473,
  "User": "xxxx"
}

[SparkListenerEnvironmentUpdate sample]

{
  "Event": "SparkListenerEnvironmentUpdate",
  "JVM Information": {
    "Java Home": "/xxx/xxx/xxx/envs/xxx",
    "Java Version": "11.0.9.1-internal (Oracle Corporation)",
    "Scala Version": "version 2.12.10"
  },
  "Spark Properties": {
    "spark.rapids.sql.exec.CollectLimitExec": "true",
    "spark.executor.resource.gpu.amount": "1",
    "spark.rapids.sql.concurrentGpuTasks": "1",
    ...
    }
  "Hadoop Properties": {
    "yarn.resourcemanager.amlauncher.thread-count": "50",
    "dfs.namenode.resource.check.interval": "5000",
    ...
    }
  "System Properties": {
    "java.io.tmpdir": "/tmp",
    "line.separator": "n",   
    ... 
    }
  "Classpath Entries": {
    "/home/xxx/spark/jars/curator-framework-2.7.1.jar": "System Classpath",
    "/home/xxx/spark/jars/parquet-encoding-1.10.1.jar": "System Classpath",
    "/home/xxx/spark/jars/commons-dbcp-1.4.jar": "System Classpath",
    ...
 }
}

필자는 SparkListenerEnvironmentUpdate에서는 "JVM Information"과 "Spark Properties"에서 "spark."으로 시작하는 값만 파싱하기로 결정했다.

 

3. event log 파싱

그럼 가져오려는 event log의 키를 확인했는데 어떤 방식으로 가져올지 결정해야했다. 필자는 spark job의 수행 다음날 전날의 event log를 배치형태로 가져오는 방식으로 했다. 통계 데이터 확인 용도이므로 실시간으로 가져올 필요가 없었고, 수행 이후 통계 수치만 확인하면 됐기 때문이다. 그래서 간단한 bash 스크립트로 hdfs event log 저장 디렉토리에 전날 날짜에 생성된 로그들만 대상으로 하고, 스크립트로 바로 파싱을 했다. json 형식으로 되어 있으니 파싱은 jq를 사용했다.

 

아래 코드는 hdfs 디렉토리를 확인하는 내용은 제외하고, event log 파일을 파싱하는 로직만 넣었다.

#!/bin/bash

function get_event_body() {
  event_file=$1

  spark_properties=$(grep "SparkListenerEnvironmentUpdate" $event_file | head -n 1 | jq '."Spark Properties" | with_entries(select(.key | contains("spark.")))')

  jvm_home=$(grep "SparkListenerEnvironmentUpdate" $event_file | head -n 1 | jq '."JVM Information"."Java Home"')
  jvm_version=$(grep "SparkListenerEnvironmentUpdate" $event_file | head -n 1 | jq '."JVM Information"."Java Version"')
  scala_version=$(grep "SparkListenerEnvironmentUpdate" $event_file | head -n 1 | jq '."JVM Information"."Scala Version"')

  app_name=$(grep "SparkListenerApplicationStart" $event_file | head -n 1 | jq '."App Name"')
  app_timestamp=$(grep "SparkListenerApplicationStart" $event_file | head -n 1 | jq '."Timestamp"')
  unix_timestamp=${app_timestamp:0:10}

  log_time=$(date -r $unix_timestamp +'%Y-%m-%dT%H:%M:%S')

  spark_version=$(grep "SparkListenerLogStart" $event_file | head -n 1 | jq '."Spark Version"')

  result=$(echo "$spark_properties" | jq --argjson jvm_home "$jvm_home" --argjson jvm_version "$jvm_version" \
   --argjson app_name "$app_name" --arg log_time $log_time --argjson spark_version $spark_version \
   --argjson scala_version "$scala_version" '. += {"java_home": $jvm_home, "java_version": $jvm_version, "scala_version":$scala_version, "app_name":$app_name, "log_time":$log_time, "spark_version":$spark_version} ')

  echo "$result"
}

body=$(get_event_body "application_log")

curl -XPOST "http://localhost:9200/spark_event/_doc/" -H'Content-type: application/json' -d "$body"
 
 

elasticsearch로 넣는 데이터도 json 형식으로 만들어서 api의 body 값으로 넣으면 된다. 그래서 spark event log의 데이터를 파싱하고 하나의 json으로 통합했다. 간략히 만든 스크립트라서 참고 용도로만 사용하면 된다. 파싱하는 부분 중 일부를 설명하기 위해 "Spark Properties" 하위에서 "spark."으로 시작하는 키만 선택해서 가져오는 코드를 살펴보자.

  • grep "SparkListenerEnvironmentUpdate" $event_file : event log 파일에서 "SparkListenerEnvironmentUpdate" 키를 찾는다.
  • jq '."Spark Properties"' : json 키 "Spark Properties"를 찾는다.
  • with_entries(select(.key | contains("spark.")))[각주:2] : 모든 키를 돌면서 "spark."를 포함한 키만 선택한다.
  spark_properties=$(grep "SparkListenerEnvironmentUpdate" $event_file | head -n 1 | jq '."Spark Properties" | with_entries(select(.key | contains("spark.")))')

 

파싱한 json 값을 하나의 json으로 합치는 부분은 jq의 "--argjson", "--arg" 옵션으로 변수의 값을 파라미터로 받아서 "+=" 할당자를 활용하여 하나의 json으로 만들었다. 마지막으로 Elasticsearch에 데이터를 넣은 부분은 간단하다. Elasticsearch index를 url에 포함하고 POST API로 body에 앞서 만든 json을 던져서 저장하게 했다. 필자는 "SparkListenerApplicationStart"의 Timestamp 키를 UTC 시간으로 변환해서 Elasticsearch에 저장했다. 저장한 데이터는 kibana or grafana에서 UTC 시간 순으로 조회할 수 있게 하기 위함이다.

 

4. 결론

spark event log는 history server에서 job 분석으로 활용하는데 도움을 많이 준다. 하지만 실행환경의 전체적인 파악을 위해서는 일부 값을 파싱해야했고, 파싱을 하기 위해 event log의 형태를 알 수 있게 되어서 좋았다. 이 글에서는 spark 옵션을 파싱하는 정도만 설명했지만, task의 세부 사항도 모두 event log에 기록되므로 보다 자세한 분석 형태가 있다면 다른 키들의 내용도 파악하여 Elasticsearch에 넣어둔다면 활용도가 더 높을 것으로 생각된다.

 

 

 
 
반응형

'Big Data > Spark' 카테고리의 다른 글

hadoop 2.6.0 버전을 위한 spark 3.x 빌드  (0) 2022.09.28
[Spark] Direct API for Kafka (직접 모델)  (2) 2021.08.12
[spark] hadoop 3 & hive 3 환경 설정  (0) 2020.05.25
[Spark] 2.4.0 - bucket pruning  (0) 2020.03.10
[Spark] Dataset  (0) 2020.01.14