diff --git a/Tiltfile b/Tiltfile index ce3bcc96b..1ccb5f332 100644 --- a/Tiltfile +++ b/Tiltfile @@ -63,8 +63,35 @@ KUBE_OVERLAY = 'hack/kube/overlays/dev-a3m' if PRES_SYS == 'am': KUBE_OVERLAY = 'hack/kube/overlays/dev-am' +# Load Kustomize YAML +yaml = kustomize(KUBE_OVERLAY) + +# Preprocessing +PREPROCESSING_PATH = os.environ.get("PREPROCESSING_PATH", "") +if PREPROCESSING_PATH != "": + # Load preprocessing Tiltfile for Enduro + load_dynamic(PREPROCESSING_PATH + "/Tiltfile.enduro") + # Get Enduro a3m/am worker k8s manifest + if PRES_SYS == "a3m": + pres_yaml, yaml = filter_yaml(yaml, name="^enduro-a3m$", kind="StatefulSet") + else: + pres_yaml, yaml = filter_yaml(yaml, name="^enduro-am$", kind="Deployment") + # Append preprocessing volume and volume mount to worker container, + # this will only work in single node k8s cluster deployments + volume = {"name": "shared-dir", "persistentVolumeClaim": {"claimName": "preprocessing-pvc"}} + volume_mount = {"name": "shared-dir", "mountPath": "/home/enduro/preprocessing"} + pres_obj = decode_yaml(pres_yaml) + if "volumes" not in pres_obj["spec"]["template"]["spec"]: + pres_obj["spec"]["template"]["spec"]["volumes"] = [] + pres_obj["spec"]["template"]["spec"]["volumes"].append(volume) + for container in pres_obj["spec"]["template"]["spec"]["containers"]: + if container["name"] in ["enduro-a3m-worker", "enduro-am-worker"]: + container["volumeMounts"].append(volume_mount) + pres_yaml = encode_yaml(pres_obj) + yaml = [yaml, pres_yaml] + # Load Kubernetes resources -k8s_yaml(kustomize(KUBE_OVERLAY)) +k8s_yaml(yaml) # Configure trigger mode trigger_mode = TRIGGER_MODE_MANUAL diff --git a/docs/src/dev-manual/README.md b/docs/src/dev-manual/README.md index a9672d2e1..aff2c3cbe 100644 --- a/docs/src/dev-manual/README.md +++ b/docs/src/dev-manual/README.md @@ -6,6 +6,7 @@ This is the developer manual for Enduro SDPS. - [Dependency management](deps.md) - [Environment setup](devel.md) - [Working with Archivematica](archivematica.md) + - [Preprocessing child workflow](preprocessing.md) - [Logging](logging.md) - [Makefile](make.md) - [Testing](testing.md) diff --git a/docs/src/dev-manual/devel.md b/docs/src/dev-manual/devel.md index be08060c4..2ae2f40e5 100644 --- a/docs/src/dev-manual/devel.md +++ b/docs/src/dev-manual/devel.md @@ -192,6 +192,14 @@ are planning to use Archivematica as preservation system. Build and use a local version of a3m. Requires to have the `a3m` repository cloned as a sibling of this repository folder. +### PREPROCESSING_PATH + +Relative path to a preprocessing child workflow repository. It loads a Tiltfile +called `Tiltfile.enduro` from that repository and mounts a presistent volume +claim (PVC) in the preservation system pod. That PVC must be defined in the +preprocessing and be called `preprocessing-pvc`. Check the [Preprocessing child +workflow] docs to configure the child workflow execution. + ## Tilt UI helpers ### Upload to Minio @@ -259,3 +267,4 @@ is sometimes not setup properly. To solve it, from the Tilt UI, restart the [visual studio code]: https://code.visualstudio.com/ [working with archivematica]: archivematica.md [devbox]: https://www.jetpack.io/devbox/docs/quickstart/#install-devbox +[preprocessing child workflow]: preprocessing.md diff --git a/docs/src/dev-manual/preprocessing.md b/docs/src/dev-manual/preprocessing.md new file mode 100644 index 000000000..636c25058 --- /dev/null +++ b/docs/src/dev-manual/preprocessing.md @@ -0,0 +1,34 @@ +# Preprocessing child workflow + +The processing workflow can be extended with the execution of a preprocessing +child workflow. + +## Configuration + +### `.tilt.env` + +Check the [Tilt environment configuration]. + +### `enduro.toml` + +```toml +# Optional preprocessing child workflow configuration. +[preprocessing] +# enabled triggers the execution of the child workflow, when set to false all other +# options are ignored. +enabled = true +# extract determines if the package extraction happens on the child workflow. +extract = false +# sharedPath is the full path to the directory used to share the package between workflows, +# required when enabled is set to true. +sharedPath = "/home/enduro/preprocessing" + +# Temporal configuration to trigger the preprocessing child workflow, all fields are +# required when enabled is set to true. +[preprocessing.temporal] +namespace = "default" +taskQueue = "preprocessing" +workflowName = "preprocessing" +``` + +[tilt environment configuration]: devel.md#preprocessing_path diff --git a/enduro.toml b/enduro.toml index cd195163b..768a8cd64 100644 --- a/enduro.toml +++ b/enduro.toml @@ -134,3 +134,21 @@ bucket = "sips" enabled = false address = "" samplingRatio = 1.0 + +# Optional preprocessing child workflow configuration. +[preprocessing] +# enabled triggers the execution of the child workflow, when set to false all other +# options are ignored. +enabled = false +# extract determines if the package extraction happens on the child workflow. +extract = false +# sharedPath is the full path to the directory used to share the package between workflows, +# required when enabled is set to true. +sharedPath = "/home/enduro/preprocessing" + +# Temporal configuration to trigger the preprocessing child workflow, all fields are +# required when enabled is set to true. +[preprocessing.temporal] +namespace = "default" +taskQueue = "preprocessing" +workflowName = "preprocessing" diff --git a/internal/config/config.go b/internal/config/config.go index 79ff1522a..3037b32b2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "errors" "fmt" "os" "strings" @@ -13,6 +14,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/api" "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" + "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" "github.com/artefactual-sdps/enduro/internal/storage" "github.com/artefactual-sdps/enduro/internal/telemetry" @@ -26,38 +28,31 @@ type ConfigurationValidator interface { } type Configuration struct { - Verbosity int Debug bool DebugListen string + Verbosity int - A3m a3m.Config - AM am.Config - API api.Config - Database db.Config - Event event.Config - Preservation pres.Config - Storage storage.Config - Temporal temporal.Config - Upload upload.Config - Watcher watcher.Config - Telemetry telemetry.Config + A3m a3m.Config + AM am.Config + API api.Config + Database db.Config + Event event.Config + Preprocessing preprocessing.Config + Preservation pres.Config + Storage storage.Config + Temporal temporal.Config + Upload upload.Config + Watcher watcher.Config + Telemetry telemetry.Config } func (c Configuration) Validate() error { // TODO: should this validate all the fields in Configuration? - if config, ok := interface{}(c.Upload).(ConfigurationValidator); ok { - err := config.Validate() - if err != nil { - return err - } - } - if config, ok := interface{}(c.API.Auth).(ConfigurationValidator); ok { - err := config.Validate() - if err != nil { - return err - } - } - return nil + apiAuthErr := c.API.Auth.Validate() + preprocessingErr := c.Preprocessing.Validate() + uploadErr := c.Upload.Validate() + + return errors.Join(apiAuthErr, preprocessingErr, uploadErr) } func Read(config *Configuration, configFile string) (found bool, configFileUsed string, err error) { diff --git a/internal/preprocessing/preprocessing.go b/internal/preprocessing/preprocessing.go new file mode 100644 index 000000000..aacd2aa16 --- /dev/null +++ b/internal/preprocessing/preprocessing.go @@ -0,0 +1,46 @@ +package preprocessing + +import "errors" + +type Config struct { + // Enable preprocessing child workflow. + Enabled bool + // Extract package in preprocessing. + Extract bool + // Local path shared between workers. + SharedPath string + // Temporal configuration. + Temporal Temporal +} + +type Temporal struct { + Namespace string + TaskQueue string + WorkflowName string +} + +type WorkflowParams struct { + // Relative path to the shared path. + RelativePath string +} + +type WorkflowResult struct { + // Relative path to the shared path. + RelativePath string +} + +// Validate implements config.ConfigurationValidator. +func (c Config) Validate() error { + if !c.Enabled { + return nil + } + if c.SharedPath == "" { + return errors.New("sharedPath is required in the [preprocessing] configuration") + } + if c.Temporal.Namespace == "" || c.Temporal.TaskQueue == "" || c.Temporal.WorkflowName == "" { + return errors.New( + "namespace, taskQueue and workflowName are required in the [preprocessing.temporal] configuration", + ) + } + return nil +} diff --git a/internal/preprocessing/preprocessing_test.go b/internal/preprocessing/preprocessing_test.go new file mode 100644 index 000000000..7e79a4927 --- /dev/null +++ b/internal/preprocessing/preprocessing_test.go @@ -0,0 +1,65 @@ +package preprocessing_test + +import ( + "testing" + + "gotest.tools/v3/assert" + + "github.com/artefactual-sdps/enduro/internal/preprocessing" +) + +func TestPreprocessingConfig(t *testing.T) { + t.Parallel() + + type test struct { + name string + config preprocessing.Config + wantErr string + } + for _, tt := range []test{ + { + name: "Validates if not enabled", + config: preprocessing.Config{ + Enabled: false, + }, + }, + { + name: "Validates with all required fields", + config: preprocessing.Config{ + Enabled: true, + SharedPath: "/tmp", + Temporal: preprocessing.Temporal{ + Namespace: "default", + TaskQueue: "preprocessing", + WorkflowName: "preprocessing", + }, + }, + }, + { + name: "Returns error if shared path is missing", + config: preprocessing.Config{ + Enabled: true, + }, + wantErr: "sharedPath is required in the [preprocessing] configuration", + }, + { + name: "Returns error if temporal config is missing", + config: preprocessing.Config{ + Enabled: true, + SharedPath: "/tmp", + }, + wantErr: "namespace, taskQueue and workflowName are required in the [preprocessing.temporal] configuration", + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := tt.config.Validate() + if tt.wantErr != "" { + assert.Error(t, err, tt.wantErr) + return + } + assert.NilError(t, err) + }) + } +} diff --git a/internal/workflow/activities/download.go b/internal/workflow/activities/download.go index 84aba5153..58ff91706 100644 --- a/internal/workflow/activities/download.go +++ b/internal/workflow/activities/download.go @@ -22,8 +22,9 @@ type DownloadActivity struct { } type DownloadActivityParams struct { - Key string - WatcherName string + Key string + WatcherName string + DestinationPath string } type DownloadActivityResult struct { @@ -47,7 +48,7 @@ func (a *DownloadActivity) Execute( "WatcherName", params.WatcherName, ) - destDir, err := os.MkdirTemp("", "enduro") + destDir, err := os.MkdirTemp(params.DestinationPath, "enduro") if err != nil { return &DownloadActivityResult{}, temporal_tools.NewNonRetryableError(fmt.Errorf("make temp dir: %v", err)) } diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index d59b2c519..7b4d5251b 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -8,11 +8,13 @@ package workflow import ( "errors" "fmt" + "path/filepath" "time" "github.com/go-logr/logr" "github.com/google/uuid" "go.artefactual.dev/tools/ref" + temporalapi_enums "go.temporal.io/api/enums/v1" temporalsdk_temporal "go.temporal.io/sdk/temporal" temporalsdk_workflow "go.temporal.io/sdk/workflow" @@ -22,6 +24,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/watcher" "github.com/artefactual-sdps/enduro/internal/workflow/activities" @@ -309,10 +312,14 @@ func (w *ProcessingWorkflow) SessionHandler( { var downloadResult activities.DownloadActivityResult activityOpts := withActivityOptsForLongLivedRequest(sessCtx) - err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, &activities.DownloadActivityParams{ + params := &activities.DownloadActivityParams{ Key: tinfo.req.Key, WatcherName: tinfo.req.WatcherName, - }). + } + if w.cfg.Preprocessing.Enabled { + params.DestinationPath = w.cfg.Preprocessing.SharedPath + } + err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.DownloadActivityName, params). Get(activityOpts, &downloadResult) if err != nil { return err @@ -320,8 +327,8 @@ func (w *ProcessingWorkflow) SessionHandler( tinfo.TempPath = downloadResult.Path } - // Unarchive the transfer if it's not a directory. - if !tinfo.req.IsDir { + // Unarchive the transfer if it's not a directory and it's not part of the preprocessing child workflow. + if !tinfo.req.IsDir && (!w.cfg.Preprocessing.Enabled || !w.cfg.Preprocessing.Extract) { activityOpts := withActivityOptsForLocalAction(sessCtx) var result activities.UnarchiveActivityResult err := temporalsdk_workflow.ExecuteActivity( @@ -339,6 +346,11 @@ func (w *ProcessingWorkflow) SessionHandler( tinfo.req.IsDir = result.IsDir } + // Preprocessing child workflow. + if err := w.preprocessing(sessCtx, tinfo); err != nil { + return err + } + // Bundle. { // For the a3m workflow bundle the transfer to a directory shared with @@ -799,3 +811,36 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti return nil } + +func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tinfo *TransferInfo) error { + if !w.cfg.Preprocessing.Enabled { + return nil + } + + // TODO: move package if tinfo.TempPath is not inside w.cfg.Preprocessing.SharedPath. + relPath, err := filepath.Rel(w.cfg.Preprocessing.SharedPath, tinfo.TempPath) + if err != nil { + return err + } + + preCtx := temporalsdk_workflow.WithChildOptions(ctx, temporalsdk_workflow.ChildWorkflowOptions{ + Namespace: w.cfg.Preprocessing.Temporal.Namespace, + TaskQueue: w.cfg.Preprocessing.Temporal.TaskQueue, + WorkflowID: fmt.Sprintf("%s-%s", w.cfg.Preprocessing.Temporal.WorkflowName, uuid.New().String()), + ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_TERMINATE, + }) + var result preprocessing.WorkflowResult + err = temporalsdk_workflow.ExecuteChildWorkflow( + preCtx, + w.cfg.Preprocessing.Temporal.WorkflowName, + preprocessing.WorkflowParams{RelativePath: relPath}, + ).Get(preCtx, &result) + if err != nil { + return err + } + + tinfo.TempPath = filepath.Join(w.cfg.Preprocessing.SharedPath, filepath.Clean(result.RelativePath)) + tinfo.req.IsDir = true + + return nil +} diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 31388e3fb..d01fc5bd5 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -1,6 +1,7 @@ package workflow import ( + "strings" "testing" "time" @@ -14,6 +15,7 @@ import ( temporalsdk_activity "go.temporal.io/sdk/activity" temporalsdk_testsuite "go.temporal.io/sdk/testsuite" temporalsdk_worker "go.temporal.io/sdk/worker" + temporalsdk_workflow "go.temporal.io/sdk/workflow" "go.uber.org/mock/gomock" "gotest.tools/v3/assert" @@ -24,6 +26,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" "github.com/artefactual-sdps/enduro/internal/temporal" @@ -59,7 +62,14 @@ func TestTransferInfo_Name(t *testing.T) { }) } -func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { +func preprocessingChildWorkflow( + ctx temporalsdk_workflow.Context, + params *preprocessing.WorkflowParams, +) (*preprocessing.WorkflowResult, error) { + return nil, nil +} + +func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string, ppConfig preprocessing.Config) { s.env = s.NewTestWorkflowEnvironment() s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true}) s.transferDir = s.T().TempDir() @@ -68,8 +78,9 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { ctrl := gomock.NewController(s.T()) logger := logr.Discard() cfg := config.Configuration{ - Preservation: pres.Config{TaskQueue: taskQueue}, - A3m: a3m.Config{ShareDir: s.transferDir}, + Preservation: pres.Config{TaskQueue: taskQueue}, + A3m: a3m.Config{ShareDir: s.transferDir}, + Preprocessing: ppConfig, } a3mTransferServiceClient := a3mfake.NewMockTransferServiceClient(ctrl) pkgsvc := packagefake.NewMockService(ctrl) @@ -159,6 +170,11 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) { temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName}, ) + s.env.RegisterWorkflowWithOptions( + preprocessingChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "preprocessing"}, + ) + s.workflow = NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc) } @@ -171,7 +187,7 @@ func TestProcessingWorkflow(t *testing.T) { } func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { - s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue) + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue, preprocessing.Config{}) pkgID := uint(1) ctx := mock.AnythingOfType("*context.valueCtx") sessionCtx := mock.AnythingOfType("*context.timerCtx") @@ -265,7 +281,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { } func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { - s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue) + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue, preprocessing.Config{}) pkgID := uint(1) locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") watcherName := "watcher" @@ -365,7 +381,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { } func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { - s.SetupWorkflowTest(temporal.AmWorkerTaskQueue) + s.SetupWorkflowTest(temporal.AmWorkerTaskQueue, preprocessing.Config{}) pkgID := uint(1) locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") @@ -485,7 +501,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { } func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { - s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue) + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue, preprocessing.Config{}) pkgID := uint(1) key := "transfer.zip" watcherName := "watcher" @@ -569,3 +585,119 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { s.True(s.env.IsWorkflowCompleted()) s.NoError(s.env.GetWorkflowResult(nil)) } + +func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { + ppConfig := preprocessing.Config{ + Enabled: true, + Extract: true, + SharedPath: "/home/enduro/preprocessing/", + Temporal: preprocessing.Temporal{ + Namespace: "default", + TaskQueue: "preprocessing", + WorkflowName: "preprocessing", + }, + } + s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue, ppConfig) + pkgID := uint(1) + locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") + watcherName := "watcher" + key := "transfer.zip" + retentionPeriod := 1 * time.Second + ctx := mock.AnythingOfType("*context.valueCtx") + sessionCtx := mock.AnythingOfType("*context.timerCtx") + logger := s.workflow.logger + pkgsvc := s.workflow.pkgsvc + + // Activity mocks/assertions sequence + s.env.OnActivity( + createPackageLocalActivity, + ctx, + logger, + pkgsvc, + &createPackageLocalActivityParams{Key: key, Status: enums.PackageStatusQueued}, + ).Return(pkgID, nil).Once() + s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")). + Return(nil, nil). + Once() + s.env.OnActivity(createPreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams")). + Return(uint(0), nil). + Once() + + downloadDest := strings.Replace(tempPath, "/tmp/", ppConfig.SharedPath, 1) + "/" + key + s.env.OnActivity(activities.DownloadActivityName, sessionCtx, + &activities.DownloadActivityParams{Key: key, WatcherName: watcherName, DestinationPath: ppConfig.SharedPath}, + ).Return( + &activities.DownloadActivityResult{Path: downloadDest}, nil, + ) + + prepDest := strings.Replace(extractPath, "/tmp/", ppConfig.SharedPath, 1) + s.env.OnWorkflow( + preprocessingChildWorkflow, + mock.Anything, + &preprocessing.WorkflowParams{RelativePath: strings.TrimPrefix(downloadDest, ppConfig.SharedPath)}, + ).Return( + &preprocessing.WorkflowResult{RelativePath: strings.TrimPrefix(prepDest, ppConfig.SharedPath)}, + nil, + ) + + s.env.OnActivity(activities.BundleActivityName, sessionCtx, + &activities.BundleActivityParams{ + SourcePath: prepDest, + TransferDir: s.transferDir, + IsDir: true, + }, + ).Return( + &activities.BundleActivityResult{FullPath: transferPath}, + nil, + ) + + s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). + Return(nil, nil). + Times(2) + s.env.OnActivity(createPreservationTaskLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationTaskLocalActivityParams")). + Return(uint(0), nil). + Once() + s.env.OnActivity(activities.UploadActivityName, sessionCtx, mock.AnythingOfType("*activities.UploadActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil, nil). + Never() + s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil, nil). + Never() + s.env.OnActivity(completePreservationTaskLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationTaskLocalActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(activities.MoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.MoveToPermanentStorageActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.PollMoveToPermanentStorageActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(setLocationIDLocalActivity, ctx, pkgsvc, pkgID, locationID).Return(nil, nil).Once() + s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")). + Return(nil, nil). + Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, watcherName, key).Return(nil, nil).Once() + + s.env.ExecuteWorkflow( + s.workflow.Execute, + &package_.ProcessingWorkflowRequest{ + Key: key, + WatcherName: watcherName, + RetentionPeriod: &retentionPeriod, + AutoApproveAIP: true, + DefaultPermanentLocationID: &locationID, + }, + ) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowResult(nil)) +}