- 스파크에서 기본적으로 지원하는 데이터 소스들에 대하여 소개한다.
- 스파크의 핵심 데이터 소스로는 다음과 같다.
- CSV, JSON, Parquet, ORC, JDBC/ODBC, Text
- 스파크 커뮤니티에서 만든 수많은 데이터 소스가 존재하는데 그 중 일부는 다음과 같다.
- Cassandra, HBase, MongoDB, AWS Redshift, XML 등
- 기본적으로 데이터 소스를 이용해 데이터를 읽고 쓰는 방법을 터득한다.
- Third party 데이터 소스와 스파크를 연동할 때 무엇을 고려할지 배운다.
- 실습 환경은 다음과 같다.
- DataBricks Community Edition
- 스파크에서 기본적으로 데이터를 읽을 때는 DataFrameReader 객체를 사용한다.
- SparkSession의 read 메서드를 호출하면 DataFrameReader 객체를 생성한다.
spark.read .format("csv") .option("mode", "FAILFAST") .option("inferSchema", "true") .option("path", "path/to/file") .schema(someSchema) .load()
- DataFrameReader 객체를 생성하게 되면 다음과 같은 값을 지정해야 한다.
- 포맷 (optional)
- 스키마 (optional)
- 읽기 모드
- 옵션 (optional)
- 포맷은 기본적으로 Parquet 포맷을 사용한다.
- option 메서드를 통해 데이터를 읽는 방법에 대하여 파라미터를 키-값 쌍으로 설정할 수 있다.
- 데이터 소스를 읽을 때 스파크에 정의된 형식에 맞지 않는 데이터를 만났을 때 동작 방식을 지정하는 읽기 모드는 다음과 같다.
- permissive (default) : 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록한다.
- dropMalformed : 형식에 맞지 않는 레코드가 포함된 로우를 제거한다.
- failFast : 형식에 맞지 않는 레코드를 만나면 즉시 종료한다.
- 예시
id,name,doj,salary 1,a,2020/08/12,1000 2,b,2020/08/13,1000 3,c,2020/08/13,1000 4,d,2020/08/15,1000 5,e,2020/08/16,1k
spark.read .schema("id Integer, name String, dog Date, salary Integer, _corrupt_record String") .format("csv") .option("mode", "PERMISSIVE") .option("inferSchema", "true") .option("header", "true") .load("dbfs:/FileStore/data/sample.txt") .show()
+---+----+----+------+-------------------+ | id|name| dog|salary| _corrupt_record| +---+----+----+------+-------------------+ | 1| a|null| 1000|1,a,2020/08/12,1000| | 2| b|null| 1000|2,b,2020/08/13,1000| | 3| c|null| 1000|3,c,2020/08/13,1000| | 4| d|null| 1000|4,d,2020/08/15,1000| | 5| e|null| null| 5,e,2020/08/16,1k| +---+----+----+------+-------------------+
- 스파크에서 기본적으로 데이터를 읽을 때는 DataFrameWriter 객체를 사용한다.
- SparkSession의 write 메서드를 호출하면 DataFrameWriter 객체를 생성한다.
spark.write .format("csv") .option("mode", "OVERWRITE") .option("dataFromat", "yyyy-MM-dd") .option("path", "path/to/file") .save()
- DataFrameWriter 객체를 생성하게 되면 다음과 같은 값을 지정해야 한다.
- 포맷 (optional)
- 옵션 (optional)
- 저장 모드
- 포맷은 기본적으로 Parquet 포맷을 사용한다.
- option 메서드를 통해 데이터를 쓰는 방법에 대하여 파라미터를 키-값 쌍으로 설정할 수 있다.
- 데이터 소스를 쓸 때 동일한 파일이 발견되었을 때의 동작 방식을 지정하는 쓰기 모드는 다음과 같다.
- append : 해당 경로에 이미 존재하는 파일 목록에 결과 파일을 추가한다.
- overwrite : 이미 존재하는 모든 데이터를 완전히 덮어쓴다.
- errorIfExists (default) : 해당 경로에 데이터나 파일이 존재하는 경우 오류를 발생시키면서 쓰기 작업이 실패한다.
- ignore : 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리도 하지 않는다.
- CSV(commna-separated values)는 콤마(,)로 구분된 값을 의미한다.
- 각 줄이 단일 레코드가 되며 각 필드를 콤마로 구분하는 일반적인 텍스트 파일 포맷이다.
- 까다로운 파일 포맷으로 이는 어떤 내용이 들어있는지, 어떤 구조로 되어있는지 등 다양한 전제를 만들어낼 수 없기 때문이다.
- 파일 내용에 콤마(,)가 들어있다거나 비표준적인 방식으로 null이 기록되는 경우
- 이러한 문제를 해결하기 위해 다양한 옵션들을 소개한다.
read/write | Key | Value | Default | 설명 |
---|---|---|---|---|
all | sep | 단일 문자 | , | 각 필드와 값을 구분하는데 사용되는 단일 문자 |
all | header | true/false | false | 첫 번째 줄이 컬럼명인지 나타내는 불리언값 |
read | escape | 모든 문자열 | \ | 스파크에서 이스케이프 처리할 문자 |
read | inferSchema | true/false | false | 스파크가 파일을 읽을 때 컬럼의 데이터 타입을 추론할 지 정의 |
read | ignoreLeadingWhiteSpace | true/false | false | 값을 읽을 때 값의 선행 공백을 무시할지 정의 |
read | ignoreTrailingWhiteSpace | true/false | false | 값을 읽을 때 값의 후행 공백을 무시할지 정의 |
all | nullValue | 모든 문자열 | "" | 파일에서 null 값을 나타내는 문자 |
all | nanValue | 모든 문자열 | NaN | CSV 파일에서 NaN이 값없음을 나타내는 문자를 선언 |
all | positiveInf | 모든 문자열 또는 문자 | Inf | 양의 무한 값을 나타내는 문자(열)을 선언 |
all | negativeInf | 모든 문자열 또는 문자 | -Inf | 음의 무한 값을 나타내는 문자(열)을 선언 |
all | compression/codec | none, uncompressed, bzip2, defalte, gzip, lz4, snappy | none | 스파크가 파일을 읽고 쓸 때 사용하는 압축 코덱을 정의 |
all | dateFormat | 자바의 SimpleDataFormat을 따르는 모든 문자열 또는 문자 | yyyy-MM-dd | 날짜 데이터 타입인 모든 필드에서 사용할 날짜 형식 |
all | timestampFormat | 자바의 SimpleDataFormat을 따르는 모든 문자열 또는 문자 | yyyy-MM-dd'T'HH:mm:ss.SSSZZ | 타임스탬프 데이터 타입인 모든 필드에서 사용할 날짜 형식 |
read | maxColumns | 모든 정수 | 20480 | 파일을 구성하는 최대 컬럼 수를 선언 |
read | maxCharsPerColumn | 모든 정수 | 1000000 | 컬럼의 문자 최대 길이를 선언 |
read | escapeQuotes | true/false | true | 스파크가 파일의 라인에 포함된 인용부호를 이스케이프 할 지 선언 |
read | maxMalformedLogPerPartition | 모든 정수 | 10 | 스파크가 각 파티션 별로 비정상적인 레코드를 발견했을 때 기록할 최대 수. 해당 숫자를 초과하는 비 정상적인 레코드는 모두 무시된다. |
write | quoteAll | true/false | false | 인용부호 문자가 있는 값을 이스케이프 처리하지 않고 전체 값을 인용 부호로 묶을지 여부 |
read | multiLine | true/false | false | 하나의 논리적 레코드가 여러 줄로 이어진 CSV 파일 읽기를 허용할지 여부 |
- 읽기 예시
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType} val myManualSchema = new StructType(Array( new StructField("DEST_COUNTRY_NAME", StringType, true), new StructField("ORIGIN_COUNTRY_NAME", StringType, true), new StructField("count", LongType, true))) val csv_df = spark.read.format("csv") .option("header", "true") .option("mode", "FAILFAST") .schema(myManualSchema) .load("dbfs:/FileStore/data/2010_summary.csv") csv_df.show()
- 쓰기 예시
csv_df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/2010_summary.csv")
- 스파크는 지연 연산 특성이 있으므로 데이터 프레임을 정의하는 시점이 아닌 잡 실행 시점에만 오류가 발생한다.
- JSON(JavaScript Object Notion)은 기본적으로 줄로 구분된(line-delimidated) JSON을 기본적으로 사용한다.
- 또한, multiLine 옵션을 사용하여 줄로 구분된 방식과 여러 줄로 구분된 방식을 선택적으로 사용할 수 있다.
- 줄로 구분된 JSON의 포맷이 안정적인 포맷이므로 이 방식을 사용하는 것을 추천한다.
- JSON 객체를 다룰 때 사용하는 옵션은 다음과 같다.
read/write | Key | Value | Default | 설명 |
---|---|---|---|---|
all | compression/codec | none, uncompressed, bzip2, defalte, gzip, lz4, snappy | none | 스파크가 파일을 읽고 쓸 때 사용하는 압축 코덱을 정의 |
all | dateFormat | 자바의 SimpleDataFormat을 따르는 모든 문자열 또는 문자 | yyyy-MM-dd | 날짜 데이터 타입인 모든 필드에서 사용할 날짜 형식 |
all | timestampFormat | 자바의 SimpleDataFormat을 따르는 모든 문자열 또는 문자 | yyyy-MM-dd'T'HH:mm:ss.SSSZZ | 타임스탬프 데이터 타입인 모든 필드에서 사용할 날짜 형식 |
read | primitiveAsString | ture/false | false | 모든 primitive 값을 문자열로 추정할지 정의 |
read | allowComments | true/false | false | JSON 레코드에서 자바나 C++ 스타일로 된 코멘트를 무시할지 정의 |
read | alloUnquotedFieldNames | true/false | false | 인용부호로 감싸여 있지 않은 JSON 필드명을 허용할지 정의 |
read | allowSingleQuotes | true/false | true | 인용부호로 큰따옴표(") 대신 작은따옴표(')를 허용할지 정의 |
read | allowNumericLeadingZeros | true/false | false | 숫자 앞에 0을 허용할지 정의 |
read | alloBackslashEscapingAnyCharacter | true/false | false | 백슬리시 인용부호 메커니즘을 사용한 인용부호를 허용할지 정의 |
read | ColumnNameOfCorruptRecord | 모든 문자열 | spark.sql.columnNameOfCorruptRecord 속성의 설정값 | permissive 모드에서 생성된 비정상 문자열을 가진 새로운 필드명을 변경할 수 있다. 해당 값을 설정하면 spark.sql.columnNameOfCorruptRecord 설정값 대신 적용 |
read | multiLine | true/false | false | 줄로 구분되지 않은 JSON 파일의 읽기를 허용할지 정의 |
- 읽기 예시
spark.read.format("json") .option("mode", "FAILFAST") .schema(myManualSchema) .load("dbfs:/FileStore/data/flight-data/json/2010_summary.json") .show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+
- 쓰기 예시
csv_df.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")
%fs ls /tmp/my-json-file.json/
path,name,size,modificationTime dbfs:/tmp/my-json-file.json/_SUCCESS,_SUCCESS,0,1690830377000 dbfs:/tmp/my-json-file.json/_committed_5178524152520899269,_committed_5178524152520899269,112,1690830377000 dbfs:/tmp/my-json-file.json/_started_5178524152520899269,_started_5178524152520899269,0,1690830376000 dbfs:/tmp/my-json-file.json/part-00000-tid-5178524152520899269-2cb0b929-046b-45a8-9157-077740e18ab0-4-1-c000.json,part-00000-tid-5178524152520899269-2cb0b929-046b-45a8-9157-077740e18ab0-4-1-c000.json,21353,1690830377000
%fs head /tmp/my-json-file.json/part-00000-tid-5178524152520899269-2cb0b929-046b-45a8-9157-077740e18ab0-4-1-c000.json
{"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Romania","count":1} {"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"Ireland","count":264} {"DEST_COUNTRY_NAME":"United States","ORIGIN_COUNTRY_NAME":"India","count":69} {"DEST_COUNTRY_NAME":"Egypt","ORIGIN_COUNTRY_NAME":"United States","count":24} ... (skip)
- Parquet는 다양한 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 컬럼 기반의 데이터 저장 방식이다.
- 저장소 공간을 절약할 수 있고 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며 컬럼 기반의 압축 기능을 제공한다.
- 특히, 아파치 스파크와 호환이 잘되기 때문에 스파크의 기본 파일 포맷으로 사용된다.
- Parquet는 JSON, CSV보다 훨씬 효율적으로 동작하므로 장기 저장용 데이터는 Parquet 포맷으로 저장하는 것을 추천한다.
- 또한, 복합 데이터 타입(struct, array, map)을 지원한다.
- Parquet 데이터를 다룰 때 사용하는 옵션은 다음과 같다.
read/write | Key | Value | Default | 설명 |
---|---|---|---|---|
all | compression/codec | none, uncompressed, bzip2, defalte, gzip, lz4, snappy | none | 스파크가 파일을 읽고 쓸 때 사용하는 압축 코덱을 정의 |
read | mergeSchema | true/false | spark.sql.parquet.mergeSchema 속성의 설정값 | 동일한 테이블이나 폴더에 신규 추가된 Parquet 파일에 컬럼을 점진적으로 추가할 수 있다. 이러한 기능을 활성화하거나 비활성화하기 위해 해당 옵션을 사용한다. |
- Parquet 경우에는 데이터를 저장할 때 자체 스키마를 사용해 데이터를 저장하기 때문에 옵션이 거의 없다.
- DataFrame을 표현하기 위해 정확한 스키마가 필요한 경우에만 스키마를 설정한다. (schema-on-read 방식을 지원하기 때문에 사실 거의 필요는 없음)
- Parquet 파일은 스키마가 파일 자체에 내장되어 있으므로 스키마 추정이 필요 없다.
- 읽기 예시
spark.read.format("parquet") .load("dbfs:/FileStore/data/flight-data/parquet/2010-summary.parquet") .show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+ only showing top 5 rows
- 쓰기 예시
csv_df.write.format("parquet").mode("overwrite").save("/tmp/my-parquet-file.parquet")
%fs ls FileStore/data/flight-data/parquet/2010-summary.parquet/
path,name,size,modificationTime dbfs:/tmp/my-parquet-file.parquet/_SUCCESS,_SUCCESS,0,1690831196000 dbfs:/tmp/my-parquet-file.parquet/_committed_7876934312199867966,_committed_7876934312199867966,122,1690831196000 dbfs:/tmp/my-parquet-file.parquet/_started_7876934312199867966,_started_7876934312199867966,0,1690831193000 dbfs:/tmp/my-parquet-file.parquet/part-00000-tid-7876934312199867966-8f4c074c-aacf-4f26-906f-1e24c267c467-5-1-c000.snappy.parquet,part-00000-tid-7876934312199867966-8f4c074c-aacf-4f26-906f-1e24c267c467-5-1-c000.snappy.parquet,5488,1690831196000
- ORC는 하둡 워크로드를 위해 설계된 자기 기술적(self-describing)이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷이다.
- 대규모 스트리밍 읽기에 최적화되어 있을 뿐만 아니라 필요한 로우를 신속하게 찾아낼 수 있는 기능이 통합되어 있다.
- 스파크에서는 ORC 파일 포맷을 효율적으로 사용할 수 있으므로 별도의 옵션 지정 없이 데이터를 읽을 수 있다.
- Parquet 파일은 스파크에 최적화 된 반면 ORC는 Hive에 최적화되어 있다.
- 읽기 예시
spark.read.format("orc") .load("dbfs:/FileStore/data/flight-data/orc/2010-summary.orc") .show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+
- 쓰기 예시
csv_df.write.format("orc").mode("overwrite").save("/tmp/my-orc-file.orc")
%fs ls /tmp/my-orc-file.orc/
path,name,size,modificationTime dbfs:/tmp/my-orc-file.orc/_SUCCESS,_SUCCESS,0,1690832777000 dbfs:/tmp/my-orc-file.orc/_committed_2739091522261783320,_committed_2739091522261783320,118,1690832777000 dbfs:/tmp/my-orc-file.orc/_started_2739091522261783320,_started_2739091522261783320,0,1690832776000 dbfs:/tmp/my-orc-file.orc/part-00000-tid-2739091522261783320-500ff9ea-c56d-4303-af08-696279992748-9-1-c000.snappy.orc,part-00000-tid-2739091522261783320-500ff9ea-c56d-4303-af08-696279992748-9-1-c000.snappy.orc,3929,1690832776000
- 사용자는 SQL를 지원하는 다양한 시스템에 SQL 데이터 소스를 연결할 수 있다.
- 예를 들어 MySQL, PostgreSQL, Oracle 데이터베이스에 접속할 수 있다.
- 데이터베이스는 인증 정보나 접속과 관련된 옵션이 추가적으로 필요하고 실제로 데이터베이스 시스템에 접속이 가능한지 네트워크 상태를 확인해야 한다.
- 해당 단원에서는 SQLite 실행을 위한 참고용 샘플을 대상으로 실습을 진행한다.
- 데이터베이스의 데이터를 읽고 쓰기 위해서는 스파크 classpath에 데이터베이스의 JDBC 드라이버를 추가하고 적절한 JDBC Jar 파일을 제공해야 한다.
- 예시 : PostgreSQL
%sh ./bin/spark-shell \ --driver-class-path postgresql-9.4.1207.jar \ --jars postgresql-9.4.1207.jar
- JDBC 데이터 소스를 다룰 때 옵션은 다음과 같다.
속성명 | 의미 |
---|---|
url | 접속을 위한 JDBC URL. 소스 시스템에 특화된 설정은 URL에 지정할 수 있다. ex) jdbc:postgresql://localhost/test?user=fred&password=secret |
dbtable | 읽을 JDBC 테이블을 설정한다. SQL 쿼리의 FROM 절에 유효한 모든 것을 사용할 수 있다. 예를 들어 전체 테이블 대신 괄호 안에 서브쿼리를 사용할 수도 있다. |
driver | 지정한 URL에 접속할 때 사용할 JDBC 드라이버 클래스 명을 지정한다. |
partitionColumn,lowerBound, upperBound | 세 가지 옵션은 항상 같이 지정해야 하며, numPartitions 또한 반드시 지정해야 한다. 이러한 속성은 다수의 워커에서 병렬로 테이블을 나눠 읽는 방법을 정의한다. partitionColumn은 반드시 해당 테이블의 수치형 컬럼이어야 한다. lowerBound와 upperBound는 테이블의 로우를 필터링하는데 사용되는 것이 아니라 각 파티션의 범위를 결정하는데 사용된다. 따라서 테이블의 모든 로우는 분할되어 반환된다. 해당 옵션은 읽기에만 적용된다. |
numPartitions | 테이블의 데이터를 병렬로 읽거나 쓰기 작업에 사용할 수 있는 최대 파티션 수를 결정한다. 해당 속성은 최대 동시 JDBC 연결 수를 결정한다. 쓰기에 사용되는 파티션 수가 이 값을 초과하는 경우 쓰기 연산 전에 coalesce(numPartitions)를 실행하여 파티션 수를 해당 값에 맞게 줄이게 된다. |
fetchSize | 한 번에 얼마나 많은 로우를 가져올지 결정하는 JDBC의 fetch size를 설정한다. 해당 옵션은 기본적으로 fetch size가 작게 설정된 JDBC 드라이버의 성능을 올리는데 도움이 된다. (Oracle default:10) 해당 옵션은 읽기에만 적용된다. |
batchSize | 한 번에 얼마나 많은 로우를 저장할지 결정하는 JDBC의 batch size를 설정한다. 해당 옵션은 JDBC 드라이버의 성능을 향상시킬 수 있다. 해당 옵션은 쓰기에만 적용되며 기본값은 1000이다. |
isolationLevel | 현재 연결에 적용되는 트랜잭션 격리 수준을 정의한다. 해당 옵션은 JDBC Connection 객체에서 정의하는 표준 트랜잭션 격리 수준에 해당하는 None, READ_COMMITTED, READ_UNCOMMITTED, REPETABLE_READ, SERIALIZABLE 중에 하나가 될 수 있다. (자세한 내용은 java.sql.Connection 문서를 참고) |
truncate | JDBC writer 관련 옵션이다. SaveMode.Overwrite가 활성화 되면 스파크는 기존 테이블을 삭제하거나 재생성하는 대신 데이터베이스의 truncate 명령을 실행한다. 이런 동작 방식이 더 효율적일 수 있으며 인덱스 같은 테이블 메타데이터가 제거되는 현상을 방지할 수 있다. 하지만 신규 데이터가 현재 스키마와 다른 경우와 같이 일부 경우에는 정상적으로 동작하지 않을 수 있다. 해당 옵션의 기본값은 false이며 쓰기에만 적용된다. |
createTableOptions | JDBC write 관련 옵션이다. 해당 옵션을 지정하면 테이블 생성 시 특정 테이블의 데이터베이스와 파티션 옵션을 설정할 수 있다. ex) CREATE TABLE t (name string) ENGINE=InnoDB 해당 옵션은 쓰기에만 적용된다. |
createTableColumnTypes | 테이블을 생성할 때 기본값 대신 데이터베이스 컬럼 데이터 타입을 정의한다. 데이터 타입 정보는 반드시 CREATE TABLE 구문에서 사용하는 컬럼 정의 구문과 동일한 형식으로 지정해야 한다. 지정된 타입은 유효한 스파크 SQL 타입이어야 한다. 해당 옵션은 쓰기에만 적용된다. |
- 읽기 예시
val driver = "org.sqlite.JDBC" val path = "tmp/my_sqlite.db" val url = s"jdbc:sqlite:/${path}" val tablename = "flight_info"
import java.sql.DriverManager val connection = DriverManager.getConnection(url) connection.isClosed() connection.close()
val dbDataFrame = spark.read.format("jdbc") .option("url", url) .option("dbtable", tablename) .option("driver", driver) .load() dbDataFrame.show(5)
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| |Equatorial Guinea| United States| 1| +-----------------+-------------------+-----+
- 스파크에는 데이터베이스 테이블에서 스키마 정보를 읽어 테이블에 존재하는 컬럼의 데이터 타입을 스파크의 데이터 타입으로 변환한다.
- 따라서 생성된 데이터 프레임은 스파크의 데이터 프레임 처럼 자유롭게 사용할 수 있다.
- 쓰기 예시
val newPath = "jdbc:sqlite://tmp/my-sqlite.db" csv_df.write.mode("overwrite").jdbc(newPath, tablename, props)
spark.read.jdbc(newPath, tablename, props).count() // 255
res131: Long = 255
- 스파크는 DataFrame을 만들기 전에 데이터베이스 자체에서 데이터를 필터링하도록 만들 수 있다.
- 예를 들어, 아래 예제에서 사용한 쿼리의 실행 계획을 한 번 보면 테이블의 여러 컬럼 중 관련이 있는 컬럼만 선택한다는 것을 알 수 있다.
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[DEST_COUNTRY_NAME#733], functions=[]) +- Exchange hashpartitioning(DEST_COUNTRY_NAME#733, 200), ENSURE_REQUIREMENTS, [plan_id=404] +- HashAggregate(keys=[DEST_COUNTRY_NAME#733], functions=[]) +- Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#733] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
- 스파크는 특정 유형의 쿼리를 더 나은 방식으로 처리할 수 있다.
- Dataframe에 filter를 명시하면 스파크는 해당 필터에 대한 처리를 데이터베이스로 위임(push down) 한다.
dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain
== Physical Plan == *(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#733,ORIGIN_COUNTRY_NAME#734,count#735] PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>
- 스파크의 모든 함수는 사용하는 SQL 데이터베이스에 맞게 변환하지 못하는 경우도 있다.
- 이러한 경우에는 전체 SQL 쿼리를 데이터베이스에 전달하여 DataFrame으로 전달받을 수 있다.
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info""" val dbDataFrame = spark.read.format("jdbc") .option("url", url).option("dbtable", pushdownQuery).option("driver", driver) .load() dbDataFrame.show(5)
+-----------------+ |DEST_COUNTRY_NAME| +-----------------+ | United States| | Egypt| |Equatorial Guinea| | Costa Rica| | Senegal| +-----------------+
dbDataFrame.explain
== Physical Plan == *(1) Scan JDBCRelation((SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#779] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
- 스파크는 파일 크기, 파일 유형 그리고 압축 방식에 따라 여러 파일을 읽어 하나의 파티션으로 만들거나 여러 파티션을 하나의 파일로 만드는 알고리즘을 가지고 잇다.
- 파일이 가진 유연성은 SQL 데이터베이스에도 존재하지만 몇 가지 수동적인 설정이 필요하다.
- 이전 옵션 목록 중 numPartitions 옵션을 사용하여 읽기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수를 설정할 수 있다.
val dbDataFrame = spark.read.format("jdbc") .option("url", url) .option("dbtable", tablename) .option("driver", driver) .option("numPartitions", 10) .load()
- numPartitions 값이 너무 높으면 동시에 많은 쿼리가 발생하기 때문에 성능이 저하될 수 있다. 50보다 높게 설정하지 않도록 주의해야 한다. (https://docs.databricks.com/external-data/jdbc.html#language-scala)
- partitionColumn 옵션을 사용 시 원본 데이터베이스에서 인덱스가 존재하는 열을 선택하면 쿼리 속도를 더 높일 수 있다.
val employees_table = spark.read .format("jdbc") .option("url", "<jdbc-url>") .option("dbtable", "<table-name>") .option("user", "<username>") .option("password", "<password>") // a column that can be used that has a uniformly distributed range of values that can be used for parallelization .option("partitionColumn", "<partition-key>") // lowest value to pull data for with the partitionColumn .option("lowerBound", "<min-value>") // max value to pull data for with the partitionColumn .option("upperBound", "<max-value>") // number of partitions to distribute the data into. Do not set this very large (~hundreds) .option("numPartitions", 8) .load()
- 위와 같은 설정을 활용하여 데이터베이스에서 일어날 수 있는 과도한 쓰기나 읽기를 막을 수 있다.
- 명시적으로 조건절을 SQL 데이터베이스에 위임할 수 있다.
- 이러한 최적화 방법은 조건절을 명시함으로써 특정 파티션에 특정 데이터의 물리적 위치를 제어할 수 있다.
- 전체 데이터 중 Anguilla와 Sweden 두 국가의 데이터만 필요하다고 가정해보자.
- 두 국가에 대한 필터를 데이터베이스에 위임(push down)하여 처리할 수 있지만 스파크 자체 파티션에 결과 데이터를 저장함으로써 더 많은 처리를 할 수도 있다.
- 데이터 소스 생성 시 조건절 목록을 정의하여 스파크 자체 파티션에 결과 데이터를 저장할 수 있다.
val props = new java.util.Properties props.setProperty("driver", "org.sqlite.JDBC") val predicates = Array( "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'", "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'") spark.read.jdbc(url, tablename, predicates, props).show() spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions // 2
+-----------------+-------------------+-----+ |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +-----------------+-------------------+-----+ | Sweden| United States| 65| | United States| Sweden| 73| | Anguilla| United States| 21| | United States| Anguilla| 20| +-----------------+-------------------+-----+
- 연관성 없는 조건절을 정의하면 중복 로우가 많이 발생할 수 있다.
val props = new java.util.Properties props.setProperty("driver", "org.sqlite.JDBC") val predicates = Array( "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'", "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'") spark.read.jdbc(url, tablename, predicates, props).show() spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions // 2 spark.read.jdbc(url, tablename, predicates, props).count() // 510
+--------------------+-------------------+-----+ | DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| +--------------------+-------------------+-----+ | United States| Romania| 1| | United States| Ireland| 264| | United States| India| 69| | Egypt| United States| 24| | Equatorial Guinea| United States| 1| | United States| Singapore| 25| | United States| Grenada| 54| | Costa Rica| United States| 477| | Senegal| United States| 29| | United States| Marshall Islands| 44| | Guyana| United States| 17| | United States| Sint Maarten| 53| | Malta| United States| 1| | Bolivia| United States| 46| | Anguilla| United States| 21| |Turks and Caicos ...| United States| 136| | United States| Afghanistan| 2| |Saint Vincent and...| United States| 1| | Italy| United States| 390| | United States| Russia| 156| +--------------------+-------------------+-----+ only showing top 20 rows
- 조건절을 기반으로 파티션을 분할할 수 있다.
- 수치형 컬럼을 기준으로 분할하며 lowerBound, uppderBound를 활용하여 파티션 내에 최소값과 최대값을 정의한다.
- 해당 범위 밖에 모든 값은 첫 번째 또는 마지막 파티션에 속한다. (서로 다른 파티션에 속한다?)
- 전체 파티션 수를 설정하게 되면 설정한 값만큼 병렬처리를 하게 된다.
- 즉, 스파크는 데이터베이스에 numPartitions 만큼 task를 생성하여 병렬로 쿼리를 실행하며 이 때, 각 쿼리에는 상한값(lowerBound)와 하한값(upperBound)로 정해진 상태에서 실행된다.
val colName = "count" val lowerBound = 0L val upperBound = 348113L // this is the max count in our database val numPartitions = 10
spark.read.jdbc(url,tablename,colName,lowerBound,upperBound,numPartitions,props).count() // 255
select * from tablename where colName between 0 and 348113; select * from tablename where colName between 348114 and 696227; ...
- 일반 텍스트(plain-text) 파일도 읽을 수 있으며 파일의 각 라인은 DataFrame의 레코드에 해당된다.
- 아파치 로그 파일을 구조화 된 포맷으로 파싱하거나 자연어 처리를 위해 일반 텍스트를 파싱하는 경우가 있다.
- 텍스트 파일은 기본 데이터 타입의 유연성을 활용할 수 있으므로 Dataset API에서 사용하기에 아주 적절한 포맷이다.
- 읽기 예시
spark.read.textFile("dbfs:/FileStore/data/2010_summary.csv") .selectExpr("split(value, ',') as rows").show()
+----------------------------------------------------+ |rows | +----------------------------------------------------+ |[DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count] | |[United States, Romania, 1] | |[United States, Ireland, 264] | |[United States, India, 69] | |[Egypt, United States, 24] | |[Equatorial Guinea, United States, 1] | |[United States, Singapore, 25] | |[United States, Grenada, 54] | |[Costa Rica, United States, 477] | |[Senegal, United States, 29] | |[United States, Marshall Islands, 44] | |[Guyana, United States, 17] | |[United States, Sint Maarten, 53] | |[Malta, United States, 1] | |[Bolivia, United States, 46] | |[Anguilla, United States, 21] | |[Turks and Caicos Islands, United States, 136] | |[United States, Afghanistan, 2] | |[Saint Vincent and the Grenadines, United States, 1]| |[Italy, United States, 390] | +----------------------------------------------------+ only showing top 20 rows
- 쓰기 예시
csv_df.select("DEST_COUNTRY_NAME").write.text("file:/tmp/simple-text-file.txt")
%sh ls -ltr /tmp/simple-text-file.txt
total 8 -rw-r--r-- 1 root root 0 Jul 31 22:58 _started_7371801202632364130 -rw-r--r-- 1 root root 3124 Jul 31 22:58 part-00000-tid-7371801202632364130-3a365c58-4f2a-4648-bb7d-9ac9f304457f-304-1-c000.txt -rw-r--r-- 1 root root 113 Jul 31 22:58 _committed_7371801202632364130 -rw-r--r-- 1 root root 0 Jul 31 22:58 _SUCCESS
- 텍스트 파일을 쓸 때는 문자열 컬럼 하나만 존재해야 한다. 그렇지 않으면 에러가 발생한다.
csv_df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").write.text("file:/tmp/simple-text-file2.txt")
AnalysisException: Text data source supports only a single column, and you have 2 columns.
- 파티셔닝 작업을 수행하면 더 많은 컬럼을 저장할 수 있다. 하지만 모든 파일에 컬럼을 추가하는 것이 아니라 텍스트 파일이 저장되는 디렉터리에 폴더별로 컬럼을 저장한다.
csv_df.limit(10).select("DEST_COUNTRY_NAME", "count").write.partitionBy("count").text("file:/tmp/five-csv-files2.csv")
%sh ls -ltr /tmp/five-csv-files2.csv
total 36 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=29 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=44 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=1 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=477 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=264 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=25 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=69 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=54 drwxr-xr-x 2 root root 4096 Jul 31 22:57 count=24 -rw-r--r-- 1 root root 0 Jul 31 22:57 _SUCCESS
- 쓰기 작업 전에 파티션 수를 조절함으로써 병렬로 처리할 파일 수를 제어할 수 있다.
- 또한 버켓팅과 파티셔닝을 조절함으로써 데이터의 저장 구조를 제어할 수 있다.
- 파티셔닝은 데이터를 디렉토리 별로 나누어 저장하는 방식이다.
- 버켓팅은 데이터를 파일 별로 나누어 저장하는 방식이다.
- 모두 데이터를 읽을 때 스캔해야 하는 데이터의 양을 줄이는 최적화 방법으로 두 가지 도구를 함께 써도 되고, 따로 써도 된다.
- 특정 파일 포맷은 기본적으로 분할을 지원한다.
- 따라서 스파크에서 전체 파일이 아닌 쿼리에 필요한 부분만 읽을 수 있으므로 성능 향상에 도움이 된다.
- 모든 압축 방식이 분할 압축을 지원하지 않기 때문에 데이터 저장 방식에 따라 스파크 잡의 동작 성능에 영향을 끼칠 수 있다.
- 데이터를 저장할 때는 parquet 포맷과 gzip 압축 방식을 추천한다.
- 여러 executor가 같은 파일을 동시에 읽을 수는 없지만 여러 파일을 동시에 읽을 수는 있다.
- 다수의 파일이 존재하는 폴더를 읽을 때 폴더의 개별 파일은 DataFrame의 파티션에 해당된다.
- 따라서 사용 가능한 익스큐터를 이용해 병렬로 파일을 읽는다.
- 파일이나 데이터 수는 데이터를 쓰는 시점에 DataFrame이 가진 파티션 수에 따라 달라질 수 있다.
- 기본적으로 데이터 파티션 하나 당 하나의 파일이 작성된다. 따라서 옵션에 지정한 파일명은 실제로 다수의 파일을 가진 디렉토리가 된다.
- 예시 (5개의 파티션으로 분할되어 파일을 생성)
csv_df.limit(10).write .mode("overwrite") .partitionBy("DEST_COUNTRY_NAME") .save("file:/tmp/partitioned-files.parquet")
%sh ls -ltr /tmp/partitioned-files.parquet
total 24 drwxr-xr-x 2 root root 4096 Jul 31 23:10 DEST_COUNTRY_NAME=Costa Rica drwxr-xr-x 2 root root 4096 Jul 31 23:10 DEST_COUNTRY_NAME=Egypt drwxr-xr-x 2 root root 4096 Jul 31 23:10 DEST_COUNTRY_NAME=Equatorial Guinea drwxr-xr-x 2 root root 4096 Jul 31 23:10 DEST_COUNTRY_NAME=Senegal drwxr-xr-x 2 root root 4096 Jul 31 23:10 DEST_COUNTRY_NAME=United States drwxr-xr-x 2 root root 4096 Jul 31 23:10 _delta_log
- 파티셔닝은 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능이다.
- 파티셔닝된 디렉터리 또는 테이블에 파일을 쓸 때 디렉토리별로 컬럼 데이터를 인코딩하여 저장한다.
- 따라서 데이터를 읽을 때 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽을 수 있다.
- 위의 데이터에서 각 폴더는 조건절을 폴더명으로 사용하며 조건적을 만족하는 데이터가 저장된 parquet 파일을 가지고 있다.
%sh ls -ltr /tmp/partitioned-files.parquet/DEST_COUNTRY_NAME=Senegal
total 4 -rw-r--r-- 1 root root 951 Jul 31 23:10 part-00000-086ea84d-b0e1-43e3-a024-4835aa46d2cb.c000.snappy.parquet
- 파티셔닝은 필터링을 자주 사용하는 테이블을 가진 경우에 사용할 수 있는 가장 손쉬운 최적화 방식이다.
- 예를 들어, 전체 데이터를 스캔하지 않고 지난주 데이터만 보려면 날짜를 기준으로 파티션을 만들 수 있다.
- 버켓팅은 각 파일에 저장된 데이터를 제어할 수 있는 또 다른 파일 조직화 기법이다.
- 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여있기 때문에 데이터를 읽을 때 셔플을 피할 수 있다.
- 즉, 데이터가 이후의 사용 방식에 맞춰 사전에 파티셔닝 되므로 조인이나 집계 시 발생하는 고비용의 셔플을 피할 수 있다.
val numberBuckets = 10 val columnToBucketBy = "count" csv_df.write .format("parquet") .mode("overwrite") .bucketBy(numberBuckets, columnToBucketBy) .saveAsTable("bucketedFiles")
path,name,size,modificationTime dbfs:/user/hive/warehouse/bucketedfiles/_SUCCESS,_SUCCESS,0,1690846049000 dbfs:/user/hive/warehouse/bucketedfiles/_committed_2481057225640818158,_committed_2481057225640818158,1085,1690846049000 dbfs:/user/hive/warehouse/bucketedfiles/_started_2481057225640818158,_started_2481057225640818158,0,1690846047000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-10_00009.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-10_00009.c000.snappy.parquet,2002,1690846048000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-1_00000.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-1_00000.c000.snappy.parquet,1817,1690846047000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-2_00001.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-2_00001.c000.snappy.parquet,1651,1690846047000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-3_00002.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-3_00002.c000.snappy.parquet,1747,1690846047000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-4_00003.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-4_00003.c000.snappy.parquet,1675,1690846048000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-5_00004.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-5_00004.c000.snappy.parquet,1830,1690846048000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-6_00005.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-6_00005.c000.snappy.parquet,1882,1690846048000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-7_00006.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-7_00006.c000.snappy.parquet,1672,1690846048000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-8_00007.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-8_00007.c000.snappy.parquet,1533,1690846048000 dbfs:/user/hive/warehouse/bucketedfiles/part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-9_00008.c000.snappy.parquet,part-00000-tid-2481057225640818158-033c357b-fb1b-4984-93e2-5096b26ca14b-321-9_00008.c000.snappy.parquet,1758,1690846048000
- 스파크는 다양한 복합 데이터 타입들을 제공하고 있다.
- 하지만 모든 데이터 파일 포맷에 적용하는 것은 어렵다.
- CSV 파일은 복합 데이터 타입을 지원하진 않지만 parquet나 orc는 복합 데이터 타입을 지원한다.
- 파일 크기를 관리하는 것은 데이터를 저장할 때보다는 읽을 때 중요하다.
- 작은 파일을 많이 생성하면 메타데이터에 엄청난 관리 부하가 발생하게 된다. (이러한 문제를 "small file problem" 이라고 한다.)
- 반대로 엄청나게 큰 파일을 하나 생성하면 몇 개의 데이터만 읽더라도 전체 데이터를 읽어야 하기 때문에 비효율적이다.
- 스파크 2.2 버전 이후로는 자동으로 파일 크기를 제어할 수 잇는 새로운 방법이 도입되었다.
- 이전의 예제에서는 파일의 수를 파티션의 수로 고정 할당하여 제어를 하였는데 아래와 같이 maxRecordsPerFile을 사용하면 파일을 최적의 크기로 제한할 수 있다.
df.write.option("maxRecordsPerFile", 5000)
- 위의 예시에서는 파일당 최대 5,000개의 로우를 포함할 수 있도록 보장한다.
- 스파크에서 데이터를 읽고 쓸 때 사용할 수 있는 다양한 옵션에 대하여 소개하였다.
- 사용자 정의 데이터 소스 구현하는 방법이 존재하는데 이는 관련 API가 구조적 스트리밍과의 호환성을 위해 개선되고 있으므로 생략하였다.
- 이에 관한 자세한 내용은 카산드라 커넥터를 참조한다. (https://github.com/datastax/spark-cassandra-connector)