diff --git a/enduro.toml b/enduro.toml index fe07ccc2..fc642c06 100644 --- a/enduro.toml +++ b/enduro.toml @@ -232,6 +232,12 @@ namespace = "default" taskQueue = "preprocessing" workflowName = "preprocessing" +# Temporal configurations to trigger poststorage child workflows, allows multiple sections. +# [[poststorage]] +# namespace = "default" +# taskQueue = "poststorage" +# workflowName = "poststorage" + [failedSips] endpoint = "http://minio.enduro-sdps:9000" pathStyle = true diff --git a/internal/config/config.go b/internal/config/config.go index 51df5cba..6e4a0b8d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,6 +22,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" "github.com/artefactual-sdps/enduro/internal/storage" @@ -47,6 +48,7 @@ type Configuration struct { Database db.Config Event event.Config ExtractActivity archiveextract.Config + Poststorage []poststorage.Config Preprocessing preprocessing.Config Preservation pres.Config Storage storage.Config diff --git a/internal/poststorage/poststorage.go b/internal/poststorage/poststorage.go new file mode 100644 index 00000000..f41b657d --- /dev/null +++ b/internal/poststorage/poststorage.go @@ -0,0 +1,11 @@ +package poststorage + +type Config struct { + Namespace string + TaskQueue string + WorkflowName string +} + +type WorkflowParams struct { + AIPUUID string +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index c3d9dd74..76fd00fb 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -35,6 +35,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/watcher" @@ -760,6 +761,10 @@ func (w *ProcessingWorkflow) SessionHandler( return err } } + + if err := w.poststorage(sessCtx, tinfo.SIPID); err != nil { + return err + } } else if !tinfo.req.AutoApproveAIP { // Record package rejection in review preservation task { @@ -1020,6 +1025,10 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo } } + if err := w.poststorage(ctx, tinfo.SIPID); err != nil { + return err + } + // Delete transfer. activityOpts = withActivityOptsForRequest(ctx) err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{ @@ -1094,6 +1103,36 @@ func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tin } } +// poststorage executes the configured poststorage child workflows. It uses +// a disconnected context, abandon as parent close policy and only waits +// until the workflows are started, ignoring their results. +func (w *ProcessingWorkflow) poststorage(ctx temporalsdk_workflow.Context, aipUUID string) error { + var err error + disconnectedCtx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx) + + for _, cfg := range w.cfg.Poststorage { + psCtx := temporalsdk_workflow.WithChildOptions( + disconnectedCtx, + temporalsdk_workflow.ChildWorkflowOptions{ + Namespace: cfg.Namespace, + TaskQueue: cfg.TaskQueue, + WorkflowID: fmt.Sprintf("%s-%s", cfg.WorkflowName, aipUUID), + ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_ABANDON, + }, + ) + err = errors.Join( + err, + temporalsdk_workflow.ExecuteChildWorkflow( + psCtx, + cfg.WorkflowName, + poststorage.WorkflowParams{AIPUUID: aipUUID}, + ).GetChildWorkflowExecution().Get(psCtx, nil), + ) + } + + return err +} + func (w *ProcessingWorkflow) createPreservationTask( ctx temporalsdk_workflow.Context, pt datatypes.PreservationTask, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 9c61fac3..81cbfb7d 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -37,6 +37,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" @@ -91,6 +92,13 @@ func preprocessingChildWorkflow( return nil, nil } +func poststorageChildWorkflow( + ctx temporalsdk_workflow.Context, + params *poststorage.WorkflowParams, +) (*interface{}, error) { + return nil, nil +} + func (s *ProcessingWorkflowTestSuite) CreateTransferDir() string { s.transferDir = s.T().TempDir() @@ -131,10 +139,6 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration s.setupA3mWorkflowTest(ctrl, pkgsvc) } - s.env.RegisterWorkflowWithOptions( - preprocessingChildWorkflow, - temporalsdk_workflow.RegisterOptions{Name: "preprocessing"}, - ) s.env.RegisterActivityWithOptions( removepaths.New().Execute, temporalsdk_activity.RegisterOptions{Name: removepaths.Name}, @@ -156,6 +160,19 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName}, ) + s.env.RegisterWorkflowWithOptions( + preprocessingChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "preprocessing"}, + ) + s.env.RegisterWorkflowWithOptions( + poststorageChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "poststorage_1"}, + ) + s.env.RegisterWorkflowWithOptions( + poststorageChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "poststorage_2"}, + ) + s.workflow = NewProcessingWorkflow(cfg, rng, pkgsvc, wsvc) } @@ -815,7 +832,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { s.NoError(s.env.GetWorkflowResult(nil)) } -func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { +func (s *ProcessingWorkflowTestSuite) TestChildWorkflows() { cfg := config.Configuration{ A3m: a3m.Config{ShareDir: s.CreateTransferDir()}, Preservation: pres.Config{TaskQueue: temporal.A3mWorkerTaskQueue}, @@ -829,6 +846,18 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { WorkflowName: "preprocessing", }, }, + Poststorage: []poststorage.Config{ + { + Namespace: "default", + TaskQueue: "poststorage", + WorkflowName: "poststorage_1", + }, + { + Namespace: "default", + TaskQueue: "poststorage", + WorkflowName: "poststorage_2", + }, + }, Storage: storage.Config{ DefaultPermanentLocationID: locationID, }, @@ -842,6 +871,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { ctx := mock.AnythingOfType("*context.valueCtx") sessionCtx := mock.AnythingOfType("*context.timerCtx") pkgsvc := s.workflow.pkgsvc + aipUUID := "56eebd45-5600-4768-a8c2-ec0114555a3d" downloadDir := strings.Replace(tempPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) prepDest := strings.Replace(extractPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) @@ -989,8 +1019,8 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { ) s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")). - Return(nil, nil). - Once() + Return(&a3m.CreateAIPActivityResult{UUID: aipUUID}, nil) + s.env.OnActivity(updatePackageLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). Return(nil, nil). Times(2) @@ -1050,6 +1080,22 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { Return(nil, nil). Once() + s.env.OnWorkflow( + "poststorage_1", + mock.AnythingOfType("*internal.valueCtx"), + &poststorage.WorkflowParams{ + AIPUUID: aipUUID, + }, + ).Return(nil, nil) + + s.env.OnWorkflow( + "poststorage_2", + mock.AnythingOfType("*internal.valueCtx"), + &poststorage.WorkflowParams{ + AIPUUID: aipUUID, + }, + ).Return(nil, nil) + s.env.OnActivity( removepaths.Name, sessionCtx,