From c1241ecc7f3dad4114da8326155406675b29573a Mon Sep 17 00:00:00 2001 From: Zachary Stewart Date: Mon, 3 Sep 2018 14:02:10 -0400 Subject: [PATCH] Support passing the result of one task into other tasks (#1) * Support passing data between tasks * Add a new Results type for intermediate results, change interface to use varargs for tasks * Update the docs to support passing parameters --- README.md | 14 ++++++------ doc.go | 22 +++++++++---------- execution_ctx.go | 7 +++++- results.go | 19 ++++++++++++++++ workflow.go | 5 +++-- workflow_test.go | 57 +++++++++++++++++++++++++++++++----------------- 6 files changed, 83 insertions(+), 41 deletions(-) create mode 100644 results.go diff --git a/README.md b/README.md index 49842aa..429d648 100644 --- a/README.md +++ b/README.md @@ -18,18 +18,18 @@ import ( func doSomething(ctx context.Context) { - taskGraph, err := workflow.NewGraph([]workflow.Task{ - NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error { + taskGraph, err := workflow.NewGraph( + NewTask("taskOne", []string{"taskTwo"}, func(ctx context.Context, res Results) (interface{}, error) { // Do some useful work here... - return nil + return nil, nil }), - NewTask("someOtherTask", nil, func(ctx context.Context) error { + NewTask("taskTwo", nil, func(ctx context.Context, res Results) (interface{}, error) { // Do some useful work here... - return nil + return nil, nil }) - }) + ) - // Check for an error in case we forgot to add "someOtherTask" as a dependency to "taskName" + // Check for an error -- maybe we forgot to add "taskTwo" if err != nil { // Handle the error .... } diff --git a/doc.go b/doc.go index 8d8d9cd..619f594 100644 --- a/doc.go +++ b/doc.go @@ -19,19 +19,18 @@ // ) // // func doSomething(ctx context.Context) { -// -// taskGraph, err := workflow.NewGraph([]workflow.Task{ -// NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error { +// taskGraph, err := workflow.NewGraph( +// NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context, res Results) (interface{}, error) { // // Do some useful work here... -// return nil +// return nil, nil // }), -// NewTask("someOtherTask", nil, func(ctx context.Context) error { +// NewTask("someOtherTask", nil, func(ctx context.Context, res Results) (interface{}, error) { // // Do some useful work here... -// return nil +// return nil, nil // }) -// }) +// ) // -// // Check for an error in case we forgot to add "someOtherTask" as a dependency to "taskName" +// // Check for an error constructing the graph // if err != nil { // // Handle the error .... // } @@ -42,9 +41,10 @@ // } // // In this example, `taskName` will run before `someOtherTask` does, assuming -// that `taskName` does not return an error. -// If "taskName" returns an error, `taskGraph.Run()` will return an error and -// `someOtherTask` will not execute. +// that `taskName` does not return an error. `taskName` will also have the +// result of `someOtherTask` included in the `Results` parameter. +// If "someOtherTask" returns an error, `taskGraph.Run()` will return an error and +// `taskName` will not execute. // // Currently, 2^32 - 1 tasks are supported per workflow. package workflow diff --git a/execution_ctx.go b/execution_ctx.go index 456ae96..eb82bee 100644 --- a/execution_ctx.go +++ b/execution_ctx.go @@ -20,6 +20,7 @@ type executionCtx struct { ctx context.Context cancel context.CancelFunc errCounter *atomic.Int32 // There are no atomic errors, sadly + results Results } func newExecutionCtx(ctx context.Context, g Graph) *executionCtx { @@ -31,6 +32,7 @@ func newExecutionCtx(ctx context.Context, g Graph) *executionCtx { ctx: iCtx, cancel: cancel, errCounter: atomic.NewInt32(0), + results: Results{resMap: &sync.Map{}}, } } @@ -78,12 +80,15 @@ func (ec *executionCtx) runTask(t Task) { return } - if err := t.fn(ec.ctx); err != nil { + res, err := t.fn(ec.ctx, ec.results) + if err != nil { ec.markFailure(err) // Do not queue up additional tasks after encountering an error return } + ec.results.store(t.name, res) + for dep := range ec.g.taskToDependants[t.name] { if ec.taskToNumdeps[dep].Add(-1) == int32(0) { ec.enqueueTask(ec.g.tasks[dep]) diff --git a/results.go b/results.go new file mode 100644 index 0000000..a2775c3 --- /dev/null +++ b/results.go @@ -0,0 +1,19 @@ +package workflow + +import "sync" + +// Results holds a reference to the intermediate results of a workflow +// execution. It used to task the result of one function into its +// dependencies. +type Results struct { + resMap *sync.Map +} + +// Load retrieves the result for a particular task. +func (r Results) Load(taskName string) (val interface{}, ok bool) { + return r.resMap.Load(taskName) +} + +func (r Results) store(taskName string, result interface{}) { + r.resMap.Store(taskName, result) +} diff --git a/workflow.go b/workflow.go index c120dc4..6906ab3 100644 --- a/workflow.go +++ b/workflow.go @@ -9,7 +9,8 @@ var empty struct{} // A TaskFn is an individually executable unit of work. It is expected that // when the context is closed, such as via a timetout or cancellation, that // a TaskFun will cease execution and return immediately. -type TaskFn = func(ctx context.Context) error +// Results, which can be nil, contains a map of task name to return result. +type TaskFn = func(ctx context.Context, results Results) (interface{}, error) // A Task is a unit of work along with a name and set of dependencies. type Task struct { @@ -50,7 +51,7 @@ type Graph struct { // well-formed, such as when a dependency is not satisfied or a cycle is // detected. func NewGraph( - tasks []Task, + tasks ...Task, ) (Graph, error) { taskMap := make(map[string]Task, len(tasks)) taskToDependants := make(map[string]map[string]struct{}, len(tasks)) diff --git a/workflow_test.go b/workflow_test.go index 5745cb8..fc2b218 100644 --- a/workflow_test.go +++ b/workflow_test.go @@ -7,7 +7,7 @@ import ( ) func TestWorkflowNoJobs(t *testing.T) { - graph, err := NewGraph(nil) + graph, err := NewGraph() if err != nil { t.Fatal("failed to initialize the graph") @@ -21,28 +21,45 @@ func TestWorkflowNoJobs(t *testing.T) { } func TestWorkflowSimpleCase(t *testing.T) { - taskOneRan := false taskTwoRan := false - taskGraph, err := NewGraph([]Task{ - NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error { - // Do some useful work here... + taskThreeRan := false + taskOneRetVal := "someReturnValue" + taskGraph, err := NewGraph( + NewTask("taskOne", []string{"taskTwo"}, func(ctx context.Context, res Results) (interface{}, error) { taskOneRan = true - return nil + return taskOneRetVal, nil }), - NewTask("someOtherTask", nil, func(ctx context.Context) error { - // Do some useful work here... + NewTask("taskTwo", nil, func(ctx context.Context, res Results) (interface{}, error) { taskTwoRan = true - return nil + return nil, nil }), - }) + NewTask("taskThree", []string{"taskOne"}, func(ctx context.Context, res Results) (interface{}, error) { + taskThreeRan = true + + taskOneRes, ok := res.Load("taskOne") + if !ok { + t.Fatal("failed to stored value in result map") + } + + resAsString, ok := taskOneRes.(string) + if !ok { + t.Fatal("failed to cast result to a string") + } + + if resAsString != taskOneRetVal { + t.Fatal("incorrect value returned") + } + + return nil, nil + }), + ) if err != nil { t.Fatal("failed to initialize the graph") } err = taskGraph.Run(context.Background()) - if err != nil { t.Fatal("failed to run the graph") } @@ -55,25 +72,25 @@ func TestWorkflowSimpleCase(t *testing.T) { t.Fatal("failed to run taskTwo") } + if !taskThreeRan { + t.Fatal("failed to run taskThree") + } } func TestWorkflowReturnsError(t *testing.T) { - retErr := errors.New("bad error") taskOneRan := false taskTwoRan := false - taskGraph, err := NewGraph([]Task{ - NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error { - // Do some useful work here... + taskGraph, err := NewGraph( + NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context, res Results) (interface{}, error) { taskOneRan = true - return nil + return nil, nil }), - NewTask("someOtherTask", nil, func(ctx context.Context) error { - // Do some useful work here... + NewTask("someOtherTask", nil, func(ctx context.Context, res Results) (interface{}, error) { taskTwoRan = true - return retErr + return nil, retErr }), - }) + ) if err != nil { t.Fatal("failed to initialize the graph")