Skip to content

Commit

Permalink
[SPARK-35105][SQL] Support multiple paths for ADD FILE/JAR/ARCHIVE co…
Browse files Browse the repository at this point in the history
…mmands

### What changes were proposed in this pull request?

This PR extends `ADD FILE/JAR/ARCHIVE` commands to be able to take multiple path arguments like Hive.

### Why are the changes needed?

To make those commands more useful.

### Does this PR introduce _any_ user-facing change?

Yes. In the current implementation, those commands can take a path which contains whitespaces without enclose it by neither `'` nor `"` but after this change, users need to enclose such paths.
I've note this incompatibility in the migration guide.

### How was this patch tested?

New tests.

Closes #32205 from sarutak/add-multiple-files.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
  • Loading branch information
sarutak committed Apr 29, 2021
1 parent 529b875 commit 132cbf0
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 30 deletions.
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ license: |

## Upgrading from Spark SQL 3.1 to 3.2

- Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.

- Since Spark 3.2, all the supported JDBC dialects use StringType for ROWID. In Spark 3.1 or earlier, Oracle dialect uses StringType and the other dialects use LongType.

- In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively.
Expand Down
3 changes: 2 additions & 1 deletion docs/sql-ref-syntax-aux-resource-mgmt-add-archive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license: |
### Syntax

```sql
ADD ARCHIVE file_name
ADD { ARCHIVE | ARCHIVES } file_name [ ... ]
```

### Parameters
Expand All @@ -42,6 +42,7 @@ ADD ARCHIVE /tmp/test.tar.gz;
ADD ARCHIVE "/path/to/some.zip";
ADD ARCHIVE '/some/other.tgz';
ADD ARCHIVE "/path with space/abc.tar";
ADD ARCHIVES "/path with space/def.tgz" '/path with space/ghi.zip';
```

### Related Statements
Expand Down
3 changes: 2 additions & 1 deletion docs/sql-ref-syntax-aux-resource-mgmt-add-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license: |
### Syntax

```sql
ADD FILE resource_name
ADD { FILE | FILES } resource_name [ ... ]
```

### Parameters
Expand All @@ -43,6 +43,7 @@ ADD FILE "/path/to/file/abc.txt";
ADD FILE '/another/test.txt';
ADD FILE "/path with space/abc.txt";
ADD FILE "/path/to/some/directory";
ADD FILES "/path with space/cde.txt" '/path with space/fgh.txt';
```

### Related Statements
Expand Down
3 changes: 2 additions & 1 deletion docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license: |
### Syntax

```sql
ADD JAR file_name
ADD { JAR | JARS } file_name [ ... ]
```

### Parameters
Expand All @@ -52,6 +52,7 @@ ADD JAR /tmp/test.jar;
ADD JAR "/path/to/some.jar";
ADD JAR '/some/other.jar';
ADD JAR "/path with space/abc.jar";
ADD JARS "/path with space/def.jar" '/path with space/ghi.jar';
ADD JAR "ivy://group:module:version";
ADD JAR "ivy://group:module:version?transitive=false"
ADD JAR "ivy://group:module:version?transitive=true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}

/**
* Create a [[AddFileCommand]], [[AddJarCommand]], [[AddArchiveCommand]],
* Create a [[AddFilesCommand]], [[AddJarsCommand]], [[AddArchivesCommand]],
* [[ListFilesCommand]], [[ListJarsCommand]] or [[ListArchivesCommand]]
* command depending on the requested operation on resources.
* Expected format:
Expand All @@ -356,15 +356,12 @@ class SparkSqlAstBuilder extends AstBuilder {
case p => p
}

// The implementation of pathForAdd is to keep the compatibility with before SPARK-34977.
val pathForAdd = strLiteralDef.findFirstIn(rawArg)
.find(p => p.startsWith("\"") || p.startsWith("'")).map(unescapeSQLString).getOrElse(rawArg)
ctx.op.getType match {
case SqlBaseParser.ADD =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "file" => AddFileCommand(pathForAdd)
case "jar" => AddJarCommand(pathForAdd)
case "archive" => AddArchiveCommand(pathForAdd)
case "files" | "file" => AddFilesCommand(maybePaths)
case "jars" | "jar" => AddJarsCommand(maybePaths)
case "archives" | "archive" => AddArchivesCommand(maybePaths)
case other => operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,30 @@ import org.apache.spark.util.Utils
/**
* Adds a jar to the current session so it can be used (for UDFs or serdes).
*/
case class AddJarCommand(path: String) extends LeafRunnableCommand {
case class AddJarsCommand(paths: Seq[String]) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.resourceLoader.addJar(path)
paths.foreach(sparkSession.sessionState.resourceLoader.addJar(_))
Seq.empty[Row]
}
}

