-
Notifications
You must be signed in to change notification settings - Fork 6
File Source
File Source(s) provide a stream over files in a directory, or a set of directories when configured with a glob path. As files are added to the configured path(s), the File Source will create micro-batches for the underlying streaming query to execute based on new files. There are many types of File Source(s), though they share common behavior. File Source(s) typically vary behaviorally based on the underlying file format for which they provide a stream.
Below are a list of File Source implementations:
All file sources support rate limiting. Please see maxFilesPerTrigger for more information.
Some file sources support user-defined schemas. Please refer to a specific file source's documentation for more details.
All file sources share a common write-ahead-log implementation. Each micro-batch, a file source will poll its configured path for new files. Files detected to be new, based on configured options, will be written into a file based on the current micro-batch id. Entries in this file contain metadata about each file to process. When a file source must recover, these files allow the file source to infer which files have already been processed. Typically, this write-ahead-log is stored underneath the streaming query's checkpoint location base directory in sources/$sourceId/$batchId
. Below are a few examples of contents of a file source write-ahead-log.
Chriss-MacBook-Pro-4:bar cbowden$ ls -lrt sources/0/
total 168
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:18 0
Chriss-MacBook-Pro-4:bar cbowden$ cat sources/0/0
v1
{"path":"file:///tmp/data/foo/0000.json","timestamp":1516227354000,"batchId":0}
In the above example, micro-batch 0 processed one file: /tmp/data/foo/0000.json
. If the underlying streaming query were to fail and restart, the file source would be aware /tmp/data/foo/0000.json
was already processed, so it would not serve the contents of this file to the sink for processing again.
Over time, you may find your write-ahead-log contains .compact
files. These are compacted contents of all micro-batches less than and up to the corresponding micro-batch id. The file source is self-compacting so it does not create too many small files, as seen below:
Chriss-MacBook-Pro-4:bar cbowden$ ls -l sources/0/
total 168
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:18 0
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:18 1
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 2
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 3
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 4
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 5
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 6
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 7
-rw-r--r-- 1 cbowden wheel 86 Jan 17 16:19 8
-rw-r--r-- 1 cbowden wheel 842 Jan 17 16:19 9.compact
Chriss-MacBook-Pro-4:bar cbowden$ cat sources/0/9.compact
v1
{"path":"file:///tmp/data/foo/0000.json","timestamp":1516227354000,"batchId":0}
{"path":"file:///tmp/data/foo/0001.json","timestamp":1516234736000,"batchId":1}
{"path":"file:///tmp/data/foo/0002.json","timestamp":1516234757000,"batchId":2}
{"path":"file:///tmp/data/foo/0003.json","timestamp":1516234769000,"batchId":3}
{"path":"file:///tmp/data/foo/0004.json","timestamp":1516234773000,"batchId":4}
{"path":"file:///tmp/data/foo/0005.json","timestamp":1516234775000,"batchId":5}
{"path":"file:///tmp/data/foo/0006.json","timestamp":1516234783000,"batchId":6}
{"path":"file:///tmp/data/foo/0007.json","timestamp":1516234786000,"batchId":7}
{"path":"file:///tmp/data/foo/0008.json","timestamp":1516234790000,"batchId":8}
{"path":"file:///tmp/data/foo/0009.json","timestamp":1516234793000,"batchId":9}
As time passes, write-ahead-log entries older than the current compaction file(s) will be deleted. For more information, users should read about the following options:
- spark.sql.streaming.fileSource.log.cleanupDelay
- spark.sql.streaming.fileSource.log.compactInterval
- spark.sql.streaming.fileSource.log.deletion
Specifies whether to check new files based on only the filename, instead of on the full path. If set to true
, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
- file:///dataset.txt
- s3://a/dataset.txt
- s3n://a/b/dataset.txt
- s3a://a/b/c/dataset.txt
Defaults to false
.
CREATE STREAM foo
FROM TEXT
OPTIONS(
'fileNameOnly'='true'
);
Specifies whether to scan latest files first. If set to true
, when the source finds unprocessed files in a micro-batch, it will process the latest files first.
Defaults to false
.
CREATE STREAM foo
FROM TEXT
OPTIONS(
'latestFirst'='true'
);
Specifies the maximum age of a file that can be found in this directory before it is ignored. For the first micro-batch all files will be considered valid. If latestFirst is set to true
and maxFilesPerTrigger
is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details.
The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system. This means if the last file has timestamp 1000, and the current system time is 2000, and max age is 200, the system will purge files older than 800 (rather than 1800) from internal state.
Provide values in the format 50s
, 100ms
, or 250us
, etc. Note there is no space between the number and units. Valid units include:
-
us
: microseconds -
ms
: milliseconds -
s
: seconds -
m
: minutes -
min
: minutes -
h
: hours -
d
: days
Defaults to 7d
.
CREATE STREAM foo
FROM TEXT
OPTIONS(
'maxFileAge'='1h'
);
Specifies the maximum number of files this source should provide to the underlying streaming query for each micro-batch. For example, a user may have domain context around a particular use case. Given this domain context, the user prefers each micro-batch processes up to 10 files (perhaps to provide predictable end-to-end latency per micro-batch).
Defaults to none
(e.g., process all available files).
CREATE STREAM foo
FROM TEXT
OPTIONS(
'maxFilesPerTrigger'='10'
);
Specifies the path to stream data from as new files show up. A wide variety of URI schemes as well as globs are supported. Below is a list of common URI scheme examples:
- file:///path/to/data
- hdfs:///path/to/data
- s3://path/to/data
- s3n://path/to/data
- s3a://path/to/data
If no scheme is provided (e.g., /path/to/data
rather than file:///path/to/data
), the scheme will default to Spark's underlying FileSystem
(e.g., hdfs when deployed on a properly configured HDFS cluster).
The path
option is required for all File Source(s) unless otherwise noted by a specific File Source implementation.
CREATE STREAM foo
FROM TEXT
OPTIONS(
'path'='/path/to/data'
);
If spark.sql.streaming.fileSource.log.deletion
is enabled, how long to keep old write-ahead-log files around. Each write-ahead-log file must be at least spark.sql.streaming.fileSource.log.compactInterval
micro-batches old and older than spark.sql.streaming.fileSource.log.cleanupDelay
to be eligible for deletion.
Provide values in the format 50s
, 100ms
, or 250us
, etc. Note there is no space between the number and units. Valid units include:
-
us
: microseconds -
ms
: milliseconds -
s
: seconds -
m
: minutes -
min
: minutes -
h
: hours -
d
: days
Defaults to 10m
.
-- spark.properties: spark.sql.streaming.fileSource.log.cleanupDelay=1h
CREATE STREAM foo
FROM TEXT
OPTIONS(...);
Number of log files after which all previous write-ahead-log files are compacted. Internally, file source(s) use a write-ahead-log to keep track of which files have been processed. To prevent creating too many small files within the write-ahead-log, file source(s) periodically merge each batch's write-ahead-log state into a larger file. The compact interval specifies how frequently compaction should occur in micro-batches, e.g., every 10th micro-batch, etc.
Defaults to 10
.
-- spark.properties: spark.sql.streaming.fileSource.log.compactInterval=20
CREATE STREAM foo
FROM TEXT
OPTIONS(...);
Whether or not files in the write-ahead-log older than spark.sql.streaming.fileSource.log.compactInterval
and spark.sql.streaming.fileSource.log.cleanupDelay
should be deleted.
Defaults to true
.
-- spark.properties: spark.sql.streaming.fileSource.log.deletion=false
CREATE STREAM foo
FROM TEXT
OPTIONS(...);