Skip to content

Commit

Permalink
Use a3m shareDir config value in processing
Browse files Browse the repository at this point in the history
Fixes #864

- Pass the full enduro config to processing workflow, and remove the
  separate taskQueue parameter
- Use config value for A3m shareDir in processing.go
- Use a temporary directory as a shareDir in processing_test.go
  • Loading branch information
djjuhasz committed Feb 21, 2024
1 parent b0e5362 commit dc21ed3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
34 changes: 20 additions & 14 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/temporal"
Expand All @@ -26,18 +27,23 @@ import (
)

type ProcessingWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
taskQueue string
logger logr.Logger
cfg config.Configuration
pkgsvc package_.Service
wsvc watcher.Service
}

func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, taskQueue string) *ProcessingWorkflow {
func NewProcessingWorkflow(
logger logr.Logger,
cfg config.Configuration,
pkgsvc package_.Service,
wsvc watcher.Service,
) *ProcessingWorkflow {
return &ProcessingWorkflow{
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
taskQueue: taskQueue,
logger: logger,
cfg: cfg,
pkgsvc: pkgsvc,
wsvc: wsvc,
}
}

Expand Down Expand Up @@ -176,7 +182,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack

activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: w.taskQueue,
TaskQueue: w.cfg.Preservation.TaskQueue,
})
for attempt := 1; attempt <= maxAttempts; attempt++ {
sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{
Expand Down Expand Up @@ -317,8 +323,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
// For the a3m workflow bundle the transfer to a directory shared with
// the a3m container.
var transferDir string
if w.taskQueue == temporal.A3mWorkerTaskQueue {
transferDir = "/home/a3m/.local/share/a3m/share"
if w.cfg.Preservation.TaskQueue == temporal.A3mWorkerTaskQueue {
transferDir = w.cfg.A3m.ShareDir
}

activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
Expand Down Expand Up @@ -356,7 +362,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
// Do preservation activities.
{
var err error
if w.taskQueue == temporal.AmWorkerTaskQueue {
if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue {
err = w.transferAM(sessCtx, tinfo)
} else {
err = w.transferA3m(sessCtx, tinfo)
Expand All @@ -379,7 +385,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
}

// Stop here for the Archivematica workflow.
if w.taskQueue == temporal.AmWorkerTaskQueue {
if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue {
return nil
}

Expand Down
22 changes: 16 additions & 6 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/pres"
sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake"
Expand All @@ -32,6 +34,9 @@ type ProcessingWorkflowTestSuite struct {

env *temporalsdk_testsuite.TestWorkflowEnvironment

// Each test creates it's own temporary transfer directory.
transferDir string

// Each test registers the workflow with a different name to avoid dups.
workflow *ProcessingWorkflow
}
Expand All @@ -47,10 +52,15 @@ func TestTransferInfo_Name(t *testing.T) {
func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env = s.NewTestWorkflowEnvironment()
s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true})
s.transferDir = s.T().TempDir()

clock := clockwork.NewFakeClock()
ctrl := gomock.NewController(s.T())
logger := logr.Discard()
cfg := config.Configuration{
Preservation: pres.Config{TaskQueue: taskQueue},
A3m: a3m.Config{ShareDir: s.transferDir},
}
pkgsvc := packagefake.NewMockService(ctrl)
wsvc := watcherfake.NewMockService(ctrl)
sftpc := sftp_fake.NewMockClient(ctrl)
Expand Down Expand Up @@ -87,7 +97,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env.RegisterActivityWithOptions(
am.NewPollTransferActivity(
logger,
&am.Config{},
&cfg.AM,
clock,
amclienttest.NewMockTransferService(ctrl),
amclienttest.NewMockJobsService(ctrl),
Expand All @@ -98,7 +108,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env.RegisterActivityWithOptions(
am.NewPollIngestActivity(
logger,
&am.Config{},
&cfg.AM,
clock,
amclienttest.NewMockIngestService(ctrl),
amclienttest.NewMockJobsService(ctrl),
Expand All @@ -107,7 +117,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName},
)

s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue)
s.workflow = NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc)
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -154,7 +164,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {
s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
TransferDir: "/home/a3m/.local/share/a3m/share",
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
Expand Down Expand Up @@ -229,7 +239,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {
s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
TransferDir: "/home/a3m/.local/share/a3m/share",
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
Expand Down Expand Up @@ -412,7 +422,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() {
s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
TransferDir: "/home/a3m/.local/share/a3m/share",
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})

Check warning on line 336 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L336

Added line #L336 was not covered by tests
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})

Expand Down

0 comments on commit dc21ed3

Please sign in to comment.