Skip to content

Commit

Permalink
Instrument ArrayNode (flyteorg#550)
Browse files Browse the repository at this point in the history
* updated flyteidl to local to get ArrayNode

Signed-off-by: Daniel Rammer <[email protected]>

* added boilerplate to support ArrayNode

Signed-off-by: Daniel Rammer <[email protected]>

* pushing forward

Signed-off-by: Daniel Rammer <[email protected]>

* refactored node executor interfaces to fix dependency cycle

Signed-off-by: Daniel Rammer <[email protected]>

* refactoring almost complete

Signed-off-by: Daniel Rammer <[email protected]>

* refactor complete

Signed-off-by: Daniel Rammer <[email protected]>

* supporting environment variables

Signed-off-by: Daniel Rammer <[email protected]>

* minimum viable product

Signed-off-by: Daniel Rammer <[email protected]>

* update print statements for debugging

Signed-off-by: Daniel Rammer <[email protected]>

* massive refactor fixing NodeExecutionContext override for ArrayNode

Signed-off-by: Daniel Rammer <[email protected]>

* refactoring TODOs

Signed-off-by: Daniel Rammer <[email protected]>

* subnode retries working

Signed-off-by: Daniel Rammer <[email protected]>

* parallelism working

Signed-off-by: Daniel Rammer <[email protected]>

* cache and cache_serialize working - first new functionality in maptask

Signed-off-by: Daniel Rammer <[email protected]>

* adding implementation notes

Signed-off-by: Daniel Rammer <[email protected]>

* removed eventing from subtasks

Signed-off-by: Daniel Rammer <[email protected]>

* adding correct requirements

Signed-off-by: Daniel Rammer <[email protected]>

* working end-2-end with flytekit

Signed-off-by: Daniel Rammer <[email protected]>

* reporting output directory on success

Signed-off-by: Daniel Rammer <[email protected]>

* fixed output directory append

Signed-off-by: Daniel Rammer <[email protected]>

* mocking TaskTemplate interface to enable caching

Signed-off-by: Daniel Rammer <[email protected]>

* capture failure reasons

Signed-off-by: Daniel Rammer <[email protected]>

* wrapped up abort and finalize functionality

Signed-off-by: Daniel Rammer <[email protected]>

* mocking initialization events

Signed-off-by: Daniel Rammer <[email protected]>

* sending all events

Signed-off-by: Daniel Rammer <[email protected]>

* minor refactoring of debug prints and formatting

Signed-off-by: Daniel Rammer <[email protected]>

* intratask checkpointing working

Signed-off-by: Daniel Rammer <[email protected]>

* support for  and

Signed-off-by: Daniel Rammer <[email protected]>

* setting node log ids correctly

Signed-off-by: Daniel Rammer <[email protected]>

* reporting cache status

Signed-off-by: Daniel Rammer <[email protected]>

* correctly setting subnode abort phase

Signed-off-by: Daniel Rammer <[email protected]>

* removing dead code

Signed-off-by: Daniel Rammer <[email protected]>

* cleaned up most random TODO items

Signed-off-by: Daniel Rammer <[email protected]>

* refactored into new files

Signed-off-by: Daniel Rammer <[email protected]>

* refactoring for ArrayNode unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* refactored for unit testing to allow creation of NodeExecutor in array package

Signed-off-by: Daniel Rammer <[email protected]>

* first unit test for handling ArrayNodePhaseNone

Signed-off-by: Daniel Rammer <[email protected]>

* most of executing unit tests completed

Signed-off-by: Daniel Rammer <[email protected]>

* finished executing unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* finished succeeding unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* wrote failing phase unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* moving towards complete unit_test success

Signed-off-by: Daniel Rammer <[email protected]>

* unit tests passing

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl dep

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests for Abort

Signed-off-by: Daniel Rammer <[email protected]>

* adding unit test for Finalize

Signed-off-by: Daniel Rammer <[email protected]>

* added utils unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* moved state structs to handler package

Signed-off-by: Daniel Rammer <[email protected]>

* added docs

Signed-off-by: Daniel Rammer <[email protected]>

* cleaned up abort event reporting

Signed-off-by: Daniel Rammer <[email protected]>

* fixed RecordNodeEvent unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* removed taskEventRecorder from nodes package

Signed-off-by: Daniel Rammer <[email protected]>

* adding interface checking for arraynode

Signed-off-by: Daniel Rammer <[email protected]>

* added transform unit test

Signed-off-by: Daniel Rammer <[email protected]>

* fixed input bindings issue

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* go generate

Signed-off-by: Daniel Rammer <[email protected]>

* addressing random TODO

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* addressing pr comments

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jul 31, 2023
1 parent 577b35e commit 924f110
Show file tree
Hide file tree
Showing 102 changed files with 6,577 additions and 2,153 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
tasks:
- container:
args:
- "pyflyte-fast-execute"
- "--additional-distribution"
- "s3://my-s3-bucket/flytesnacks/development/SMJBJX7BQJ6MCOABLKQT5VZXVY======/script_mode.tar.gz"
- "--dest-dir"
- "/root"
- "--"
- "pyflyte-map-execute"
- "--inputs"
- "{{.input}}"
- "--output-prefix"
- "{{.outputPrefix}}"
- "--raw-output-data-prefix"
- "{{.rawOutputDataPrefix}}"
- "--checkpoint-path"
- "{{.checkpointOutputPrefix}}"
- "--prev-checkpoint"
- "{{.prevCheckpointPrefix}}"
- "--resolver"
- "MapTaskResolver"
- "--"
- "vars"
- ""
- "resolver"
- "flytekit.core.python_auto_container.default_task_resolver"
- "task-module"
- "map-task"
- "task-name"
- "a_mappable_task"
image: "array-node:ee1ba227aa95447d04bb1761691b4d97749642dc"
resources:
limits:
- name: 1
value: "1"
- name: 3
value: "500Mi"
requests:
- name: 1
value: "1"
- name: 3
value: "300Mi"
id:
name: task-1
project: flytesnacks
domain: development
metadata:
discoverable: true
discovery_version: "1.0"
cache_serializable: true
interface:
inputs:
variables:
a:
type:
simple: INTEGER
outputs:
variables:
o0:
type:
simple: STRING
workflow:
id:
name: workflow-with-array-node
interface:
inputs:
variables:
x:
type:
collectionType:
simple: INTEGER
nodes:
- id: node-1
inputs:
- binding:
promise:
node_id: start-node
var: x
var: a
arrayNode:
parallelism: 0
node:
metadata:
retries:
retries: 3
taskNode:
referenceId:
name: task-1
project: flytesnacks
domain: development
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
tasks:
- container:
args:
- "pyflyte-fast-execute"
- "--additional-distribution"
- "s3://my-s3-bucket/flytesnacks/development/SMJBJX7BQJ6MCOABLKQT5VZXVY======/script_mode.tar.gz"
- "--dest-dir"
- "/root"
- "--"
- "pyflyte-map-execute"
- "--inputs"
- "{{.input}}"
- "--output-prefix"
- "{{.outputPrefix}}"
- "--raw-output-data-prefix"
- "{{.rawOutputDataPrefix}}"
- "--checkpoint-path"
- "{{.checkpointOutputPrefix}}"
- "--prev-checkpoint"
- "{{.prevCheckpointPrefix}}"
- "--resolver"
- "MapTaskResolver"
- "--"
- "vars"
- ""
- "resolver"
- "flytekit.core.python_auto_container.default_task_resolver"
- "task-module"
- "map-task"
- "task-name"
- "a_mappable_task"
image: "array-node:ee1ba227aa95447d04bb1761691b4d97749642dc"
resources:
limits:
- name: 1
value: "1"
- name: 3
value: "500Mi"
requests:
- name: 1
value: "1"
- name: 3
value: "300Mi"
id:
name: task-1
project: flytesnacks
domain: development
metadata:
discoverable: true
discovery_version: "1.0"
cache_serializable: false
interface:
inputs:
variables:
a:
type:
simple: INTEGER
outputs:
variables:
o0:
type:
simple: STRING
workflow:
id:
name: workflow-with-array-node
interface:
inputs:
variables:
x:
type:
collectionType:
simple: INTEGER
nodes:
- id: node-1
inputs:
- binding:
promise:
node_id: start-node
var: x
var: a
arrayNode:
parallelism: 1
node:
metadata:
retries:
retries: 3
taskNode:
referenceId:
name: task-1
project: flytesnacks
domain: development
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
literals:
"x":
collection:
literals:
- scalar:
primitive:
integer: "1"
- scalar:
primitive:
integer: "2"
- scalar:
primitive:
integer: "3"
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
tasks:
- container:
args:
- "pyflyte-fast-execute"
- "--additional-distribution"
- "s3://my-s3-bucket/flytesnacks/development/SMJBJX7BQJ6MCOABLKQT5VZXVY======/script_mode.tar.gz"
- "--dest-dir"
- "/root"
- "--"
- "pyflyte-map-execute"
- "--inputs"
- "{{.input}}"
- "--output-prefix"
- "{{.outputPrefix}}"
- "--raw-output-data-prefix"
- "{{.rawOutputDataPrefix}}"
- "--checkpoint-path"
- "{{.checkpointOutputPrefix}}"
- "--prev-checkpoint"
- "{{.prevCheckpointPrefix}}"
- "--resolver"
- "MapTaskResolver"
- "--"
- "vars"
- ""
- "resolver"
- "flytekit.core.python_auto_container.default_task_resolver"
- "task-module"
- "map-task"
- "task-name"
- "a_mappable_task"
image: "array-node:ee1ba227aa95447d04bb1761691b4d97749642dc"
resources:
limits:
- name: 1
value: "1"
- name: 3
value: "500Mi"
requests:
- name: 1
value: "1"
- name: 3
value: "300Mi"
id:
name: task-1
metadata:
discoverable: false
cache_serializable: false
interface:
inputs:
variables:
a:
type:
simple: INTEGER
outputs:
variables:
x:
type:
simple: STRING
workflow:
id:
name: workflow-with-array-node
interface:
inputs:
variables:
x:
type:
collectionType:
simple: INTEGER
nodes:
- id: node-1
inputs:
- binding:
promise:
node_id: start-node
var: x
var: a
arrayNode:
parallelism: 1
node:
metadata:
retries:
retries: 3
taskNode:
referenceId:
name: task-1
8 changes: 4 additions & 4 deletions flytepropeller/events/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/golang/protobuf/proto"
)

