Skip to content

Threaded Python and CLI client library for AWS S3, Google Cloud Storage (GCS), in-memory, and the local filesystem.

License

Notifications You must be signed in to change notification settings

seung-lab/cloud-files

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PyPI version Test Suite

CloudFiles: Fast access to cloud storage and local FS.

from cloudfiles import CloudFiles, CloudFile, dl

results = dl(["gs://bucket/file1", "gs://bucket2/file2", ... ]) # shorthand

cf = CloudFiles('gs://bucket', progress=True) # s3://, https://, and file:// also supported
results = cf.get(['file1', 'file2', 'file3', ..., 'fileN']) # threaded
results = cf.get(paths, parallel=2) # threaded and two processes
file1 = cf['file1']
part  = cf['file1', 0:30] # first 30 bytes of file1

cf.put('filename', content)
cf.put_json('filename', content)
cf.puts([{
    'path': 'filename',
    'content': content,
}, ... ]) # automatically threaded
cf.puts(content, parallel=2) # threaded + two processes
cf.puts(content, storage_class="NEARLINE") # apply vendor-specific storage class

cf.put_jsons(...) # same as puts
cf['filename'] = content

for fname in cf.list(prefix='abc123'):
    print(fname)
list(cf) # same as list(cf.list())

cf.delete('filename')
del cf['filename']
cf.delete([ 'filename_1', 'filename_2', ... ]) # threaded
cf.delete(paths, parallel=2) # threaded + two processes

boolean = cf.exists('filename')
results = cf.exists([ 'filename_1', ... ]) # threaded

# for single files
cf = CloudFile("gs://bucket/file1")
info = cf.head()
binary = cf.get()
cf.put(binary)
cf[:30] # get first 30 bytes of file

CloudFiles was developed to access files from object storage without ever touching disk. The goal was to reliably and rapidly access a petabyte of image data broken down into tens to hundreds of millions of files being accessed in parallel across thousands of cores. CloudFiles has been used to processes dozens of images, many of which were in the hundreds of terabyte range. It has reliably read and written tens of billions of files to date.

Highlights

  1. Fast file access with transparent threading and optionally multi-process access w/ local file locking.
  2. Google Cloud Storage, Amazon S3, local filesystems, and arbitrary web servers making hybrid or multi-cloud easy.
  3. Robust to flaky network connections. Uses exponential random window retries to avoid network collisions on a large cluster. Validates md5 for gcs and s3.
  4. gzip, brotli, bz2, zstd, and xz compression.
  5. Supports HTTP Range reads.
  6. Supports green threads, which are important for achieving maximum performance on virtualized servers.
  7. High efficiency transfers that avoid compression/decompression cycles.
  8. High speed gzip decompression using libdeflate (compared with zlib).
  9. Bundled CLI tool.
  10. Accepts iterator and generator input.
  11. Resumable bulk transfers.
  12. Supports composite parallel upload for GCS and multi-part upload for AWS S3.
  13. Supports s3 and GCS internal copies to avoid unnecessary data movement.

Installation

pip install cloud-files
pip install cloud-files[test] # to enable testing with pytest

If you run into trouble installing dependenies, make sure you're using at least Python3.6 and you have updated pip. On Linux, some dependencies require manylinux2010 or manylinux2014 binaries which earlier versions of pip do not search for. MacOS, Linux, and Windows are supported platforms.

Credentials

You may wish to install credentials under ~/.cloudvolume/secrets. CloudFiles is descended from CloudVolume, and for now we'll leave the same configuration structure in place.

You need credentials only for the services you'll use. The local filesystem doesn't need any. Google Storage (setup instructions here) will attempt to use default account credentials if no service account is provided.

If neither of those two conditions apply, you need a service account credential. google-secret.json is a service account credential for Google Storage, aws-secret.json is a service account for S3, etc. You can support multiple projects at once by prefixing the bucket you are planning to access to the credential filename. google-secret.json will be your defaut service account, but if you also want to also access bucket ABC, you can provide ABC-google-secret.json and you'll have simultaneous access to your ordinary buckets and ABC. The secondary credentials are accessed on the basis of the bucket name, not the project name.

mkdir -p ~/.cloudvolume/secrets/
mv aws-secret.json ~/.cloudvolume/secrets/ # needed for Amazon
mv google-secret.json ~/.cloudvolume/secrets/ # needed for Google
mv matrix-secret.json ~/.cloudvolume/secrets/ # needed for Matrix

