-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REVIEW] Formalization of Computation #4923
Conversation
33cc926
to
4c1cd29
Compare
4a3ab81
to
d71974a
Compare
d71974a
to
2adfcfc
Compare
@jcrist @TomAugspurger if either of you have time (I suspect not) it would be good to get the perspective of someone who is familiar with Dask broadly, who also isn't familiar with the work that Mads and team have been doing. A sanity check here would be welcome. |
- `PickledObject` - An object that are serialized using `protocol.pickle`. | ||
This object isn't a computation by itself instead users can build pickled | ||
computations that contains pickled objects. This object is automatically | ||
de-serialized by the Worker before execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have PickledObjects? Or should we use the general serialization path for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean if we have like a list of objects? That should also just use a single PickledObject
to serialize everything in one go.
We use typeset_computation()
to look through a computation and wrap individual task functions in PickledCallable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean "why would we ever want to pickle an object, rather than give the rest of our serialization machinery a chance to work?" For example, what if the object was a cupy array.
Or maybe this is used very infrequently, and not where user-data is likely to occur?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically the fine vs coarse grained serialization discussion. This PR continues the existing coarse grained approach where (nested) tasks are pickled. We used to use dumps_function()
and warn_dumps()
to do this.
Now, we use PickledComputation
as the outermost wrapper and PickledObjects
for already pickled objects. This makes it possible for HLG.unpack() that runs on the Scheduler to build new tasks of already pickled objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe this is used very infrequently, and not where user-data is likely to occur?
Yes, including large data in the task directly will raise a warning just like it use to.
Can one of the admins verify this patch? |
@jrbourbeau if you can find time to review this PR, it would be great. |
This PR implements graph computations based on the specification in Dask:
In order to support efficient and flexible task serialization, this PR introduces classes for computations, tasks, data, functions, etc.
This PR continues the existing coarse grained approach where (nested) tasks are pickled. We used to use
dumps_function()
andwarn_dumps()
to do this. Now, we usePickledComputation
as the outermost wrapper andPickledObjects
for already pickled objects. This makes it possible forHLG.unpack(
), which runs on the Scheduler, to build new tasks of already pickled objects.Notable Classes
PickledObject
- An object that are serialized usingprotocol.pickle
. This object isn't a computation by itself instead users can build computations containing them. It is automatically de-serialized by the Worker before execution.Computation
- A computation that the Worker can execute. The Scheduler sees this as a black box. A computation cannot contain pickled objects but it may containSerialize
and/orSerialized
objects, which will be de-serialize when arriving on the Worker automatically.PickledComputation
- A computation that are serialized usingprotocol.pickle
. The class is derived fromComputation
but can contain pickled objects. Itself and contained pickled objects will be de-serialized by the Worker before execution.Notable Functions
typeset_dask_graph()
- Use to typeset a Dask graph, which wrap computations in either theData
orTask
class. This should be done before communicating the graph. Note, this replaces the oldtlz.valmap(dumps_task, dsk)
operation.Serialize
objects within tasks #4673tuple
s &list
s in MsgPack serialization #4575dask.dataframe.read_csv('./filepath/*.csv')
returning tuple dask#7777dumps_task
inSimpleShuffleLayer
andBroadcastJoinLayer
unpack dask#7650black distributed
/flake8 distributed
/isort distributed