Skip to content

Commit

Permalink
Make preservation system configurable (#755)
Browse files Browse the repository at this point in the history
- Hardcoded the a3m taskqueue name for better efficiency and optimization.
- Added comments to enhance code readability and understanding.
- Removed unnecessary variable to improve code clarity.
- Modified names for better consistency and clarity.
- Removed a comment that was no longer relevant.
- Created a preservation config struct to make the system more configurable
and adaptable.
- Implemented configuration options for the preservation
system to allow customization and flexibility.
  • Loading branch information
Diogenesoftoronto authored Nov 8, 2023
1 parent 92cd26e commit 13d8e33
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func main() {
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts)
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
Expand Down
5 changes: 4 additions & 1 deletion enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
debug = true
debugListen = "127.0.0.1:9001"
verbosity = 2
useArchivematica = false

[temporal]
namespace = "default"
Expand Down Expand Up @@ -64,6 +63,10 @@ secret = "minio123"
region = "us-west-1"
bucket = "aips"

# Change the taskqueue setting to your prefered preservation system, by default it is a3m.
[preservation]
taskqueue = "a3m"

[a3m]
address = "127.0.0.1:7000"
shareDir = "/home/a3m/.local/share/a3m/share"
Expand Down
7 changes: 3 additions & 4 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package a3m
import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"

type Config struct {
Name string
ShareDir string
TaskQueue string
Address string
Name string
ShareDir string
Address string
Processing
}

Expand Down
3 changes: 2 additions & 1 deletion internal/am/poll_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"fmt"
"time"

"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/go-logr/logr"
"go.artefactual.dev/amclient"
temporalsdk_activity "go.temporal.io/sdk/activity"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const PollTransferActivityName = "poll-transfer-activity"
Expand Down
29 changes: 15 additions & 14 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/api"
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
"github.com/artefactual-sdps/enduro/internal/pres"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/upload"
Expand All @@ -23,19 +24,19 @@ type ConfigurationValidator interface {
}

type Configuration struct {
Verbosity int
Debug bool
DebugListen string
UseArchivematica bool
API api.Config
Event event.Config
Database db.Config
Temporal temporal.Config
Watcher watcher.Config
Storage storage.Config
Upload upload.Config
A3m a3m.Config
Am am.Config
Verbosity int
Debug bool
DebugListen string
API api.Config
Event event.Config
Database db.Config
Temporal temporal.Config
Watcher watcher.Config
Storage storage.Config
Upload upload.Config
A3m a3m.Config
Am am.Config
Preservation pres.Config
}

func (c Configuration) Validate() error {
Expand Down Expand Up @@ -63,7 +64,7 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.AddConfigPath("/etc")
v.SetConfigName("enduro")
v.SetDefault("api.processing", a3m.ProcessingDefault)
v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("preservation.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("api.listen", "127.0.0.1:9000")
Expand Down
10 changes: 10 additions & 0 deletions internal/pres/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pres

type Config struct {
// TaskQueue sets the Temporal task queue to use for the processing workflow
// (e.g. ` temporal.A3mWorkerTaskQueue`, ` temporal.AMWorkerTaskQueue`).
// The task queue determines which processing worker will run the processing
// workflow - a3m or Archivematica, and is used to branch the processing
// workflow logic.
TaskQueue string
}
29 changes: 11 additions & 18 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (
)

type ProcessingWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
useArchivematica bool
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
preservationTaskQueue string
}

func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, useAm bool) *ProcessingWorkflow {
func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, preservationTaskQueue string) *ProcessingWorkflow {
return &ProcessingWorkflow{
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
useArchivematica: useAm,
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
preservationTaskQueue: preservationTaskQueue,
}
}

Expand Down Expand Up @@ -172,18 +172,11 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack
// Activities running within a session.
{
var sessErr error
var taskQueue string
maxAttempts := 5

if w.useArchivematica {
taskQueue = temporal.AmWorkerTaskQueue
} else {
taskQueue = temporal.A3mWorkerTaskQueue
}

activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: taskQueue,
TaskQueue: w.preservationTaskQueue,
})
for attempt := 1; attempt <= maxAttempts; attempt++ {
sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{
Expand Down Expand Up @@ -345,7 +338,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context

{
var err error
if w.useArchivematica {
if w.preservationTaskQueue == temporal.AmWorkerTaskQueue {
err = w.transferAM(sessCtx, tinfo)
if err != nil {
return err
Expand Down
14 changes: 7 additions & 7 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)
Expand All @@ -41,7 +42,7 @@ func TestTransferInfo_Name(t *testing.T) {
})
}

func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAm bool) {
func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env = s.NewTestWorkflowEnvironment()
s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true})

Expand Down Expand Up @@ -71,8 +72,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAm bool) {
s.env.RegisterActivityWithOptions(am.NewPollIngestActivity(
logger, &am.Config{}, amclienttest.NewMockIngestService(ctrl),
dur).Execute, temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName})

s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, useAm)
s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue)
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand All @@ -84,7 +84,7 @@ func TestProcessingWorkflow(t *testing.T) {
}

func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {
s.SetupWorkflowTest(false)
s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue)
pkgID := uint(1)
ctx := mock.AnythingOfType("*context.valueCtx")
locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f")
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {
}

func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {
s.SetupWorkflowTest(false)
s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue)
pkgID := uint(1)
locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f")
watcherName := "watcher"
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {
}

func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP_AM() {
s.SetupWorkflowTest(true)
s.SetupWorkflowTest(temporal.AmWorkerTaskQueue)
pkgID := uint(1)
locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f")
watcherName := "watcher"
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP_AM() {
}

func (s *ProcessingWorkflowTestSuite) TestPackageRejection() {
s.SetupWorkflowTest(false)
s.SetupWorkflowTest(temporal.A3mWorkerTaskQueue)
pkgID := uint(1)
watcherName := "watcher"
retentionPeriod := 1 * time.Second
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func main() {
AutoApproveAIP: autoApproveAIP,
DefaultPermanentLocationID: &defaultPermanentLocationID,
TaskQueue: cfg.Temporal.TaskQueue,
A3mTaskQueue: cfg.A3m.TaskQueue,
A3mTaskQueue: cfg.Preservation.TaskQueue,
}
if err := package_.InitProcessingWorkflow(ctx, temporalClient, &req); err != nil {
logger.Error(err, "Error initializing processing workflow.")
Expand Down Expand Up @@ -299,7 +299,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.UseArchivematica).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})

Expand Down

0 comments on commit 13d8e33

Please sign in to comment.