나누고 싶은 개발 이야기

Data Engineer로서 기록하고 공유하고 싶은 기술들. 책과 함께 이야기합니다.

Big Data/Spark

Spark SQL

devidea 2018. 11. 20. 10:59
팀에서 spark SQL을 사용해서 사용자가 요청한 쿼리를 stream 처리하는 작업을 하고 있다. 그래서 spark SQL에 대해서 정리해 보고자 한다. 해당 블로그의 내용은 책 '스파크 2 프로그래밍'에서 대부분 참고했다.

1. RDD와의 차이점
spark의 기본 데이터 모델은 RDD 이다. 분산환경에서 메모리 기반으로 빠르고 안정적으로 동작하는 프로그램을 작성할 수 있는 장점이 있지만 아쉬운 점도 있었다. 그것은 "스키마"에 대한 표현방법이 없다는 것이다. spark 2.0 부터 DataSet으로 통합되어 "스키마" 있는 데이터 처리가 가능하다.
"스키마"라는 단어를 처음 들었을 때, 제일 먼저 데이터베이스가 생각났다. 데이터베이스가 설정된 스키마에 따라 SQL문을 던지면 결과를 주듯이, DataSet도 동일하게 SQL문을 지원한다. SQL은 ANSI-SQL과 Hive-QL 문법을 따른다. Hive-QL 문법을 따른다는 앞서 설명 처럼 hive 조회도 가능하며 join도 된다.

2. 구성요소 설명

아래 표는 spark SQL을 이해하기 위한 필수 용어들이다.
개인적으로 가장 개념이 잡히지 않았던 부분은 DataSet, DataFrame의 구분이다. DataFrame이 없어지고 DataSet으로 통합이 됐다고 하는데 DataFrame 용어로 많이 기술이 되어서 헷갈렸다. 개인적으로 정리한 개념은 DataFrame == DataSet[Row] 이다.
새로운 데이터 모델인 DataSet을 만들면서 기존의 DataFrame을 DataSet으로 흡수한 것이다. DataSet[Row]로 볼 수 있듯이 Row를 가지는데 Rows는 여러 Column들을 가진 집합이다. 데이터베이스 테이블의 한 Row처럼 구조를 가진 한 데이터 집합으로 이해했다. 그러므로 구조를 가진 데이터모델로서 DataFrame, DataSet 같은 개념을 가진다.
용어
설명
SparkSession
데이터프레임을 생성하기 위해서는 SparkSession을 이용해야 한다.
인스턴스 생성을 위한 build() 메서드 제공. - 기존 인스턴스를 재사용하거나 새로운 인스턴스 생성.
spark shell이 자동으로 spark라는 이름으로 SparkSession 인스턴스를 생성하므로 별도의 생성과정을 거치지 않고 spark라는 변수를 통해 접근할 수 있다.
DataSet
spark SQL에서 사용하는 분산 데이터 모델.
DataFrame
org.apache.spark.sql.Row 타입의 데이터로 구성된 데이터셋.
DataFrameReader
SparkSession의 read() 메서드를 통해 접근.
"jdbc", "json", "parquet" 등 다양한 유형의 데이터소스로부터 데이터프레임을 생성하는 메서드 제공.
DataFrameWriter
DataSet의 write() 메서드를 통해 접근.
데이터셋에 저장된 데이터를 파일시스템, 데이터베이스 등 다양한 저장소에 저장하는 메서드 제공.
Row, Column
데이터프레임을 구성하는 요소인 row, column 표현.
functions
데이터를 처리할 때 사용할 수 있는 각종 함수를 제공하는 오브젝트. (ex - sum, stddev)
StructType, StructField
데이터에 대한 스키마 정보를 나타내는 API.
StructType은 데이터프레임의 레코드에 대한 구조를 나타내며, 내부적으로 여러 개의 StructField를 갖는 형태로 정의.
StructType에 또 다른 StrucType을 갖는 중첩구조도 가능.
GroupedData, GroupedDataSet
groupBy() 메서드 등에 의해 group 연산을 수행할 때 사용되며, 집계와 관련된 다양한 연산을 제공.

3. 연산
책을 보면 여러 연산들을 나열하는 식으로 기술되어서 분류별로 정리하기가 헷갈리는데 크게 3가지로 나누면 될 듯 하다.
  • 기본
    • 데이터 저장 옵션 조정 및 스키마 조회
  • 액션
    • 데이터 처리를 수행하고 결과를 생성. DataSet이 아닌 다른 타입으로 변환.
  • 트랜스포메이션
    • 데이터 처리 이후 새로운 DataSet 생성.