/**
* Adds a file to the current session so it can be used.
*/
case class AddFileCommand(path: String) extends LeafRunnableCommand {
case class AddFilesCommand(paths: Seq[String]) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val recursive = !sparkSession.sessionState.conf.addSingleFileInAddFile
sparkSession.sparkContext.addFile(path, recursive)
paths.foreach(sparkSession.sparkContext.addFile(_, recursive))
Seq.empty[Row]
}
}

/**
* Adds an archive to the current session so it can be used.
*/
case class AddArchiveCommand(path: String) extends LeafRunnableCommand {
case class AddArchivesCommand(paths: Seq[String]) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sparkContext.addArchive(path)
paths.foreach(sparkSession.sparkContext.addArchive(_))
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,24 @@ class SparkSqlParserSuite extends AnalysisTest {
}

test("manage resources") {
assertEqual("ADD FILE abc.txt", AddFileCommand("abc.txt"))
assertEqual("ADD FILE 'abc.txt'", AddFileCommand("abc.txt"))
assertEqual("ADD FILE \"/path/to/abc.txt\"", AddFileCommand("/path/to/abc.txt"))
assertEqual("ADD FILE abc.txt", AddFilesCommand(Seq("abc.txt")))
assertEqual("ADD FILE 'abc.txt'", AddFilesCommand(Seq("abc.txt")))
assertEqual("ADD FILE \"/path/to/abc.txt\"", AddFilesCommand("/path/to/abc.txt"::Nil))
assertEqual("LIST FILE abc.txt", ListFilesCommand(Array("abc.txt")))
assertEqual("LIST FILE '/path//abc.txt'", ListFilesCommand(Array("/path//abc.txt")))
assertEqual("LIST FILE \"/path2/abc.txt\"", ListFilesCommand(Array("/path2/abc.txt")))
assertEqual("ADD JAR /path2/_2/abc.jar", AddJarCommand("/path2/_2/abc.jar"))
assertEqual("ADD JAR '/test/path_2/jar/abc.jar'", AddJarCommand("/test/path_2/jar/abc.jar"))
assertEqual("ADD JAR \"abc.jar\"", AddJarCommand("abc.jar"))
assertEqual("ADD JAR /path2/_2/abc.jar", AddJarsCommand(Seq("/path2/_2/abc.jar")))
assertEqual("ADD JAR '/test/path_2/jar/abc.jar'",
AddJarsCommand(Seq("/test/path_2/jar/abc.jar")))
assertEqual("ADD JAR \"abc.jar\"", AddJarsCommand(Seq("abc.jar")))
assertEqual("LIST JAR /path-with-dash/abc.jar",
ListJarsCommand(Array("/path-with-dash/abc.jar")))
assertEqual("LIST JAR 'abc.jar'", ListJarsCommand(Array("abc.jar")))
assertEqual("LIST JAR \"abc.jar\"", ListJarsCommand(Array("abc.jar")))
assertEqual("ADD FILE /path with space/abc.txt", AddFileCommand("/path with space/abc.txt"))
assertEqual("ADD JAR /path with space/abc.jar", AddJarCommand("/path with space/abc.jar"))
assertEqual("ADD FILE '/path with space/abc.txt'",
AddFilesCommand(Seq("/path with space/abc.txt")))
assertEqual("ADD JAR '/path with space/abc.jar'",
AddJarsCommand(Seq("/path with space/abc.jar")))
}

test("SPARK-32608: script transform with row format delimit") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}

