From 4b922d1c380e5a10d0d0512adfba56cac8727193 Mon Sep 17 00:00:00 2001 From: Christopher Schleiden Date: Thu, 16 May 2024 21:00:45 -0700 Subject: [PATCH] Pass workflow events as pointers --- backend/backend.go | 2 +- backend/history/grouping.go | 6 +++--- backend/history/grouping_test.go | 2 +- backend/mock_Backend.go | 4 ++-- backend/monoprocess/monoprocess.go | 2 +- backend/mysql/mysql.go | 2 +- backend/options.go | 4 ++++ backend/redis/workflow.go | 2 +- backend/sqlite/sqlite.go | 2 +- backend/test/backendtest.go | 12 ++++++------ internal/command/command.go | 2 +- internal/command/complete_workflow.go | 2 +- internal/command/continueasnew.go | 2 +- internal/command/schedule_subworkflow.go | 4 ++-- tester/tester.go | 4 ++-- workflow/executor/executor.go | 4 ++-- 16 files changed, 30 insertions(+), 26 deletions(-) diff --git a/backend/backend.go b/backend/backend.go index 61d5ba99..9f97f5e0 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -55,7 +55,7 @@ type Backend interface { // completed or other workflow instances. CompleteWorkflowTask( ctx context.Context, task *WorkflowTask, instance *workflow.Instance, state core.WorkflowInstanceState, - executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error + executedEvents, activityEvents, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error // GetActivityTask returns a pending activity task or nil if there are no pending activities GetActivityTask(ctx context.Context) (*ActivityTask, error) diff --git a/backend/history/grouping.go b/backend/history/grouping.go index 71efaaf7..f6d44ed3 100644 --- a/backend/history/grouping.go +++ b/backend/history/grouping.go @@ -2,14 +2,14 @@ package history import "github.com/cschleiden/go-workflows/core" -func EventsByWorkflowInstance(events []WorkflowEvent) map[core.WorkflowInstance][]WorkflowEvent { - groupedEvents := make(map[core.WorkflowInstance][]WorkflowEvent) +func EventsByWorkflowInstance(events []*WorkflowEvent) map[core.WorkflowInstance][]*WorkflowEvent { + groupedEvents := make(map[core.WorkflowInstance][]*WorkflowEvent) for _, m := range events { instance := *m.WorkflowInstance if _, ok := groupedEvents[instance]; !ok { - groupedEvents[instance] = []WorkflowEvent{} + groupedEvents[instance] = []*WorkflowEvent{} } groupedEvents[instance] = append(groupedEvents[instance], m) diff --git a/backend/history/grouping_test.go b/backend/history/grouping_test.go index 08ca437c..c25496b9 100644 --- a/backend/history/grouping_test.go +++ b/backend/history/grouping_test.go @@ -13,7 +13,7 @@ func TestGrouping_MultipleEventsSameInstance(t *testing.T) { id := uuid.NewString() instance := core.NewWorkflowInstance(id, "exid") - r := EventsByWorkflowInstance([]WorkflowEvent{ + r := EventsByWorkflowInstance([]*WorkflowEvent{ { WorkflowInstance: instance, HistoryEvent: NewPendingEvent(time.Now(), EventType_SubWorkflowScheduled, &SubWorkflowScheduledAttributes{}), diff --git a/backend/mock_Backend.go b/backend/mock_Backend.go index 013ce0aa..038d1ff0 100644 --- a/backend/mock_Backend.go +++ b/backend/mock_Backend.go @@ -69,11 +69,11 @@ func (_m *MockBackend) CompleteActivityTask(ctx context.Context, instance *core. } // CompleteWorkflowTask provides a mock function with given fields: ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents -func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, instance *core.WorkflowInstance, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []history.WorkflowEvent) error { +func (_m *MockBackend) CompleteWorkflowTask(ctx context.Context, task *WorkflowTask, instance *core.WorkflowInstance, state core.WorkflowInstanceState, executedEvents []*history.Event, activityEvents []*history.Event, timerEvents []*history.Event, workflowEvents []*history.WorkflowEvent) error { ret := _m.Called(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *WorkflowTask, *core.WorkflowInstance, core.WorkflowInstanceState, []*history.Event, []*history.Event, []*history.Event, []history.WorkflowEvent) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, *WorkflowTask, *core.WorkflowInstance, core.WorkflowInstanceState, []*history.Event, []*history.Event, []*history.Event, []*history.WorkflowEvent) error); ok { r0 = rf(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents) } else { r0 = ret.Error(0) diff --git a/backend/monoprocess/monoprocess.go b/backend/monoprocess/monoprocess.go index ff410d92..6b2406be 100644 --- a/backend/monoprocess/monoprocess.go +++ b/backend/monoprocess/monoprocess.go @@ -85,7 +85,7 @@ func (b *monoprocessBackend) CompleteWorkflowTask( instance *workflow.Instance, state core.WorkflowInstanceState, executedEvents, activityEvents, timerEvents []*history.Event, - workflowEvents []history.WorkflowEvent, + workflowEvents []*history.WorkflowEvent, ) error { if err := b.Backend.CompleteWorkflowTask(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents); err != nil { return err diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index 57e84370..6764ca37 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -535,7 +535,7 @@ func (b *mysqlBackend) CompleteWorkflowTask( instance *workflow.Instance, state core.WorkflowInstanceState, executedEvents, activityEvents, timerEvents []*history.Event, - workflowEvents []history.WorkflowEvent, + workflowEvents []*history.WorkflowEvent, ) error { tx, err := b.db.BeginTx(ctx, &sql.TxOptions{ Isolation: sql.LevelReadCommitted, diff --git a/backend/options.go b/backend/options.go index 70d8a291..164baf68 100644 --- a/backend/options.go +++ b/backend/options.go @@ -37,6 +37,10 @@ type Options struct { // ActivityLockTimeout determines how long an activity task can be locked for. If the activity task is not completed // by that timeframe, it's considered abandoned and another worker might pick it up ActivityLockTimeout time.Duration + + WorkflowNamespace string + + ActivityNamespace string } var DefaultOptions Options = Options{ diff --git a/backend/redis/workflow.go b/backend/redis/workflow.go index 256395c8..4087637d 100644 --- a/backend/redis/workflow.go +++ b/backend/redis/workflow.go @@ -97,7 +97,7 @@ func (rb *redisBackend) CompleteWorkflowTask( instance *core.WorkflowInstance, state core.WorkflowInstanceState, executedEvents, activityEvents, timerEvents []*history.Event, - workflowEvents []history.WorkflowEvent, + workflowEvents []*history.WorkflowEvent, ) error { keys := make([]string, 0) args := make([]interface{}, 0) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 35c06174..144f4d05 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -459,7 +459,7 @@ func (sb *sqliteBackend) CompleteWorkflowTask( instance *workflow.Instance, state core.WorkflowInstanceState, executedEvents, activityEvents, timerEvents []*history.Event, - workflowEvents []history.WorkflowEvent, + workflowEvents []*history.WorkflowEvent, ) error { tx, err := sb.db.BeginTx(ctx, nil) if err != nil { diff --git a/backend/test/backendtest.go b/backend/test/backendtest.go index c4d84cdb..8d8c4eeb 100644 --- a/backend/test/backendtest.go +++ b/backend/test/backendtest.go @@ -174,11 +174,11 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test require.NotNil(t, tk) // Complete workflow task - err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{}) + err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{}) require.NoError(t, err) // Task is already completed, this should error - err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{}) + err = b.CompleteWorkflowTask(ctx, tk, wfi, core.WorkflowInstanceStateActive, tk.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{}) require.Error(t, err) }, }, @@ -212,7 +212,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test activityScheduledEvent, } - workflowEvents := []history.WorkflowEvent{} + workflowEvents := []*history.WorkflowEvent{} err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateActive, events, activityEvents, []*history.Event{}, workflowEvents) require.NoError(t, err) @@ -257,7 +257,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test events[i].SequenceID = sequenceID } - err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateFinished, events, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{}) + err = b.CompleteWorkflowTask(ctx, task, wfi, core.WorkflowInstanceStateFinished, events, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{}) require.NoError(t, err) time.Sleep(time.Second) @@ -285,7 +285,7 @@ func BackendTest(t *testing.T, setup func(options ...backend.BackendOption) Test // Simulate context and sub-workflow cancellation task, err := b.GetWorkflowTask(ctx) require.NoError(t, err) - err = b.CompleteWorkflowTask(ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{ + err = b.CompleteWorkflowTask(ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{ { WorkflowInstance: subInstance1, HistoryEvent: history.NewHistoryEvent(1, time.Now(), history.EventType_WorkflowExecutionCanceled, &history.SubWorkflowCancellationRequestedAttributes{ @@ -374,6 +374,6 @@ func startWorkflow(t *testing.T, ctx context.Context, b backend.Backend, c *clie require.NoError(t, err) err = b.CompleteWorkflowTask( - ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []history.WorkflowEvent{}) + ctx, task, instance, core.WorkflowInstanceStateActive, task.NewEvents, []*history.Event{}, []*history.Event{}, []*history.WorkflowEvent{}) require.NoError(t, err) } diff --git a/internal/command/command.go b/internal/command/command.go index aaebf962..dd886f45 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -34,7 +34,7 @@ type CommandResult struct { Events []*history.Event ActivityEvents []*history.Event TimerEvents []*history.Event - WorkflowEvents []history.WorkflowEvent + WorkflowEvents []*history.WorkflowEvent } type command struct { diff --git a/internal/command/complete_workflow.go b/internal/command/complete_workflow.go index f5be2e68..bd875963 100644 --- a/internal/command/complete_workflow.go +++ b/internal/command/complete_workflow.go @@ -88,7 +88,7 @@ func (c *CompleteWorkflowCommand) Execute(clock clock.Clock) *CommandResult { ) } - r.WorkflowEvents = []history.WorkflowEvent{ + r.WorkflowEvents = []*history.WorkflowEvent{ { WorkflowInstance: c.Instance.Parent, HistoryEvent: historyEvent, diff --git a/internal/command/continueasnew.go b/internal/command/continueasnew.go index 30ca7ae5..d5aaa918 100644 --- a/internal/command/continueasnew.go +++ b/internal/command/continueasnew.go @@ -65,7 +65,7 @@ func (c *ContinueAsNewCommand) Execute(clock clock.Clock) *CommandResult { }, ), }, - WorkflowEvents: []history.WorkflowEvent{ + WorkflowEvents: []*history.WorkflowEvent{ // Schedule a new workflow execution { WorkflowInstance: continuedInstance, diff --git a/internal/command/schedule_subworkflow.go b/internal/command/schedule_subworkflow.go index 7be2d512..2f0db055 100644 --- a/internal/command/schedule_subworkflow.go +++ b/internal/command/schedule_subworkflow.go @@ -65,7 +65,7 @@ func (c *ScheduleSubWorkflowCommand) Execute(clock clock.Clock) *CommandResult { ), }, // Send event to new workflow instance - WorkflowEvents: []history.WorkflowEvent{ + WorkflowEvents: []*history.WorkflowEvent{ { WorkflowInstance: c.Instance, HistoryEvent: history.NewPendingEvent( @@ -98,7 +98,7 @@ func (c *ScheduleSubWorkflowCommand) Execute(clock clock.Clock) *CommandResult { }, // Send cancellation event to sub-workflow - WorkflowEvents: []history.WorkflowEvent{ + WorkflowEvents: []*history.WorkflowEvent{ { WorkflowInstance: c.Instance, HistoryEvent: history.NewWorkflowCancellationEvent(clock.Now()), diff --git a/tester/tester.go b/tester/tester.go index c213a5ab..00de2696 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -762,7 +762,7 @@ func (wt *workflowTester[TResult]) addWorkflow(instance *core.WorkflowInstance, return tw } -func (wt *workflowTester[TResult]) scheduleSubWorkflow(event history.WorkflowEvent) { +func (wt *workflowTester[TResult]) scheduleSubWorkflow(event *history.WorkflowEvent) { a := event.HistoryEvent.Attributes.(*history.ExecutionStartedAttributes) // TODO: Right location to call handler? @@ -829,7 +829,7 @@ func (wt *workflowTester[TResult]) scheduleSubWorkflow(event history.WorkflowEve 0, event.WorkflowInstance, workflowResult, workflowerrors.FromError(workflowRawErr), ).Execute(wt.clock) - return &r.WorkflowEvents[0] + return r.WorkflowEvents[0] } } diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 026febae..96ee7cd5 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -43,7 +43,7 @@ type ExecutionResult struct { TimerEvents []*history.Event // Events for other workflow instances - WorkflowEvents []history.WorkflowEvent + WorkflowEvents []*history.WorkflowEvent } type WorkflowHistoryProvider interface { @@ -205,7 +205,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *backend.WorkflowTask) (*E newCommandEvents := make([]*history.Event, 0) activityEvents := make([]*history.Event, 0) timerEvents := make([]*history.Event, 0) - workflowEvents := make([]history.WorkflowEvent, 0) + workflowEvents := make([]*history.WorkflowEvent, 0) for _, c := range e.workflowState.Commands() { if c.State() == command.CommandState_Done {