-
Notifications
You must be signed in to change notification settings - Fork 910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run pipeline without reading from intermediate datasets #420
Comments
Hi @deepyaman Thank you for sharing your hooks! Regarding the contribution, Kedro only provides the hooks spec and doesn't have any concrete hooks implementation. Hooks implementations are something users would implement in their projects, and we would encourage people to publish their own hooks outside of Kedro hooks. We are more than happy to list your hook plugins in our docs (see the examples for kedro-wing, and steel-toe) |
Hi @deepyaman , have you tried CachedDataSet?
https://kedro.readthedocs.io/en/stable/kedro.io.CachedDataSet.html |
Hi @Minyus! I think
I'd say |
related to #346 Also:
If I remember correctly I was letting the |
Interesting. If so, you might be interested in |
You're right, sorry. I didn't read the code properly. Edit: @tsanikgr @Minyus @921kiyo I feel like you all have more familiarity with |
I want to write, but I don't want to be blocked on write when I already have the data in memory. More specifically, I want to write so that I have my versioned dataset associated with this run. I don't care when it gets written out, since I have an in-memory copy that I can pass along. (This does assume writing doesn't hit a snag, as it's possible your pipeline will run ahead based on in-memory data while your write fails for whatever reason, so future datasets also don't get written out unless some handling is added. I'm honestly not too worried about this myself.) |
Yes, to some extent. My implementation (https://github.com/deepyaman/hookshot/blob/develop/src/hookshot/hooks.py) is based on the code that handles the async functionality, but extended across the pipeline rather than on a per-node basis (hence the "unrolled" I think the feeling I'm getting is that there are existing methods that are in the direction of what I want, but my feeling is that they don't push it far enough. I will try to find some time to benchmark these different approaches under parametrizable conditions ( |
Great, I look forward to your benchmark results. |
Sorry for the delay! I've put together something in my spare time, not feature complete but figured I'd share. Let's assume a slow filesystem with a load and save delay of 10 seconds for intermediate datasets. I haven't added delays in nodes (to simulate nontrivial data processing) yet; an example of where this makes a better case for Here are executions under each strategy:
(Note that times include the initial minute delays before the pipeline begins, because of the way I added delays somehow triggering on initialization.) The code to run these examples are in https://github.com/deepyaman/hookshot/. You can also change the load/save delays in
|
Baseline (i.e. no caching/plugins)(hookshot) BOS-178551-C02X31K9JHD4:hookshot deepyaman$ kedro run
2020-07-14 13:40:10,444 - root - INFO - ** Kedro project hookshot
/anaconda3/envs/hookshot/lib/python3.8/site-packages/fsspec/implementations/local.py:29: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
warnings.warn(
2020-07-14 13:40:11,067 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x1167b39d0
2020-07-14 13:40:21,071 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x1167b39d0
2020-07-14 13:40:21,072 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816c310
2020-07-14 13:40:31,076 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816c310
2020-07-14 13:40:31,077 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816c8b0
2020-07-14 13:40:41,080 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816c8b0
2020-07-14 13:40:41,080 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816ca00
2020-07-14 13:40:51,082 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816ca00
2020-07-14 13:40:51,083 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816cb50
2020-07-14 13:41:01,083 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816cb50
2020-07-14 13:41:01,084 - kedro.io.core - DEBUG - Saving SlowDataSet() at 0x11816cca0
2020-07-14 13:41:11,086 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<object>) at 0x11816cca0
2020-07-14 13:41:11,096 - kedro.io.data_catalog - INFO - Loading data from `example_iris_data` (CSVDataSet)...
2020-07-14 13:41:11,096 - kedro.io.core - DEBUG - Loading CSVDataSet(filepath=/Users/deepyaman/hookshot/data/01_raw/iris.csv, protocol=file, save_args={'index': False})
2020-07-14 13:41:11,113 - kedro.io.data_catalog - INFO - Loading data from `params:example_test_data_ratio` (MemoryDataSet)...
2020-07-14 13:41:11,113 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<float>)
2020-07-14 13:41:11,114 - kedro.pipeline.node - INFO - Running node: split_data([example_iris_data,params:example_test_data_ratio]) -> [example_test_x,example_test_y,example_train_x,example_train_y]
2020-07-14 13:41:11,143 - kedro.io.data_catalog - INFO - Saving data to `example_train_x` (SlowDataSet)...
2020-07-14 13:41:11,144 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x1167b39d0
2020-07-14 13:41:11,144 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x1167b39d0
2020-07-14 13:41:21,144 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:41:21,144 - kedro.io.data_catalog - INFO - Saving data to `example_train_y` (SlowDataSet)...
2020-07-14 13:41:21,145 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c310
2020-07-14 13:41:21,145 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c310
2020-07-14 13:41:31,153 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:41:31,153 - kedro.io.data_catalog - INFO - Saving data to `example_test_x` (SlowDataSet)...
2020-07-14 13:41:31,153 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c8b0
2020-07-14 13:41:31,153 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816c8b0
2020-07-14 13:41:41,162 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:41:41,162 - kedro.io.data_catalog - INFO - Saving data to `example_test_y` (SlowDataSet)...
2020-07-14 13:41:41,162 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816ca00
2020-07-14 13:41:41,162 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816ca00
2020-07-14 13:41:51,168 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:41:51,169 - kedro.runner.sequential_runner - INFO - Completed 1 out of 4 tasks
2020-07-14 13:41:51,169 - kedro.io.data_catalog - INFO - Loading data from `example_train_x` (SlowDataSet)...
2020-07-14 13:41:51,170 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:41:51,170 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:42:01,176 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:42:01,177 - kedro.io.data_catalog - INFO - Loading data from `example_train_y` (SlowDataSet)...
2020-07-14 13:42:01,177 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:01,177 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:11,181 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:11,181 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
2020-07-14 13:42:11,181 - kedro.io.core - DEBUG - Loading MemoryDataSet(data=<dict>)
2020-07-14 13:42:11,181 - kedro.pipeline.node - INFO - Running node: train_model([example_train_x,example_train_y,parameters]) -> [example_model]
2020-07-14 13:42:11,516 - kedro.io.data_catalog - INFO - Saving data to `example_model` (SlowDataSet)...
2020-07-14 13:42:11,516 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cb50
2020-07-14 13:42:11,517 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cb50
2020-07-14 13:42:21,518 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:21,518 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x1167b39d0
2020-07-14 13:42:21,518 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x11816c310
2020-07-14 13:42:21,519 - kedro.runner.sequential_runner - INFO - Completed 2 out of 4 tasks
2020-07-14 13:42:21,519 - kedro.io.data_catalog - INFO - Loading data from `example_model` (SlowDataSet)...
2020-07-14 13:42:21,519 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:21,519 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:31,521 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:31,521 - kedro.io.data_catalog - INFO - Loading data from `example_test_x` (SlowDataSet)...
2020-07-14 13:42:31,521 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:31,521 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:41,525 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:41,525 - kedro.pipeline.node - INFO - Running node: predict([example_model,example_test_x]) -> [example_predictions]
2020-07-14 13:42:41,527 - kedro.io.data_catalog - INFO - Saving data to `example_predictions` (SlowDataSet)...
2020-07-14 13:42:41,527 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cca0
2020-07-14 13:42:41,527 - kedro.io.core - DEBUG - Saving SlowDataSet(data=<object>) at 0x11816cca0
2020-07-14 13:42:51,532 - kedro.io.core - DEBUG - Saved SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:42:51,532 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<ndarray>) at 0x11816cb50
2020-07-14 13:42:51,532 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x11816c8b0
2020-07-14 13:42:51,532 - kedro.runner.sequential_runner - INFO - Completed 3 out of 4 tasks
2020-07-14 13:42:51,532 - kedro.io.data_catalog - INFO - Loading data from `example_predictions` (SlowDataSet)...
2020-07-14 13:42:51,533 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:42:51,533 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:43:01,537 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:43:01,537 - kedro.io.data_catalog - INFO - Loading data from `example_test_y` (SlowDataSet)...
2020-07-14 13:43:01,537 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:01,538 - kedro.io.core - DEBUG - Loading SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:11,539 - kedro.io.core - DEBUG - Loaded SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:11,539 - kedro.pipeline.node - INFO - Running node: report_accuracy([example_predictions,example_test_y]) -> None
2020-07-14 13:43:11,540 - hookshot.pipelines.data_science.nodes - INFO - Model accuracy on test set: 100.00%
2020-07-14 13:43:11,540 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<ndarray>) at 0x11816cca0
2020-07-14 13:43:11,540 - kedro.io.core - DEBUG - Releasing SlowDataSet(data=<DataFrame>) at 0x11816ca00
2020-07-14 13:43:11,540 - kedro.runner.sequential_runner - INFO - Completed 4 out of 4 tasks
2020-07-14 13:43:11,540 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully. |
|
|
Description
I'm always frustrated when I/O dominates compute.
For example, my pipeline takes 10 minutes to run, of which 9 minutes are spent writing to and reading back from S3.
Context
At QuantumBlack, it's most common to write intermediate datasets to disk. In fact, the Kedro data catalog very much facilitates this workflow. This also presents numerous advantages:
However, it's also extremely inefficient, especially when writing large datasets using slow mechanisms. On top of that, we most often expect reloaded data to be exactly equal to what was saved, save the case of transcoding and some terminal output formats (e.g. Excel, CSV).
Possible Implementation
https://github.com/deepyaman/hookshot/blob/develop/src/hookshot/hooks.py
Feel free to clone the repo and run the example. :)
At a high-level, the plugin aims to provide Unix
tee
-like behavior to runners.Goals of this implementation:
Limitations:
SharedMemoryDataSet
forParallelRunner
. I would be happy to get some input from the experts here. :)I'm most interested in understanding what's the best way to contribute this. I think it makes sense as part of a new
kedro.extras.hooks
subpackage. As part of Kedro, this functionality would continue to be supported through backend redesigns.The text was updated successfully, but these errors were encountered: