Skip to content

Commit

Permalink
[SPARK-21996][SQL] read files with space in name for streaming
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Structured streaming is now able to read files with space in file name (previously it would skip the file and output a warning)

## How was this patch tested?

Added new unit test.

Author: Xiayun Sun <[email protected]>

Closes #19247 from xysun/SPARK-21996.

(cherry picked from commit 0219470)
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
xysun authored and zsxwing committed Jan 18, 2018
1 parent 050c1e2 commit f2688ef
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class FileStreamSource(
val newDataSource =
DataSource(
sparkSession,
paths = files.map(_.path),
paths = files.map(f => new Path(new URI(f.path)).toString),
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
className = fileFormatClassName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ abstract class FileStreamSourceTest
protected def addData(source: FileStreamSource): Unit
}

case class AddTextFileData(content: String, src: File, tmp: File)
case class AddTextFileData(content: String, src: File, tmp: File, tmpFilePrefix: String = "text")
extends AddFileData {

override def addData(source: FileStreamSource): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val tempFile = Utils.tempFileWith(new File(tmp, tmpFilePrefix))
val finalFile = new File(src, tempFile.getName)
src.mkdirs()
require(stringToFile(tempFile, content).renameTo(finalFile))
Expand Down Expand Up @@ -408,6 +408,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-21996 read from text files -- file name has space") {
withTempDirs { case (src, tmp) =>
val textStream = createFileStream("text", src.getCanonicalPath)
val filtered = textStream.filter($"value" contains "keep")

testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp, "text text"),
CheckAnswer("keep2", "keep3")
)
}
}

test("SPARK-21996 read from text files generated by file sink -- file name has space") {
val testTableName = "FileStreamSourceTest"
withTable(testTableName) {
withTempDirs { case (src, checkpoint) =>
val output = new File(src, "text text")
val inputData = MemoryStream[String]
val ds = inputData.toDS()

val query = ds.writeStream
.option("checkpointLocation", checkpoint.getCanonicalPath)
.format("text")
.start(output.getCanonicalPath)

try {
inputData.addData("foo")
failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
query.stop()
}

val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
val query2 = df2.writeStream.format("memory").queryName(testTableName).start()
try {
query2.processAllAvailable()
checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
} finally {
query2.stop()
}
}
}
}

test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
Expand Down

0 comments on commit f2688ef

Please sign in to comment.