aws-secret.json and matrix-secret.json

Create an IAM user service account that can read, write, and delete objects from at least one bucket.

{
  "AWS_ACCESS_KEY_ID": "$MY_AWS_ACCESS_KEY_ID",
  "AWS_SECRET_ACCESS_KEY": "$MY_SECRET_ACCESS_TOKEN",
  "AWS_DEFAULT_REGION": "$MY_AWS_REGION" // defaults to us-east-1
}

google-secret.json

You can create the google-secret.json file here. You don't need to manually fill in JSON by hand, the below example is provided to show you what the end result should look like. You should be able to read, write, and delete objects from at least one bucket.

{
  "type": "service_account",
  "project_id": "$YOUR_GOOGLE_PROJECT_ID",
  "private_key_id": "...",
  "private_key": "...",
  "client_email": "...",
  "client_id": "...",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://accounts.google.com/o/oauth2/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": ""
}

API Documentation

Note that the "Cloud Costs" mentioned below are current as of June 2020 and are subject to change. As of this writing, S3 and Google use identical cost structures for these operations.

Constructor

# import gevent.monkey # uncomment when using green threads
# gevent.monkey.patch_all(thread=False)
from cloudfiles import CloudFiles

cf = CloudFiles(
    cloudpath, progress=False, 
    green=None, secrets=None, num_threads=20,
    use_https=False, endpoint=None, request_payer=None,
    composite_upload_threshold = int(1e8)
)

# cloudpath examples:
cf = CloudFiles('gs://bucket/') # google cloud storage
cf = CloudFiles('s3://bucket/') # Amazon S3
cf = CloudFiles('s3://https://s3emulator.com/coolguy/') # alternate s3 endpoint
cf = CloudFiles('file:///home/coolguy/') # local filesystem
cf = CloudFiles('mem:///home/coolguy/') # in memory
cf = CloudFiles('https://website.com/coolguy/') # arbitrary web server
  • cloudpath: The path to the bucket you are accessing. The path is formatted as $PROTOCOL://BUCKET/PATH. Files will then be accessed relative to the path. The protocols supported are gs (GCS), s3 (AWS S3), file (local FS), mem (RAM), and http/https.
  • progress: Whether to display a progress bar when processing multiple items simultaneously.
  • green: Use green threads. For this to work properly, you must uncomment the top two lines. Green threads are used automatically upon monkey patching if green is None.
  • secrets: Provide secrets dynamically rather than fetching from the credentials directory $HOME/.cloudvolume/secrets.
  • num_threads: Number of simultaneous requests to make. Usually 20 per core is pretty close to optimal unless file sizes are extreme.
  • use_https: gs:// and s3:// require credentials to access their files. However, each has a read-only https endpoint that sometimes requires no credentials. If True, automatically convert gs:// to https://storage.googleapis.com/ and s3:// to https://s3.amazonaws.com/.
  • endpoint: (s3 only) provide an alternate endpoint than the official Amazon servers. This is useful for accessing the various S3 emulators offered by on-premises deployments of object storage.
  • request_payer: Specify the account that should be charged for requests towards the bucket, rather than the bucket owner.
    • gs://: request_payer can be any Google Cloud project id. Please refer to the documentation for more information.
    • s3://: request_payer must be "requester". The AWS account associated with the AWS_ACCESS_KEY_ID will be charged. Please refer to the documentation for more information

The advantage of using mem:// versus a dict has both the advantage of using identical interfaces in your code and it will use compression automatically.

get / get_json

# Let 'filename' be the file b'hello world'

binary = cf.get('filename')
binary = cf['filename']
>> b'hello world'

compressed_binary = cf.get('filename', raw=True) 

binaries = cf.get(['filename1', 'filename2'])
>> [ { 'path': 'filename1', 'content': b'...', 'byte_range': (None, None), 'error': None }, { 'path': 'filename2', 'content': b'...', 'byte_range': (None, None), 'error': None } ]

# total provides info for progress bar when using generators.
binaries = cf.get(generator(), total=N) 

binary = cf.get({ 'path': 'filename', 'start': 0, 'end': 5 }) # fetches 5 bytes
binary = cf['filename', 0:5] # only fetches 5 bytes
binary = cf['filename'][0:5] # same result, fetches 11 bytes
>> b'hello' # represents byte range 0-4 inclusive of filename

