Skip to content

Commit

Permalink
Perform minimal cleanup (make filepath optional) (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored Mar 19, 2021
1 parent 3f5c74b commit 1144426
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
3 changes: 0 additions & 3 deletions conf/base/catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ example_creditcard_data:

streaming_creditcard_data:
type: kedro_streaming.io.SparkStreamingDataSet
filepath: ignore
file_format: kafka
load_args:
kafka.bootstrap.servers: "localhost:19092"
Expand All @@ -23,11 +22,9 @@ example_model:

streaming_predictions:
type: kedro_streaming.io.SparkStreamingDataSet
filepath: ignore
file_format: console
# streaming_predictions:
# type: kedro_streaming.io.SparkStreamingDataSet
# filepath: ignore
# file_format: kafka
# save_args:
# output_mode: append
Expand Down
12 changes: 6 additions & 6 deletions src/kedro_streaming/io/spark_streaming_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,20 @@ class SparkStreamingDataSet(AbstractVersionedDataSet):

def __init__( # pylint: disable=too-many-arguments
self,
filepath: str,
filepath: str = "",
file_format: str = "parquet",
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``SparkDataSet``.
"""Creates a new instance of ``SparkStreamingDataSet``.
Args:
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
and working with data written to mount path points,
specify ``filepath``s for (versioned) ``SparkDataSet``s
starting with ``/dbfs/mnt``.
starting with ``/dbfs/mnt``. Required for file sources/sinks.
file_format: File format used during load and save
operations. These are formats supported by the running
SparkContext include parquet, csv. For a list of supported
Expand Down Expand Up @@ -310,9 +310,9 @@ def _load(self) -> DataFrame:

def _save(self, data: DataFrame) -> None:
# save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path()))
data.writeStream.format("console").outputMode(
"append"
).start().awaitTermination()
data.writeStream.start(
format=self._file_format, **self._save_args
).awaitTermination()

def _exists(self) -> bool:
# TODO(deepyaman): Check that the stream exists, done for files.
Expand Down

0 comments on commit 1144426

Please sign in to comment.