Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make preservation system configurable (#755) #775

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func main() {
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts)
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
Expand Down
5 changes: 4 additions & 1 deletion enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
debug = true
debugListen = "127.0.0.1:9001"
verbosity = 2
useArchivematica = false

[temporal]
namespace = "default"
Expand Down Expand Up @@ -64,6 +63,10 @@ secret = "minio123"
region = "us-west-1"
bucket = "aips"

# Change the taskqueue setting to your prefered preservation system, by default it is a3m.
[preservation]
taskqueue = "a3m"

[a3m]
address = "127.0.0.1:7000"
shareDir = "/home/a3m/.local/share/a3m/share"
Expand Down
4 changes: 2 additions & 2 deletions hack/kube/overlays/dev-am/enduro-am.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ spec:
value: $(MINIO_USER)
- name: ENDURO_WATCHER_EMBEDDED_SECRET
value: $(MINIO_PASSWORD)
- name: ENDURO_USEARCHIVEMATICA
value: "true"
- name: ENDURO_PRESERVATION_TASKQUEUE
value: "am"
- name: ENDURO_AM_SFTP_HOST
valueFrom:
secretKeyRef:
Expand Down
4 changes: 2 additions & 2 deletions hack/kube/overlays/dev-am/enduro-internal-patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ spec:
containers:
- name: enduro-internal
env:
- name: ENDURO_USEARCHIVEMATICA
value: "true"
- name: ENDURO_PRESERVATION_TASKQUEUE
value: "am"
4 changes: 2 additions & 2 deletions hack/kube/overlays/dev-am/enduro-patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ spec:
containers:
- name: enduro
env:
- name: ENDURO_USEARCHIVEMATICA
value: "true"
- name: ENDURO_PRESERVATION_TASKQUEUE
value: "am"
7 changes: 3 additions & 4 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package a3m
import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"

type Config struct {
Name string
ShareDir string
TaskQueue string
Address string
Name string
ShareDir string
Address string
Processing
}

Expand Down
31 changes: 16 additions & 15 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/api"
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
"github.com/artefactual-sdps/enduro/internal/pres"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/upload"
Expand All @@ -23,20 +24,20 @@ type ConfigurationValidator interface {
}

type Configuration struct {
Verbosity int
Debug bool
DebugListen string
UseArchivematica bool

A3m a3m.Config
AM am.Config
API api.Config
Database db.Config
Event event.Config
Storage storage.Config
Temporal temporal.Config
Upload upload.Config
Watcher watcher.Config
Verbosity int
Debug bool
DebugListen string

A3m a3m.Config
AM am.Config
API api.Config
Database db.Config
Event event.Config
Preservation pres.Config
Storage storage.Config
Temporal temporal.Config
Upload upload.Config
Watcher watcher.Config
}

func (c Configuration) Validate() error {
Expand Down Expand Up @@ -64,7 +65,7 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.AddConfigPath("/etc")
v.SetConfigName("enduro")
v.SetDefault("a3m.processing", a3m.ProcessingDefault)
v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("storage.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("debugListen", "127.0.0.1:9001")
Expand Down
3 changes: 1 addition & 2 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@ func TestConfig(t *testing.T) {
// Zero value defaults.
assert.Equal(t, c.Verbosity, 0)
assert.Equal(t, c.Debug, false)
assert.Equal(t, c.UseArchivematica, false)
assert.Equal(t, c.Database.DSN, "")

// Valued defaults.
assert.Equal(t, c.DebugListen, "127.0.0.1:9001")
assert.Equal(t, c.A3m.Processing, a3m.ProcessingDefault)
assert.Equal(t, c.A3m.TaskQueue, temporal.A3mWorkerTaskQueue)
assert.Equal(t, c.API.Listen, "127.0.0.1:9000")
assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue)
assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue)
assert.Equal(t, c.Temporal.TaskQueue, temporal.GlobalTaskQueue)
})
Expand Down
10 changes: 10 additions & 0 deletions internal/pres/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pres

type Config struct {
// TaskQueue sets the Temporal task queue to use for the processing workflow
// (e.g. ` temporal.A3mWorkerTaskQueue`, ` temporal.AMWorkerTaskQueue`).
// The task queue determines which processing worker will run the processing
// workflow - a3m or Archivematica, and is used to branch the processing
// workflow logic.
TaskQueue string
}
39 changes: 16 additions & 23 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (
)

type ProcessingWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
useArchivematica bool
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
preservationTaskQueue string
}

func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, useAM bool) *ProcessingWorkflow {
func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, presTQ string) *ProcessingWorkflow {
return &ProcessingWorkflow{
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
useArchivematica: useAM,
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
preservationTaskQueue: presTQ,
}
}

Expand Down Expand Up @@ -172,20 +172,13 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack
// Activities running within a session.
{
var sessErr error
var taskQueue string
maxAttempts := 5

if w.useArchivematica {
taskQueue = temporal.AmWorkerTaskQueue
} else {
taskQueue = tinfo.A3mTaskQueue
}

activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: w.preservationTaskQueue,
})
for attempt := 1; attempt <= maxAttempts; attempt++ {
activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: taskQueue,
})
sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{
CreationTimeout: forever,
ExecutionTimeout: forever,
Expand Down Expand Up @@ -319,7 +312,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
// For the a3m workflow bundle the transfer to a shared directory also
// mounted by the a3m container.
var transferDir string
if !w.useArchivematica {
if w.preservationTaskQueue == temporal.A3mWorkerTaskQueue {
transferDir = "/home/a3m/.local/share/a3m/share"
}

Expand Down Expand Up @@ -353,7 +346,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
// Do preservation activities.
{
var err error
if w.useArchivematica {
if w.preservationTaskQueue == temporal.AmWorkerTaskQueue {
err = w.transferAM(sessCtx, tinfo)
} else {
err = w.transferA3m(sessCtx, tinfo)
Expand All @@ -377,7 +370,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context

// Stop here for the the Archivematica workflow. AIP creation, review, and
// storage are handled entirely by Archivematica and the AM Storage Service.
if w.useArchivematica {
if w.preservationTaskQueue == temporal.AmWorkerTaskQueue {
return nil
}

Expand Down
13 changes: 7 additions & 6 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)
Expand All @@ -41,7 +42,7 @@ func TestTransferInfo_Name(t *testing.T) {
})
}

func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAM bool) {
func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env = s.NewTestWorkflowEnvironment()
s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true})

Expand Down Expand Up @@ -71,7 +72,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAM bool) {
temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName},
)

s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, useAM)
s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue)
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand All @@ -83,7 +84,7 @@ func TestProcessingWorkflow(t *testing.T) {
}

func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {
s.SetupWorkflowTest(false)
s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue)
pkgID := uint(1)
ctx := mock.AnythingOfType("*context.valueCtx")
locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f")
Expand Down Expand Up @@ -139,7 +140,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {
}

func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {
s.SetupWorkflowTest(false)
s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue)
pkgID := uint(1)
locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f")
watcherName := "watcher"
Expand Down Expand Up @@ -191,7 +192,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {
}

func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() {
s.SetupWorkflowTest(true)
s.SetupWorkflowTest(temporal.AmWorkerTaskQueue)
pkgID := uint(1)
locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f")
watcherName := "watcher"
Expand Down Expand Up @@ -245,7 +246,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() {
}

func (s *ProcessingWorkflowTestSuite) TestPackageRejection() {
s.SetupWorkflowTest(false)
s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue)
pkgID := uint(1)
watcherName := "watcher"
retentionPeriod := 1 * time.Second
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
logger.Info("Configuration file not found.")
}

logger.V(1).Info("Preservation system", "UseArchivematica", cfg.UseArchivematica)
logger.V(1).Info("Preservation config", "TaskQueue", cfg.Preservation.TaskQueue)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -273,7 +273,7 @@ func main() {
AutoApproveAIP: autoApproveAIP,
DefaultPermanentLocationID: &defaultPermanentLocationID,
TaskQueue: cfg.Temporal.TaskQueue,
A3mTaskQueue: cfg.A3m.TaskQueue,
A3mTaskQueue: cfg.Preservation.TaskQueue,
}
if err := package_.InitProcessingWorkflow(ctx, temporalClient, &req); err != nil {
logger.Error(err, "Error initializing processing workflow.")
Expand Down Expand Up @@ -301,7 +301,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.UseArchivematica).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})

Expand Down
Loading