Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(handler, s3store): implement ServerDataStore for direct content serving #1208

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/tus/tusd/v2
// Specify the Go version needed for the Heroku deployment
// See https://github.com/heroku/heroku-buildpack-go#go-module-specifics
// +heroku goVersion go1.22
go 1.21.0
go 1.22.1

toolchain go1.22.7

require (
Expand Down
6 changes: 6 additions & 0 deletions pkg/handler/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type StoreComposer struct {
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
ContentServer ContentServerDataStore
UsesContentServer bool
}

// NewStoreComposer creates a new and empty store composer.
Expand Down Expand Up @@ -85,3 +87,7 @@ func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
store.UsesLengthDeferrer = ext != nil
store.LengthDeferrer = ext
}
func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) {
store.UsesContentServer = ext != nil
store.ContentServer = ext
}
11 changes: 11 additions & 0 deletions pkg/handler/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handler
import (
"context"
"io"
"net/http"
)

type MetaData map[string]string
Expand Down Expand Up @@ -121,6 +122,16 @@ type DataStore interface {
GetUpload(ctx context.Context, id string) (upload Upload, err error)
}

// ServableUpload defines the method for serving content directly
type ServableUpload interface {
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
}

// ContentServerDataStore is the interface for data stores that can serve content directly
type ContentServerDataStore interface {
AsServableUpload(upload Upload) ServableUpload
}

type TerminatableUpload interface {
// Terminate an upload so any further requests to the upload resource will
// return the ErrNotFound error.
Expand Down
11 changes: 11 additions & 0 deletions pkg/handler/unrouted_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,17 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}

// If the data store implements ContentServerDataStore, use the ServableUpload interface
if handler.composer.UsesContentServer {
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
err = servableUpload.ServeContent(c, w, r)
if err != nil {
handler.sendError(c, err)
}
return
}

// Fall back to the existing GetReader implementation if ContentServerDataStore is not implemented
contentType, contentDisposition := filterContentType(info)
resp := HTTPResponse{
StatusCode: http.StatusOK,
Expand Down
138 changes: 138 additions & 0 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"net/http"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -376,6 +377,81 @@ func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.Concatabl
return upload.(*s3Upload)
}

func (store S3Store) AsServableUpload(upload handler.Upload) handler.ServableUpload {
return upload.(*s3Upload)
}

func (su *s3Upload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
// Get file info
info, err := su.GetInfo(ctx)
if err != nil {
return err
}

// Prepare GetObject input
input := &s3.GetObjectInput{
Bucket: aws.String(su.store.Bucket),
Key: su.store.keyWithPrefix(su.objectId),
}

// Handle range requests
rangeHeader := r.Header.Get("Range")
if rangeHeader != "" {
if err := su.handleRangeRequest(ctx, w, r, info, input, rangeHeader); err != nil {
return err
}
return nil
}

// For non-range requests, serve the entire file
result, err := su.store.Service.GetObject(ctx, input)
if err != nil {
return err
}
defer result.Body.Close()

// Set headers
w.Header().Set("Content-Length", strconv.FormatInt(info.Size, 10))
w.Header().Set("Content-Type", info.MetaData["filetype"])
w.Header().Set("ETag", *result.ETag)

// Stream the content
_, err = io.Copy(w, result.Body)
return err
}

func (su *s3Upload) handleRangeRequest(ctx context.Context, w http.ResponseWriter, _ *http.Request, info handler.FileInfo, input *s3.GetObjectInput, rangeHeader string) error {
ranges, err := parseRange(rangeHeader, info.Size)
if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return err
}

if len(ranges) > 1 {
return fmt.Errorf("multiple ranges are not supported")
}

// Set the range in the GetObject input
input.Range = aws.String(fmt.Sprintf("bytes=%d-%d", ranges[0].start, ranges[0].end))

result, err := su.store.Service.GetObject(ctx, input)
if err != nil {
return err
}
defer result.Body.Close()

// Set headers for partial content
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", ranges[0].start, ranges[0].end, info.Size))
w.Header().Set("Content-Length", strconv.FormatInt(ranges[0].end-ranges[0].start+1, 10))
w.Header().Set("Content-Type", info.MetaData["filetype"])
w.Header().Set("ETag", *result.ETag)
w.WriteHeader(http.StatusPartialContent)

// Stream the content
_, err = io.Copy(w, result.Body)
return err
}

func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error {
store := upload.store

Expand Down Expand Up @@ -1249,3 +1325,65 @@ func (store S3Store) releaseUploadSemaphore() {
store.uploadSemaphore.Release()
store.uploadSemaphoreDemandMetric.Dec()
}

// Helper function to parse range header
func parseRange(rangeHeader string, size int64) ([]struct{ start, end int64 }, error) {
if rangeHeader == "" {
return nil, fmt.Errorf("empty range header")
}

const b = "bytes="
if !strings.HasPrefix(rangeHeader, b) {
return nil, fmt.Errorf("invalid range header format")
}

var ranges []struct{ start, end int64 }
for _, ra := range strings.Split(rangeHeader[len(b):], ",") {
ra = strings.TrimSpace(ra)
if ra == "" {
continue
}
i := strings.Index(ra, "-")
if i < 0 {
return nil, fmt.Errorf("invalid range format")
}
start, end := strings.TrimSpace(ra[:i]), strings.TrimSpace(ra[i+1:])
var r struct{ start, end int64 }
if start == "" {
// suffix-byte-range-spec, like "-100"
n, err := strconv.ParseInt(end, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid range format")
}
if n > size {
n = size
}
r.start = size - n
r.end = size - 1
} else {
i, err := strconv.ParseInt(start, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid range format")
}
if i >= size {
return nil, fmt.Errorf("range out of bounds")
}
r.start = i
if end == "" {
// byte-range-spec, like "100-"
r.end = size - 1
} else {
i, err := strconv.ParseInt(end, 10, 64)
if err != nil || i >= size || i < r.start {
return nil, fmt.Errorf("invalid range format")
}
r.end = i
}
}
ranges = append(ranges, r)
}
if len(ranges) == 0 {
return nil, fmt.Errorf("no valid ranges")
}
return ranges, nil
}
Loading