// The class loader of CliSessionState's conf is current main thread's class loader
// used to load jars passed by --jars. One class loader used by AddJarCommand is
// used to load jars passed by --jars. One class loader used by AddJarsCommand is
// sharedState.jarClassLoader which contain jar path passed by --jars in main thread.
// We set CliSessionState's conf class loader to sharedState.jarClassLoader.
// Thus we can load all jars passed by --jars and AddJarCommand.
// Thus we can load all jars passed by --jars and AddJarsCommand.
sessionState.getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader)

// TODO work around for set the log output to console, because the HiveContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ private[hive] class HiveClientImpl(
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
// Hive 2.3 will set UDFClassLoader to hiveConf when initializing SessionState
// since HIVE-11878, and ADDJarCommand will add jars to clientLoader.classLoader.
// For this reason we cannot load the jars added by ADDJarCommand because of class loader
// since HIVE-11878, and ADDJarsCommand will add jars to clientLoader.classLoader.
// For this reason we cannot load the jars added by ADDJarsCommand because of class loader
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
SessionState.start(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
Files.write(file.toPath, "test_file1".getBytes)
val jarFile = new File(dir, "test file.jar")
TestUtils.createJar(Seq(file), jarFile)
sql(s"ADD JAR ${jarFile.getAbsolutePath}")
sql(s"ADD JAR '${jarFile.getAbsolutePath}'")
assert(sql("LIST JARS").
filter(_.getString(0).contains(s"${jarFile.getName}".replace(" ", "%20"))).count() > 0)
}
Expand Down Expand Up @@ -964,6 +964,104 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
}
}

