Skip to content

Commit

Permalink
Add an activity to poll AM transfer status
Browse files Browse the repository at this point in the history
- Add a PollTransferActivity to poll AM until transfer processing is
  complete or fails
- Add a PollInterval config option to control frequency of polls
- Send a heartbeat after each poll, and set a maximum time between
  heartbeats
- Add a TransferDeadline config option that limits the total duration
  of the PollTransferActivity in case Archivematica processing stalls
- Improve error handling for AM HTTP errors based on the legacy Enduro
  error handling
- Use go.artefactual.dev/tools/temporal for creating and checking
  non-retryable temporal errors

Co-authored-by: Diogenesoftoronto <[email protected]>
  • Loading branch information
djjuhasz and Diogenesoftoronto committed Nov 27, 2023
1 parent 5a6d045 commit 59e018c
Show file tree
Hide file tree
Showing 17 changed files with 452 additions and 43 deletions.
4 changes: 4 additions & 0 deletions cmd/enduro-am-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func main() {
am.NewStartTransferActivity(logger, &cfg.AM, amc.Package).Execute,
temporalsdk_activity.RegisterOptions{Name: am.StartTransferActivityName},
)
w.RegisterActivityWithOptions(
am.NewPollTransferActivity(logger, &cfg.AM, amc.Transfer).Execute,
temporalsdk_activity.RegisterOptions{Name: am.PollTransferActivityName},
)
w.RegisterActivityWithOptions(
activities.NewCleanUpActivity().Execute,
temporalsdk_activity.RegisterOptions{Name: activities.CleanUpActivityName},
Expand Down
9 changes: 9 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ user = "" # Secret: set with env var ENDURO_AM_USER.
apiKey = "" # Secret: set with env var ENDURO_AM_APIKEY.
processingConfig = "automated"

# pollInterval is the time to wait between AM polling requests in a string
# format compatible with https://pkg.go.dev/time#ParseDuration (Default: 10s).
pollInterval = "10s"

# transferDeadline is the maximum time to wait for a transfer to complete in a
# format compatible with https://pkg.go.dev/time#ParseDuration. Set to "0" for
# no time limit.
transferDeadline = "1h"

[am.sftp]
host = "" # The Archivematica Storage Service hostname.
user = ""
Expand Down
13 changes: 12 additions & 1 deletion internal/am/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package am

import "github.com/artefactual-sdps/enduro/internal/sftp"
import (
"time"

"github.com/artefactual-sdps/enduro/internal/sftp"
)

type Config struct {
// Archivematica server address.
Expand All @@ -17,4 +21,11 @@ type Config struct {

// SFTP configuration for uploading transfers to Archivematica.
SFTP sftp.Config

// PollInterval is the time to wait between poll requests to the AM API.
PollInterval time.Duration

// TransferDeadline is the maximum time to wait for a transfer to complete.
// Set to zero for no deadline.
TransferDeadline time.Duration
}
37 changes: 24 additions & 13 deletions internal/am/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,39 @@ package am

import (
"errors"
"fmt"
"net/http"

"go.artefactual.dev/amclient"

"github.com/artefactual-sdps/enduro/internal/temporal"
temporal_tools "go.artefactual.dev/tools/temporal"
)

// ConvertAMClientError converts an Archivematica API response to a
// non-retryable temporal ApplicationError if the response is guaranteed not to
// change in subsequent requests.
func convertAMClientError(resp *amclient.Response, err error) error {
if resp != nil {
switch resp.Response.StatusCode {
case http.StatusUnauthorized:
return temporal.NonRetryableError(errors.New("invalid Archivematica credentials"))
case http.StatusForbidden:
return temporal.NonRetryableError(errors.New("insufficient Archivematica permissions"))
case http.StatusNotFound:
return temporal.NonRetryableError(errors.New("Archivematica transfer not found"))
}
if resp == nil || resp.Response == nil {
return err
}

switch {
case resp.Response.StatusCode == http.StatusUnauthorized:
return temporal_tools.NewNonRetryableError(errors.New("invalid Archivematica credentials"))
case resp.Response.StatusCode == http.StatusForbidden:
return temporal_tools.NewNonRetryableError(errors.New("insufficient Archivematica permissions"))
case resp.Response.StatusCode == http.StatusNotFound:
return temporal_tools.NewNonRetryableError(errors.New("Archivematica resource not found"))
// Archivematica returns a "400 Bad request" code when transfer or ingest
// status are "in progress", so the request should be retried.
case resp.Response.StatusCode == http.StatusBadRequest:
return errors.New("Archivematica response: 400 Bad request")
// All status codes between 401 and 499 are non-retryable.
case resp.Response.StatusCode >= 401 && resp.Response.StatusCode < 500:
return temporal_tools.NewNonRetryableError(
fmt.Errorf("Archivematica error: %s", resp.Response.Status),
)
}

// Retry any client requests that don't return one of the above responses.
return err
// Retry any requests that don't return one of the above status codes.
return fmt.Errorf("Archivematica error: %s", resp.Response.Status)
}
144 changes: 144 additions & 0 deletions internal/am/poll_transfer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package am

import (
context "context"
"errors"
"fmt"
"net/http"
"time"

"github.com/go-logr/logr"
"go.artefactual.dev/amclient"
temporal_tools "go.artefactual.dev/tools/temporal"
temporalsdk_activity "go.temporal.io/sdk/activity"
)

const PollTransferActivityName = "poll-transfer-activity"

var errWorkOngoing = errors.New("work ongoing")

type PollTransferActivityParams struct {
TransferID string
}

type PollTransferActivity struct {
logger logr.Logger
cfg *Config
amts amclient.TransferService
}

type PollTransferActivityResult struct {
SIPID string
Path string
}

func NewPollTransferActivity(logger logr.Logger, cfg *Config, amts amclient.TransferService) *PollTransferActivity {
return &PollTransferActivity{logger: logger, cfg: cfg, amts: amts}
}

// Execute polls Archivematica for the status of a transfer and returns when
// the transfer is complete or returns an error status. Execute sends an
// activity heartbeat after each poll.
//
// A transfer status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns
// a temporal.NonRetryableApplicationError to indicate that processing can not
// continue.
func (a *PollTransferActivity) Execute(ctx context.Context, params *PollTransferActivityParams) (*PollTransferActivityResult, error) {
a.logger.V(1).Info("Executing PollTransferActivity", "TransferID", params.TransferID)

ticker := time.NewTicker(a.cfg.PollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
reqCtx, cancel := context.WithTimeout(ctx, a.cfg.PollInterval/2)
r, err := a.poll(reqCtx, params.TransferID, cancel)
if err == errWorkOngoing {
// Send a heartbeat then continue polling after the poll interval.
temporalsdk_activity.RecordHeartbeat(ctx, fmt.Sprintf("Last HTTP response: %v", errWorkOngoing))
continue
}
if err != nil {
return nil, err
}

return r, nil
}
}
}

// poll polls the Archivematica transfer status endpoint, returning a non-nil
// result if transfer processing is complete. An errWorkOngoing error indicates
// work is ongoing and polling should continue. All other errors should
// terminate polling.
func (a *PollTransferActivity) poll(ctx context.Context, transferID string, cancel context.CancelFunc) (*PollTransferActivityResult, error) {
// Cancel the context timer when we return so it doesn't wait for the
// timeout deadline to expire.
defer cancel()

resp, httpResp, err := a.amts.Status(ctx, transferID)
if ferr := transferFailedError(httpResp, err); ferr != nil {
a.logger.V(2).Info("Poll transfer error",
"StatusCode", httpResp.StatusCode,
"Status", httpResp.Status,
)
return nil, ferr
}

complete, err := isComplete(resp)
if err != nil {
return nil, err
}
if complete {
return &PollTransferActivityResult{SIPID: resp.SIPID, Path: resp.Path}, nil
}

return nil, errWorkOngoing
}

// transferFailedError checks an amclient error to determine if the transfer has
// failed, or if it is still processing (which returns a 400 status code). If an
// error is returned the activity should return the error, which may or may not
// be a non-retryable error.
func transferFailedError(r *amclient.Response, err error) error {
if err == nil {
return nil
}

// AM can return a "400 Bad request" HTTP status code while processing, in
// which case we should continue polling.
if r != nil && r.Response.StatusCode == http.StatusBadRequest {
return nil
}

return convertAMClientError(r, err)
}

// isComplete checks the AM transfer status response to determine if
// processing has completed successfully. A non-nil error indicates then AM has
// ended processing with a failure or requires user input, and Enduro processing
// should stop. If error is nil then a true result indicates the transfer has
// completed successfully, and a false result means the transfer is still
// processing.
func isComplete(resp *amclient.TransferStatusResponse) (bool, error) {
if resp == nil {
return false, nil
}

switch resp.Status {
case "COMPLETE":
if resp.SIPID == "BACKLOG" {
return false, temporal_tools.NewNonRetryableError(errors.New("Archivematica SIP sent to backlog"))
}
return true, nil
case "PROCESSING", "":
return false, nil
case "REJECTED", "FAILED", "USER_INPUT":
return false, temporal_tools.NewNonRetryableError(fmt.Errorf("Invalid Archivematica transfer status: %s", resp.Status))
default:
return false, temporal_tools.NewNonRetryableError(fmt.Errorf("Unknown Archivematica transfer status: %s", resp.Status))
}
}
Loading

0 comments on commit 59e018c

Please sign in to comment.