Skip to content

Commit

Permalink
Add poststorage child workflows
Browse files Browse the repository at this point in the history
Allow to configure a set of poststorage child workflows that will be
started after AIP storage. These workflows will receive the AIPUUID
as a parameter and the parent workflow will only wait for them to be
started by Temporal. They are started with a disconnected context and
using the abandon parent close policy, so they can continue running
after the parent workflow finishes, therefore their results are ignored.
  • Loading branch information
jraddaoui committed Nov 13, 2024
1 parent 92feb35 commit 12ec6a6
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 7 deletions.
6 changes: 6 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"

# Temporal configurations to trigger poststorage child workflows, allows multiple sections.
# [[poststorage]]
# namespace = "default"
# taskQueue = "poststorage"
# workflowName = "poststorage"

[failedSips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/poststorage"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/pres"
"github.com/artefactual-sdps/enduro/internal/storage"
Expand All @@ -47,6 +48,7 @@ type Configuration struct {
Database db.Config
Event event.Config
ExtractActivity archiveextract.Config
Poststorage []poststorage.Config
Preprocessing preprocessing.Config
Preservation pres.Config
Storage storage.Config
Expand Down
11 changes: 11 additions & 0 deletions internal/poststorage/poststorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package poststorage

type Config struct {
Namespace string
TaskQueue string
WorkflowName string
}

type WorkflowParams struct {
AIPUUID string
}
39 changes: 39 additions & 0 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/enums"
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/poststorage"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/watcher"
Expand Down Expand Up @@ -760,6 +761,10 @@ func (w *ProcessingWorkflow) SessionHandler(
return err
}
}

if err := w.poststorage(sessCtx, tinfo.SIPID); err != nil {
return err
}

Check warning on line 767 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L766-L767

Added lines #L766 - L767 were not covered by tests
} else if !tinfo.req.AutoApproveAIP {
// Record package rejection in review preservation task
{
Expand Down Expand Up @@ -1020,6 +1025,10 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo
}
}

if err := w.poststorage(ctx, tinfo.SIPID); err != nil {
return err
}

Check warning on line 1030 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L1029-L1030

Added lines #L1029 - L1030 were not covered by tests

// Delete transfer.
activityOpts = withActivityOptsForRequest(ctx)
err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{
Expand Down Expand Up @@ -1094,6 +1103,36 @@ func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tin
}
}

// poststorage executes the configured poststorage child workflows. It uses
// a disconnected context, abandon as parent close policy and only waits
// until the workflows are started, ignoring their results.
func (w *ProcessingWorkflow) poststorage(ctx temporalsdk_workflow.Context, aipUUID string) error {
var err error
disconnectedCtx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx)

for _, cfg := range w.cfg.Poststorage {
psCtx := temporalsdk_workflow.WithChildOptions(
disconnectedCtx,
temporalsdk_workflow.ChildWorkflowOptions{
Namespace: cfg.Namespace,
TaskQueue: cfg.TaskQueue,
WorkflowID: fmt.Sprintf("%s-%s", cfg.WorkflowName, aipUUID),
ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_ABANDON,
},
)
err = errors.Join(
err,
temporalsdk_workflow.ExecuteChildWorkflow(
psCtx,
cfg.WorkflowName,
poststorage.WorkflowParams{AIPUUID: aipUUID},
).GetChildWorkflowExecution().Get(psCtx, nil),
)
}

return err
}

func (w *ProcessingWorkflow) createPreservationTask(
ctx temporalsdk_workflow.Context,
pt datatypes.PreservationTask,
Expand Down
60 changes: 53 additions & 7 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/enums"
"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/poststorage"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/pres"
sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake"
Expand Down Expand Up @@ -91,6 +92,13 @@ func preprocessingChildWorkflow(
return nil, nil
}

func poststorageChildWorkflow(
ctx temporalsdk_workflow.Context,
params *poststorage.WorkflowParams,
) (*interface{}, error) {
return nil, nil
}

func (s *ProcessingWorkflowTestSuite) CreateTransferDir() string {
s.transferDir = s.T().TempDir()

Expand Down Expand Up @@ -131,10 +139,6 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration
s.setupA3mWorkflowTest(ctrl, pkgsvc)
}

s.env.RegisterWorkflowWithOptions(
preprocessingChildWorkflow,
temporalsdk_workflow.RegisterOptions{Name: "preprocessing"},
)
s.env.RegisterActivityWithOptions(
removepaths.New().Execute,
temporalsdk_activity.RegisterOptions{Name: removepaths.Name},
Expand All @@ -156,6 +160,19 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration
temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName},
)

s.env.RegisterWorkflowWithOptions(
preprocessingChildWorkflow,
temporalsdk_workflow.RegisterOptions{Name: "preprocessing"},
)
s.env.RegisterWorkflowWithOptions(
poststorageChildWorkflow,
temporalsdk_workflow.RegisterOptions{Name: "poststorage_1"},
)
s.env.RegisterWorkflowWithOptions(
poststorageChildWorkflow,
temporalsdk_workflow.RegisterOptions{Name: "poststorage_2"},
)

s.workflow = NewProcessingWorkflow(cfg, rng, pkgsvc, wsvc)
}

Expand Down Expand Up @@ -815,7 +832,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() {
s.NoError(s.env.GetWorkflowResult(nil))
}

func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() {
func (s *ProcessingWorkflowTestSuite) TestChildWorkflows() {
cfg := config.Configuration{
A3m: a3m.Config{ShareDir: s.CreateTransferDir()},
Preservation: pres.Config{TaskQueue: temporal.A3mWorkerTaskQueue},
Expand All @@ -829,6 +846,18 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() {
WorkflowName: "preprocessing",
},
},
Poststorage: []poststorage.Config{
{
Namespace: "default",
TaskQueue: "poststorage",
WorkflowName: "poststorage_1",
},
{
Namespace: "default",
TaskQueue: "poststorage",
WorkflowName: "poststorage_2",
},
},
Storage: storage.Config{
DefaultPermanentLocationID: locationID,
},
Expand All @@ -842,6 +871,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() {
ctx := mock.AnythingOfType("*context.valueCtx")
sessionCtx := mock.AnythingOfType("*context.timerCtx")
pkgsvc := s.workflow.pkgsvc
aipUUID := "56eebd45-5600-4768-a8c2-ec0114555a3d"

downloadDir := strings.Replace(tempPath, "/tmp/", cfg.Preprocessing.SharedPath, 1)
prepDest := strings.Replace(extractPath, "/tmp/", cfg.Preprocessing.SharedPath, 1)
Expand Down Expand Up @@ -989,8 +1019,8 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() {
)

s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")).
Return(nil, nil).
Once()
Return(&a3m.CreateAIPActivityResult{UUID: aipUUID}, nil)

s.env.OnActivity(updatePackageLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")).
Return(nil, nil).
Times(2)
Expand Down Expand Up @@ -1050,6 +1080,22 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() {
Return(nil, nil).
Once()

s.env.OnWorkflow(
"poststorage_1",
mock.AnythingOfType("*internal.valueCtx"),
&poststorage.WorkflowParams{
AIPUUID: aipUUID,
},
).Return(nil, nil)

s.env.OnWorkflow(
"poststorage_2",
mock.AnythingOfType("*internal.valueCtx"),
&poststorage.WorkflowParams{
AIPUUID: aipUUID,
},
).Return(nil, nil)

s.env.OnActivity(
removepaths.Name,
sessionCtx,
Expand Down

0 comments on commit 12ec6a6

Please sign in to comment.