Skip to content

Commit

Permalink
Workflows: Fix daprd memory leak (dapr#8047)
Browse files Browse the repository at this point in the history
* Workflows: Fix daprd memory leak

Fixes a memory leak in Workflows whereby the internal actor state held
by daprd would never be released- meaning daprd would grow in memory
indefinitely after each workflow execution. This memory leak occurs for
both scheduler and non-scheduler reminders.

Specifically, the actor table and internal actor workflow state maps
where never deleted after workflow orchestration and activity
completion. PR updates the workflow and activity actors to track whether
it has been completed. This is signalled to the actor runtime after a
workflow reminder execution, and the actor runtime will then delete the
actor state.

Adds integration tests to ensure daprd memory usage does not grow for
both scheduler and non-scheduler workflows.

Signed-off-by: joshvanl <[email protected]>

* Skip workflow process on windows due to sqlite limitations

Signed-off-by: joshvanl <[email protected]>

* Increase scheduler int test start up wait time 5s -> 15s

Signed-off-by: joshvanl <[email protected]>

* Increase TCP port ready time for int test daprd

Signed-off-by: joshvanl <[email protected]>

* Wait for scheduler client before wait for ready

Signed-off-by: joshvanl <[email protected]>

* Revert

Signed-off-by: joshvanl <[email protected]>

* Increase Eventually on TCP port check

Signed-off-by: joshvanl <[email protected]>

* Increase memory difference for memory leak

Signed-off-by: joshvanl <[email protected]>

* Increase Eventually timeout for re-schedule

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
JoshVanL and yaron2 authored Sep 5, 2024
1 parent 9489fcc commit 4db3c60
Show file tree
Hide file tree
Showing 17 changed files with 491 additions and 41 deletions.
22 changes: 21 additions & 1 deletion pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ func (a *actorsRuntime) haltActor(actorType, actorID string) error {
key := constructCompositeKey(actorType, actorID)
log.Debugf("Halting actor '%s'", key)

// Optimistically remove the actor from the internal actors table. No need to
// check whether it actually exists.
a.internalActors.Del(key)

// Remove the actor from the table
// This will forbit more state changes
actAny, ok := a.actorsTable.LoadAndDelete(key)
Expand Down Expand Up @@ -1070,6 +1074,15 @@ func (a *actorsRuntime) executeReminder(reminder *internal.Reminder) bool {
if errors.Is(err, ErrReminderCanceled) {
// The handler is explicitly canceling the timer
log.Debug("Reminder " + reminder.ActorKey() + " was canceled by the actor")

a.lock.Lock()
key := constructCompositeKey(reminder.ActorType, reminder.ActorID)
if act, ok := a.internalActors.Get(key); ok && act.Completed() {
a.internalActors.Del(key)
a.actorsTable.Delete(key)
}
a.lock.Unlock()

return false
}
log.Errorf("Error invoking reminder on actor %s: %s", reminder.ActorKey(), err)
Expand Down Expand Up @@ -1184,6 +1197,13 @@ func (a *actorsRuntime) ExecuteLocalOrRemoteActorReminder(ctx context.Context, r

// If the reminder was cancelled, delete it.
if errors.Is(err, ErrReminderCanceled) {
a.lock.Lock()
key := constructCompositeKey(reminder.ActorType, reminder.ActorID)
if act, ok := a.internalActors.Get(key); ok && act.Completed() {
a.internalActors.Del(key)
a.actorsTable.Delete(key)
}
a.lock.Unlock()
go func() {
log.Debugf("Deleting reminder which was cancelled: %s", reminder.Key())
reqCtx, cancel := context.WithTimeout(context.Background(), time.Second*15)
Expand All @@ -1196,7 +1216,7 @@ func (a *actorsRuntime) ExecuteLocalOrRemoteActorReminder(ctx context.Context, r
log.Errorf("Error deleting reminder %s: %s", reminder.Key(), derr)
}
a.lock.Lock()
delete(a.internalReminderInProgress, constructCompositeKey(reminder.ActorType, reminder.ActorID, reminder.Name))
delete(a.internalReminderInProgress, reminder.Key())
a.lock.Unlock()
}()
return ErrReminderCanceled
Expand Down
1 change: 1 addition & 0 deletions pkg/actors/internal_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type InternalActor interface {
DeactivateActor(ctx context.Context) error
InvokeReminder(ctx context.Context, reminder InternalActorReminder, metadata map[string][]string) error
InvokeTimer(ctx context.Context, timer InternalActorReminder, metadata map[string][]string) error
Completed() bool
}

type InternalActorReminder struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/actors/internal_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (*mockInternalActor) InvokeTimer(ctx context.Context, timer InternalActorRe
panic("unimplemented")
}

func (ia *mockInternalActor) Completed() bool {
panic("unimplemented")
}

// newTestActorsRuntimeWithInternalActors creates and initializes an actors runtime with a specified set of internal actors
func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalActorFactory) (*actorsRuntime, error) {
spec := config.TracingSpec{SamplingRate: "1"}
Expand Down
33 changes: 21 additions & 12 deletions pkg/runtime/wfengine/backends/actors/activity_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/microsoft/durabletask-go/api"
Expand All @@ -46,6 +47,7 @@ type activityActor struct {
defaultTimeout time.Duration
reminderInterval time.Duration
config actorsBackendConfig
completed atomic.Bool
}

// ActivityRequest represents a request by a worklow to invoke an activity.
Expand Down Expand Up @@ -138,7 +140,10 @@ func (a *activityActor) InvokeReminder(ctx context.Context, reminder actors.Inte
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, a.defaultTimeout)
defer cancelTimeout()

err := a.executeActivity(timeoutCtx, reminder.Name, state.EventPayload)
completed, err := a.executeActivity(timeoutCtx, reminder.Name, state.EventPayload)
if completed == runCompletedTrue {
a.completed.Store(true)
}

var recoverableErr *recoverableError
// Returning nil signals that we want the execution to be retried in the next period interval
Expand All @@ -162,21 +167,25 @@ func (a *activityActor) InvokeReminder(ctx context.Context, reminder actors.Inte
}
}

func (a *activityActor) executeActivity(ctx context.Context, name string, eventPayload []byte) error {
func (a *activityActor) Completed() bool {
return a.completed.Load()
}

func (a *activityActor) executeActivity(ctx context.Context, name string, eventPayload []byte) (runCompleted, error) {
taskEvent, err := backend.UnmarshalHistoryEvent(eventPayload)
if err != nil {
return err
return runCompletedTrue, err
}
activityName := ""
if ts := taskEvent.GetTaskScheduled(); ts != nil {
activityName = ts.GetName()
} else {
return fmt.Errorf("invalid activity task event: '%s'", taskEvent.String())
return runCompletedTrue, fmt.Errorf("invalid activity task event: '%s'", taskEvent.String())
}

endIndex := strings.Index(a.actorID, "::")
if endIndex < 0 {
return fmt.Errorf("invalid activity actor ID: '%s'", a.actorID)
return runCompletedTrue, fmt.Errorf("invalid activity actor ID: '%s'", a.actorID)
}
workflowID := a.actorID[0:endIndex]

Expand All @@ -197,9 +206,9 @@ func (a *activityActor) executeActivity(ctx context.Context, name string, eventP
wfLogger.Debugf("Activity actor '%s': scheduling activity '%s' for workflow with instanceId '%s'", a.actorID, name, wi.InstanceID)
err = a.scheduler(ctx, wi)
if errors.Is(err, context.DeadlineExceeded) {
return newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
return runCompletedFalse, newRecoverableError(fmt.Errorf("timed-out trying to schedule an activity execution - this can happen if too many activities are running in parallel or if the workflow engine isn't running: %w", err))
} else if err != nil {
return newRecoverableError(fmt.Errorf("failed to schedule an activity execution: %w", err))
return runCompletedFalse, newRecoverableError(fmt.Errorf("failed to schedule an activity execution: %w", err))
}
// Activity execution started
start := time.Now()
Expand All @@ -222,7 +231,7 @@ loop:
// Activity execution failed with recoverable error
elapsed = diag.ElapsedSince(start)
executionStatus = diag.StatusRecoverable
return ctx.Err() // will be retried
return runCompletedFalse, ctx.Err() // will be retried
case <-t.C:
if deadline, ok := ctx.Deadline(); ok {
wfLogger.Warnf("Activity actor '%s': '%s' is still running - will keep waiting until '%v'", a.actorID, name, deadline)
Expand All @@ -240,7 +249,7 @@ loop:
} else {
// Activity execution failed with recoverable error
executionStatus = diag.StatusRecoverable
return newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called
return runCompletedFalse, newRecoverableError(errExecutionAborted) // AbandonActivityWorkItem was called
}
}
}
Expand All @@ -251,7 +260,7 @@ loop:
if err != nil {
// Returning non-recoverable error
executionStatus = diag.StatusFailed
return err
return runCompletedTrue, err
}
req := internalsv1pb.
NewInternalInvokeRequest(AddWorkflowEventMethod).
Expand All @@ -264,15 +273,15 @@ loop:
case err != nil:
// Returning recoverable error, record metrics
executionStatus = diag.StatusRecoverable
return newRecoverableError(fmt.Errorf("failed to invoke '%s' method on workflow actor: %w", AddWorkflowEventMethod, err))
return runCompletedFalse, newRecoverableError(fmt.Errorf("failed to invoke '%s' method on workflow actor: %w", AddWorkflowEventMethod, err))
case wi.Result.GetTaskCompleted() != nil:
// Activity execution completed successfully
executionStatus = diag.StatusSuccess
case wi.Result.GetTaskFailed() != nil:
// Activity execution failed
executionStatus = diag.StatusFailed
}
return nil
return runCompletedTrue, nil
}

// InvokeTimer implements actors.InternalActor
Expand Down
7 changes: 7 additions & 0 deletions pkg/runtime/wfengine/backends/actors/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ const (
ActivityNameLabelKey = "activity"
)

type runCompleted bool

const (
runCompletedFalse runCompleted = false
runCompletedTrue runCompleted = true
)

// actorsBackendConfig is the configuration for the workflow engine's actors backend
type actorsBackendConfig struct {
AppID string
Expand Down
Loading

0 comments on commit 4db3c60

Please sign in to comment.