-
Notifications
You must be signed in to change notification settings - Fork 59
Conversation
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
} | ||
return dynamicWorkflowContext{ | ||
isDynamic: true, | ||
subWorkflow: compiledWf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we set subWorkflowClosure
here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't cache that though, right? (if i'm understanding correctly)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we send the dynamic workflow spec only the first time? or on every event (that will be too much)?
If it's only needed the first time, then I guess that's alright...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we only send the spec the first time (when it's uncached) and don't need to bog down subsequent messages
@@ -329,4 +333,88 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t | |||
assert.Error(t, err) | |||
assert.True(t, callsAdmin) | |||
}) | |||
t.Run("dynamic wf cached", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we write a test for the failure case? Where the cache read fails, or the cache write fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I am assuming you do not have the event sending in this PR right? |
trns.WithInfo(trns.Info().WithInfo(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{TaskNodeMetadata: &event.TaskNodeMetadata{CacheStatus: status.GetCacheStatus(), CatalogKey: status.GetMetadata()}}})) | ||
taskNodeInfoMetadata := &event.TaskNodeMetadata{CacheStatus: status.GetCacheStatus(), CatalogKey: status.GetMetadata()} | ||
if dCtx.subWorkflowClosure != nil && dCtx.subWorkflowClosure.Primary != nil && dCtx.subWorkflowClosure.Primary.Template != nil { | ||
taskNodeInfoMetadata.DynamicWorkflow = &event.DynamicWorkflowNodeMetadata{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty static (it gets generated once and never changes for the duration of that node)... should we keep sending it to Admin on every event? isn't that too much?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer we send it only once - does this need a state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we only send it once. for cached reads to the dynamic workflow we don't send it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should rely on caching to not send it... can we make it based on the phase maybe? we only send it on queued or something like that... ? if there isn't a good existing node phase to control this, we can introduce one here...
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
@@ -116,6 +120,19 @@ func (d dynamicNodeTaskNodeHandler) buildDynamicWorkflowTemplate(ctx context.Con | |||
}, nil | |||
} | |||
|
|||
// Updates a dynamically generated subworkflow version to be deterministically generated from the computed closure. | |||
func setDeterministicVersion(ctx context.Context, workflowClosure *core.CompiledWorkflowClosure) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not need to be done any more right? Why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
PTAL @EngHabu |
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the verification runs!
Signed-off-by: Ketan Umare <[email protected]>
TL;DR
Cache dynamic workflow. Verified the cache is read during execution.
Also sends workflow spec to admin
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
flyteorg/flyte#899
flyteorg/flyteconsole#295
flyteorg/flyte#928
Follow-up issue
NA