diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index 45a315237..6adbf62a5 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -127,7 +127,7 @@ func main() { MaxConcurrentSessionExecutionSize: 1000, MaxConcurrentActivityExecutionSize: 1, } - w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts) + w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts) if err != nil { logger.Error(err, "Error creating Temporal worker.") os.Exit(1) diff --git a/enduro.toml b/enduro.toml index cb8398414..aeae0ee45 100644 --- a/enduro.toml +++ b/enduro.toml @@ -3,7 +3,6 @@ debug = true debugListen = "127.0.0.1:9001" verbosity = 2 -useArchivematica = false [temporal] namespace = "default" @@ -64,6 +63,10 @@ secret = "minio123" region = "us-west-1" bucket = "aips" +# Change the taskqueue setting to your prefered preservation system, by default it is a3m. +[preservation] +taskqueue = "a3m" + [a3m] address = "127.0.0.1:7000" shareDir = "/home/a3m/.local/share/a3m/share" diff --git a/hack/kube/overlays/dev-am/enduro-am.yaml b/hack/kube/overlays/dev-am/enduro-am.yaml index 4d690a281..305c4cadb 100644 --- a/hack/kube/overlays/dev-am/enduro-am.yaml +++ b/hack/kube/overlays/dev-am/enduro-am.yaml @@ -59,8 +59,8 @@ spec: value: $(MINIO_USER) - name: ENDURO_WATCHER_EMBEDDED_SECRET value: $(MINIO_PASSWORD) - - name: ENDURO_USEARCHIVEMATICA - value: "true" + - name: ENDURO_PRESERVATION_TASKQUEUE + value: "am" - name: ENDURO_AM_SFTP_HOST valueFrom: secretKeyRef: diff --git a/hack/kube/overlays/dev-am/enduro-internal-patch.yaml b/hack/kube/overlays/dev-am/enduro-internal-patch.yaml index 50f99f354..15d3d6f94 100644 --- a/hack/kube/overlays/dev-am/enduro-internal-patch.yaml +++ b/hack/kube/overlays/dev-am/enduro-internal-patch.yaml @@ -8,5 +8,5 @@ spec: containers: - name: enduro-internal env: - - name: ENDURO_USEARCHIVEMATICA - value: "true" + - name: ENDURO_PRESERVATION_TASKQUEUE + value: "am" diff --git a/hack/kube/overlays/dev-am/enduro-patch.yaml b/hack/kube/overlays/dev-am/enduro-patch.yaml index bcf7605bc..fb66eac15 100644 --- a/hack/kube/overlays/dev-am/enduro-patch.yaml +++ b/hack/kube/overlays/dev-am/enduro-patch.yaml @@ -8,5 +8,5 @@ spec: containers: - name: enduro env: - - name: ENDURO_USEARCHIVEMATICA - value: "true" + - name: ENDURO_PRESERVATION_TASKQUEUE + value: "am" diff --git a/internal/a3m/config.go b/internal/a3m/config.go index 2dec6f110..6fb5f2d42 100644 --- a/internal/a3m/config.go +++ b/internal/a3m/config.go @@ -3,10 +3,9 @@ package a3m import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1" type Config struct { - Name string - ShareDir string - TaskQueue string - Address string + Name string + ShareDir string + Address string Processing } diff --git a/internal/config/config.go b/internal/config/config.go index a337d6333..e321ea598 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/api" "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" + "github.com/artefactual-sdps/enduro/internal/pres" "github.com/artefactual-sdps/enduro/internal/storage" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/upload" @@ -23,20 +24,20 @@ type ConfigurationValidator interface { } type Configuration struct { - Verbosity int - Debug bool - DebugListen string - UseArchivematica bool - - A3m a3m.Config - AM am.Config - API api.Config - Database db.Config - Event event.Config - Storage storage.Config - Temporal temporal.Config - Upload upload.Config - Watcher watcher.Config + Verbosity int + Debug bool + DebugListen string + + A3m a3m.Config + AM am.Config + API api.Config + Database db.Config + Event event.Config + Preservation pres.Config + Storage storage.Config + Temporal temporal.Config + Upload upload.Config + Watcher watcher.Config } func (c Configuration) Validate() error { @@ -64,7 +65,7 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed v.AddConfigPath("/etc") v.SetConfigName("enduro") v.SetDefault("a3m.processing", a3m.ProcessingDefault) - v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue) + v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue) v.SetDefault("storage.taskqueue", temporal.GlobalTaskQueue) v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue) v.SetDefault("debugListen", "127.0.0.1:9001") diff --git a/internal/config/config_test.go b/internal/config/config_test.go index c488e676a..604b33bb6 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -50,14 +50,13 @@ func TestConfig(t *testing.T) { // Zero value defaults. assert.Equal(t, c.Verbosity, 0) assert.Equal(t, c.Debug, false) - assert.Equal(t, c.UseArchivematica, false) assert.Equal(t, c.Database.DSN, "") // Valued defaults. assert.Equal(t, c.DebugListen, "127.0.0.1:9001") assert.Equal(t, c.A3m.Processing, a3m.ProcessingDefault) - assert.Equal(t, c.A3m.TaskQueue, temporal.A3mWorkerTaskQueue) assert.Equal(t, c.API.Listen, "127.0.0.1:9000") + assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue) assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue) assert.Equal(t, c.Temporal.TaskQueue, temporal.GlobalTaskQueue) }) diff --git a/internal/pres/config.go b/internal/pres/config.go new file mode 100644 index 000000000..9180631db --- /dev/null +++ b/internal/pres/config.go @@ -0,0 +1,10 @@ +package pres + +type Config struct { + // TaskQueue sets the Temporal task queue to use for the processing workflow + // (e.g. ` temporal.A3mWorkerTaskQueue`, ` temporal.AMWorkerTaskQueue`). + // The task queue determines which processing worker will run the processing + // workflow - a3m or Archivematica, and is used to branch the processing + // workflow logic. + TaskQueue string +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 42677ec8b..95c93e1ee 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -26,18 +26,18 @@ import ( ) type ProcessingWorkflow struct { - logger logr.Logger - pkgsvc package_.Service - wsvc watcher.Service - useArchivematica bool + logger logr.Logger + pkgsvc package_.Service + wsvc watcher.Service + preservationTaskQueue string } -func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, useAM bool) *ProcessingWorkflow { +func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, presTQ string) *ProcessingWorkflow { return &ProcessingWorkflow{ - logger: logger, - pkgsvc: pkgsvc, - wsvc: wsvc, - useArchivematica: useAM, + logger: logger, + pkgsvc: pkgsvc, + wsvc: wsvc, + preservationTaskQueue: presTQ, } } @@ -172,20 +172,13 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack // Activities running within a session. { var sessErr error - var taskQueue string maxAttempts := 5 - if w.useArchivematica { - taskQueue = temporal.AmWorkerTaskQueue - } else { - taskQueue = tinfo.A3mTaskQueue - } - + activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + TaskQueue: w.preservationTaskQueue, + }) for attempt := 1; attempt <= maxAttempts; attempt++ { - activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ - StartToCloseTimeout: time.Minute, - TaskQueue: taskQueue, - }) sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{ CreationTimeout: forever, ExecutionTimeout: forever, @@ -319,7 +312,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // For the a3m workflow bundle the transfer to a shared directory also // mounted by the a3m container. var transferDir string - if !w.useArchivematica { + if w.preservationTaskQueue == temporal.A3mWorkerTaskQueue { transferDir = "/home/a3m/.local/share/a3m/share" } @@ -353,7 +346,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Do preservation activities. { var err error - if w.useArchivematica { + if w.preservationTaskQueue == temporal.AmWorkerTaskQueue { err = w.transferAM(sessCtx, tinfo) } else { err = w.transferA3m(sessCtx, tinfo) @@ -377,7 +370,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Stop here for the the Archivematica workflow. AIP creation, review, and // storage are handled entirely by Archivematica and the AM Storage Service. - if w.useArchivematica { + if w.preservationTaskQueue == temporal.AmWorkerTaskQueue { return nil } diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index a8083d44c..8b16422df 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -19,6 +19,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" + "github.com/artefactual-sdps/enduro/internal/temporal" watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake" "github.com/artefactual-sdps/enduro/internal/workflow/activities" ) @@ -41,7 +42,7 @@ func TestTransferInfo_Name(t *testing.T) { }) } -func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAM bool) { +func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { s.env = s.NewTestWorkflowEnvironment() s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true}) @@ -71,7 +72,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAM bool) { temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName}, ) - s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, useAM) + s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -83,7 +84,7 @@ func TestProcessingWorkflow(t *testing.T) { } func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { - s.SetupWorkflowTest(false) + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue) pkgID := uint(1) ctx := mock.AnythingOfType("*context.valueCtx") locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") @@ -139,7 +140,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { } func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { - s.SetupWorkflowTest(false) + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue) pkgID := uint(1) locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") watcherName := "watcher" @@ -191,7 +192,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { } func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { - s.SetupWorkflowTest(true) + s.SetupWorkflowTest(temporal.AmWorkerTaskQueue) pkgID := uint(1) locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") watcherName := "watcher" @@ -245,7 +246,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { } func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { - s.SetupWorkflowTest(false) + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue) pkgID := uint(1) watcherName := "watcher" retentionPeriod := 1 * time.Second diff --git a/main.go b/main.go index 4b00c0c36..edf0da9b2 100644 --- a/main.go +++ b/main.go @@ -92,7 +92,7 @@ func main() { logger.Info("Configuration file not found.") } - logger.V(1).Info("Preservation system", "UseArchivematica", cfg.UseArchivematica) + logger.V(1).Info("Preservation config", "TaskQueue", cfg.Preservation.TaskQueue) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -273,7 +273,7 @@ func main() { AutoApproveAIP: autoApproveAIP, DefaultPermanentLocationID: &defaultPermanentLocationID, TaskQueue: cfg.Temporal.TaskQueue, - A3mTaskQueue: cfg.A3m.TaskQueue, + A3mTaskQueue: cfg.Preservation.TaskQueue, } if err := package_.InitProcessingWorkflow(ctx, temporalClient, &req); err != nil { logger.Error(err, "Error initializing processing workflow.") @@ -301,7 +301,7 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.UseArchivematica).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})