Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching a polars dataframe into parquet fails #1240

Open
poldpold opened this issue Nov 23, 2024 · 4 comments · May be fixed by #1241
Open

Caching a polars dataframe into parquet fails #1240

poldpold opened this issue Nov 23, 2024 · 4 comments · May be fixed by #1241
Labels
triage label for issues that need to be triaged.

Comments

@poldpold
Copy link

Current behavior

When trying to cache a node whose output is a polars DataFrame, an exception is raised.

Stack Traces

********************************************************************************
>[post-node-execute] hello [test_module.hello()] encountered an error          <
> Node inputs:
{}
********************************************************************************
Traceback (most recent call last):
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py", line 318, in execute_lifecycle_for_node
    __adapter.call_all_lifecycle_hooks_sync(
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/lifecycle/base.py", line 915, in call_all_lifecycle_hooks_sync
    getattr(adapter, hook_name)(**kwargs)
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/adapter.py", line 1446, in post_node_execute
    self.result_store.set(
  File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/stores/file.py", line 71, in set
    saver = saver_cls(path=str(materialized_path.absolute()))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'
-------------------------------------------------------------------
{
	"name": "TypeError",
	"message": "PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'",
	"stack": "---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[4], line 1
----> 1 dr.execute(final_vars=[\"hello\"])

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:637, in Driver.execute(self, final_vars, overrides, display_graph, inputs)
    635     error_execution = e
    636     error_telemetry = telemetry.sanitize_error(*sys.exc_info())
--> 637     raise e
    638 finally:
    639     if self.adapter.does_hook(\"post_graph_execute\", is_async=False):

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:623, in Driver.execute(self, final_vars, overrides, display_graph, inputs)
    614     self.adapter.call_all_lifecycle_hooks_sync(
    615         \"pre_graph_execute\",
    616         run_id=run_id,
   (...)
    620         overrides=overrides,
    621     )
    622 try:
--> 623     outputs = self.__raw_execute(
    624         _final_vars, overrides, display_graph, inputs=inputs, _run_id=run_id
    625     )
    626     if self.adapter.does_method(\"do_build_result\", is_async=False):
    627         # Build the result if we have a result builder
    628         outputs = self.adapter.call_lifecycle_method_sync(
    629             \"do_build_result\", outputs=outputs
    630         )

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:834, in Driver.__raw_execute(self, final_vars, overrides, display_graph, inputs, _fn_graph, _run_id)
    832     return results
    833 except Exception as e:
--> 834     raise e

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:825, in Driver.__raw_execute(self, final_vars, overrides, display_graph, inputs, _fn_graph, _run_id)
    823 results = None
    824 try:
--> 825     results = self.graph_executor.execute(
    826         function_graph,
    827         final_vars,
    828         overrides if overrides is not None else {},
    829         inputs if inputs is not None else {},
    830         run_id,
    831     )
    832     return results
    833 except Exception as e:

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:175, in DefaultGraphExecutor.execute(self, fg, final_vars, overrides, inputs, run_id)
    173 memoized_computation = dict()  # memoized storage
    174 nodes = [fg.nodes[node_name] for node_name in final_vars if node_name in fg.nodes]
--> 175 fg.execute(nodes, memoized_computation, overrides, inputs, run_id=run_id)
    176 outputs = {
    177     # we do this here to enable inputs to also be used as outputs
    178     # putting inputs into memoized before execution doesn't work due to some graphadapter assumptions.
    179     final_var: memoized_computation.get(final_var, inputs.get(final_var))
    180     for final_var in final_vars
    181 }  # only want request variables in df.
    182 del memoized_computation  # trying to cleanup some memory

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/graph.py:1099, in FunctionGraph.execute(self, nodes, computed, overrides, inputs, run_id)
   1097     run_id = str(uuid.uuid4())
   1098 inputs = graph_functions.combine_config_and_inputs(self.config, inputs)
-> 1099 return graph_functions.execute_subdag(
   1100     nodes=nodes,
   1101     inputs=inputs,
   1102     adapter=self.adapter,
   1103     computed=computed,
   1104     overrides=overrides,
   1105     run_id=run_id,
   1106 )

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:250, in execute_subdag(nodes, inputs, adapter, computed, overrides, run_id, task_id)
    247     if final_var_node.user_defined:
    248         # from the top level, we don't know if this UserInput is required. So mark as optional.
    249         dep_type = node.DependencyType.OPTIONAL
--> 250     dfs_traverse(final_var_node, dep_type)
    251 return computed

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:222, in execute_subdag.<locals>.dfs_traverse(node_, dependency_type)
    215         result = adapter.call_lifecycle_method_sync(
    216             \"do_remote_execute\",
    217             node=node_,
    218             execute_lifecycle_for_node=execute_lifecycle_for_node_partial,
    219             **kwargs,
    220         )
    221     else:
--> 222         result = execute_lifecycle_for_node_partial(**kwargs)
    224 computed[node_.name] = result
    225 # > pruning the graph
    226 # This doesn't narrow it down to the entire space of the graph
    227 # E.G. if something is not needed by this current execution due to
    228 # the selection of nodes to run it might not prune everything.
    229 # to do this we'd need to first determine all nodes on the path, then prune
    230 # We may also want to use a reference counter for slightly cleaner/more efficient memory management

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:318, in execute_lifecycle_for_node(__node_, __adapter, __run_id, __task_id, **__kwargs)
    314 if not pre_node_execute_errored and __adapter.does_hook(
    315     \"post_node_execute\", is_async=False
    316 ):
    317     try:
--> 318         __adapter.call_all_lifecycle_hooks_sync(
    319             \"post_node_execute\",
    320             run_id=__run_id,
    321             node_=__node_,
    322             kwargs=__kwargs,
    323             success=success,
    324             error=error,
    325             result=result,
    326             task_id=__task_id,
    327         )
    328     except Exception:
    329         message = create_error_message(__kwargs, __node_, \"[post-node-execute]\")

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/lifecycle/base.py:915, in LifecycleAdapterSet.call_all_lifecycle_hooks_sync(self, hook_name, **kwargs)
    909 \"\"\"Calls all the lifecycle hooks in this group, by hook name (stage)
    910 
    911 :param hook_name: Name of the hooks to call
    912 :param kwargs: Keyword arguments to pass into the hook
    913 \"\"\"
    914 for adapter in self.sync_hooks.get(hook_name, []):
--> 915     getattr(adapter, hook_name)(**kwargs)

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/adapter.py:1446, in HamiltonCacheAdapter.post_node_execute(self, run_id, node_, result, success, error, task_id, **future_kwargs)
   1444 result_missing = not self.result_store.exists(data_version)
   1445 if result_missing or materialized_path_missing:
-> 1446     self.result_store.set(
   1447         data_version=data_version,
   1448         result=result,
   1449         saver_cls=saver_cls,
   1450         loader_cls=loader_cls,
   1451     )
   1452     self._log_event(
   1453         run_id=run_id,
   1454         node_name=node_name,
   (...)
   1458         value=data_version,
   1459     )

File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/stores/file.py:71, in FileResultStore.set(self, data_version, result, saver_cls, loader_cls)
     68 if saver_cls is not None:
     69     # materialized_path
     70     materialized_path = self._materialized_path(data_version, saver_cls)
---> 71     saver = saver_cls(path=str(materialized_path.absolute()))
     72     loader = loader_cls(path=str(materialized_path.absolute()))
     73 else:

TypeError: PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'"
}

Steps to replicate behavior

Write and run a jupyter notebook with the following cells:

%load_ext hamilton.plugins.jupyter_magic
%%cell_to_module -m test_module --display --rebuild-drivers

import polars as pl
from hamilton.function_modifiers import cache

@cache(format="parquet")
def hello() -> pl.DataFrame:
    return pl.DataFrame({"a": [1,2]})
from hamilton import driver
import test_module

dr = (
    driver
    .Builder()
    .with_config({})
    .with_modules(test_module)
    .with_cache(path=".")
    .build()
)
dr.execute(final_vars=["hello"])

Library & System Information

python=3.11.8, sf-hamilton=1.81.0 and 1.83.2, polars=1.10.0

Expected behavior

I expected the node output to be persisted to disk in a parquet format.

@poldpold poldpold added the triage label for issues that need to be triaged. label Nov 23, 2024
@skrawcz
Copy link
Collaborator

skrawcz commented Nov 24, 2024

@poldpold thanks for the issue! Will take a look.

skrawcz added a commit that referenced this issue Nov 24, 2024
The cache store assumed that every persister took a `path` argument. That is
not the case because the savers / loaders wrap external APIs and we decided
to not try to create our own abstraction layer around them, and instead mirror them.

E.g. polars takes `file`, but pandas takes `path`.
@skrawcz skrawcz linked a pull request Nov 24, 2024 that will close this issue
7 tasks
@skrawcz
Copy link
Collaborator

skrawcz commented Nov 24, 2024

@poldpold if you could try installing my fix and giving it a go please:
Assuming you use SSH: pip install "git+ssh://[email protected]/dagworks-inc/hamilton.git@fix_1240"

@poldpold
Copy link
Author

@skrawcz, this works great, thank you! Looking forward to seeing it in an upcoming release!

@skrawcz
Copy link
Collaborator

skrawcz commented Nov 25, 2024

@skrawcz, this works great, thank you! Looking forward to seeing it in an upcoming release!

it will be out this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage label for issues that need to be triaged.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants