From 61d7e37840b1d0dbe75bce0a85c0fb573e7f9f44 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 21 Apr 2021 20:59:49 +0800 Subject: [PATCH] [NSE-261] ArrowDataSource: Add S3 Support --- arrow-data-source/pom.xml | 58 ++++++++++++++++++- .../datasources/v2/arrow/ArrowUtils.scala | 5 ++ .../arrow/ArrowDataSourceTest.scala | 15 +++-- native-sql-engine/core/pom.xml | 2 +- 4 files changed, 72 insertions(+), 8 deletions(-) diff --git a/arrow-data-source/pom.xml b/arrow-data-source/pom.xml index 3dc3c7747..fa6553045 100644 --- a/arrow-data-source/pom.xml +++ b/arrow-data-source/pom.xml @@ -48,6 +48,50 @@ + + org.apache.hadoop + hadoop-aws + 2.7.3 + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + + javax.servlet + servlet-api + + + com.sun.jersey + jersey-core + + + com.sun.jersey + jersey-json + + + com.sun.jersey + jersey-server + + + commons-httpclient + commons-httpcore + + + + + org.apache.httpcomponents + httpcore + 4.2 + org.scala-lang scala-library @@ -61,7 +105,7 @@ org.apache.arrow - arrow-format + arrow-vector org.slf4j @@ -91,6 +135,12 @@ org.apache.spark spark-catalyst_2.12 ${spark.version} + + + org.apache.arrow + arrow-vector + + test-jar test @@ -98,6 +148,12 @@ org.apache.spark spark-sql_2.12 ${spark.version} + + + org.apache.arrow + arrow-vector + + test-jar test diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala index 4ef604114..4af788d0e 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/v2/arrow/ArrowUtils.scala @@ -156,6 +156,11 @@ object ArrowUtils { private def rewriteUri(uriStr: String): String = { val uri = URI.create(uriStr) + if (uri.getScheme == "s3" || uri.getScheme == "s3a") { + val s3Rewritten = new URI("s3", uri.getAuthority, + uri.getPath, uri.getQuery, uri.getFragment).toString + return s3Rewritten + } val sch = uri.getScheme match { case "hdfs" => "hdfs" case "file" => "file" diff --git a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala index f88e085fa..161d285c7 100644 --- a/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala +++ b/arrow-data-source/standard/src/test/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowDataSourceTest.scala @@ -106,10 +106,18 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { verifyParquet( spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path)) } + test("simple sql query on s3") { + val path = "s3a://mlp-spark-dataset-bucket/test_arrowds_s3_small" + val frame = spark.read + .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") + .arrow(path) + frame.createOrReplaceTempView("stab") + assert(spark.sql("select id from stab").count() === 1000) + } + test("create catalog table") { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) spark.catalog.createTable("ptab", path, "arrow") @@ -130,7 +138,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") verifyParquet(spark.sql("select * from ptab")) @@ -142,7 +149,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile3) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") val sqlFrame = spark.sql("select * from ptab") @@ -163,7 +169,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") spark.sql("select col from ptab where col = 1").explain(true) @@ -178,7 +183,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile2) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") val rows = spark.sql("select * from ptab where col = 'b'").collect() @@ -215,7 +219,6 @@ class ArrowDataSourceTest extends QueryTest with SharedSparkSession { val path = ArrowDataSourceTest.locateResourcePath(parquetFile1) val frame = spark.read .option(ArrowOptions.KEY_ORIGINAL_FORMAT, "parquet") - .option(ArrowOptions.KEY_FILESYSTEM, "hdfs") .arrow(path) frame.createOrReplaceTempView("ptab") diff --git a/native-sql-engine/core/pom.xml b/native-sql-engine/core/pom.xml index cfd87baa0..fba2ef075 100644 --- a/native-sql-engine/core/pom.xml +++ b/native-sql-engine/core/pom.xml @@ -33,7 +33,7 @@ 3.0.0 3.0.0 2.12 - 2.12.8 + 2.12.10 none package provided