binaries = cf[:100] # download the first 100 files

get supports several different styles of input. The simplest takes a scalar filename and returns the contents of that file. However, you can also specify lists of filenames, a byte range request, and lists of byte range requests. You can provide a generator or iterator as input as well. Order is not guaranteed.

When more than one file is provided at once, the download will be threaded using preemptive or cooperative (green) threads depending on the green setting. If progress is set to true, a progress bar will be displayed that counts down the number of files to download.

get_json is the same as get but it will parse the returned binary as JSON data encoded as utf8 and returns a dictionary. Order is guaranteed.

Cloud Cost: Usually about $0.40 per million requests.

put / puts / put_json / put_jsons

cf.put('filename', b'content')
cf['filename'] = b'content'
cf.put_json('digits', [1,2,3,4,5])

cf.puts([{ 
   'path': 'filename',
   'content': b'...',
   'content_type': 'application/octet-stream',
   'compress': 'gzip',
   'compression_level': 6, # parameter for gzip or brotli compressor
   'cache_control': 'no-cache',
}])

cf.puts([ (path, content), (path, content) ], compression='gzip')
cf.put_jsons(...)

# Definition of put, put_json is identical
def put(
    self, 
    path, content,     
    content_type=None, compress=None, 
    compression_level=None, cache_control=None,
    raw=False
)

# Definition of puts, put_jsons is identical
def puts(
    self, files, 
    content_type=None, compress=None, 
    compression_level=None, cache_control=None,
    total=None, raw=False
)

The PUT operation is the most complex operation because it's so configurable. Sometimes you want one file, sometimes many. Sometimes you want to configure each file individually, sometimes you want to standardize a bulk upload. Sometimes it's binary data, but oftentimes it's JSON. We provide a simpler interface for uploading a single file put and put_json (singular) versus the interface for uploading possibly many files puts and put_jsons (plural).

In order to upload many files at once (which is much faster due to threading), you need to minimally provide the path and content for each file. This can be done either as a dict containing those fields or as a tuple (path, content). If dicts are used, the fields (if present) specified in the dict take precedence over the parameters of the function. You can mix tuples with dicts. The input to puts can be a scalar (a single dict or tuple) or an iterable such as a list, iterator, or generator.

Cloud Cost: Usually about $5 per million files.

delete

cf.delete('filename')
cf.delete([ 'file1', 'file2', ... ])
del cf['filename']

This will issue a delete request for each file specified in a threaded fashion.

Cloud Cost: Usually free.

exists

cf.exists('filename') 
>> True # or False

cf.exists([ 'file1', 'file2', ... ]) 
>> { 'file1': True, 'file2': False, ... }

Scalar input results in a simple boolean output while iterable input returns a dictionary of input paths mapped to whether they exist. In iterable mode, a progress bar may be displayed and threading is utilized to improve performance.

Cloud Cost: Usually about $0.40 per million requests.

size

cf.size('filename')
>>> 1337 # size in bytes

cf.size([ 'file1', 'nonexistent', 'empty_file', ... ])
>>> { 'file1': 392, 'nonexistent': None, 'empty_file': 0, ... }

The output is the size of each file as it is stored in bytes. If the file doesn't exist, None is returned. Scalar input results in a simple boolean output while iterable input returns a dictionary of input paths mapped to whether they exist. In iterable mode, a progress bar may be displayed and threading is utilized to improve performance.

Cloud Cost: Usually about $0.40 per million requests.

list

cf.list() # returns generator
list(cf) # same as list(cf.list())
cf.list(prefix="abc")
cf.list(prefix="abc", flat=True)

Recall that in object storage, directories do not really exist and file paths are really a key-value mapping. The list operator will list everything under the cloudpath given in the constructor. The prefix operator allows you to efficiently filter some of the results. If flat is specified, the results will be filtered to return only a single "level" of the "directory" even though directories are fake. The entire set of all subdirectories will still need to be fetched.

Cloud Cost: Usually about $5 per million requests, but each request might list 1000 files. The list operation will continuously issue list requests lazily as needed.

transfer_to / transfer_from

cff = CloudFiles('file:///source_location')
cfg = CloudFiles('gs://dest_location')

# Transfer all files from local filesys to google cloud storage
cfg.transfer_from(cff, block_size=64) # in blocks of 64 files
cff.transfer_to(cfg, block_size=64)
cff.transfer_to(cfg, block_size=64, reencode='br') # change encoding to brotli
cfg[:] = cff # default block size 64

