Skip to content

Commit

Permalink
s3store: Fix data race in concatUsingMultipart
Browse files Browse the repository at this point in the history
  • Loading branch information
Acconut committed Oct 4, 2024
1 parent 94a2c11 commit ce3d0fc
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/s3store/multi_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
)

// TODO: Replace with errors.Join
func newMultiError(errs []error) error {
message := "Multiple errors occurred:\n"
for _, err := range errs {
Expand Down
26 changes: 10 additions & 16 deletions pkg/s3store/s3store.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,13 +974,10 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error {
store := upload.store

numPartialUploads := len(partialUploads)
errs := make([]error, 0, numPartialUploads)

// Copy partial uploads concurrently
var wg sync.WaitGroup
wg.Add(numPartialUploads)
var eg errgroup.Group
for i, partialUpload := range partialUploads {

// Part numbers must be in the range of 1 to 10000, inclusive. Since
// slice indexes start at 0, we add 1 to ensure that i >= 1.
partNumber := int32(i + 1)
Expand All @@ -992,29 +989,26 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads
etag: "",
})

go func(partNumber int32, sourceObject string) {
defer wg.Done()

eg.Go(func() error {
res, err := store.Service.UploadPartCopy(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(upload.objectId),
UploadId: aws.String(upload.multipartId),
PartNumber: aws.Int32(partNumber),
CopySource: aws.String(store.Bucket + "/" + *store.keyWithPrefix(sourceObject)),
CopySource: aws.String(store.Bucket + "/" + *store.keyWithPrefix(partialS3Upload.objectId)),
})
if err != nil {
errs = append(errs, err)
return
return err
}

upload.parts[partNumber-1].etag = *res.CopyPartResult.ETag
}(partNumber, partialS3Upload.objectId)
return nil
})
}

wg.Wait()

if len(errs) > 0 {
return newMultiError(errs)
err := eg.Wait()
if err != nil {
return err
}

return upload.FinishUpload(ctx)
Expand Down

0 comments on commit ce3d0fc

Please sign in to comment.