연산들의 종류는 대표적인 몇 가지만 기술하기로 하고 자세한 사항은 spark document 중 org.apache.spark.sql.DataSet을 살펴보면 된다.


3.1. 기본 연산
  • createOrReplaceTempView(viewName: String) : Unit
    • viewName 이름으로 로컬 임시 view를 만든다.
  • explain() : Unit
    • 디버깅 목적으로 실행계획을 출력한다.

3.2. 액션 연산
  • show()
    • DataSet의 정보를 표 형태로 표시한다.
    • 아래 그림 처럼 컬럼정보와 데이터를 표 형태로 보여준다.
  • head()
    • 첫 번째 row를 가져온다.
  • take(n: Int): Array[T]
    • DataSet에서 n개의 row를 가져온다.

3.3. 트랜스포메이션 연산
  • select(col: String, cols: String*): DataFrame
    • 지정된 컬럼들을 가져온다. (SQL select와 같은 개념)
  • filter
    • SQL 표현식 또는 row 조건으로 데이터 필터링 한다.
  • agg
    • 특정 컬럼에 대해 sum(), max()와 같은 집합 연산을 수행한다.
  • intersect
    • 두 개의 DataSet에 모두 속하는 row 로만 구성된 DataSet를 생성한다.
    • SQL의 INTERSECT와 동일한 기능.

4. DataSet
  • 데이터프레임 Dataset[Row]
  • 필요한 이유 : 데이터프레임과 RDD 간의 데이터 타입을 다루는 방식의 차이 때문이다.
    • RDD는 내부 데이터의 타입을 명확하게 정의해서 사용하도록 강제돼 있는 데 반해
    • 데이터프레임의 경우 내부 데이터가 Row의 집합이라는 것만 보장되어 있을 뿐 실제 데이터 타입에 대한 정보는 외부에 노출돼 있지 않기 때문이다.
  • 데이터셋 생성
    • 파일로부터 생성
      • val df1 - spark.read.textFile("<spark_home_dir>/examples/src/main/resources/people.txt")
    • 자바 객체를 이용해 생성
      • 데이터셋을 생성할 때는 인코더(org.apache.spark.sql.Encoder) 정보를 반드시 설정해야 한다는 점에서 차이가 있다.
      • 다른 직렬화 방식과 다르게 스파크SQL 내부에서 사용하는 바이너리 포맷을 사용함으로써 연산 성능과 메모리 사용 효율을 높이는 장점이 있다.
      • Encoders 인코더 생성을 위한 팩토리 클래스처럼 사용할 수 있다.
    • RDD 및 데이터프레임을 이용해 생성
      • RDD
        • val rdd = sc.parallelize(List(1,2,3))
        • val ds3 = spark.createDataset(rdd)
        • val ds4 = rdd.toDS()
      • 데이터프레임
        • val ds = List(1,2,3).toDF.as[Int]
    • range()로 생성
      • spark.range(0, 10, 3)

5. DataFrameWriter가 제공하는 주요 기능.
  • 데이터 저장 경로 설정 및 저장
    • df.write.save(<path_to_save>)
  • 데이터 포맷의 지정
    • df.write.format("json").save(<path_to_save>)
    • df.write.json(<path>)
  • 파티션의 설정
    • 특정 컬럼값을 기준으로 파티션을 설정할 수 있다.
    • df.write.format("parquet").partitionBy("date").save(<save_to_path>)
  • 저장 옵션의 설정
    • df.write.options(Map("url" -> "jdbc:...", driver="...")).save(<save_to_path>)
  • 저장 모드 설정
    • df.write.mode(SaveMode.Append)
    • Append(추가), Overwrite(덮어쓰기), ErrorIfExists(이미 다른 파일이 존재할 경우 오류 발생)
  • 테이블 형태로 저장 및 하이브 연동
    • saveAsTable : 하이브 테이블 형식으로 저장할 수 있다.


참고문서

반응형

'Big Data > Spark' 카테고리의 다른 글

[Spark] Direct API for Kafka (직접 모델)  (2) 2021.08.12
[spark] hadoop 3 & hive 3 환경 설정  (0) 2020.05.25
[Spark] 2.4.0 - bucket pruning  (0) 2020.03.10
[Spark] Dataset  (0) 2020.01.14
[Spark] Accumulators  (0) 2019.05.03