Skip to content

Commit

Permalink
Merge pull request #28 from nvaatstra/s3-folders
Browse files Browse the repository at this point in the history
S3: GlobalPrefix and Recursive
  • Loading branch information
wojas authored Feb 22, 2023
2 parents 8b71881 + 77fb204 commit 75237ca
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
40 changes: 38 additions & 2 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ type Options struct {
// CreateBucket tells us to try to create the bucket
CreateBucket bool `yaml:"create_bucket"`

// GlobalPrefix is a prefix applied to all operations, allowing work within a prefix
// seamlessly
GlobalPrefix string `yaml:"global_prefix"`

// PrefixFolders can be enabled to make List operations show nested prefixes as folders
// instead of recursively listing all contents of nested prefixes
PrefixFolders bool `yaml:"prefix_folders"`

// EndpointURL can be set to something like "http://localhost:9000" when using Minio
// or "https://s3.amazonaws.com" for AWS S3.
EndpointURL string `yaml:"endpoint_url"`
Expand Down Expand Up @@ -110,6 +118,9 @@ type Backend struct {
}

func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
// Prepend global prefix
prefix = b.prependGlobalPrefix(prefix)

if !b.opt.UseUpdateMarker {
return b.doList(ctx, prefix)
}
Expand Down Expand Up @@ -150,9 +161,12 @@ func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList,
func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
var blobs simpleblob.BlobList

// Runes to strip from blob names for GlobalPrefix
gpEndRune := len(b.opt.GlobalPrefix)

objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: false,
Recursive: !b.opt.PrefixFolders,
})
for obj := range objCh {
// Handle error returned by MinIO client
Expand All @@ -166,7 +180,14 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
if obj.Key == UpdateMarkerFilename {
continue
}
blobs = append(blobs, simpleblob.Blob{Name: obj.Key, Size: obj.Size})

// Strip global prefix from blob
blobName := obj.Key
if gpEndRune > 0 {
blobName = blobName[gpEndRune:]
}

blobs = append(blobs, simpleblob.Blob{Name: blobName, Size: obj.Size})
}

// Minio appears to return them sorted, but maybe not all implementations
Expand All @@ -179,6 +200,9 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
// Load retrieves the content of the object identified by name from S3 Bucket
// configured in b.
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

metricCalls.WithLabelValues("load").Inc()
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()

Expand All @@ -199,6 +223,9 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
// Store sets the content of the object identified by name to the content
// of data, in the S3 Bucket configured in b.
func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

info, err := b.doStore(ctx, name, data)
if err != nil {
return err
Expand All @@ -222,6 +249,9 @@ func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio.
// Delete removes the object identified by name from the S3 Bucket
// configured in b.
func (b *Backend) Delete(ctx context.Context, name string) error {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

if err := b.doDelete(ctx, name); err != nil {
return err
}
Expand Down Expand Up @@ -370,6 +400,12 @@ func convertMinioError(err error) error {
return nil
}

// prependGlobalPrefix prepends the GlobalPrefix to the name/prefix
// passed as input
func (b *Backend) prependGlobalPrefix(name string) string {
return b.opt.GlobalPrefix + name
}

func init() {
simpleblob.RegisterBackend("s3", func(ctx context.Context, p simpleblob.InitParams) (simpleblob.Interface, error) {
var opt Options
Expand Down
56 changes: 54 additions & 2 deletions backends/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

blobs, err := b.doList(ctx, "")
blobs, err := b.List(ctx, "")
if err != nil {
t.Logf("Blobs list error: %s", err)
return
}
for _, blob := range blobs {
err := b.client.RemoveObject(ctx, b.opt.Bucket, blob.Name, minio.RemoveObjectOptions{})
err := b.Delete(ctx, blob.Name)
if err != nil {
t.Logf("Object delete error: %s", err)
}
Expand Down Expand Up @@ -101,3 +101,55 @@ func TestBackend_marker(t *testing.T) {
assert.NoError(t, err)
assert.EqualValues(t, b.lastMarker, markerFileContent)
}

func TestBackend_globalprefix(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

b := getBackend(ctx, t)
b.opt.GlobalPrefix = "v5/"

tester.DoBackendTests(t, b)
assert.Len(t, b.lastMarker, 0)
}

func TestBackend_recursive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

b := getBackend(ctx, t)

// Starts empty
ls, err := b.List(ctx, "")
assert.NoError(t, err)
assert.Len(t, ls, 0)

// Add items
err = b.Store(ctx, "bar-1", []byte("bar1"))
assert.NoError(t, err)
err = b.Store(ctx, "bar-2", []byte("bar2"))
assert.NoError(t, err)
err = b.Store(ctx, "foo/bar-3", []byte("bar3"))
assert.NoError(t, err)

// List all - PrefixFolders disabled (default)
ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/bar-3"})

// List all - PrefixFolders enabled
b.opt.PrefixFolders = true

ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/"})

// List all - PrefixFolders disabled
b.opt.PrefixFolders = false

ls, err = b.List(ctx, "")
assert.NoError(t, err)
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/bar-3"})

assert.Len(t, b.lastMarker, 0)
}

0 comments on commit 75237ca

Please sign in to comment.