-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generic Spark Integration #101
Conversation
issue please. |
from flytekit.sdk.workflow import workflow_class, Input | ||
|
||
|
||
scala_spark = spark_task(spark_type="SCALA", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do one of 2 things,
Make Spark_type an enum, which it seems it is already in the proto def
OR
we should make special task wrappers like scala_spark etc which fixes the type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made an enum .
flytekit/sdk/tasks.py
Outdated
@@ -406,12 +406,16 @@ def spark_task( | |||
cache_version='', | |||
retries=0, | |||
interruptible=None, | |||
inputs=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this handleded differently as compared to @inputs decorator? If so we should document that correctly, as this is very confusing.
ALSO at the moment I think only a few input types are supported like primitives and blobs right?
If so can we verify this statically and raise an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to use the inputs decorator. Added the check for primitives for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't mix the decorator with the generic usage. The decorator patterns are part of the SDK's basic python programming model so the assumption is that they will always be python. It's easy enough to expose a helper function or class for Scala spark jobs elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(if you want to make available through this file, that's fine too. I just think mixing a decorator and common object definitions will eventually cause a problem)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and continuing Ketan's thought, I would also just avoid adding the input arg that way all together. Especially since as your test below shows, you use the input decorator as a helper function--which is a bit confusing. Better to use a new interface and just a normal {'a': Types.Integer}
style annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally wasn't using the decorator but changed after Ketan's comment to be in-sync with how we do this for Presto as well : https://github.com/lyft/flytekit/blob/master/tests/flytekit/common/workflows/presto.py#L11
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to stop using the existing spark_task decorator. I do think we should make the generic_spark_task available from this file as well. Added it separately as a helper function.
) | ||
) | ||
|
||
def _get_container_definition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is where you will have to check the types and raise an alert. You can make this as a common method for now, when we add support for all types, we can remove this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, added it in the validate_inputs above
Codecov Report
@@ Coverage Diff @@
## master #101 +/- ##
=========================================
Coverage ? 82.24%
=========================================
Files ? 208
Lines ? 13266
Branches ? 1107
=========================================
Hits ? 10910
Misses ? 2092
Partials ? 264 Continue to review full report at Codecov.
|
flytekit/sdk/types.py
Outdated
|
||
|
||
class SparkType(enum.Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be defined in flyteidl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it is and you map it above. This file is really for I/O typing so I wouldn't introduce them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will move to a different file
Looks good |
@@ -488,6 +490,45 @@ def wrapper(fn): | |||
return wrapper | |||
|
|||
|
|||
def generic_spark_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we need this method right?
We can directly use - SdkGenericSparkTask?
Unless you are defaulting somethings. (i see timeout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we are defaulting. In-addition, I also want users to be able to look-up all task_types supported from a single place.
I'm a new Flyte (and FlyteKit) user. Is this PR going to be merged and is it going to be the canonical way to use scala spark with Flyte? |
class SparkTasksWorkflow(object): | ||
triggered_date = Input(Types.Datetime) | ||
partitions = Input(Types.Integer) | ||
sparkTask = scala_spark(partitions=partitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no camel case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
You're gonna need to update |
Looks good, but I will have to approve after you update the version or else my review will get cleared. |
Bumped version |
* Generic Spark Integration * Image * PR comments * PR comments * Separate task-type for generic-spark * PR comments
Makes it easier to integrate a non-Python Spark Job as a task in Flyte. Issue : flyteorg/flyte#266
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
https://github.com/lyft/flyte/issues/
Follow-up issue
NA
OR
https://github.com/lyft/flyte/issues/