Transfer semantics provide a simple way to perform bulk file transfers. Use the block_size parameter to adjust the number of files handled in a given pass. This can be important for preventing memory blow-up and reducing latency between batches.

gs to gs and s3 to s3 transfers will occur within the cloud without looping through the executing client provided no reencoding is specified.

resumable transfer

from cloudfiles import ResumableTransfer

# .db b/c this is a sqlite database
# that will be automatically created
rt = ResumableTransfer("NAME_OF_JOB.db") 

# init should only be called once
rt.init("file://source_location", "gs://dest_location")
# This part can be interrupted and resumed
rt.execute("NAME_OF_JOB.db")
# If multiple transfer clients, the lease_msec
# parameter must be specified to prevent conflicts.
rt.execute("NAME_OF_JOB.db", lease_msec=30000)

rt.close() # deletes NAME_OF_JOB.db

This is esentially a more durable version of cp. The transfer works by first loading a sqlite database with filenames, a "done" flag, and a lease time. Then clients can attach to the database and execute the transfer in batches. When multiple clients are used, a lease time must be set so that the database does not return the same set of files to each client (and is robust).

This transfer type can also be accessed via the CLI.

cloudfiles xfer init SOURCE DEST --db NAME_OF_JOB.db
cloudfiles xfer execute NAME_OF_JOB.db # deletes db when done

composite upload (Google Cloud Storage)

If a file is larger than 100MB (default), CloudFiles will split the file into 100MB parts and upload them as individual part files using the STANDARD storage class to minimize deletion costs. Once uploaded, the part files will be recursively merged in a tree 32 files at a time. After each merge, the part files will be deleted. The final file will have the default storage class for the bucket.

If an upload is interrupted, the part files will remain and must be cleaned up. You can provide an open for binary reading file handle instead of a bytes object so that large files can be uploaded without overwhelming RAM. You can also adjust the composite threshold using CloudFiles(..., composite_upload_threshold=int(2e8)) to for example, raise the threshold to 200MB.

mutli-part upload (S3)

If a file is larger than 100MB (default), the S3 service will use multi-part upload. ou can provide an open for binary reading file handle instead of a bytes object so that large files can be uploaded without overwhelming RAM. You can also adjust the composite threshold using CloudFiles(..., composite_upload_threshold=int(2e8)) to for example, raise the threshold to 200MB.

Unfinished upload parts remain on S3 (and cost money) unless you use a bucket lifecycle rule to remove them automatically.

https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html

transcode

from cloudfiles.compression import transcode

files = cf.get(...) 

for file in transcode(files, 'gzip'):
  file['content'] # gzipped file content regardless of source

transcode(files, 
  encoding='gzip', # any cf compatible compression scheme
  in_place=False, # modify the files in-place to save memory
  progress=True # progress bar
)

Sometimes we want to change the encoding type of a set of arbitrary files (often when moving them around to another storage system). transcode will take the output of get and transcode the resultant files into a new format. transcode respects the raw attribute which indicates that the contents are already compressed and will decompress them first before recompressing. If the input data are already compressed to the correct output encoding, it will simply pass it through without going through a decompression/recompression cycle.

transcode returns a generator so that the transcoding can be done in a streaming manner.

Network Robustness

CloudFiles protects itself from network issues in several ways.

First, it uses a connection pool to avoid needing to reestablish connections or exhausting the number of available sockets.

Second, it uses an exponential random window backoff to retry failed connections and requests. The exponential backoff allows increasing breathing room for an overloaded server and the random window decorrelates independent attempts by a cluster. If the backoff was not growing, the retry attempts by a large cluster would be too rapid fire or inefficiently slow. If the attempts were not decorrellated, then regardless of the backoff, the servers will often all try again around the same time. We backoff seven times starting from 0.5 seconds to 60 seconds, doubling the random window each time.

