Skip to content

Commit

Permalink
Upload transfer to Archivematica
Browse files Browse the repository at this point in the history
- Create an enduro-am-worker and add an "am" Temporal Taskqueue
- Branch the processing workflow logic for AM processing
- Add a zip package
- Add a ZipActivity to zip the bundled transfer before transmission
- Add an UploadTransferActivity to upload the zipped transfer to the
  AM Storage Service transfer source directory
- Add SFTP configuration for the upload
- Add an SSH username and a logger to the SFTP package
- Add the enduro-am-worker build to the Dockerfile
- Add an enduro-am-worker to the Tilt dev deployment
- Add a Tilt environment variable to select a kube config on startup
- Add the enduro-am-worker container and configuration to the kube
  manifests
- Add a "dev-am" kube overlay

Co-authored-by: Diogenesoftoronto <[email protected]>
  • Loading branch information
djjuhasz and Diogenesoftoronto committed Nov 20, 2023
1 parent e68a7c7 commit c9ee533
Show file tree
Hide file tree
Showing 38 changed files with 1,042 additions and 91 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
/covreport
/dist
/.tilt.env
/*.secret
32 changes: 25 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ ARG VERSION_GIT_HASH
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build \
-trimpath \
-ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \
-o /out/enduro .
-trimpath \
-ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \
-o /out/enduro .

FROM build-go AS build-enduro-a3m-worker
ARG VERSION_PATH
Expand All @@ -30,10 +30,23 @@ ARG VERSION_GIT_HASH
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build \
-trimpath \
-ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \
-o /out/enduro-a3m-worker \
./cmd/enduro-a3m-worker
-trimpath \
-ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \
-o /out/enduro-a3m-worker \
./cmd/enduro-a3m-worker

FROM build-go AS build-enduro-am-worker
ARG VERSION_PATH
ARG VERSION_LONG
ARG VERSION_SHORT
ARG VERSION_GIT_HASH
RUN --mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build \
-trimpath \
-ldflags="-X '${VERSION_PATH}.Long=${VERSION_LONG}' -X '${VERSION_PATH}.Short=${VERSION_SHORT}' -X '${VERSION_PATH}.GitCommit=${VERSION_GIT_HASH}'" \
-o /out/enduro-am-worker \
./cmd/enduro-am-worker

FROM alpine:3.18.2 AS base
ARG USER_ID=1000
Expand All @@ -52,4 +65,9 @@ COPY --from=build-enduro-a3m-worker --link /out/enduro-a3m-worker /home/enduro/b
COPY --from=build-enduro-a3m-worker --link /src/enduro.toml /home/enduro/.config/enduro.toml
CMD ["/home/enduro/bin/enduro-a3m-worker", "--config", "/home/enduro/.config/enduro.toml"]

FROM base AS enduro-am-worker
COPY --from=build-enduro-am-worker --link /out/enduro-am-worker /home/enduro/bin/enduro-am-worker
COPY --from=build-enduro-am-worker --link /src/enduro.toml /home/enduro/.config/enduro.toml
CMD ["/home/enduro/bin/enduro-am-worker", "--config", "/home/enduro/.config/enduro.toml"]

FROM ${TARGET}
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ gen-mock: $(MOCKGEN)
mockgen -typed -destination=./internal/api/auth/fake/mock_ticket_store.go -package=fake github.com/artefactual-sdps/enduro/internal/api/auth TicketStore
mockgen -typed -destination=./internal/package_/fake/mock_package_.go -package=fake github.com/artefactual-sdps/enduro/internal/package_ Service
mockgen -typed -destination=./internal/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/persistence Service
mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Service
mockgen -typed -destination=./internal/sftp/fake/mock_sftp.go -package=fake github.com/artefactual-sdps/enduro/internal/sftp Client
mockgen -typed -destination=./internal/storage/fake/mock_storage.go -package=fake github.com/artefactual-sdps/enduro/internal/storage Service
mockgen -typed -destination=./internal/storage/persistence/fake/mock_persistence.go -package=fake github.com/artefactual-sdps/enduro/internal/storage/persistence Storage
mockgen -typed -destination=./internal/upload/fake/mock_upload.go -package=fake github.com/artefactual-sdps/enduro/internal/upload Service
Expand Down
20 changes: 16 additions & 4 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ custom_build(
command=["hack/build_docker.sh", "enduro-a3m-worker"],
deps=["."],
)
custom_build(
ref="enduro-am-worker:dev",
command=["hack/build_docker.sh", "enduro-am-worker"],
deps=["."],
)
docker_build(
"enduro-dashboard:dev",
context="dashboard",
Expand All @@ -34,20 +39,26 @@ docker_build(
]
)

# All Kubernetes resources
k8s_yaml(kustomize("hack/kube/overlays/dev"))

# Configure trigger mode
# Load tilt env file if it exists
dotenv_path = ".tilt.env"
if os.path.exists(dotenv_path):
dotenv(fn=dotenv_path)

# Set kube config directory
kube_config = os.environ.get('ENDURO_KUBE_CONFIG', 'hack/kube/overlays/dev')

# All Kubernetes resources
k8s_yaml(kustomize(kube_config))

# Configure trigger mode
trigger_mode = TRIGGER_MODE_MANUAL
if os.environ.get('TRIGGER_MODE_AUTO', ''):
trigger_mode = TRIGGER_MODE_AUTO

# Enduro resources
k8s_resource("enduro", labels=["Enduro"], trigger_mode=trigger_mode)
k8s_resource("enduro-a3m", labels=["Enduro"], trigger_mode=trigger_mode)
k8s_resource("enduro-am", labels=["Enduro"], trigger_mode=trigger_mode)
k8s_resource("enduro-internal", port_forwards="9000", labels=["Enduro"], trigger_mode=trigger_mode)
k8s_resource("enduro-dashboard", port_forwards="8080:80", labels=["Enduro"], trigger_mode=trigger_mode)

Expand Down Expand Up @@ -106,6 +117,7 @@ cmd_button(
kubectl rollout restart deployment temporal; \
kubectl rollout restart deployment enduro; \
kubectl rollout restart statefulset enduro-a3m; \
kubectl rollout restart statefulset enduro-am; \
kubectl rollout restart deployment dex; \
kubectl create -f hack/kube/base/mysql-create-locations-job.yaml;",
],
Expand Down
224 changes: 224 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package main

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"go.artefactual.dev/tools/log"
temporalsdk_activity "go.temporal.io/sdk/activity"
temporalsdk_client "go.temporal.io/sdk/client"
temporalsdk_worker "go.temporal.io/sdk/worker"

"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/sftp"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/version"
"github.com/artefactual-sdps/enduro/internal/watcher"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)

const (
appName = "enduro-am-worker"
)

func main() {
p := pflag.NewFlagSet(appName, pflag.ExitOnError)

p.String("config", "", "Configuration file")
p.Bool("version", false, "Show version information")
_ = p.Parse(os.Args[1:])

if v, _ := p.GetBool("version"); v {
fmt.Println(version.Info(appName))
os.Exit(0)
}

var cfg config.Configuration
configFile, _ := p.GetString("config")
configFileFound, configFileUsed, err := config.Read(&cfg, configFile)
if err != nil {
fmt.Printf("Failed to read configuration: %v\n", err)
os.Exit(1)
}

logger := log.New(os.Stderr,
log.WithName(appName),
log.WithDebug(cfg.Debug),
log.WithLevel(cfg.Verbosity),
)
defer log.Sync(logger)

logger.Info("Starting...", "version", version.Long, "pid", os.Getpid())

if configFileFound {
logger.Info("Configuration file loaded.", "path", configFileUsed)
} else {
logger.Info("Configuration file not found.")
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

enduroDatabase, err := db.Connect(cfg.Database.Driver, cfg.Database.DSN)
if err != nil {
logger.Error(err, "Enduro database configuration failed.")
os.Exit(1)
}
_ = enduroDatabase.Ping()

temporalClient, err := temporalsdk_client.Dial(temporalsdk_client.Options{
Namespace: cfg.Temporal.Namespace,
HostPort: cfg.Temporal.Address,
Logger: temporal.Logger(logger.WithName("temporal-client")),
})
if err != nil {
logger.Error(err, "Error creating Temporal client.")
os.Exit(1)
}

// Set up the watcher service.
var wsvc watcher.Service
{
wsvc, err = watcher.New(ctx, logger.WithName("watcher"), &cfg.Watcher)
if err != nil {
logger.Error(err, "Error setting up watchers.")
os.Exit(1)
}
}

var g run.Group

// Activity worker.
{
done := make(chan struct{})
workerOpts := temporalsdk_worker.Options{
DisableWorkflowWorker: true,
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.AmWorkerTaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
}

w.RegisterActivityWithOptions(
activities.NewDownloadActivity(wsvc).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.DownloadActivityName},
)
w.RegisterActivityWithOptions(
activities.NewBundleActivity(wsvc).Execute,
temporalsdk_activity.RegisterOptions{Name: activities.BundleActivityName},
)
w.RegisterActivityWithOptions(
activities.NewZipActivity(logger).Execute, temporalsdk_activity.RegisterOptions{Name: activities.ZipActivityName},
)
w.RegisterActivityWithOptions(
am.NewUploadTransferActivity(logger, sftp.NewGoClient(logger, cfg.AM.SFTP)).Execute,
temporalsdk_activity.RegisterOptions{Name: am.UploadTransferActivityName},
)
w.RegisterActivityWithOptions(activities.NewCleanUpActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName})

g.Add(
func() error {
if err := w.Start(); err != nil {
return err
}
<-done
return nil
},
func(err error) {
w.Stop()
close(done)
},
)
}

// Observability server.
{
srv := &http.Server{
Addr: cfg.DebugListen,
ReadTimeout: time.Second * 1,
WriteTimeout: time.Second * 1,
IdleTimeout: time.Second * 30,
}

g.Add(func() error {
mux := http.NewServeMux()

// Health check.
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, "OK")
})

// Prometheus metrics.
mux.Handle("/metrics", promhttp.Handler())

// Profiling data.
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/debug/pprof/block", pprof.Handler("block"))
mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))

srv.Handler = mux

return srv.ListenAndServe()
}, func(error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_ = srv.Shutdown(ctx)
})
}

// Signal handler.
{
var (
cancelInterrupt = make(chan struct{})
ch = make(chan os.Signal, 2)
)
defer close(ch)

g.Add(
func() error {
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)

select {
case <-ch:
case <-cancelInterrupt:
}

return nil
}, func(err error) {
logger.Info("Quitting...")
close(cancelInterrupt)
cancel()
signal.Stop(ch)
},
)
}

err = g.Run()
if err != nil {
logger.Error(err, "Application failure.")
os.Exit(1)
}
logger.Info("Bye!")
}
11 changes: 11 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
debug = true
debugListen = "127.0.0.1:9001"
verbosity = 2
useArchivematica = false

[temporal]
namespace = "default"
Expand Down Expand Up @@ -84,6 +85,16 @@ PerformPolicyChecksOnPreservationDerivatives = true
AipCompressionLevel = 1
AipCompressionAlgorithm = 6

[am.sftp]
host = "" # The Archivematica Storage Service hostname.
user = ""
knownHostsFile = ""
remoteDir = "/transfer_source"

[am.sftp.privateKey]
path = ""
passphrase = "" # Secret: set (if required) with env var ENDURO_AM_SFTP_PRIVATEKEY_PASSPHRASE.

[upload]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
Expand Down
7 changes: 6 additions & 1 deletion hack/build_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ case "$TARGET" in
TARGET="enduro-a3m-worker"
FOLDER="."
;;
"enduro-am-worker")
IMAGE_NAME="enduro-am-worker"
TARGET="enduro-am-worker"
FOLDER="."
;;
"enduro-dashboard")
IMAGE_NAME="enduro-dashboard"
TARGET="enduro-dashboard"
FOLDER="dashboard"
;;
*)
echo "Accepted values: enduro, enduro-a3m-worker, enduro-dashboard."
echo "Accepted values: enduro, enduro-a3m-worker, enduro-am-worker, enduro-dashboard."
exit 1
;;
esac
Expand Down
Loading

0 comments on commit c9ee533

Please sign in to comment.