Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable other cache implementations #336

Merged
merged 3 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/internal/sync"
internalwf "github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/worker"
"github.com/cschleiden/go-workflows/workflow"
"github.com/cschleiden/go-workflows/workflow/executor"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -834,10 +834,10 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
type noopWorkflowExecutorCache struct {
}

var _ internalwf.ExecutorCache = (*noopWorkflowExecutorCache)(nil)
var _ executor.Cache = (*noopWorkflowExecutorCache)(nil)

// Get implements workflow.ExecutorCache
func (*noopWorkflowExecutorCache) Get(ctx context.Context, instance *core.WorkflowInstance) (internalwf.WorkflowExecutor, bool, error) {
func (*noopWorkflowExecutorCache) Get(ctx context.Context, instance *core.WorkflowInstance) (executor.WorkflowExecutor, bool, error) {
return nil, false, nil
}

Expand All @@ -851,7 +851,7 @@ func (*noopWorkflowExecutorCache) StartEviction(ctx context.Context) {
}

// Store implements workflow.ExecutorCache
func (*noopWorkflowExecutorCache) Store(ctx context.Context, instance *core.WorkflowInstance, workflow internalwf.WorkflowExecutor) error {
func (*noopWorkflowExecutorCache) Store(ctx context.Context, instance *core.WorkflowInstance, workflow executor.WorkflowExecutor) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/goleak v1.3.0
golang.org/x/tools v0.12.0
modernc.org/sqlite v1.27.0
)
Expand Down Expand Up @@ -57,7 +58,6 @@ require (
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.tmz.dev/musttag v0.7.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/goleak v1.3.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,6 @@ go.tmz.dev/musttag v0.7.2 h1:1J6S9ipDbalBSODNT5jCep8dhZyMr4ttnjQagmGYR5s=
go.tmz.dev/musttag v0.7.2/go.mod h1:m6q5NiiSKMnQYokefa2xGoyoXnrswCbJ0AWYzf4Zs28=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
Expand Down
26 changes: 13 additions & 13 deletions internal/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
"github.com/cschleiden/go-workflows/internal/log"
"github.com/cschleiden/go-workflows/internal/metrickeys"
im "github.com/cschleiden/go-workflows/internal/metrics"
"github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/internal/workflow/cache"
"github.com/cschleiden/go-workflows/registry"
"github.com/cschleiden/go-workflows/workflow/executor"
"github.com/cschleiden/go-workflows/workflow/executor/cache"
)

type WorkflowWorkerOptions struct {
WorkerOptions

WorkflowExecutorCache workflow.ExecutorCache
WorkflowExecutorCache executor.Cache
WorkflowExecutorCacheSize int
WorkflowExecutorCacheTTL time.Duration
}
Expand All @@ -32,7 +32,7 @@ func NewWorkflowWorker(
b backend.Backend,
registry *registry.Registry,
options WorkflowWorkerOptions,
) *Worker[backend.WorkflowTask, workflow.ExecutionResult] {
) *Worker[backend.WorkflowTask, executor.ExecutionResult] {
if options.WorkflowExecutorCache == nil {
options.WorkflowExecutorCache = cache.NewWorkflowExecutorLRUCache(b.Metrics(), options.WorkflowExecutorCacheSize, options.WorkflowExecutorCacheTTL)
}
Expand All @@ -44,13 +44,13 @@ func NewWorkflowWorker(
logger: b.Logger(),
}

return NewWorker[backend.WorkflowTask, workflow.ExecutionResult](b, tw, &options.WorkerOptions)
return NewWorker[backend.WorkflowTask, executor.ExecutionResult](b, tw, &options.WorkerOptions)
}

type WorkflowTaskWorker struct {
backend backend.Backend
registry *registry.Registry
cache workflow.ExecutorCache
cache executor.Cache
logger *slog.Logger
}

Expand All @@ -63,7 +63,7 @@ func (wtw *WorkflowTaskWorker) Start(ctx context.Context) error {
}

// Complete implements TaskWorker.
func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *workflow.ExecutionResult, t *backend.WorkflowTask) error {
func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *executor.ExecutionResult, t *backend.WorkflowTask) error {
logger := wtw.logger.With(
slog.String(log.TaskIDKey, t.ID),
slog.String(log.InstanceIDKey, t.WorkflowInstance.InstanceID),
Expand Down Expand Up @@ -99,7 +99,7 @@ func (wtw *WorkflowTaskWorker) Complete(ctx context.Context, result *workflow.Ex
return nil
}

func (wtw *WorkflowTaskWorker) Execute(ctx context.Context, t *backend.WorkflowTask) (*workflow.ExecutionResult, error) {
func (wtw *WorkflowTaskWorker) Execute(ctx context.Context, t *backend.WorkflowTask) (*executor.ExecutionResult, error) {
// Record how long this task was in the queue
firstEvent := t.NewEvents[0]
var scheduledAt time.Time
Expand Down Expand Up @@ -154,15 +154,15 @@ func (wtw *WorkflowTaskWorker) Get(ctx context.Context) (*backend.WorkflowTask,
return t, nil
}

func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.WorkflowTask) (workflow.WorkflowExecutor, error) {
func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.WorkflowTask) (executor.WorkflowExecutor, error) {
// Try to get a cached executor
executor, ok, err := wtw.cache.Get(ctx, t.WorkflowInstance)
e, ok, err := wtw.cache.Get(ctx, t.WorkflowInstance)
if err != nil {
wtw.logger.ErrorContext(ctx, "could not get cached workflow task executor", "error", err)
}

if !ok {
executor, err = workflow.NewExecutor(
e, err = executor.NewExecutor(
wtw.logger.With(
slog.String(log.InstanceIDKey, t.WorkflowInstance.InstanceID),
slog.String(log.ExecutionIDKey, t.WorkflowInstance.ExecutionID),
Expand All @@ -182,9 +182,9 @@ func (wtw *WorkflowTaskWorker) getExecutor(ctx context.Context, t *backend.Workf
}

// Cache executor instance for future continuation tasks, or refresh last access time
if err := wtw.cache.Store(ctx, t.WorkflowInstance, executor); err != nil {
if err := wtw.cache.Store(ctx, t.WorkflowInstance, e); err != nil {
wtw.logger.ErrorContext(ctx, "error while caching workflow task executor:", "error", err)
}

return executor, nil
return e, nil
}
4 changes: 2 additions & 2 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/cschleiden/go-workflows/internal/fn"
"github.com/cschleiden/go-workflows/internal/log"
"github.com/cschleiden/go-workflows/internal/signals"
wf "github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/internal/workflowerrors"
"github.com/cschleiden/go-workflows/registry"
"github.com/cschleiden/go-workflows/workflow"
"github.com/cschleiden/go-workflows/workflow/executor"
"github.com/google/uuid"
"github.com/stretchr/testify/mock"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -346,7 +346,7 @@ func (wt *workflowTester[TResult]) Execute(ctx context.Context, args ...any) {
tw.pendingEvents = tw.pendingEvents[:0]

// Execute task
e, err := wf.NewExecutor(wt.logger, wt.tracer, wt.registry, wt.converter, wt.propagators, &testHistoryProvider{tw.history}, tw.instance, tw.metadata, wt.clock)
e, err := executor.NewExecutor(wt.logger, wt.tracer, wt.registry, wt.converter, wt.propagators, &testHistoryProvider{tw.history}, tw.instance, tw.metadata, wt.clock)
if err != nil {
panic(fmt.Errorf("could not create workflow executor: %v", err))
}
Expand Down
4 changes: 2 additions & 2 deletions worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package worker
import (
"time"

"github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/workflow/executor"
)

type Options struct {
Expand Down Expand Up @@ -47,7 +47,7 @@ type Options struct {

// WorkflowExecutorCache is the cache to use for workflow executors. If nil, a default cache implementation
// will be used.
WorkflowExecutorCache workflow.ExecutorCache
WorkflowExecutorCache executor.Cache
}

var DefaultOptions = Options{
Expand Down
4 changes: 2 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/cschleiden/go-workflows/client"
"github.com/cschleiden/go-workflows/internal/signals"
internal "github.com/cschleiden/go-workflows/internal/worker"
workflowinternal "github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/registry"
"github.com/cschleiden/go-workflows/workflow"
"github.com/cschleiden/go-workflows/workflow/executor"
)

type Worker struct {
Expand All @@ -24,7 +24,7 @@ type Worker struct {

registry *registry.Registry

workflowWorker *internal.Worker[backend.WorkflowTask, workflowinternal.ExecutionResult]
workflowWorker *internal.Worker[backend.WorkflowTask, executor.ExecutionResult]
activityWorker *internal.Worker[backend.ActivityTask, history.Event]
}

Expand Down
4 changes: 2 additions & 2 deletions internal/workflow/cache.go → workflow/executor/cache.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package workflow
package executor

import (
"context"

"github.com/cschleiden/go-workflows/core"
)

type ExecutorCache interface {
type Cache interface {
Store(ctx context.Context, instance *core.WorkflowInstance, workflow WorkflowExecutor) error
Evict(ctx context.Context, instance *core.WorkflowInstance) error
Get(ctx context.Context, instance *core.WorkflowInstance) (WorkflowExecutor, bool, error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import (
"github.com/cschleiden/go-workflows/backend/metrics"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/internal/metrickeys"
"github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/workflow/executor"
"github.com/jellydator/ttlcache/v3"
)

type lruCache struct {
mc metrics.Client
c *ttlcache.Cache[string, workflow.WorkflowExecutor]
c *ttlcache.Cache[string, executor.WorkflowExecutor]
}

func NewWorkflowExecutorLRUCache(mc metrics.Client, size int, expiration time.Duration) *lruCache {
c := ttlcache.New(
ttlcache.WithCapacity[string, workflow.WorkflowExecutor](uint64(size)),
ttlcache.WithTTL[string, workflow.WorkflowExecutor](expiration),
ttlcache.WithCapacity[string, executor.WorkflowExecutor](uint64(size)),
ttlcache.WithTTL[string, executor.WorkflowExecutor](expiration),
)

c.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[string, workflow.WorkflowExecutor]) {
c.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[string, executor.WorkflowExecutor]) {
// Close the executor to allow it to clean up resources.
i.Value().Close()

Expand All @@ -43,7 +43,7 @@ func NewWorkflowExecutorLRUCache(mc metrics.Client, size int, expiration time.Du
}
}

func (lc *lruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (workflow.WorkflowExecutor, bool, error) {
func (lc *lruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (executor.WorkflowExecutor, bool, error) {
e := lc.c.Get(getKey(instance))
if e != nil {
return e.Value(), true, nil
Expand All @@ -52,7 +52,7 @@ func (lc *lruCache) Get(ctx context.Context, instance *core.WorkflowInstance) (w
return nil, false, nil
}

func (lc *lruCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor workflow.WorkflowExecutor) error {
func (lc *lruCache) Store(ctx context.Context, instance *core.WorkflowInstance, executor executor.WorkflowExecutor) error {
lc.c.Set(getKey(instance), executor, ttlcache.DefaultTTL)

lc.mc.Gauge(metrickeys.WorkflowInstanceCacheSize, metrics.Tags{}, int64(lc.c.Len()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/cschleiden/go-workflows/backend/metadata"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/internal/metrics"
wf "github.com/cschleiden/go-workflows/internal/workflow"
"github.com/cschleiden/go-workflows/registry"
"github.com/cschleiden/go-workflows/workflow"
"github.com/cschleiden/go-workflows/workflow/executor"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -28,14 +28,14 @@ func Test_Cache_StoreAndGet(t *testing.T) {
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))

i := core.NewWorkflowInstance("instanceID", "executionID")
e, err := wf.NewExecutor(
e, err := executor.NewExecutor(
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter,
[]workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(),
)
require.NoError(t, err)

i2 := core.NewWorkflowInstance("instanceID2", "executionID2")
e2, err := wf.NewExecutor(
e2, err := executor.NewExecutor(
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r, converter.DefaultConverter,
[]workflow.ContextPropagator{}, &testHistoryProvider{}, i, &metadata.WorkflowMetadata{}, clock.New(),
)
Expand Down Expand Up @@ -68,7 +68,7 @@ func Test_Cache_AutoEviction(t *testing.T) {
i := core.NewWorkflowInstance("instanceID", "executionID")
r := registry.New()
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
e, err := wf.NewExecutor(
e, err := executor.NewExecutor(
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
&metadata.WorkflowMetadata{}, clock.New(),
Expand Down Expand Up @@ -98,7 +98,7 @@ func Test_Cache_Evict(t *testing.T) {
i := core.NewWorkflowInstance("instanceID", "executionID")
r := registry.New()
require.NoError(t, r.RegisterWorkflow(workflowWithActivity))
e, err := wf.NewExecutor(
e, err := executor.NewExecutor(
slog.Default(), trace.NewNoopTracerProvider().Tracer(backend.TracerName), r,
converter.DefaultConverter, []workflow.ContextPropagator{}, &testHistoryProvider{}, i,
&metadata.WorkflowMetadata{}, clock.New(),
Expand Down
File renamed without changes.
19 changes: 14 additions & 5 deletions internal/workflow/executor.go → workflow/executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflow
package executor

import (
"context"
Expand Down Expand Up @@ -30,10 +30,19 @@ import (
)

type ExecutionResult struct {
State core.WorkflowInstanceState
Executed []*history.Event
// New state of the workflow instance
State core.WorkflowInstanceState

// Events executed during the tastk execution
Executed []*history.Event

// Activities that were scheduled
ActivityEvents []*history.Event
TimerEvents []*history.Event

// Timers that were scheduled
TimerEvents []*history.Event

// Events for other workflow instances
WorkflowEvents []history.WorkflowEvent
}

Expand Down Expand Up @@ -382,7 +391,7 @@ func (e *executor) handleWorkflowExecutionStarted(a *history.ExecutionStartedAtt
return fmt.Errorf("workflow %s not found", a.Name)
}

e.workflow = NewWorkflow(reflect.ValueOf(wfFn))
e.workflow = newWorkflow(reflect.ValueOf(wfFn))

return e.workflow.Execute(e.workflowCtx, a.Inputs)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflow
package executor

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflow
package executor

import (
"errors"
Expand All @@ -19,7 +19,7 @@ type workflow struct {
err error
}

func NewWorkflow(workflowFn reflect.Value) *workflow {
func newWorkflow(workflowFn reflect.Value) *workflow {
s := sync.NewScheduler()

return &workflow{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package workflow
package executor

import (
"reflect"
Expand All @@ -25,7 +25,7 @@ func Test_Workflow_WrapsPanic(t *testing.T) {
ctx := sync.Background()
ctx = contextvalue.WithConverter(ctx, converter.DefaultConverter)

wf := NewWorkflow(reflect.ValueOf(w))
wf := newWorkflow(reflect.ValueOf(w))
err := wf.Execute(ctx, nil)
require.NoError(t, err)

Expand Down
Loading