Third, for Google Cloud Storage (GCS) and S3 endpoints, we compute the md5 digest both sending and receiving to ensure data corruption did not occur in transit and that the server sent the full response. We cannot validate the digest for partial ("Range") reads. For composite objects (GCS) we can check the crc32c check-sum which catches transmission errors but not tampering (though MD5 isn't secure at all anymore). We are unable to perform validation for multi-part uploads (S3). Using custom encryption keys may also create validation problems.

Local File Locking

When accessing local files (file://), CloudFiles can use the fasteners library to perform locking so multi-process IO can be performed safely. These options have no effect on remote file access such as gs:// or s3://.

Lock files are by default stored in $CLOUD_FILES_DIR/locks (usually ~/.cloudfiles/locks).

Locking is enabled if:

  1. CloudFiles(..., locking=True)
  2. CloudFiles(..., locking=None) and a locking directory is set either via CloudFiles(..., lock_dir=...) or via the environment variable CLOUD_FILES_LOCK_DIR.

Locking is always disabled if locking=False. This can be advantageous for performance reasons but may require careful design of access patterns to avoid reading a file that is being written to.

You can check the current assigned locks directory:

cf.lock_dir

You can clear the lock dir of all locks with:

cf.clear_locks()

CloudFiles CLI Tool

# list cloud and local directories
cloudfiles ls gs://bucket-folder/
# parallel file transfer, no decompression
cloudfiles -p 2 cp --progress -r s3://bkt/ gs://bkt2/
# change compression type to brotli
cloudfiles cp -c br s3://bkt/file.txt gs://bkt2/
# decompress
cloudfiles cp -c none s3://bkt/file.txt gs://bkt2/
# pass from stdin (use "-" for source argument)
find some_dir | cloudfiles cp - s3://bkt/
# resumable transfers
cloudfiles xfer init SRC DEST --db JOBNAME.db
cloudfiles xfer execute JOBNAME.db --progress # can quit and resume
# Get human readable file sizes from anywhere
cloudfiles du -shc ./tmp gs://bkt/dir s3://bkt/dir
# remove files
cloudfiles rm ./tmp gs://bkt/dir/file s3://bkt/dir/file
# cat across services, -r for range reads
cloudfiles cat ./tmp gs://bkt/dir/file s3://bkt/dir/file
# verify a transfer was successful by comparing bytes and hashes
cloudfiles verify ./my-data gs://bucket/my-data/ 

cp Pros and Cons

For the cp command, the bundled CLI tool has a number of advantages vs. gsutil when it comes to transfers.

  1. No decompression of file transfers (unless you want to).
  2. Can shift compression format.
  3. Easily control the number of parallel processes.
  4. Green threads make core utilization more efficient.
  5. Optionally uses libdeflate for faster gzip decompression.

It also has some disadvantages:

  1. Doesn't support all commands.
  2. File suffixes may be added to signify compression type on the local filesystem (e.g. .gz, .br, or .zstd). cloudfiles ls will list them without the extension and they will be converted into Content-Encoding on cloud storage.

ls Generative Expressions

For the ls command, we support (via the -e flag) simple generative expressions that enable querying multiple prefixes at once. A generative expression is denoted [chars] where c,h,a,r, & s will be inserted individually into the position where the expression appears. Multiple expressions are allowed and produce a cartesian product of resulting strings. This functionality is very limited at the moment but we intend to improve it.

cloudfiles ls -e "gs://bucket/prefix[ab]"
# equivalent to:
# cloudfiles ls gs://bucket/prefixa
# cloudfiles ls gs://bucket/prefixb

alias for Alternative S3 Endpoints

You can set your own protocols for S3 compatible endpoints by creating dynamic or persistent aliases. CloudFiles comes with two official s3 endpoints that are important for the Seung Lab, matrix:// and tigerdata:// which point to Princeton S3 endpoints. Official aliases can't be overridden.

To create a dynamic alias, you can use cloudfiles.paths.add_alias which will only affect the current process. To create a persistent alias that resides in ~/.cloudfiles/aliases.json, you can use the CLI.

cloudfiles alias add example s3://https://example.com/ # example://
cloudfiles alias ls # list all aliases
cloudfiles alias rm example # remove example://

The alias file is only accessed (and cached) if CloudFiles encounters an unknown protocol. If you stick to default protocols and use the syntax s3://https://example.com/ for alternative endpoints, you can still use CloudFiles in environments without filesystem access.

Credits

CloudFiles is derived from the CloudVolume.Storage system.

Storage was initially created by William Silversmith and Ignacio Tartavull. It was maintained and improved by William Silversmith and includes improvements by Nico Kemnitz (extremely fast exists), Ben Falk (brotli), and Ran Lu (local file locking). Manuel Castro added the ability to chose cloud storage class. Thanks to the anonymous author from https://teppen.io/ for their s3 etag validation code.