From c9ee533015cc7c5042a39c2f93b2585753a0841a Mon Sep 17 00:00:00 2001 From: David Juhasz Date: Tue, 14 Nov 2023 16:45:28 -0800 Subject: [PATCH] Upload transfer to Archivematica - Create an enduro-am-worker and add an "am" Temporal Taskqueue - Branch the processing workflow logic for AM processing - Add a zip package - Add a ZipActivity to zip the bundled transfer before transmission - Add an UploadTransferActivity to upload the zipped transfer to the AM Storage Service transfer source directory - Add SFTP configuration for the upload - Add an SSH username and a logger to the SFTP package - Add the enduro-am-worker build to the Dockerfile - Add an enduro-am-worker to the Tilt dev deployment - Add a Tilt environment variable to select a kube config on startup - Add the enduro-am-worker container and configuration to the kube manifests - Add a "dev-am" kube overlay Co-authored-by: Diogenesoftoronto --- .gitignore | 1 + Dockerfile | 32 ++- Makefile | 2 +- Tiltfile | 20 +- cmd/enduro-am-worker/main.go | 224 ++++++++++++++++++ enduro.toml | 11 + hack/build_docker.sh | 7 +- hack/kube/base/enduro-am.yaml | 92 +++++++ hack/kube/base/kustomization.yaml | 1 + .../dev/dex-secret.yaml | 0 .../kube/components/dev/enduro-am-secret.yaml | 12 + .../dev/enduro-secret.yaml | 0 hack/kube/components/dev/kustomization.yaml | 11 + .../{overlays => components}/dev/ldap.yaml | 0 .../dev/minio-secret.yaml | 0 .../dev/mysql-secret.yaml | 0 .../dev/temporal-ui-secret.yaml | 0 .../dev-am/enduro-internal-patch.yaml | 12 + hack/kube/overlays/dev-am/enduro-patch.yaml | 12 + hack/kube/overlays/dev-am/kustomization.yaml | 28 +++ hack/kube/overlays/dev/kustomization.yaml | 20 +- internal/am/config.go | 8 + internal/am/upload_transfer.go | 55 +++++ internal/am/upload_transfer_test.go | 114 +++++++++ internal/config/config.go | 9 +- internal/config/config_test.go | 1 + internal/sftp/client.go | 2 +- internal/sftp/fake/mock_sftp.go | 54 +++-- internal/sftp/goclient.go | 27 ++- internal/sftp/goclient_test.go | 34 ++- internal/sftp/ssh.go | 5 +- internal/temporal/temporal.go | 3 + internal/workflow/activities/bundle.go | 2 +- internal/workflow/activities/zip.go | 90 +++++++ internal/workflow/activities/zip_test.go | 52 ++++ internal/workflow/processing.go | 93 +++++++- internal/workflow/processing_test.go | 95 +++++++- main.go | 4 +- 38 files changed, 1042 insertions(+), 91 deletions(-) create mode 100644 cmd/enduro-am-worker/main.go create mode 100644 hack/kube/base/enduro-am.yaml rename hack/kube/{overlays => components}/dev/dex-secret.yaml (100%) create mode 100644 hack/kube/components/dev/enduro-am-secret.yaml rename hack/kube/{overlays => components}/dev/enduro-secret.yaml (100%) create mode 100644 hack/kube/components/dev/kustomization.yaml rename hack/kube/{overlays => components}/dev/ldap.yaml (100%) rename hack/kube/{overlays => components}/dev/minio-secret.yaml (100%) rename hack/kube/{overlays => components}/dev/mysql-secret.yaml (100%) rename hack/kube/{overlays => components}/dev/temporal-ui-secret.yaml (100%) create mode 100644 hack/kube/overlays/dev-am/enduro-internal-patch.yaml create mode 100644 hack/kube/overlays/dev-am/enduro-patch.yaml create mode 100644 hack/kube/overlays/dev-am/kustomization.yaml create mode 100644 internal/am/config.go create mode 100644 internal/am/upload_transfer.go create mode 100644 internal/am/upload_transfer_test.go create mode 100644 internal/workflow/activities/zip.go create mode 100644 internal/workflow/activities/zip_test.go diff --git a/.gitignore b/.gitignore index ac4102f66..c0903462f 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /covreport /dist /.tilt.env +/*.secret diff --git a/Dockerfile b/Dockerfile index 79c918441..f856ac331 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,9 +18,9 @@ ARG VERSION_GIT_HASH RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build \ - -trimpath \ - -ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \ - -o /out/enduro . + -trimpath \ + -ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \ + -o /out/enduro . FROM build-go AS build-enduro-a3m-worker ARG VERSION_PATH @@ -30,10 +30,23 @@ ARG VERSION_GIT_HASH RUN --mount=type=cache,target=/go/pkg/mod \ --mount=type=cache,target=/root/.cache/go-build \ go build \ - -trimpath \ - -ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \ - -o /out/enduro-a3m-worker \ - ./cmd/enduro-a3m-worker + -trimpath \ + -ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \ + -o /out/enduro-a3m-worker \ + ./cmd/enduro-a3m-worker + +FROM build-go AS build-enduro-am-worker +ARG VERSION_PATH +ARG VERSION_LONG +ARG VERSION_SHORT +ARG VERSION_GIT_HASH +RUN --mount=type=cache,target=/go/pkg/mod \ + --mount=type=cache,target=/root/.cache/go-build \ + go build \ + -trimpath \ + -ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \ + -o /out/enduro-am-worker \ + ./cmd/enduro-am-worker FROM alpine:3.18.2 AS base ARG USER_ID=1000 @@ -52,4 +65,9 @@ COPY --from=build-enduro-a3m-worker --link /out/enduro-a3m-worker /home/enduro/b COPY --from=build-enduro-a3m-worker --link /src/enduro.toml /home/enduro/.config/enduro.toml CMD ["/home/enduro/bin/enduro-a3m-worker", "--config", "/home/enduro/.config/enduro.toml"] +FROM base AS enduro-am-worker +COPY --from=build-enduro-am-worker --link /out/enduro-am-worker /home/enduro/bin/enduro-am-worker +COPY --from=build-enduro-am-worker --link /src/enduro.toml /home/enduro/.config/enduro.toml +CMD ["/home/enduro/bin/enduro-am-worker", "--config", "/home/enduro/.config/enduro.toml"] + FROM ${TARGET} diff --git a/Makefile b/Makefile index 462211f9c..137c0400a 100644 --- a/Makefile +++ b/Makefile @@ -111,7 +111,7 @@ gen-mock: $(MOCKGEN) mockgen -typed -destination=./internal/api/auth/fake/mock_ticket_store.go -package=fake github.com/artefactual-sdps/enduro/internal/api/auth TicketStore mockgen -typed -destination=./internal/package_/fake/mock_package_.go -package=fake github.com/artefactual-sdps/enduro/internal/package_ Service mockgen -typed -destination=./internal/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/persistence Service - mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Service + mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client mockgen -typed -destination=./internal/storage/fake/mock_storage.go -package=fake github.com/artefactual-sdps/enduro/internal/storage Service mockgen -typed -destination=./internal/storage/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/storage/persistence Storage mockgen -typed -destination=./internal/upload/fake/mock_upload.go -package=fake github.com/artefactual-sdps/enduro/internal/upload Service diff --git a/Tiltfile b/Tiltfile index 82ff153e6..699d7b68c 100644 --- a/Tiltfile +++ b/Tiltfile @@ -14,6 +14,11 @@ custom_build( command=["hack/build_docker.sh", "enduro-a3m-worker"], deps=["."], ) +custom_build( + ref="enduro-am-worker:dev", + command=["hack/build_docker.sh", "enduro-am-worker"], + deps=["."], +) docker_build( "enduro-dashboard:dev", context="dashboard", @@ -34,13 +39,18 @@ docker_build( ] ) -# All Kubernetes resources -k8s_yaml(kustomize("hack/kube/overlays/dev")) - -# Configure trigger mode +# Load tilt env file if it exists dotenv_path = ".tilt.env" if os.path.exists(dotenv_path): dotenv(fn=dotenv_path) + +# Set kube config directory +kube_config = os.environ.get('ENDURO_KUBE_CONFIG', 'hack/kube/overlays/dev') + +# All Kubernetes resources +k8s_yaml(kustomize(kube_config)) + +# Configure trigger mode trigger_mode = TRIGGER_MODE_MANUAL if os.environ.get('TRIGGER_MODE_AUTO', ''): trigger_mode = TRIGGER_MODE_AUTO @@ -48,6 +58,7 @@ if os.environ.get('TRIGGER_MODE_AUTO', ''): # Enduro resources k8s_resource("enduro", labels=["Enduro"], trigger_mode=trigger_mode) k8s_resource("enduro-a3m", labels=["Enduro"], trigger_mode=trigger_mode) +k8s_resource("enduro-am", labels=["Enduro"], trigger_mode=trigger_mode) k8s_resource("enduro-internal", port_forwards="9000", labels=["Enduro"], trigger_mode=trigger_mode) k8s_resource("enduro-dashboard", port_forwards="8080:80", labels=["Enduro"], trigger_mode=trigger_mode) @@ -106,6 +117,7 @@ cmd_button( kubectl rollout restart deployment temporal; \ kubectl rollout restart deployment enduro; \ kubectl rollout restart statefulset enduro-a3m; \ + kubectl rollout restart statefulset enduro-am; \ kubectl rollout restart deployment dex; \ kubectl create -f hack/kube/base/mysql-create-locations-job.yaml;", ], diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go new file mode 100644 index 000000000..26681e307 --- /dev/null +++ b/cmd/enduro-am-worker/main.go @@ -0,0 +1,224 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/http/pprof" + "os" + "os/signal" + "syscall" + "time" + + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/pflag" + "go.artefactual.dev/tools/log" + temporalsdk_activity "go.temporal.io/sdk/activity" + temporalsdk_client "go.temporal.io/sdk/client" + temporalsdk_worker "go.temporal.io/sdk/worker" + + "github.com/artefactual-sdps/enduro/internal/am" + "github.com/artefactual-sdps/enduro/internal/config" + "github.com/artefactual-sdps/enduro/internal/db" + "github.com/artefactual-sdps/enduro/internal/sftp" + "github.com/artefactual-sdps/enduro/internal/temporal" + "github.com/artefactual-sdps/enduro/internal/version" + "github.com/artefactual-sdps/enduro/internal/watcher" + "github.com/artefactual-sdps/enduro/internal/workflow/activities" +) + +const ( + appName = "enduro-am-worker" +) + +func main() { + p := pflag.NewFlagSet(appName, pflag.ExitOnError) + + p.String("config", "", "Configuration file") + p.Bool("version", false, "Show version information") + _ = p.Parse(os.Args[1:]) + + if v, _ := p.GetBool("version"); v { + fmt.Println(version.Info(appName)) + os.Exit(0) + } + + var cfg config.Configuration + configFile, _ := p.GetString("config") + configFileFound, configFileUsed, err := config.Read(&cfg, configFile) + if err != nil { + fmt.Printf("Failed to read configuration: %v\n", err) + os.Exit(1) + } + + logger := log.New(os.Stderr, + log.WithName(appName), + log.WithDebug(cfg.Debug), + log.WithLevel(cfg.Verbosity), + ) + defer log.Sync(logger) + + logger.Info("Starting...", "version", version.Long, "pid", os.Getpid()) + + if configFileFound { + logger.Info("Configuration file loaded.", "path", configFileUsed) + } else { + logger.Info("Configuration file not found.") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + enduroDatabase, err := db.Connect(cfg.Database.Driver, cfg.Database.DSN) + if err != nil { + logger.Error(err, "Enduro database configuration failed.") + os.Exit(1) + } + _ = enduroDatabase.Ping() + + temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{ + Namespace: cfg.Temporal.Namespace, + HostPort: cfg.Temporal.Address, + Logger: temporal.Logger(logger.WithName("temporal-client")), + }) + if err != nil { + logger.Error(err, "Error creating Temporal client.") + os.Exit(1) + } + + // Set up the watcher service. + var wsvc watcher.Service + { + wsvc, err = watcher.New(ctx, logger.WithName("watcher"), &cfg.Watcher) + if err != nil { + logger.Error(err, "Error setting up watchers.") + os.Exit(1) + } + } + + var g run.Group + + // Activity worker. + { + done := make(chan struct{}) + workerOpts := temporalsdk_worker.Options{ + DisableWorkflowWorker: true, + EnableSessionWorker: true, + MaxConcurrentSessionExecutionSize: 1000, + MaxConcurrentActivityExecutionSize: 1, + } + w := temporalsdk_worker.New(temporalClient, temporal.AmWorkerTaskQueue, workerOpts) + if err != nil { + logger.Error(err, "Error creating Temporal worker.") + os.Exit(1) + } + + w.RegisterActivityWithOptions( + activities.NewDownloadActivity(wsvc).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}, + ) + w.RegisterActivityWithOptions( + activities.NewBundleActivity(wsvc).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}, + ) + w.RegisterActivityWithOptions( + activities.NewZipActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName}, + ) + w.RegisterActivityWithOptions( + am.NewUploadTransferActivity(logger, sftp.NewGoClient(logger, cfg.AM.SFTP)).Execute, + temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName}, + ) + w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName}) + + g.Add( + func() error { + if err := w.Start(); err != nil { + return err + } + <-done + return nil + }, + func(err error) { + w.Stop() + close(done) + }, + ) + } + + // Observability server. + { + srv := &http.Server{ + Addr: cfg.DebugListen, + ReadTimeout: time.Second * 1, + WriteTimeout: time.Second * 1, + IdleTimeout: time.Second * 30, + } + + g.Add(func() error { + mux := http.NewServeMux() + + // Health check. + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "OK") + }) + + // Prometheus metrics. + mux.Handle("/metrics", promhttp.Handler()) + + // Profiling data. + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + mux.Handle("/debug/pprof/block", pprof.Handler("block")) + mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) + mux.Handle("/debug/pprof/heap", pprof.Handler("heap")) + mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + + srv.Handler = mux + + return srv.ListenAndServe() + }, func(error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + _ = srv.Shutdown(ctx) + }) + } + + // Signal handler. + { + var ( + cancelInterrupt = make(chan struct{}) + ch = make(chan os.Signal, 2) + ) + defer close(ch) + + g.Add( + func() error { + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) + + select { + case <-ch: + case <-cancelInterrupt: + } + + return nil + }, func(err error) { + logger.Info("Quitting...") + close(cancelInterrupt) + cancel() + signal.Stop(ch) + }, + ) + } + + err = g.Run() + if err != nil { + logger.Error(err, "Application failure.") + os.Exit(1) + } + logger.Info("Bye!") +} diff --git a/enduro.toml b/enduro.toml index 2ffda0329..cb8398414 100644 --- a/enduro.toml +++ b/enduro.toml @@ -3,6 +3,7 @@ debug = true debugListen = "127.0.0.1:9001" verbosity = 2 +useArchivematica = false [temporal] namespace = "default" @@ -84,6 +85,16 @@ PerformPolicyChecksOnPreservationDerivatives = true AipCompressionLevel = 1 AipCompressionAlgorithm = 6 +[am.sftp] +host = "" # The Archivematica Storage Service hostname. +user = "" +knownHostsFile = "" +remoteDir = "/transfer_source" + +[am.sftp.privateKey] +path = "" +passphrase = "" # Secret: set (if required) with env var ENDURO_AM_SFTP_PRIVATEKEY_PASSPHRASE. + [upload] endpoint = "http://minio.enduro-sdps:9000" pathStyle = true diff --git a/hack/build_docker.sh b/hack/build_docker.sh index 56c5e45f5..449adb283 100755 --- a/hack/build_docker.sh +++ b/hack/build_docker.sh @@ -19,13 +19,18 @@ case "$TARGET" in TARGET="enduro-a3m-worker" FOLDER="." ;; + "enduro-am-worker") + IMAGE_NAME="enduro-am-worker" + TARGET="enduro-am-worker" + FOLDER="." + ;; "enduro-dashboard") IMAGE_NAME="enduro-dashboard" TARGET="enduro-dashboard" FOLDER="dashboard" ;; *) - echo "Accepted values: enduro, enduro-a3m-worker, enduro-dashboard." + echo "Accepted values: enduro, enduro-a3m-worker, enduro-am-worker, enduro-dashboard." exit 1 ;; esac diff --git a/hack/kube/base/enduro-am.yaml b/hack/kube/base/enduro-am.yaml new file mode 100644 index 000000000..4d690a281 --- /dev/null +++ b/hack/kube/base/enduro-am.yaml @@ -0,0 +1,92 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: enduro-am + labels: + app: enduro-am +spec: + serviceName: enduro-am + selector: + matchLabels: + app: enduro-am + template: + metadata: + labels: + app: enduro-am + spec: + serviceAccountName: sdps + securityContext: + fsGroup: 1000 + initContainers: + - name: check-temporal + image: busybox + imagePullPolicy: IfNotPresent + command: + [ + "sh", + "-c", + "until echo STATUS | nc -w 2 temporal.enduro-sdps 7233; do echo waiting for temporal to start; sleep 1; done;", + ] + containers: + - name: enduro-am-worker + image: ghcr.io/artefactual-sdps/enduro-am-worker:main + env: + - name: MYSQL_USER + valueFrom: + secretKeyRef: + name: mysql-secret + key: user + - name: MYSQL_PASSWORD + valueFrom: + secretKeyRef: + name: mysql-secret + key: password + - name: MINIO_USER + valueFrom: + secretKeyRef: + name: minio-secret + key: user + - name: MINIO_PASSWORD + valueFrom: + secretKeyRef: + name: minio-secret + key: password + - name: ENDURO_DATABASE_DSN + value: $(MYSQL_USER):$(MYSQL_PASSWORD)@tcp(mysql.enduro-sdps:3306)/enduro + - name: ENDURO_STORAGE_DATABASE_DSN + value: $(MYSQL_USER):$(MYSQL_PASSWORD)@tcp(mysql.enduro-sdps:3306)/enduro_storage + - name: ENDURO_WATCHER_EMBEDDED_KEY + value: $(MINIO_USER) + - name: ENDURO_WATCHER_EMBEDDED_SECRET + value: $(MINIO_PASSWORD) + - name: ENDURO_USEARCHIVEMATICA + value: "true" + - name: ENDURO_AM_SFTP_HOST + valueFrom: + secretKeyRef: + name: enduro-am-secret + key: sftp_host + - name: ENDURO_AM_SFTP_USER + valueFrom: + secretKeyRef: + name: enduro-am-secret + key: sftp_user + - name: ENDURO_AM_SFTP_KNOWNHOSTSFILE + value: "/etc/ssh/known_hosts" + - name: ENDURO_AM_SFTP_PRIVATEKEY_PATH + value: "/etc/ssh/id_ed25519" + volumeMounts: + - name: ssh-volume + mountPath: "/etc/ssh" + readOnly: true + volumes: + - name: ssh-volume + secret: + secretName: enduro-am-secret + items: + - key: id_ed25519 + defaultMode: 0600 + path: id_ed25519 + - key: known_hosts + defaultMode: 0644 + path: known_hosts diff --git a/hack/kube/base/kustomization.yaml b/hack/kube/base/kustomization.yaml index a2388c827..dc54326bb 100644 --- a/hack/kube/base/kustomization.yaml +++ b/hack/kube/base/kustomization.yaml @@ -4,6 +4,7 @@ namespace: enduro-sdps resources: - dex.yaml - enduro-a3m.yaml + - enduro-am.yaml - enduro-dashboard.yaml - enduro-internal.yaml - enduro.yaml diff --git a/hack/kube/overlays/dev/dex-secret.yaml b/hack/kube/components/dev/dex-secret.yaml similarity index 100% rename from hack/kube/overlays/dev/dex-secret.yaml rename to hack/kube/components/dev/dex-secret.yaml diff --git a/hack/kube/components/dev/enduro-am-secret.yaml b/hack/kube/components/dev/enduro-am-secret.yaml new file mode 100644 index 000000000..306dd2be5 --- /dev/null +++ b/hack/kube/components/dev/enduro-am-secret.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Secret +metadata: + name: enduro-am-secret +type: Opaque +stringData: + api_user: "artefactual" + api_key: "" + sftp_host: "localhost" + sftp_user: "artefactual" + known_hosts: "" + id_ed25519: "" diff --git a/hack/kube/overlays/dev/enduro-secret.yaml b/hack/kube/components/dev/enduro-secret.yaml similarity index 100% rename from hack/kube/overlays/dev/enduro-secret.yaml rename to hack/kube/components/dev/enduro-secret.yaml diff --git a/hack/kube/components/dev/kustomization.yaml b/hack/kube/components/dev/kustomization.yaml new file mode 100644 index 000000000..8654e8205 --- /dev/null +++ b/hack/kube/components/dev/kustomization.yaml @@ -0,0 +1,11 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +namespace: enduro-sdps +resources: + - dex-secret.yaml + - enduro-am-secret.yaml + - enduro-secret.yaml + - ldap.yaml + - minio-secret.yaml + - mysql-secret.yaml + - temporal-ui-secret.yaml diff --git a/hack/kube/overlays/dev/ldap.yaml b/hack/kube/components/dev/ldap.yaml similarity index 100% rename from hack/kube/overlays/dev/ldap.yaml rename to hack/kube/components/dev/ldap.yaml diff --git a/hack/kube/overlays/dev/minio-secret.yaml b/hack/kube/components/dev/minio-secret.yaml similarity index 100% rename from hack/kube/overlays/dev/minio-secret.yaml rename to hack/kube/components/dev/minio-secret.yaml diff --git a/hack/kube/overlays/dev/mysql-secret.yaml b/hack/kube/components/dev/mysql-secret.yaml similarity index 100% rename from hack/kube/overlays/dev/mysql-secret.yaml rename to hack/kube/components/dev/mysql-secret.yaml diff --git a/hack/kube/overlays/dev/temporal-ui-secret.yaml b/hack/kube/components/dev/temporal-ui-secret.yaml similarity index 100% rename from hack/kube/overlays/dev/temporal-ui-secret.yaml rename to hack/kube/components/dev/temporal-ui-secret.yaml diff --git a/hack/kube/overlays/dev-am/enduro-internal-patch.yaml b/hack/kube/overlays/dev-am/enduro-internal-patch.yaml new file mode 100644 index 000000000..50f99f354 --- /dev/null +++ b/hack/kube/overlays/dev-am/enduro-internal-patch.yaml @@ -0,0 +1,12 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: enduro-internal +spec: + template: + spec: + containers: + - name: enduro-internal + env: + - name: ENDURO_USEARCHIVEMATICA + value: "true" diff --git a/hack/kube/overlays/dev-am/enduro-patch.yaml b/hack/kube/overlays/dev-am/enduro-patch.yaml new file mode 100644 index 000000000..bcf7605bc --- /dev/null +++ b/hack/kube/overlays/dev-am/enduro-patch.yaml @@ -0,0 +1,12 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: enduro +spec: + template: + spec: + containers: + - name: enduro + env: + - name: ENDURO_USEARCHIVEMATICA + value: "true" diff --git a/hack/kube/overlays/dev-am/kustomization.yaml b/hack/kube/overlays/dev-am/kustomization.yaml new file mode 100644 index 000000000..0e3a19b3b --- /dev/null +++ b/hack/kube/overlays/dev-am/kustomization.yaml @@ -0,0 +1,28 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +namespace: enduro-sdps +resources: + - ../../base + - ../../components/dev +images: + - name: ghcr.io/artefactual-sdps/enduro:main + newName: enduro + newTag: dev + - name: ghcr.io/artefactual-sdps/enduro-a3m-worker:main + newName: enduro-a3m-worker + newTag: dev + - name: ghcr.io/artefactual-sdps/enduro-am-worker:main + newName: enduro-am-worker + newTag: dev + - name: ghcr.io/artefactual-sdps/enduro-dashboard:main + newName: enduro-dashboard + newTag: dev +patches: + - target: + kind: Deployment + name: enduro-internal + path: enduro-internal-patch.yaml + - target: + kind: Deployment + name: enduro + path: enduro-patch.yaml diff --git a/hack/kube/overlays/dev/kustomization.yaml b/hack/kube/overlays/dev/kustomization.yaml index 0841054a3..14854442f 100644 --- a/hack/kube/overlays/dev/kustomization.yaml +++ b/hack/kube/overlays/dev/kustomization.yaml @@ -1,15 +1,9 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization namespace: enduro-sdps -bases: - - ../../base resources: - - dex-secret.yaml - - enduro-secret.yaml - - ldap.yaml - - minio-secret.yaml - - mysql-secret.yaml - - temporal-ui-secret.yaml + - ../../base + - ../../components/dev images: - name: ghcr.io/artefactual-sdps/enduro:main newName: enduro @@ -17,8 +11,14 @@ images: - name: ghcr.io/artefactual-sdps/enduro-a3m-worker:main newName: enduro-a3m-worker newTag: dev + - name: ghcr.io/artefactual-sdps/enduro-am-worker:main + newName: enduro-am-worker + newTag: dev - name: ghcr.io/artefactual-sdps/enduro-dashboard:main newName: enduro-dashboard newTag: dev -patchesStrategicMerge: - - enduro-a3m-patch.yaml +patches: + - target: + kind: StatefulSet + name: enduro-a3m + path: enduro-a3m-patch.yaml diff --git a/internal/am/config.go b/internal/am/config.go new file mode 100644 index 000000000..216fd4ff7 --- /dev/null +++ b/internal/am/config.go @@ -0,0 +1,8 @@ +package am + +import "github.com/artefactual-sdps/enduro/internal/sftp" + +type Config struct { + // SFTP configuration for uploading transfers to Archivematica. + SFTP sftp.Config +} diff --git a/internal/am/upload_transfer.go b/internal/am/upload_transfer.go new file mode 100644 index 000000000..ee4401b6b --- /dev/null +++ b/internal/am/upload_transfer.go @@ -0,0 +1,55 @@ +package am + +import ( + "context" + "fmt" + "os" + + "github.com/go-logr/logr" + + "github.com/artefactual-sdps/enduro/internal/sftp" +) + +const UploadTransferActivityName = "UploadTransferActivity" + +type UploadTransferActivityParams struct { + SourcePath string + Filename string +} + +type UploadTransferActivityResult struct { + BytesCopied int64 + RemotePath string +} + +type UploadTransferActivity struct { + client sftp.Client + logger logr.Logger +} + +func NewUploadTransferActivity(logger logr.Logger, client sftp.Client) *UploadTransferActivity { + return &UploadTransferActivity{client: client, logger: logger} +} + +func (a *UploadTransferActivity) Execute(ctx context.Context, params *UploadTransferActivityParams) (*UploadTransferActivityResult, error) { + a.logger.V(1).Info("Execute UploadTransferActivity", + "SourcePath", params.SourcePath, + "Filename", params.Filename, + ) + + src, err := os.Open(params.SourcePath) + if err != nil { + return nil, fmt.Errorf("upload transfer: %v", err) + } + defer src.Close() + + bytes, path, err := a.client.Upload(ctx, src, params.Filename) + if err != nil { + return nil, fmt.Errorf("upload transfer: %v", err) + } + + return &UploadTransferActivityResult{ + BytesCopied: bytes, + RemotePath: path, + }, nil +} diff --git a/internal/am/upload_transfer_test.go b/internal/am/upload_transfer_test.go new file mode 100644 index 000000000..fac58eb71 --- /dev/null +++ b/internal/am/upload_transfer_test.go @@ -0,0 +1,114 @@ +package am_test + +import ( + "errors" + "fmt" + "os" + "testing" + + "github.com/go-logr/logr" + "go.artefactual.dev/tools/mockutil" + temporalsdk_activity "go.temporal.io/sdk/activity" + temporalsdk_testsuite "go.temporal.io/sdk/testsuite" + "go.uber.org/mock/gomock" + "gotest.tools/v3/assert" + tfs "gotest.tools/v3/fs" + + "github.com/artefactual-sdps/enduro/internal/am" + sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" +) + +func TestUploadTransferActivity(t *testing.T) { + filename := "fake_bag" + td := tfs.NewDir(t, "enduro-upload-transfer-test", + tfs.WithFile(filename, "Testing 1-2-3!"), + ) + + type test struct { + name string + params am.UploadTransferActivityParams + want am.UploadTransferActivityResult + recorder func(*sftp_fake.MockClientMockRecorder) + errMsg string + } + for _, tt := range []test{ + { + name: "Uploads transfer", + params: am.UploadTransferActivityParams{ + SourcePath: td.Join(filename), + Filename: filename, + }, + recorder: func(m *sftp_fake.MockClientMockRecorder) { + var t *os.File + m.Upload( + mockutil.Context(), + gomock.AssignableToTypeOf(t), + filename, + ).Return(int64(14), "/transfer_dir/"+filename, nil) + }, + want: am.UploadTransferActivityResult{ + BytesCopied: int64(14), + RemotePath: "/transfer_dir/" + filename, + }, + }, + { + name: "Errors when local file can't be read", + params: am.UploadTransferActivityParams{ + SourcePath: td.Join("missing"), + Filename: filename, + }, + errMsg: fmt.Sprintf("upload transfer: open %s: no such file or directory", td.Join("missing")), + }, + { + name: "Errors when upload fails", + params: am.UploadTransferActivityParams{ + SourcePath: td.Join(filename), + Filename: filename, + }, + recorder: func(m *sftp_fake.MockClientMockRecorder) { + var t *os.File + m.Upload( + mockutil.Context(), + gomock.AssignableToTypeOf(t), + filename, + ).Return( + 0, + "", + errors.New("SSH: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused"), + ) + }, + errMsg: "upload transfer: SSH: failed to connect: dial tcp 127.0.0.1:2200: connect: connection refused", + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ts := &temporalsdk_testsuite.WorkflowTestSuite{} + env := ts.NewTestActivityEnvironment() + msvc := sftp_fake.NewMockClient(gomock.NewController(t)) + + if tt.recorder != nil { + tt.recorder(msvc.EXPECT()) + } + + env.RegisterActivityWithOptions( + am.NewUploadTransferActivity(logr.Discard(), msvc).Execute, + temporalsdk_activity.RegisterOptions{ + Name: am.UploadTransferActivityName, + }, + ) + + fut, err := env.ExecuteActivity(am.UploadTransferActivityName, tt.params) + if tt.errMsg != "" { + assert.ErrorContains(t, err, tt.errMsg) + return + } + + var res am.UploadTransferActivityResult + err = fut.Get(&res) + assert.NilError(t, err) + assert.DeepEqual(t, res, tt.want) + }) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 5557bae10..a337d6333 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/viper" "github.com/artefactual-sdps/enduro/internal/a3m" + "github.com/artefactual-sdps/enduro/internal/am" "github.com/artefactual-sdps/enduro/internal/api" "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" @@ -22,11 +23,13 @@ type ConfigurationValidator interface { } type Configuration struct { - Verbosity int - Debug bool - DebugListen string + Verbosity int + Debug bool + DebugListen string + UseArchivematica bool A3m a3m.Config + AM am.Config API api.Config Database db.Config Event event.Config diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 9608925a1..c488e676a 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -50,6 +50,7 @@ func TestConfig(t *testing.T) { // Zero value defaults. assert.Equal(t, c.Verbosity, 0) assert.Equal(t, c.Debug, false) + assert.Equal(t, c.UseArchivematica, false) assert.Equal(t, c.Database.DSN, "") // Valued defaults. diff --git a/internal/sftp/client.go b/internal/sftp/client.go index 5050eec75..f0c993ca9 100644 --- a/internal/sftp/client.go +++ b/internal/sftp/client.go @@ -13,5 +13,5 @@ import ( type Client interface { // Upload transfers data from the provided source reader to a specified // destination on the SFTP server. - Upload(ctx context.Context, src io.Reader, dest string) (bytes int64, err error) + Upload(ctx context.Context, src io.Reader, dest string) (bytes int64, remotePath string, err error) } diff --git a/internal/sftp/fake/mock_sftp.go b/internal/sftp/fake/mock_sftp.go index 82b2dfb80..59e0b0054 100644 --- a/internal/sftp/fake/mock_sftp.go +++ b/internal/sftp/fake/mock_sftp.go @@ -1,78 +1,80 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/artefactual-sdps/enduro/internal/sftp (interfaces: Service) +// Source: github.com/artefactual-sdps/enduro/internal/sftp (interfaces: Client) // // Generated by this command: // -// mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Service +// mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client // // Package fake is a generated GoMock package. package fake import ( + context "context" io "io" reflect "reflect" gomock "go.uber.org/mock/gomock" ) -// MockService is a mock of Service interface. -type MockService struct { +// MockClient is a mock of Client interface. +type MockClient struct { ctrl *gomock.Controller - recorder *MockServiceMockRecorder + recorder *MockClientMockRecorder } -// MockServiceMockRecorder is the mock recorder for MockService. -type MockServiceMockRecorder struct { - mock *MockService +// MockClientMockRecorder is the mock recorder for MockClient. +type MockClientMockRecorder struct { + mock *MockClient } -// NewMockService creates a new mock instance. -func NewMockService(ctrl *gomock.Controller) *MockService { - mock := &MockService{ctrl: ctrl} - mock.recorder = &MockServiceMockRecorder{mock} +// NewMockClient creates a new mock instance. +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockService) EXPECT() *MockServiceMockRecorder { +func (m *MockClient) EXPECT() *MockClientMockRecorder { return m.recorder } // Upload mocks base method. -func (m *MockService) Upload(arg0 io.Reader, arg1 string) (int64, error) { +func (m *MockClient) Upload(arg0 context.Context, arg1 io.Reader, arg2 string) (int64, string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Upload", arg0, arg1) + ret := m.ctrl.Call(m, "Upload", arg0, arg1, arg2) ret0, _ := ret[0].(int64) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(string) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // Upload indicates an expected call of Upload. -func (mr *MockServiceMockRecorder) Upload(arg0, arg1 any) *ServiceUploadCall { +func (mr *MockClientMockRecorder) Upload(arg0, arg1, arg2 any) *ClientUploadCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockService)(nil).Upload), arg0, arg1) - return &ServiceUploadCall{Call: call} + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Upload", reflect.TypeOf((*MockClient)(nil).Upload), arg0, arg1, arg2) + return &ClientUploadCall{Call: call} } -// ServiceUploadCall wrap *gomock.Call -type ServiceUploadCall struct { +// ClientUploadCall wrap *gomock.Call +type ClientUploadCall struct { *gomock.Call } // Return rewrite *gomock.Call.Return -func (c *ServiceUploadCall) Return(arg0 int64, arg1 error) *ServiceUploadCall { - c.Call = c.Call.Return(arg0, arg1) +func (c *ClientUploadCall) Return(arg0 int64, arg1 string, arg2 error) *ClientUploadCall { + c.Call = c.Call.Return(arg0, arg1, arg2) return c } // Do rewrite *gomock.Call.Do -func (c *ServiceUploadCall) Do(f func(io.Reader, string) (int64, error)) *ServiceUploadCall { +func (c *ClientUploadCall) Do(f func(context.Context, io.Reader, string) (int64, string, error)) *ClientUploadCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *ServiceUploadCall) DoAndReturn(f func(io.Reader, string) (int64, error)) *ServiceUploadCall { +func (c *ClientUploadCall) DoAndReturn(f func(context.Context, io.Reader, string) (int64, string, error)) *ClientUploadCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/internal/sftp/goclient.go b/internal/sftp/goclient.go index 3c6772edd..101b1da4e 100644 --- a/internal/sftp/goclient.go +++ b/internal/sftp/goclient.go @@ -6,15 +6,18 @@ import ( "fmt" "io" "os" + "strings" "github.com/dolmen-go/contextio" + "github.com/go-logr/logr" "github.com/pkg/sftp" "golang.org/x/crypto/ssh" ) // GoClient implements the SFTP service using native Go SSH and SFTP packages. type GoClient struct { - cfg Config + cfg Config + logger logr.Logger ssh *ssh.Client sftp *sftp.Client @@ -23,10 +26,10 @@ type GoClient struct { var _ Client = (*GoClient)(nil) // NewGoClient returns a new GoSFTP client with the given configuration. -func NewGoClient(cfg Config) *GoClient { +func NewGoClient(logger logr.Logger, cfg Config) *GoClient { cfg.SetDefaults() - return &GoClient{cfg: cfg} + return &GoClient{cfg: cfg, logger: logger} } // Upload writes the data from src to the remote file at dest and returns the @@ -34,16 +37,20 @@ func NewGoClient(cfg Config) *GoClient { // closed when the upload is complete or cancelled. // // Upload is not thread safe. -func (c *GoClient) Upload(ctx context.Context, src io.Reader, dest string) (int64, error) { +func (c *GoClient) Upload(ctx context.Context, src io.Reader, dest string) (int64, string, error) { if err := c.dial(); err != nil { - return 0, err + return 0, "", err } defer c.close() + // SFTP assumes that "/" is used as the directory separator. See: + // https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02#section-6.2 + remotePath := strings.TrimSuffix(c.cfg.RemoteDir, "/") + "/" + dest + // Note: Some SFTP servers don't support O_RDWR mode. - w, err := c.sftp.OpenFile(dest, (os.O_WRONLY | os.O_CREATE | os.O_TRUNC)) + w, err := c.sftp.OpenFile(remotePath, (os.O_WRONLY | os.O_CREATE | os.O_TRUNC)) if err != nil { - return 0, fmt.Errorf("SFTP: open remote file %q: %v", dest, err) + return 0, "", fmt.Errorf("SFTP: open remote file %q: %v", dest, err) } defer w.Close() @@ -51,10 +58,10 @@ func (c *GoClient) Upload(ctx context.Context, src io.Reader, dest string) (int6 // received. bytes, err := io.Copy(contextio.NewWriter(ctx, w), contextio.NewReader(ctx, src)) if err != nil { - return 0, fmt.Errorf("SFTP: upload to %q: %v", dest, err) + return 0, "", fmt.Errorf("SFTP: upload to %q: %v", dest, err) } - return bytes, nil + return bytes, remotePath, nil } // Dial connects to an SSH host then creates an SFTP client on the connection. @@ -63,7 +70,7 @@ func (c *GoClient) Upload(ctx context.Context, src io.Reader, dest string) (int6 func (c *GoClient) dial() error { var err error - c.ssh, err = sshConnect(c.cfg) + c.ssh, err = sshConnect(c.logger, c.cfg) if err != nil { return fmt.Errorf("SSH: %v", err) } diff --git a/internal/sftp/goclient_test.go b/internal/sftp/goclient_test.go index 9872c89ab..ed3ec370f 100644 --- a/internal/sftp/goclient_test.go +++ b/internal/sftp/goclient_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/gliderlabs/ssh" + "github.com/go-logr/logr" gosftp "github.com/pkg/sftp" gossh "golang.org/x/crypto/ssh" "gotest.tools/v3/assert" @@ -159,6 +160,10 @@ func TestGoClient(t *testing.T) { defer listener.Close() badHost, badPort, _ := net.SplitHostPort(listener.Addr().String()) + type params struct { + src io.Reader + dest string + } type results struct { Bytes int64 Paths []tfs.PathOp @@ -167,6 +172,7 @@ func TestGoClient(t *testing.T) { type test struct { name string cfg sftp.Config + params params want results wantErr string } @@ -181,6 +187,10 @@ func TestGoClient(t *testing.T) { Path: "./testdata/clientkeys/test_ed25519", }, }, + params: params{ + src: strings.NewReader("Testing 1-2-3"), + dest: "test.txt", + }, want: results{ Bytes: 13, Paths: []tfs.PathOp{tfs.WithFile("test.txt", "Testing 1-2-3")}, @@ -197,6 +207,10 @@ func TestGoClient(t *testing.T) { Passphrase: "Backpack-Spirits6-Bronzing", }, }, + params: params{ + src: strings.NewReader("Testing 1-2-3"), + dest: "test.txt", + }, want: results{ Bytes: 13, Paths: []tfs.PathOp{tfs.WithFile("test.txt", "Testing 1-2-3")}, @@ -213,6 +227,10 @@ func TestGoClient(t *testing.T) { Passphrase: "wrong", }, }, + params: params{ + src: strings.NewReader("Testing 1-2-3"), + dest: "test.txt", + }, wantErr: "SSH: parse private key with passphrase: x509: decryption password incorrect", }, { @@ -225,6 +243,10 @@ func TestGoClient(t *testing.T) { Path: "./testdata/clientkeys/test_ed25519", }, }, + params: params{ + src: strings.NewReader("Testing 1-2-3"), + dest: "test.txt", + }, wantErr: fmt.Sprintf( "SSH: connect: dial tcp %s:%s: connect: connection refused", badHost, badPort, @@ -271,11 +293,12 @@ func TestGoClient(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - sftpc := sftp.NewGoClient(tc.cfg) - src := strings.NewReader("Testing 1-2-3") - dest := tfs.NewDir(t, "sftp_test") + // Use a unique RemoteDir for each test. + remoteDir := tfs.NewDir(t, "sftp_test_remote") + tc.cfg.RemoteDir = remoteDir.Path() - bytes, err := sftpc.Upload(context.Background(), src, dest.Join("test.txt")) + sftpc := sftp.NewGoClient(logr.Discard(), tc.cfg) + bytes, remotePath, err := sftpc.Upload(context.Background(), tc.params.src, tc.params.dest) if tc.wantErr != "" { assert.Error(t, err, tc.wantErr) @@ -284,7 +307,8 @@ func TestGoClient(t *testing.T) { assert.NilError(t, err) assert.Equal(t, bytes, tc.want.Bytes) - assert.Assert(t, tfs.Equal(dest.Path(), tfs.Expected(t, tc.want.Paths...))) + assert.Equal(t, remotePath, tc.cfg.RemoteDir+"/"+tc.params.dest) + assert.Assert(t, tfs.Equal(remoteDir.Path(), tfs.Expected(t, tc.want.Paths...))) }) } } diff --git a/internal/sftp/ssh.go b/internal/sftp/ssh.go index 6596bd62a..3e9e1695b 100644 --- a/internal/sftp/ssh.go +++ b/internal/sftp/ssh.go @@ -7,6 +7,7 @@ import ( "path/filepath" "time" + "github.com/go-logr/logr" "golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh/knownhosts" ) @@ -16,7 +17,7 @@ import ( // // Only private key authentication is currently supported, with or without a // passphrase. -func sshConnect(cfg Config) (*ssh.Client, error) { +func sshConnect(logger logr.Logger, cfg Config) (*ssh.Client, error) { // Load private key for authentication. keyBytes, err := os.ReadFile(filepath.Clean(cfg.PrivateKey.Path)) // #nosec G304 -- File data is validated below if err != nil { @@ -50,12 +51,14 @@ func sshConnect(cfg Config) (*ssh.Client, error) { }, HostKeyCallback: hostcallback, Timeout: 5 * time.Second, + User: cfg.User, } // Connect to the server. address := net.JoinHostPort(cfg.Host, cfg.Port) conn, err := ssh.Dial("tcp", address, sshConfig) if err != nil { + logger.V(2).Info("SSH dial failed", "address", address, "user", cfg.User) return nil, fmt.Errorf("connect: %v", err) } diff --git a/internal/temporal/temporal.go b/internal/temporal/temporal.go index 30200e1d2..7e928fe39 100644 --- a/internal/temporal/temporal.go +++ b/internal/temporal/temporal.go @@ -7,8 +7,11 @@ import ( ) const ( + // There are task queues used by our workflow and activity workers. It may + // be convenient to make these configurable in the future . GlobalTaskQueue = "global" A3mWorkerTaskQueue = "a3m" + AmWorkerTaskQueue = "am" ) func NonRetryableError(err error) error { diff --git a/internal/workflow/activities/bundle.go b/internal/workflow/activities/bundle.go index b4678fb48..d6369370a 100644 --- a/internal/workflow/activities/bundle.go +++ b/internal/workflow/activities/bundle.go @@ -49,7 +49,7 @@ func (a *BundleActivity) Execute(ctx context.Context, params *BundleActivityPara ) if params.TransferDir == "" { - params.TransferDir, err = os.MkdirTemp("", "*-enduro-a3m-worker") + params.TransferDir, err = os.MkdirTemp("", "*-enduro-transfer") if err != nil { return nil, err } diff --git a/internal/workflow/activities/zip.go b/internal/workflow/activities/zip.go new file mode 100644 index 000000000..f809c5194 --- /dev/null +++ b/internal/workflow/activities/zip.go @@ -0,0 +1,90 @@ +package activities + +import ( + "archive/zip" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/go-logr/logr" +) + +const ZipActivityName = "ZipActivity" + +type ZipActivityParams struct { + SourceDir string + DestPath string +} + +type ZipActivityResult struct { + Path string +} + +type zipActivity struct { + logger logr.Logger +} + +func NewZipActivity(logger logr.Logger) *zipActivity { + return &zipActivity{logger: logger} +} + +// Execute creates a Zip archive at params.DestPath from the contents of +// params.SourceDir. If params.DestPath is not specified then params.SourceDir +// + ".zip" will be used. +func (a *zipActivity) Execute(ctx context.Context, params *ZipActivityParams) (*ZipActivityResult, error) { + a.logger.V(1).Info("Executing ZipActivity", "sourceDir", params.SourceDir, "DestPath", params.DestPath) + + if params.SourceDir == "" { + return nil, errors.New("zip: missing source directory") + } + + var dest string + if params.DestPath == "" { + dest = params.SourceDir + ".zip" + a.logger.V(1).Info("ZipActivity dest changed", "dest", dest) + } + + w, err := os.Create(dest) // #nosec G304 -- trusted path + if err != nil { + return nil, fmt.Errorf("zip: couldn't create file: %v", err) + } + defer w.Close() + + z := zip.NewWriter(w) + defer z.Close() + + err = filepath.WalkDir(params.SourceDir, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + f, err := z.Create(path) + if err != nil { + return err + } + + r, err := os.Open(path) // #nosec G304 -- trusted path + if err != nil { + return err + } + defer r.Close() + + if _, err := io.Copy(f, r); err != nil { + return err + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("zip: %v", err) + } + + return &ZipActivityResult{Path: dest}, nil +} diff --git a/internal/workflow/activities/zip_test.go b/internal/workflow/activities/zip_test.go new file mode 100644 index 000000000..66f2b6440 --- /dev/null +++ b/internal/workflow/activities/zip_test.go @@ -0,0 +1,52 @@ +package activities_test + +import ( + "os" + "testing" + + "github.com/go-logr/logr" + temporalsdk_activity "go.temporal.io/sdk/activity" + temporalsdk_testsuite "go.temporal.io/sdk/testsuite" + "gotest.tools/v3/assert" + tfs "gotest.tools/v3/fs" + + "github.com/artefactual-sdps/enduro/internal/workflow/activities" +) + +func TestZipActivity(t *testing.T) { + t.Run("Zips a directory", func(t *testing.T) { + t.Parallel() + + transferName := "my_transfer" + contents := tfs.WithDir(transferName, + tfs.WithDir("subdir", + tfs.WithFile("abc.txt", "Testing A-B-C"), + ), + tfs.WithFile("123.txt", "Testing 1-2-3"), + ) + td := tfs.NewDir(t, "enduro-zip-test", contents) + + ts := &temporalsdk_testsuite.WorkflowTestSuite{} + env := ts.NewTestActivityEnvironment() + env.RegisterActivityWithOptions( + activities.NewZipActivity(logr.Discard()).Execute, + temporalsdk_activity.RegisterOptions{ + Name: activities.ZipActivityName, + }, + ) + + fut, err := env.ExecuteActivity(activities.ZipActivityName, + activities.ZipActivityParams{SourceDir: td.Join(transferName)}, + ) + assert.NilError(t, err) + + var res activities.ZipActivityResult + _ = fut.Get(&res) + assert.DeepEqual(t, res, activities.ZipActivityResult{Path: td.Join(transferName + ".zip")}) + + // Confirm that a zip file was created with some contents. + i, err := os.Lstat(td.Join(transferName + ".zip")) + assert.NilError(t, err) + assert.Assert(t, i.Size() > int64(0)) + }) +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index f905a9929..42677ec8b 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -17,23 +17,27 @@ import ( temporalsdk_workflow "go.temporal.io/sdk/workflow" "github.com/artefactual-sdps/enduro/internal/a3m" + "github.com/artefactual-sdps/enduro/internal/am" "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/watcher" "github.com/artefactual-sdps/enduro/internal/workflow/activities" ) type ProcessingWorkflow struct { - logger logr.Logger - pkgsvc package_.Service - wsvc watcher.Service + logger logr.Logger + pkgsvc package_.Service + wsvc watcher.Service + useArchivematica bool } -func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service) *ProcessingWorkflow { +func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, useAM bool) *ProcessingWorkflow { return &ProcessingWorkflow{ - logger: logger, - pkgsvc: pkgsvc, - wsvc: wsvc, + logger: logger, + pkgsvc: pkgsvc, + wsvc: wsvc, + useArchivematica: useAM, } } @@ -168,12 +172,19 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack // Activities running within a session. { var sessErr error + var taskQueue string maxAttempts := 5 + if w.useArchivematica { + taskQueue = temporal.AmWorkerTaskQueue + } else { + taskQueue = tinfo.A3mTaskQueue + } + for attempt := 1; attempt <= maxAttempts; attempt++ { activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, - TaskQueue: tinfo.A3mTaskQueue, + TaskQueue: taskQueue, }) sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{ CreationTimeout: forever, @@ -305,11 +316,18 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Bundle. { + // For the a3m workflow bundle the transfer to a shared directory also + // mounted by the a3m container. + var transferDir string + if !w.useArchivematica { + transferDir = "/home/a3m/.local/share/a3m/share" + } + if tinfo.Bundle == (activities.BundleActivityResult{}) { activityOpts := withActivityOptsForLongLivedRequest(sessCtx) err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.BundleActivityName, &activities.BundleActivityParams{ WatcherName: tinfo.req.WatcherName, - TransferDir: "/home/a3m/.local/share/a3m/share", + TransferDir: transferDir, Key: tinfo.req.Key, IsDir: tinfo.req.IsDir, TempFile: tinfo.TempFile, @@ -332,8 +350,14 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context } }() + // Do preservation activities. { - err := w.transferA3m(sessCtx, tinfo) + var err error + if w.useArchivematica { + err = w.transferAM(sessCtx, tinfo) + } else { + err = w.transferA3m(sessCtx, tinfo) + } if err != nil { return err } @@ -351,6 +375,12 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context }).Get(activityOpts, nil) } + // Stop here for the the Archivematica workflow. AIP creation, review, and + // storage are handled entirely by Archivematica and the AM Storage Service. + if w.useArchivematica { + return nil + } + // Identifier of the preservation task for upload to sips bucket. var uploadPreservationTaskID uint @@ -616,3 +646,46 @@ func (w *ProcessingWorkflow) transferA3m(sessCtx temporalsdk_workflow.Context, t return err } + +func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, tinfo *TransferInfo) error { + var err error + + activityOpts := withActivityOptsForLongLivedRequest(sessCtx) + var zipResult activities.ZipActivityResult + err = temporalsdk_workflow.ExecuteActivity( + activityOpts, + activities.ZipActivityName, + &activities.ZipActivityParams{SourceDir: tinfo.Bundle.FullPath}, + ).Get(activityOpts, &zipResult) + if err != nil { + return err + } + + activityOpts = temporalsdk_workflow.WithActivityOptions(sessCtx, + temporalsdk_workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour * 2, + RetryPolicy: &temporalsdk_temporal.RetryPolicy{ + InitialInterval: time.Second * 5, + BackoffCoefficient: 2, + MaximumAttempts: 3, + NonRetryableErrorTypes: []string{ + "TemporalTimeout:StartToClose", + }, + }, + }, + ) + uploadResult := am.UploadTransferActivityResult{} + err = temporalsdk_workflow.ExecuteActivity( + activityOpts, + am.UploadTransferActivityName, + &am.UploadTransferActivityParams{ + SourcePath: zipResult.Path, + Filename: tinfo.req.Key, + }, + ).Get(activityOpts, &uploadResult) + if err != nil { + return err + } + + return nil +} diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 07a8c89a3..a8083d44c 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -15,8 +15,10 @@ import ( "gotest.tools/v3/assert" "github.com/artefactual-sdps/enduro/internal/a3m" + "github.com/artefactual-sdps/enduro/internal/am" "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake" "github.com/artefactual-sdps/enduro/internal/workflow/activities" ) @@ -39,7 +41,7 @@ func TestTransferInfo_Name(t *testing.T) { }) } -func (s *ProcessingWorkflowTestSuite) SetupTest() { +func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(useAM bool) { s.env = s.NewTestWorkflowEnvironment() s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true}) @@ -47,6 +49,7 @@ func (s *ProcessingWorkflowTestSuite) SetupTest() { logger := logr.Discard() pkgsvc := packagefake.NewMockService(ctrl) wsvc := watcherfake.NewMockService(ctrl) + sftpc := sftp_fake.NewMockClient(ctrl) s.env.RegisterActivityWithOptions(activities.NewDownloadActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName}) s.env.RegisterActivityWithOptions(activities.NewBundleActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName}) @@ -55,8 +58,20 @@ func (s *ProcessingWorkflowTestSuite) SetupTest() { s.env.RegisterActivityWithOptions(activities.NewMoveToPermanentStorageActivity(nil).Execute, temporalsdk_activity.RegisterOptions{Name: activities.MoveToPermanentStorageActivityName}) s.env.RegisterActivityWithOptions(activities.NewPollMoveToPermanentStorageActivity(nil).Execute, temporalsdk_activity.RegisterOptions{Name: activities.PollMoveToPermanentStorageActivityName}) s.env.RegisterActivityWithOptions(activities.NewRejectPackageActivity(nil).Execute, temporalsdk_activity.RegisterOptions{Name: activities.RejectPackageActivityName}) + s.env.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName}) + s.env.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}) - s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc) + // Archivematica activities + s.env.RegisterActivityWithOptions( + activities.NewZipActivity(logger).Execute, + temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName}, + ) + s.env.RegisterActivityWithOptions( + am.NewUploadTransferActivity(logger, sftpc).Execute, + temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName}, + ) + + s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, useAM) } func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -68,10 +83,14 @@ func TestProcessingWorkflow(t *testing.T) { } func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { + s.SetupWorkflowTest(false) pkgID := uint(1) + ctx := mock.AnythingOfType("*context.valueCtx") locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") watcherName := "watcher" retentionPeriod := 1 * time.Second + pkgsvc := s.workflow.pkgsvc + sessionCtx := mock.AnythingOfType("*context.timerCtx") // Signal handler that mimics package confirmation s.env.RegisterDelayedCallback( @@ -100,8 +119,9 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { s.env.OnActivity(activities.MoveToPermanentStorageActivityName, mock.Anything, mock.Anything).Return(nil) s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, mock.Anything, mock.Anything).Return(nil) s.env.OnActivity(setLocationIDLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - // TODO: CleanUpActivityName - // TODO: DeleteOriginalActivityName + s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil).Once() + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, "watcher", "").Return(nil).Once() s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) s.env.OnActivity(completePreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -119,6 +139,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { } func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { + s.SetupWorkflowTest(false) pkgID := uint(1) locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") watcherName := "watcher" @@ -152,8 +173,62 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { s.env.OnActivity(activities.PollMoveToPermanentStorageActivityName, sessionCtx, mock.AnythingOfType("*activities.PollMoveToPermanentStorageActivityParams")).Return(nil).Once() s.env.OnActivity(setLocationIDLocalActivity, ctx, pkgsvc, pkgID, locationID).Return(nil).Once() s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil).Once() - // TODO: CleanUpActivityName - // TODO: DeleteOriginalActivityName + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, "watcher", "").Return(nil).Once() + + s.env.ExecuteWorkflow( + s.workflow.Execute, + &package_.ProcessingWorkflowRequest{ + WatcherName: watcherName, + RetentionPeriod: &retentionPeriod, + AutoApproveAIP: true, + DefaultPermanentLocationID: &locationID, + }, + ) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowResult(nil)) +} + +func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { + s.SetupWorkflowTest(true) + pkgID := uint(1) + locationID := uuid.MustParse("51328c02-2b63-47be-958e-e8088aa1a61f") + watcherName := "watcher" + key := "" + retentionPeriod := 1 * time.Second + ctx := mock.AnythingOfType("*context.valueCtx") + sessionCtx := mock.AnythingOfType("*context.timerCtx") + logger := s.workflow.logger + pkgsvc := s.workflow.pkgsvc + + // Activity mocks/assertions sequence + s.env.OnActivity( + createPackageLocalActivity, + ctx, + logger, + pkgsvc, + &createPackageLocalActivityParams{Key: key, Status: package_.StatusQueued}, + ).Return(pkgID, nil).Once() + s.env.OnActivity(setStatusInProgressLocalActivity, ctx, pkgsvc, pkgID, mock.AnythingOfType("time.Time")).Return(nil).Once() + s.env.OnActivity(createPreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams")).Return(uint(0), nil).Once() + s.env.OnActivity(activities.DownloadActivityName, sessionCtx, watcherName, key).Return("", nil).Once() + s.env.OnActivity(activities.BundleActivityName, mock.Anything, mock.Anything).Return(&activities.BundleActivityResult{FullPath: "/tmp/aip", FullPathBeforeStrip: "/tmp/aip"}, nil) + + // Archivematica specific activities. + s.env.OnActivity(activities.ZipActivityName, + sessionCtx, mock.AnythingOfType("*activities.ZipActivityParams"), + ).Return(&activities.ZipActivityResult{}, nil) + s.env.OnActivity(am.UploadTransferActivityName, + sessionCtx, mock.AnythingOfType("*am.UploadTransferActivityParams"), + ).Return(&am.UploadTransferActivityResult{}, nil).Once() + + // Post-preservation activities. + s.env.OnActivity(updatePackageLocalActivity, ctx, logger, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")).Return(nil).Times(2) + s.env.OnActivity(setStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Never() + s.env.OnActivity(completePreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.completePreservationActionLocalActivityParams")).Return(nil).Once() + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, "watcher", "").Return(nil).Once() s.env.ExecuteWorkflow( s.workflow.Execute, @@ -162,8 +237,6 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { RetentionPeriod: &retentionPeriod, AutoApproveAIP: true, DefaultPermanentLocationID: &locationID, - TaskQueue: "global", - A3mTaskQueue: "a3m", }, ) @@ -172,9 +245,11 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { } func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { + s.SetupWorkflowTest(false) pkgID := uint(1) watcherName := "watcher" retentionPeriod := 1 * time.Second + sessionCtx := mock.AnythingOfType("*context.timerCtx") // Signal handler that mimics package rejection s.env.RegisterDelayedCallback( @@ -201,8 +276,8 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { s.env.OnActivity(setPreservationActionStatusLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) s.env.OnActivity(createPreservationTaskLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(uint(0), nil) s.env.OnActivity(activities.RejectPackageActivityName, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - // TODO: CleanUpActivityName - // TODO: DeleteOriginalActivityName + s.env.OnActivity(activities.CleanUpActivityName, sessionCtx, mock.AnythingOfType("*activities.CleanUpActivityParams")).Return(nil).Once() + s.env.OnActivity(activities.DeleteOriginalActivityName, sessionCtx, "watcher", "").Return(nil).Once() s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) s.env.OnActivity(completePreservationActionLocalActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/main.go b/main.go index 64f8474e4..4b00c0c36 100644 --- a/main.go +++ b/main.go @@ -92,6 +92,8 @@ func main() { logger.Info("Configuration file not found.") } + logger.V(1).Info("Preservation system", "UseArchivematica", cfg.UseArchivematica) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -299,7 +301,7 @@ func main() { os.Exit(1) } - w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.UseArchivematica).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName}) w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName}) w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})