const maxErrorMessageLength = 104857600 //100KB
const MaxErrorMessageLength = 104857600 //100KB
const truncationIndicator = "... <Message Truncated> ..."

type recordingMetrics struct {
Expand Down Expand Up @@ -60,23 +60,23 @@ func (r *eventRecorder) sinkEvent(ctx context.Context, event proto.Message) erro

func (r *eventRecorder) RecordNodeEvent(ctx context.Context, e *event.NodeExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.NodeExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
truncateErrorMessage(err.Error, MaxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

func (r *eventRecorder) RecordTaskEvent(ctx context.Context, e *event.TaskExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.TaskExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
truncateErrorMessage(err.Error, MaxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

func (r *eventRecorder) RecordWorkflowEvent(ctx context.Context, e *event.WorkflowExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.WorkflowExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
truncateErrorMessage(err.Error, MaxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34
4 changes: 2 additions & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.10 h1:SHeiaWRt8EAVuFsat+BJswtc07HTZ4DqhfTEYSm621k=
github.com/flyteorg/flyteidl v1.5.10/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34 h1:Gj5UKqJU+ozeTeYAvDWHiF4HSVufHW1W1ecymFfbbis=
github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y=
github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E=
github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ=
Expand Down
24 changes: 24 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package v1alpha1

type ArrayNodeSpec struct {
SubNodeSpec *NodeSpec
Parallelism uint32
MinSuccesses *uint32
MinSuccessRatio *float32
}

func (a *ArrayNodeSpec) GetSubNodeSpec() *NodeSpec {
return a.SubNodeSpec
}

func (a *ArrayNodeSpec) GetParallelism() uint32 {
return a.Parallelism
}

func (a *ArrayNodeSpec) GetMinSuccesses() *uint32 {
return a.MinSuccesses
}

func (a *ArrayNodeSpec) GetMinSuccessRatio() *float32 {
return a.MinSuccessRatio
}
Loading

0 comments on commit 924f110

Please sign in to comment.