Skip to content

Commit

Permalink
Support passing the result of one task into other tasks (#1)
Browse files Browse the repository at this point in the history
* Support passing data between tasks
* Add a new Results type for intermediate results, change interface to use varargs for tasks
* Update the docs to support passing parameters
  • Loading branch information
ztstewart authored Sep 3, 2018
1 parent 1d8d7ae commit c1241ec
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 41 deletions.
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
func doSomething(ctx context.Context) {
taskGraph, err := workflow.NewGraph([]workflow.Task{
NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error {
taskGraph, err := workflow.NewGraph(
NewTask("taskOne", []string{"taskTwo"}, func(ctx context.Context, res Results) (interface{}, error) {
// Do some useful work here...
return nil
return nil, nil
}),
NewTask("someOtherTask", nil, func(ctx context.Context) error {
NewTask("taskTwo", nil, func(ctx context.Context, res Results) (interface{}, error) {
// Do some useful work here...
return nil
return nil, nil
})
})
)
// Check for an error in case we forgot to add "someOtherTask" as a dependency to "taskName"
// Check for an error -- maybe we forgot to add "taskTwo"
if err != nil {
// Handle the error ....
}
Expand Down
22 changes: 11 additions & 11 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
// )
//
// func doSomething(ctx context.Context) {
//
// taskGraph, err := workflow.NewGraph([]workflow.Task{
// NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error {
// taskGraph, err := workflow.NewGraph(
// NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context, res Results) (interface{}, error) {
// // Do some useful work here...
// return nil
// return nil, nil
// }),
// NewTask("someOtherTask", nil, func(ctx context.Context) error {
// NewTask("someOtherTask", nil, func(ctx context.Context, res Results) (interface{}, error) {
// // Do some useful work here...
// return nil
// return nil, nil
// })
// })
// )
//
// // Check for an error in case we forgot to add "someOtherTask" as a dependency to "taskName"
// // Check for an error constructing the graph
// if err != nil {
// // Handle the error ....
// }
Expand All @@ -42,9 +41,10 @@
// }
//
// In this example, `taskName` will run before `someOtherTask` does, assuming
// that `taskName` does not return an error.
// If "taskName" returns an error, `taskGraph.Run()` will return an error and
// `someOtherTask` will not execute.
// that `taskName` does not return an error. `taskName` will also have the
// result of `someOtherTask` included in the `Results` parameter.
// If "someOtherTask" returns an error, `taskGraph.Run()` will return an error and
// `taskName` will not execute.
//
// Currently, 2^32 - 1 tasks are supported per workflow.
package workflow
7 changes: 6 additions & 1 deletion execution_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type executionCtx struct {
ctx context.Context
cancel context.CancelFunc
errCounter *atomic.Int32 // There are no atomic errors, sadly
results Results
}

func newExecutionCtx(ctx context.Context, g Graph) *executionCtx {
Expand All @@ -31,6 +32,7 @@ func newExecutionCtx(ctx context.Context, g Graph) *executionCtx {
ctx: iCtx,
cancel: cancel,
errCounter: atomic.NewInt32(0),
results: Results{resMap: &sync.Map{}},
}
}

Expand Down Expand Up @@ -78,12 +80,15 @@ func (ec *executionCtx) runTask(t Task) {
return
}

if err := t.fn(ec.ctx); err != nil {
res, err := t.fn(ec.ctx, ec.results)
if err != nil {
ec.markFailure(err)
// Do not queue up additional tasks after encountering an error
return
}

ec.results.store(t.name, res)

for dep := range ec.g.taskToDependants[t.name] {
if ec.taskToNumdeps[dep].Add(-1) == int32(0) {
ec.enqueueTask(ec.g.tasks[dep])
Expand Down
19 changes: 19 additions & 0 deletions results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package workflow

import "sync"

// Results holds a reference to the intermediate results of a workflow
// execution. It used to task the result of one function into its
// dependencies.
type Results struct {
resMap *sync.Map
}

// Load retrieves the result for a particular task.
func (r Results) Load(taskName string) (val interface{}, ok bool) {
return r.resMap.Load(taskName)
}

func (r Results) store(taskName string, result interface{}) {
r.resMap.Store(taskName, result)
}
5 changes: 3 additions & 2 deletions workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ var empty struct{}
// A TaskFn is an individually executable unit of work. It is expected that
// when the context is closed, such as via a timetout or cancellation, that
// a TaskFun will cease execution and return immediately.
type TaskFn = func(ctx context.Context) error
// Results, which can be nil, contains a map of task name to return result.
type TaskFn = func(ctx context.Context, results Results) (interface{}, error)

// A Task is a unit of work along with a name and set of dependencies.
type Task struct {
Expand Down Expand Up @@ -50,7 +51,7 @@ type Graph struct {
// well-formed, such as when a dependency is not satisfied or a cycle is
// detected.
func NewGraph(
tasks []Task,
tasks ...Task,
) (Graph, error) {
taskMap := make(map[string]Task, len(tasks))
taskToDependants := make(map[string]map[string]struct{}, len(tasks))
Expand Down
57 changes: 37 additions & 20 deletions workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestWorkflowNoJobs(t *testing.T) {
graph, err := NewGraph(nil)
graph, err := NewGraph()

if err != nil {
t.Fatal("failed to initialize the graph")
Expand All @@ -21,28 +21,45 @@ func TestWorkflowNoJobs(t *testing.T) {
}

func TestWorkflowSimpleCase(t *testing.T) {

taskOneRan := false
taskTwoRan := false
taskGraph, err := NewGraph([]Task{
NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error {
// Do some useful work here...
taskThreeRan := false
taskOneRetVal := "someReturnValue"
taskGraph, err := NewGraph(
NewTask("taskOne", []string{"taskTwo"}, func(ctx context.Context, res Results) (interface{}, error) {
taskOneRan = true
return nil
return taskOneRetVal, nil
}),
NewTask("someOtherTask", nil, func(ctx context.Context) error {
// Do some useful work here...
NewTask("taskTwo", nil, func(ctx context.Context, res Results) (interface{}, error) {
taskTwoRan = true
return nil
return nil, nil
}),
})
NewTask("taskThree", []string{"taskOne"}, func(ctx context.Context, res Results) (interface{}, error) {
taskThreeRan = true

taskOneRes, ok := res.Load("taskOne")
if !ok {
t.Fatal("failed to stored value in result map")
}

resAsString, ok := taskOneRes.(string)
if !ok {
t.Fatal("failed to cast result to a string")
}

if resAsString != taskOneRetVal {
t.Fatal("incorrect value returned")
}

return nil, nil
}),
)

if err != nil {
t.Fatal("failed to initialize the graph")
}

err = taskGraph.Run(context.Background())

if err != nil {
t.Fatal("failed to run the graph")
}
Expand All @@ -55,25 +72,25 @@ func TestWorkflowSimpleCase(t *testing.T) {
t.Fatal("failed to run taskTwo")
}

if !taskThreeRan {
t.Fatal("failed to run taskThree")
}
}

func TestWorkflowReturnsError(t *testing.T) {

retErr := errors.New("bad error")
taskOneRan := false
taskTwoRan := false
taskGraph, err := NewGraph([]Task{
NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context) error {
// Do some useful work here...
taskGraph, err := NewGraph(
NewTask("taskName", []string{"someOtherTask"}, func(ctx context.Context, res Results) (interface{}, error) {
taskOneRan = true
return nil
return nil, nil
}),
NewTask("someOtherTask", nil, func(ctx context.Context) error {
// Do some useful work here...
NewTask("someOtherTask", nil, func(ctx context.Context, res Results) (interface{}, error) {
taskTwoRan = true
return retErr
return nil, retErr
}),
})
)

if err != nil {
t.Fatal("failed to initialize the graph")
Expand Down

0 comments on commit c1241ec

Please sign in to comment.