test("SPARK-35105: ADD FILES command with multiple files") {
withTempDir { dir =>
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val file2 = File.createTempFile("someprefix2", "somesuffix 2", dir)
val file3 = File.createTempFile("someprefix3", "somesuffix 3", dir)
val file4 = File.createTempFile("someprefix4", "somesuffix4", dir)

Files.write(file1.toPath, "file1".getBytes)
Files.write(file2.toPath, "file2".getBytes)
Files.write(file3.toPath, "file3".getBytes)
Files.write(file4.toPath, "file3".getBytes)

sql(s"ADD FILE ${file1.getAbsolutePath} '${file2.getAbsoluteFile}'")
sql(s"""ADD FILES "${file3.getAbsolutePath}" ${file4.getAbsoluteFile}""")
val listFiles = sql(s"LIST FILES ${file1.getAbsolutePath} " +
s"'${file2.getAbsolutePath}' '${file3.getAbsolutePath}' ${file4.getAbsolutePath}")
assert(listFiles.count === 4)
assert(listFiles.filter(_.getString(0).contains(file1.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(file2.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(file3.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(_.getString(0).contains(file4.getName)).count() === 1)
}
}

test("SPARK-35105: ADD JARS command with multiple files") {
withTempDir { dir =>
val file1 = new File(dir, "test1.txt")
val file2 = new File(dir, "test2.txt")
val file3 = new File(dir, "test3.txt")
val file4 = new File(dir, "test4.txt")

Files.write(file1.toPath, "file1".getBytes)
Files.write(file2.toPath, "file2".getBytes)
Files.write(file3.toPath, "file3".getBytes)
Files.write(file4.toPath, "file4".getBytes)

val jarFile1 = File.createTempFile("someprefix1", "somesuffix 1", dir)
val jarFile2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val jarFile3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val jarFile4 = File.createTempFile("someprefix4", "somesuffix 4", dir)

TestUtils.createJar(Seq(file1), jarFile1)
TestUtils.createJar(Seq(file2), jarFile2)
TestUtils.createJar(Seq(file3), jarFile3)
TestUtils.createJar(Seq(file4), jarFile4)

sql(s"""ADD JAR "${jarFile1.getAbsolutePath}" ${jarFile2.getAbsoluteFile}""")
sql(s"ADD JARS ${jarFile3.getAbsolutePath} '${jarFile4.getAbsoluteFile}'")
val listFiles = sql(s"LIST JARS '${jarFile1.getAbsolutePath}' " +
s"${jarFile2.getAbsolutePath} ${jarFile3.getAbsolutePath} '${jarFile4.getAbsoluteFile}'")
assert(listFiles.count === 4)
assert(listFiles.filter(
_.getString(0).contains(jarFile1.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(_.getString(0).contains(jarFile2.getName)).count() === 1)
assert(listFiles.filter(_.getString(0).contains(jarFile3.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(jarFile4.getName.replace(" ", "%20"))).count() === 1)
}
}

test("SPARK-35105: ADD ARCHIVES command with multiple files") {
withTempDir { dir =>
val file1 = new File(dir, "test1.txt")
val file2 = new File(dir, "test2.txt")
val file3 = new File(dir, "test3.txt")
val file4 = new File(dir, "test4.txt")

Files.write(file1.toPath, "file1".getBytes)
Files.write(file2.toPath, "file2".getBytes)
Files.write(file3.toPath, "file3".getBytes)
Files.write(file4.toPath, "file4".getBytes)

val jarFile1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val jarFile2 = File.createTempFile("someprefix2", "somesuffix 2", dir)
val jarFile3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val jarFile4 = File.createTempFile("someprefix4", "somesuffix 4", dir)

TestUtils.createJar(Seq(file1), jarFile1)
TestUtils.createJar(Seq(file2), jarFile2)
TestUtils.createJar(Seq(file3), jarFile3)
TestUtils.createJar(Seq(file4), jarFile4)

sql(s"""ADD ARCHIVE ${jarFile1.getAbsolutePath} "${jarFile2.getAbsoluteFile}"""")
sql(s"ADD ARCHIVES ${jarFile3.getAbsolutePath} '${jarFile4.getAbsoluteFile}'")
val listFiles = sql(s"LIST ARCHIVES ${jarFile1.getAbsolutePath} " +
s"'${jarFile2.getAbsolutePath}' ${jarFile3.getAbsolutePath} '${jarFile4.getAbsolutePath}'")
assert(listFiles.count === 4)
assert(listFiles.filter(_.getString(0).contains(jarFile1.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(jarFile2.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(_.getString(0).contains(jarFile3.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(jarFile4.getName.replace(" ", "%20"))).count() === 1)
}
}

test("SPARK-34977: LIST FILES/JARS/ARCHIVES should handle multiple quoted path arguments") {
withTempDir { dir =>
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
Expand All @@ -979,6 +1077,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
sql(s"ADD FILE '${file3.getAbsolutePath}'")
val listFiles = sql("LIST FILES " +
s"""'${file1.getAbsolutePath}' ${file2.getAbsolutePath} "${file3.getAbsolutePath}"""")

assert(listFiles.count === 3)
assert(listFiles.filter(_.getString(0).contains(file1.getName)).count() === 1)
assert(listFiles.filter(_.getString(0).contains(file2.getName)).count() === 1)
Expand All @@ -1004,6 +1103,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
sql(s"ADD ARCHIVE '${jarFile3.getAbsolutePath}'")
val listArchives = sql(s"LIST ARCHIVES '${jarFile1.getAbsolutePath}' " +
s"""${jarFile2.getAbsolutePath} "${jarFile3.getAbsolutePath}"""")

assert(listArchives.count === 3)
assert(listArchives.filter(_.getString(0).contains(jarFile1.getName)).count() === 1)
assert(listArchives.filter(_.getString(0).contains(jarFile2.getName)).count() === 1)
Expand All @@ -1026,7 +1126,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd

sql(s"ADD JAR ${jarFile4.getAbsolutePath}")
sql(s"ADD JAR ${jarFile5.getAbsolutePath}")
sql(s"ADD JAR ${jarFile6.getAbsolutePath}")
sql(s"ADD JAR '${jarFile6.getAbsolutePath}'")
val listJars = sql(s"LIST JARS '${jarFile4.getAbsolutePath}' " +
s"""${jarFile5.getAbsolutePath} "${jarFile6.getAbsolutePath}"""")
assert(listJars.count === 3)
Expand Down

0 comments on commit 132cbf0

Please sign in to comment.