diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 5a1f2eb01f9..b4488932acf 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -387,6 +387,10 @@ func (a *actorsRuntime) haltActor(actorType, actorID string) error { key := constructCompositeKey(actorType, actorID) log.Debugf("Halting actor '%s'", key) + // Optimistically remove the actor from the internal actors table. No need to + // check whether it actually exists. + a.internalActors.Del(key) + // Remove the actor from the table // This will forbit more state changes actAny, ok := a.actorsTable.LoadAndDelete(key) @@ -1070,6 +1074,15 @@ func (a *actorsRuntime) executeReminder(reminder *internal.Reminder) bool { if errors.Is(err, ErrReminderCanceled) { // The handler is explicitly canceling the timer log.Debug("Reminder " + reminder.ActorKey() + " was canceled by the actor") + + a.lock.Lock() + key := constructCompositeKey(reminder.ActorType, reminder.ActorID) + if act, ok := a.internalActors.Get(key); ok && act.Completed() { + a.internalActors.Del(key) + a.actorsTable.Delete(key) + } + a.lock.Unlock() + return false } log.Errorf("Error invoking reminder on actor %s: %s", reminder.ActorKey(), err) @@ -1184,6 +1197,13 @@ func (a *actorsRuntime) ExecuteLocalOrRemoteActorReminder(ctx context.Context, r // If the reminder was cancelled, delete it. if errors.Is(err, ErrReminderCanceled) { + a.lock.Lock() + key := constructCompositeKey(reminder.ActorType, reminder.ActorID) + if act, ok := a.internalActors.Get(key); ok && act.Completed() { + a.internalActors.Del(key) + a.actorsTable.Delete(key) + } + a.lock.Unlock() go func() { log.Debugf("Deleting reminder which was cancelled: %s", reminder.Key()) reqCtx, cancel := context.WithTimeout(context.Background(), time.Second*15) @@ -1196,7 +1216,7 @@ func (a *actorsRuntime) ExecuteLocalOrRemoteActorReminder(ctx context.Context, r log.Errorf("Error deleting reminder %s: %s", reminder.Key(), derr) } a.lock.Lock() - delete(a.internalReminderInProgress, constructCompositeKey(reminder.ActorType, reminder.ActorID, reminder.Name)) + delete(a.internalReminderInProgress, reminder.Key()) a.lock.Unlock() }() return ErrReminderCanceled diff --git a/pkg/actors/internal_actor.go b/pkg/actors/internal_actor.go index b72a9f5b92f..eb3adb358e1 100644 --- a/pkg/actors/internal_actor.go +++ b/pkg/actors/internal_actor.go @@ -36,6 +36,7 @@ type InternalActor interface { DeactivateActor(ctx context.Context) error InvokeReminder(ctx context.Context, reminder InternalActorReminder, metadata map[string][]string) error InvokeTimer(ctx context.Context, timer InternalActorReminder, metadata map[string][]string) error + Completed() bool } type InternalActorReminder struct { diff --git a/pkg/actors/internal_actor_test.go b/pkg/actors/internal_actor_test.go index b472709ca62..ad7cd606373 100644 --- a/pkg/actors/internal_actor_test.go +++ b/pkg/actors/internal_actor_test.go @@ -83,6 +83,10 @@ func (*mockInternalActor) InvokeTimer(ctx context.Context, timer InternalActorRe panic("unimplemented") } +func (ia *mockInternalActor) Completed() bool { + panic("unimplemented") +} + // newTestActorsRuntimeWithInternalActors creates and initializes an actors runtime with a specified set of internal actors func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalActorFactory) (*actorsRuntime, error) { spec := config.TracingSpec{SamplingRate: "1"} diff --git a/pkg/runtime/wfengine/backends/actors/activity_actor.go b/pkg/runtime/wfengine/backends/actors/activity_actor.go index 5ab2146e39a..fe33d1bd3cc 100644 --- a/pkg/runtime/wfengine/backends/actors/activity_actor.go +++ b/pkg/runtime/wfengine/backends/actors/activity_actor.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "strings" + "sync/atomic" "time" "github.com/microsoft/durabletask-go/api" @@ -46,6 +47,7 @@ type activityActor struct { defaultTimeout time.Duration reminderInterval time.Duration config actorsBackendConfig + completed atomic.Bool } // ActivityRequest represents a request by a worklow to invoke an activity. @@ -138,7 +140,10 @@ func (a *activityActor) InvokeReminder(ctx context.Context, reminder actors.Inte timeoutCtx, cancelTimeout := context.WithTimeout(ctx, a.defaultTimeout) defer cancelTimeout() - err := a.executeActivity(timeoutCtx, reminder.Name, state.EventPayload) + completed, err := a.executeActivity(timeoutCtx, reminder.Name, state.EventPayload) + if completed == runCompletedTrue { + a.completed.Store(true) + } var recoverableErr *recoverableError // Returning nil signals that we want the execution to be retried in the next period interval @@ -162,21 +167,25 @@ func (a *activityActor) InvokeReminder(ctx context.Context, reminder actors.Inte } } -func (a *activityActor) executeActivity(ctx context.Context, name string, eventPayload []byte) error { +func (a *activityActor) Completed() bool { + return a.completed.Load() +} + +func (a *activityActor) executeActivity(ctx context.Context, name string, eventPayload []byte) (runCompleted, error) { taskEvent, err := backend.UnmarshalHistoryEvent(eventPayload) if err != nil { - return err + return runCompletedTrue, err } activityName := "" if ts := taskEvent.GetTaskScheduled(); ts != nil { activityName = ts.GetName() } else { - return fmt.Errorf("invalid activity task event: '%s'", taskEvent.String()) + return runCompletedTrue, fmt.Errorf("invalid activity task event: '%s'", taskEvent.String()) } endIndex := strings.Index(a.actorID, "::") if endIndex < 0 { - return fmt.Errorf("invalid activity actor ID: '%s'", a.actorID) + return runCompletedTrue, fmt.Errorf("invalid activity actor ID: '%s'", a.actorID) } workflowID := a.actorID[0:endIndex] @@ -197,9 +206,9 @@ func (a *activityActor) executeActivity(ctx context.Context, name string, eventP wfLogger.Debugf("Activity actor '%s': scheduling activity '%s' for workflow with instanceId '%s'", a.actorID, name, wi.InstanceID) err = a.scheduler(ctx, wi) if errors.Is(err, context.DeadlineExceeded) { - return newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err)) } else if err != nil { - return newRecoverableError(fmt.Errorf("failed to schedule an activity execution: %w", err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("failed to schedule an activity execution: %w", err)) } // Activity execution started start := time.Now() @@ -222,7 +231,7 @@ loop: // Activity execution failed with recoverable error elapsed = diag.ElapsedSince(start) executionStatus = diag.StatusRecoverable - return ctx.Err() // will be retried + return runCompletedFalse, ctx.Err() // will be retried case <-t.C: if deadline, ok := ctx.Deadline(); ok { wfLogger.Warnf("Activity actor '%s': '%s' is still running - will keep waiting until '%v'", a.actorID, name, deadline) @@ -240,7 +249,7 @@ loop: } else { // Activity execution failed with recoverable error executionStatus = diag.StatusRecoverable - return newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called + return runCompletedFalse, newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called } } } @@ -251,7 +260,7 @@ loop: if err != nil { // Returning non-recoverable error executionStatus = diag.StatusFailed - return err + return runCompletedTrue, err } req := internalsv1pb. NewInternalInvokeRequest(AddWorkflowEventMethod). @@ -264,7 +273,7 @@ loop: case err != nil: // Returning recoverable error, record metrics executionStatus = diag.StatusRecoverable - return newRecoverableError(fmt.Errorf("failed to invoke '%s' method on workflow actor: %w", AddWorkflowEventMethod, err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("failed to invoke '%s' method on workflow actor: %w", AddWorkflowEventMethod, err)) case wi.Result.GetTaskCompleted() != nil: // Activity execution completed successfully executionStatus = diag.StatusSuccess @@ -272,7 +281,7 @@ loop: // Activity execution failed executionStatus = diag.StatusFailed } - return nil + return runCompletedTrue, nil } // InvokeTimer implements actors.InternalActor diff --git a/pkg/runtime/wfengine/backends/actors/backend.go b/pkg/runtime/wfengine/backends/actors/backend.go index 75223481a44..4953d8759b2 100644 --- a/pkg/runtime/wfengine/backends/actors/backend.go +++ b/pkg/runtime/wfengine/backends/actors/backend.go @@ -47,6 +47,13 @@ const ( ActivityNameLabelKey = "activity" ) +type runCompleted bool + +const ( + runCompletedFalse runCompleted = false + runCompletedTrue runCompleted = true +) + // actorsBackendConfig is the configuration for the workflow engine's actors backend type actorsBackendConfig struct { AppID string diff --git a/pkg/runtime/wfengine/backends/actors/workflow_actor.go b/pkg/runtime/wfengine/backends/actors/workflow_actor.go index 8780826f1ff..aac295940f9 100644 --- a/pkg/runtime/wfengine/backends/actors/workflow_actor.go +++ b/pkg/runtime/wfengine/backends/actors/workflow_actor.go @@ -57,6 +57,7 @@ type workflowActor struct { reminderInterval time.Duration config actorsBackendConfig activityResultAwaited atomic.Bool + completed atomic.Bool } type durableTimer struct { @@ -154,7 +155,11 @@ func (wf *workflowActor) InvokeReminder(ctx context.Context, reminder actors.Int // Workflow executions should never take longer than a few seconds at the most timeoutCtx, cancelTimeout := context.WithTimeout(ctx, wf.defaultTimeout) defer cancelTimeout() - err := wf.runWorkflow(timeoutCtx, reminder) + completed, err := wf.runWorkflow(timeoutCtx, reminder) + + if completed == runCompletedTrue { + wf.completed.Store(true) + } // We delete the reminder on success and on non-recoverable errors. // Returning nil signals that we want the execution to be retried in the next period interval @@ -265,6 +270,10 @@ func isStatusMatch(statuses []api.OrchestrationStatus, runtimeStatus api.Orchest return false } +func (wf *workflowActor) Completed() bool { + return wf.completed.Load() +} + func (wf *workflowActor) createIfCompleted(ctx context.Context, runtimeState *backend.OrchestrationRuntimeState, state *workflowState, startEvent *backend.HistoryEvent) error { // We block (re)creation of existing workflows unless they are in a completed state // Or if they still have any pending activity result awaited. @@ -372,6 +381,7 @@ func (wf *workflowActor) purgeWorkflowState(ctx context.Context) error { return api.ErrInstanceNotFound } runtimeState := getRuntimeState(wf.actorID, state) + wf.completed.Store(true) return wf.cleanupWorkflowStateInternal(ctx, state, !runtimeState.IsCompleted()) } @@ -414,30 +424,30 @@ func (wf *workflowActor) getWorkflowName(oldEvents, newEvents []*backend.History return "" } -func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.InternalActorReminder) error { +func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.InternalActorReminder) (runCompleted, error) { state, err := wf.loadInternalState(ctx) if err != nil { - return fmt.Errorf("error loading internal state: %w", err) + return runCompletedTrue, fmt.Errorf("error loading internal state: %w", err) } if state == nil { // The assumption is that someone manually deleted the workflow state. This is non-recoverable. - return errors.New("no workflow state found") + return runCompletedTrue, errors.New("no workflow state found") } if strings.HasPrefix(reminder.Name, "timer-") { var timerData durableTimer if err = reminder.DecodeData(&timerData); err != nil { // Likely the result of an incompatible durable task timer format change. This is non-recoverable. - return err + return runCompletedTrue, err } if timerData.Generation < state.Generation { wfLogger.Infof("Workflow actor '%s': ignoring durable timer from previous generation '%v'", wf.actorID, timerData.Generation) - return nil + return runCompletedFalse, nil } else { e, eventErr := backend.UnmarshalHistoryEvent(timerData.Bytes) if eventErr != nil { // Likely the result of an incompatible durable task timer format change. This is non-recoverable. - return fmt.Errorf("failed to unmarshal timer data %w", eventErr) + return runCompletedTrue, fmt.Errorf("failed to unmarshal timer data %w", eventErr) } state.Inbox = append(state.Inbox, e) } @@ -447,7 +457,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern // This can happen after multiple events are processed in batches; there may still be reminders around // for some of those already processed events. wfLogger.Debugf("Workflow actor '%s': ignoring run request for reminder '%s' because the workflow inbox is empty", wf.actorID, reminder.Name) - return nil + return runCompletedFalse, nil } // The logic/for loop below purges/removes any leftover state from a completed or failed activity @@ -487,7 +497,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern Operations: operations, }) if err != nil { - return fmt.Errorf("failed to delete activity state for activity actor '%s' with error: %w", activityActorID, err) + return runCompletedFalse, fmt.Errorf("failed to delete activity state for activity actor '%s' with error: %w", activityActorID, err) } } @@ -518,9 +528,9 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern err = wf.scheduler(ctx, wi) if err != nil { if errors.Is(err, context.DeadlineExceeded) { - return newRecoverableError(fmt.Errorf("timed-out trying to schedule a workflow execution - this can happen if there are too many in-flight workflows or if the workflow engine isn't running: %w", err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("timed-out trying to schedule a workflow execution - this can happen if there are too many in-flight workflows or if the workflow engine isn't running: %w", err)) } - return newRecoverableError(fmt.Errorf("failed to schedule a workflow execution: %w", err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("failed to schedule a workflow execution: %w", err)) } wf.recordWorkflowSchedulingLatency(ctx, esHistoryEvent, workflowName) @@ -537,12 +547,12 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern case <-ctx.Done(): // caller is responsible for timeout management // Workflow execution failed with recoverable error executionStatus = diag.StatusRecoverable - return ctx.Err() + return runCompletedFalse, ctx.Err() case completed := <-callback: if !completed { // Workflow execution failed with recoverable error executionStatus = diag.StatusRecoverable - return newRecoverableError(errExecutionAborted) + return runCompletedFalse, newRecoverableError(errExecutionAborted) } } wfLogger.Debugf("Workflow actor '%s': workflow execution returned with status '%s' instanceId '%s'", wf.actorID, runtimeState.RuntimeStatus().String(), wi.InstanceID) @@ -559,11 +569,11 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern for _, t := range runtimeState.PendingTimers() { tf := t.GetTimerFired() if tf == nil { - return errors.New("invalid event in the PendingTimers list") + return runCompletedTrue, errors.New("invalid event in the PendingTimers list") } timerBytes, errMarshal := backend.MarshalHistoryEvent(t) if errMarshal != nil { - return fmt.Errorf("failed to marshal pending timer data: %w", errMarshal) + return runCompletedTrue, fmt.Errorf("failed to marshal pending timer data: %w", errMarshal) } delay := time.Until(tf.GetFireAt().AsTime()) if delay < 0 { @@ -574,7 +584,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern wfLogger.Debugf("Workflow actor '%s': creating reminder '%s' for the durable timer", wf.actorID, reminderPrefix) if _, err = wf.createReliableReminder(ctx, reminderPrefix, data, delay); err != nil { executionStatus = diag.StatusRecoverable - return newRecoverableError(fmt.Errorf("actor '%s' failed to create reminder for timer: %w", wf.actorID, err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("actor '%s' failed to create reminder for timer: %w", wf.actorID, err)) } } } @@ -602,13 +612,13 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern eventData, errMarshal := backend.MarshalHistoryEvent(e) if errMarshal != nil { - return errMarshal + return runCompletedTrue, errMarshal } activityRequestBytes, errInternal := actors.EncodeInternalActorData(ActivityRequest{ HistoryEvent: eventData, }) if errInternal != nil { - return errInternal + return runCompletedTrue, errInternal } targetActorID := getActivityActorID(wf.actorID, e.GetEventId(), state.Generation) @@ -628,7 +638,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern continue } else if err != nil { executionStatus = diag.StatusRecoverable - return newRecoverableError(fmt.Errorf("failed to invoke activity actor '%s' to execute '%s': %w", targetActorID, ts.GetName(), err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("failed to invoke activity actor '%s' to execute '%s': %w", targetActorID, ts.GetName(), err)) } } @@ -637,7 +647,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern for _, msg := range msgList { eventData, errMarshal := backend.MarshalHistoryEvent(msg.HistoryEvent) if errMarshal != nil { - return errMarshal + return runCompletedTrue, errMarshal } requestBytes := eventData @@ -647,7 +657,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern StartEventBytes: eventData, }) if err != nil { - return fmt.Errorf("failed to marshal createWorkflowInstanceRequest: %w", err) + return runCompletedTrue, fmt.Errorf("failed to marshal createWorkflowInstanceRequest: %w", err) } } @@ -662,7 +672,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern if err != nil { executionStatus = diag.StatusRecoverable // workflow-related actor methods are never expected to return errors - return newRecoverableError(fmt.Errorf("method %s on actor '%s' returned an error: %w", method, msg.TargetInstanceID, err)) + return runCompletedFalse, newRecoverableError(fmt.Errorf("method %s on actor '%s' returned an error: %w", method, msg.TargetInstanceID, err)) } } } @@ -672,7 +682,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern err = wf.saveInternalState(ctx, state) if err != nil { - return err + return runCompletedTrue, err } if executionStatus != "" { // If workflow is not completed, set executionStatus to empty string @@ -690,8 +700,9 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, reminder actors.Intern } if runtimeState.IsCompleted() { wfLogger.Infof("Workflow Actor '%s': workflow completed with status '%s' workflowName '%s'", wf.actorID, runtimeState.RuntimeStatus().String(), workflowName) + return runCompletedTrue, nil } - return nil + return runCompletedFalse, nil } func (*workflowActor) calculateWorkflowExecutionLatency(state *workflowState) (wfExecutionElapsedTime float64) { diff --git a/tests/integration/framework/iowriter/logger/logger.go b/tests/integration/framework/iowriter/logger/logger.go new file mode 100644 index 00000000000..a5abbac882b --- /dev/null +++ b/tests/integration/framework/iowriter/logger/logger.go @@ -0,0 +1,28 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logger + +import ( + "testing" + + "github.com/dapr/dapr/tests/integration/framework/iowriter" + "github.com/dapr/kit/logger" +) + +func New(t *testing.T) logger.Logger { + log := logger.NewLogger(t.Name()) + log.SetOutput(iowriter.New(t, t.Name())) + log.SetOutputLevel(logger.DebugLevel) + return log +} diff --git a/tests/integration/framework/process/daprd/daprd.go b/tests/integration/framework/process/daprd/daprd.go index ff1413e3f35..b3cf7ea810d 100644 --- a/tests/integration/framework/process/daprd/daprd.go +++ b/tests/integration/framework/process/daprd/daprd.go @@ -193,7 +193,7 @@ func (d *Daprd) WaitUntilTCPReady(t *testing.T, ctx context.Context) { } net.Close() return true - }, 10*time.Second, 10*time.Millisecond) + }, 15*time.Second, 10*time.Millisecond) } func (d *Daprd) WaitUntilRunning(t *testing.T, ctx context.Context) { diff --git a/tests/integration/framework/process/daprd/options.go b/tests/integration/framework/process/daprd/options.go index 190db17d0b0..e75111bc918 100644 --- a/tests/integration/framework/process/daprd/options.go +++ b/tests/integration/framework/process/daprd/options.go @@ -24,6 +24,7 @@ import ( "github.com/dapr/dapr/tests/integration/framework/process/exec" "github.com/dapr/dapr/tests/integration/framework/process/logline" + "github.com/dapr/dapr/tests/integration/framework/process/scheduler" "github.com/dapr/dapr/tests/integration/framework/process/sentry" "github.com/dapr/dapr/tests/integration/framework/socket" ) @@ -311,3 +312,9 @@ func WithSentry(t *testing.T, sentry *sentry.Sentry) Option { WithEnableMTLS(true)(o) } } + +func WithScheduler(scheduler *scheduler.Scheduler) Option { + return func(o *options) { + WithSchedulerAddresses(scheduler.Address())(o) + } +} diff --git a/tests/integration/framework/process/scheduler/scheduler.go b/tests/integration/framework/process/scheduler/scheduler.go index dde6d647b13..5ac08538fea 100644 --- a/tests/integration/framework/process/scheduler/scheduler.go +++ b/tests/integration/framework/process/scheduler/scheduler.go @@ -180,7 +180,7 @@ func (s *Scheduler) WaitUntilRunning(t *testing.T, ctx context.Context) { } defer resp.Body.Close() return http.StatusOK == resp.StatusCode - }, time.Second*5, 10*time.Millisecond) + }, time.Second*15, 10*time.Millisecond) } func (s *Scheduler) ID() string { diff --git a/tests/integration/framework/process/workflow/options.go b/tests/integration/framework/process/workflow/options.go new file mode 100644 index 00000000000..cd251d66354 --- /dev/null +++ b/tests/integration/framework/process/workflow/options.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "testing" + + "github.com/microsoft/durabletask-go/task" + "github.com/stretchr/testify/require" +) + +type Option func(*options) + +type options struct { + registry *task.TaskRegistry + + enableScheduler bool +} + +func WithScheduler(enable bool) Option { + return func(o *options) { + o.enableScheduler = enable + } +} + +func WithAddOrchestratorN(t *testing.T, name string, or func(*task.OrchestrationContext) (any, error)) Option { + t.Helper() + + return func(o *options) { + require.NoError(t, o.registry.AddOrchestratorN(name, or)) + } +} + +func WithAddActivityN(t *testing.T, name string, a func(task.ActivityContext) (any, error)) Option { + t.Helper() + + return func(o *options) { + require.NoError(t, o.registry.AddActivityN(name, a)) + } +} diff --git a/tests/integration/framework/process/workflow/workflow.go b/tests/integration/framework/process/workflow/workflow.go new file mode 100644 index 00000000000..65e1e690929 --- /dev/null +++ b/tests/integration/framework/process/workflow/workflow.go @@ -0,0 +1,140 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "runtime" + "testing" + + "github.com/microsoft/durabletask-go/client" + "github.com/microsoft/durabletask-go/task" + "github.com/stretchr/testify/require" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework/iowriter/logger" + "github.com/dapr/dapr/tests/integration/framework/process/daprd" + "github.com/dapr/dapr/tests/integration/framework/process/http/app" + "github.com/dapr/dapr/tests/integration/framework/process/placement" + "github.com/dapr/dapr/tests/integration/framework/process/scheduler" + "github.com/dapr/dapr/tests/integration/framework/process/sqlite" +) + +type Workflow struct { + registry *task.TaskRegistry + app *app.App + db *sqlite.SQLite + place *placement.Placement + sched *scheduler.Scheduler + daprd *daprd.Daprd +} + +func New(t *testing.T, fopts ...Option) *Workflow { + t.Helper() + + if runtime.GOOS == "windows" { + t.Skip("Skipping test on Windows due to SQLite limitations") + } + + opts := options{ + registry: task.NewTaskRegistry(), + } + for _, fopt := range fopts { + fopt(&opts) + } + + app := app.New(t) + db := sqlite.New(t, + sqlite.WithActorStateStore(true), + sqlite.WithCreateStateTables(), + ) + place := placement.New(t) + + dopts := []daprd.Option{ + daprd.WithAppPort(app.Port()), + daprd.WithPlacementAddresses(place.Address()), + daprd.WithResourceFiles(db.GetComponent(t)), + } + + var sched *scheduler.Scheduler + if opts.enableScheduler { + sched = scheduler.New(t) + dopts = append(dopts, + daprd.WithScheduler(sched), + daprd.WithConfigManifests(t, ` +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: appconfig +spec: + features: + - name: SchedulerReminders + enabled: true +`)) + } + + return &Workflow{ + registry: opts.registry, + app: app, + db: db, + place: place, + sched: sched, + daprd: daprd.New(t, dopts...), + } +} + +func (w *Workflow) Run(t *testing.T, ctx context.Context) { + w.app.Run(t, ctx) + w.db.Run(t, ctx) + w.place.Run(t, ctx) + if w.sched != nil { + w.sched.Run(t, ctx) + } + w.daprd.Run(t, ctx) +} + +func (w *Workflow) Cleanup(t *testing.T) { + w.daprd.Cleanup(t) + if w.sched != nil { + w.sched.Cleanup(t) + } + w.place.Cleanup(t) + w.db.Cleanup(t) + w.app.Cleanup(t) +} + +func (w *Workflow) WaitUntilRunning(t *testing.T, ctx context.Context) { + w.place.WaitUntilRunning(t, ctx) + if w.sched != nil { + w.sched.WaitUntilRunning(t, ctx) + } + w.daprd.WaitUntilRunning(t, ctx) +} + +func (w *Workflow) BackendClient(t *testing.T, ctx context.Context) *client.TaskHubGrpcClient { + t.Helper() + backendClient := client.NewTaskHubGrpcClient(w.daprd.GRPCConn(t, ctx), logger.New(t)) + require.NoError(t, backendClient.StartWorkItemListener(ctx, w.registry)) + return backendClient +} + +func (w *Workflow) GRPCClient(t *testing.T, ctx context.Context) rtv1.DaprClient { + t.Helper() + return w.daprd.GRPCClient(t, ctx) +} + +func (w *Workflow) Metrics(t *testing.T, ctx context.Context) map[string]float64 { + t.Helper() + return w.daprd.Metrics(t, ctx) +} diff --git a/tests/integration/suite/actors/reminders/scheduler/basic.go b/tests/integration/suite/actors/reminders/scheduler/basic.go index dd0361fcaa0..b14d1caa234 100644 --- a/tests/integration/suite/actors/reminders/scheduler/basic.go +++ b/tests/integration/suite/actors/reminders/scheduler/basic.go @@ -135,5 +135,5 @@ func (b *basic) Run(t *testing.T, ctx context.Context) { assert.Eventually(t, func() bool { return b.methodcalled.Load() == 2 - }, time.Second*3, time.Millisecond*10) + }, time.Second*10, time.Millisecond*10) } diff --git a/tests/integration/suite/daprd/workflow/memory/scheduler.go b/tests/integration/suite/daprd/workflow/memory/scheduler.go new file mode 100644 index 00000000000..ebfc53d6f8c --- /dev/null +++ b/tests/integration/suite/daprd/workflow/memory/scheduler.go @@ -0,0 +1,86 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://wwb.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memory + +import ( + "bytes" + "context" + "testing" + + "github.com/microsoft/durabletask-go/api" + "github.com/microsoft/durabletask-go/task" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/workflow" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(scheduler)) +} + +type scheduler struct { + workflow *workflow.Workflow +} + +func (s *scheduler) Setup(t *testing.T) []framework.Option { + // 2MB payload. Enough memory to be larger than the background variant memory + // so we can measure (actor) workflow history memory does not leak. + input := bytes.Repeat([]byte("0"), 2*1024*1024) + + s.workflow = workflow.New(t, + workflow.WithScheduler(true), + workflow.WithAddOrchestratorN(t, "foo", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, ctx.CallActivity("bar", task.WithActivityInput(input)).Await(new([]byte))) + return "", nil + }), + workflow.WithAddActivityN(t, "bar", func(ctx task.ActivityContext) (any, error) { return "", nil }), + ) + + return []framework.Option{ + framework.WithProcesses(s.workflow), + } +} + +func (s *scheduler) Run(t *testing.T, ctx context.Context) { + s.workflow.WaitUntilRunning(t, ctx) + client := s.workflow.BackendClient(t, ctx) + gclient := s.workflow.GRPCClient(t, ctx) + + var actorMemBaseline float64 + + for i := 0; i < 10; i++ { + resp, err := gclient.StartWorkflowBeta1(ctx, &rtv1.StartWorkflowRequest{ + WorkflowComponent: "dapr", + WorkflowName: "foo", + }) + require.NoError(t, err) + _, err = client.WaitForOrchestrationCompletion(ctx, api.InstanceID(resp.GetInstanceId())) + require.NoError(t, err) + + if i == 0 { + actorMemBaseline = s.workflow.Metrics(t, ctx)["process_resident_memory_bytes"] * 1e-6 + } + } + + assert.InDelta(t, + s.workflow.Metrics(t, ctx)["process_resident_memory_bytes"]*1e-6, + actorMemBaseline, + 35, + "workflow memory leak", + ) +} diff --git a/tests/integration/suite/daprd/workflow/memory/state.go b/tests/integration/suite/daprd/workflow/memory/state.go new file mode 100644 index 00000000000..9ddc27eada4 --- /dev/null +++ b/tests/integration/suite/daprd/workflow/memory/state.go @@ -0,0 +1,85 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://wwb.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package memory + +import ( + "bytes" + "context" + "testing" + + "github.com/microsoft/durabletask-go/api" + "github.com/microsoft/durabletask-go/task" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/dapr/tests/integration/framework" + "github.com/dapr/dapr/tests/integration/framework/process/workflow" + "github.com/dapr/dapr/tests/integration/suite" +) + +func init() { + suite.Register(new(state)) +} + +type state struct { + workflow *workflow.Workflow +} + +func (s *state) Setup(t *testing.T) []framework.Option { + // 2MB payload. Enough memory to be larger than the background variant memory + // so we can measure (actor) workflow history memory does not leak. + input := bytes.Repeat([]byte("0"), 2*1024*1024) + + s.workflow = workflow.New(t, + workflow.WithAddOrchestratorN(t, "foo", func(ctx *task.OrchestrationContext) (any, error) { + require.NoError(t, ctx.CallActivity("bar", task.WithActivityInput(input)).Await(new([]byte))) + return "", nil + }), + workflow.WithAddActivityN(t, "bar", func(ctx task.ActivityContext) (any, error) { return "", nil }), + ) + + return []framework.Option{ + framework.WithProcesses(s.workflow), + } +} + +func (s *state) Run(t *testing.T, ctx context.Context) { + s.workflow.WaitUntilRunning(t, ctx) + client := s.workflow.BackendClient(t, ctx) + gclient := s.workflow.GRPCClient(t, ctx) + + var actorMemBaseline float64 + + for i := 0; i < 10; i++ { + resp, err := gclient.StartWorkflowBeta1(ctx, &rtv1.StartWorkflowRequest{ + WorkflowComponent: "dapr", + WorkflowName: "foo", + }) + require.NoError(t, err) + _, err = client.WaitForOrchestrationCompletion(ctx, api.InstanceID(resp.GetInstanceId())) + require.NoError(t, err) + + if i == 0 { + actorMemBaseline = s.workflow.Metrics(t, ctx)["process_resident_memory_bytes"] * 1e-6 + } + } + + assert.InDelta(t, + s.workflow.Metrics(t, ctx)["process_resident_memory_bytes"]*1e-6, + actorMemBaseline, + 35, + "workflow memory leak", + ) +} diff --git a/tests/integration/suite/daprd/workflow/workflow.go b/tests/integration/suite/daprd/workflow/workflow.go index db80294e1c7..afe76d460f3 100644 --- a/tests/integration/suite/daprd/workflow/workflow.go +++ b/tests/integration/suite/daprd/workflow/workflow.go @@ -15,5 +15,6 @@ package workflow import ( _ "github.com/dapr/dapr/tests/integration/suite/daprd/workflow/backend" + _ "github.com/dapr/dapr/tests/integration/suite/daprd/workflow/memory" _ "github.com/dapr/dapr/tests/integration/suite/daprd/workflow/scheduler" ) diff --git a/tests/integration/suite/ports/daprd.go b/tests/integration/suite/ports/daprd.go index 79328d98d14..e658c59d809 100644 --- a/tests/integration/suite/ports/daprd.go +++ b/tests/integration/suite/ports/daprd.go @@ -63,6 +63,6 @@ func (d *daprd) Run(t *testing.T, ctx context.Context) { } require.NoError(t, conn.Close()) return true - }, time.Second*5, 10*time.Millisecond, "port %s (:%d) was not available in time", name, port) + }, time.Second*15, 10*time.Millisecond, "port %s (:%d) was not available in time", name, port) } }