Skip to content

Commit

Permalink
Lots of things. Needs pulling apart before merging.
Browse files Browse the repository at this point in the history
All together because this has been very exploratory.
  • Loading branch information
alexdutton committed Oct 6, 2020
1 parent 49dd5de commit 1b06f92
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 252 deletions.
46 changes: 25 additions & 21 deletions invenio_files_rest/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@
MultipartInvalidChunkSize, MultipartInvalidPartNumber, \
MultipartInvalidSize, MultipartMissingParts, MultipartNotCompleted
from .proxies import current_files_rest
from .storage.base import StorageBackend
from .utils import ENCODING_MIMETYPES, PassthroughChecksum, guess_mimetype

if TYPE_CHECKING:
from .storage import FileStorage
from .storage import StorageBackend

slug_pattern = re.compile('^[a-z][a-z0-9-]+$')

Expand Down Expand Up @@ -750,7 +751,7 @@ def delete(self):
self.query.filter_by(id=self.id).delete()
return self

def storage(self, **kwargs) -> FileStorage:
def storage(self, **kwargs) -> StorageBackend:
"""Get storage interface for object.
Uses the applications storage factory to create a storage interface
Expand Down Expand Up @@ -810,30 +811,27 @@ def verify_checksum(self, progress_callback=None, chunk_size=None,
@ensure_writable()
def initialize(self, preferred_location: Location, size=0, **kwargs):
"""Initialize file."""
print("WRI1", self.writable)
if hasattr(current_files_rest.storage_factory, 'initialize'):
# New behaviour, with a new-style storage factory
result = current_files_rest.storage_factory.initialize(
fileinstance=self,
preferred_location=preferred_location,
size=size,
)
print("WRI2", self.writable)
else:
# Old behaviour, with an old-style storage factory
storage = self.storage(default_location=preferred_location.uri, **kwargs)
if isinstance(storage.initialize.__self__, type):
# New storage backend, but old storage factory
# New storage backends have `initialize` as a classmethod. The storage backend will have constructed
# a URI and passed it to the storage's __init__, so we can grab it from there.
result = storage.initialize(suggested_uri=storage.uri, size=size)
else:
result = storage.initialize(size=size)
result = storage.initialize(size=size)
print("WRI3", self.writable, storage)
self.update_file_metadata(
result,
readable=False,
writable=True,
storage_backend=type(storage).backend_name,
storage_backend=type(storage).backend_name if isinstance(storage, StorageBackend) else None,
)

print("WRI4", self.writable, result)

@ensure_writable()
def init_contents(self, size=0, default_location: str=None, **kwargs):
Expand Down Expand Up @@ -876,18 +874,11 @@ def set_contents(self, stream, chunk_size=None, size=None, size_limit=None,
progress_callback=progress_callback)

storage = self.storage(**kwargs)
storage.save(wrapped_stream, chunk_size=chunk_size, size=size,
size_limit=size_limit, progress_callback=progress_callback)
self.update_file_metadata(storage.save(wrapped_stream, chunk_size=chunk_size, size=size,
size_limit=size_limit, progress_callback=progress_callback))

self.storage_backend = type(storage).backend_name
self.size = wrapped_stream.bytes_read
self.checksum = wrapped_stream.checksum
self.uri = storage.filepath
self.storage_backend = type(storage).backend_name if isinstance(storage, StorageBackend) else None

# TODO: Should these be set here, or sent back from the storage backend?
self.readable = True
self.writable = False
self.storage_class = 'S'

@ensure_writable()
def copy_contents(self, fileinstance, progress_callback=None,
Expand Down Expand Up @@ -937,6 +928,15 @@ def update_file_metadata(self, file_metadata: Union[Tuple,Dict] = None, **kwargs
file_metadata = {}

if isinstance(file_metadata, tuple):
assert len(file_metadata) >= 3
# Carry across defaults from **kwargs
if len(file_metadata) < 4:
file_metadata += (kwargs.get('readable', True),)
if len(file_metadata) < 5:
file_metadata += (kwargs.get('writable', False),)
if len(file_metadata) < 6:
file_metadata += (kwargs.get('storage_class', None),)
print(file_metadata)
self.set_uri(*file_metadata)
elif isinstance(file_metadata, dict):
file_metadata.update(kwargs)
Expand Down Expand Up @@ -1622,6 +1622,7 @@ def create(cls, bucket, key, size, chunk_size):

with db.session.begin_nested():
file_ = FileInstance.create()
print("WR2", file_.writable)
file_.size = size
obj = cls(
upload_id=uuid.uuid4(),
Expand All @@ -1634,11 +1635,13 @@ def create(cls, bucket, key, size, chunk_size):
)
bucket.size += size
db.session.add(obj)
print("WR3", file_.writable)
file_.init_contents(
size=size,
default_location=bucket.location.uri,
default_storage_class=bucket.default_storage_class,
)
print("WR4", file_.writable)
return obj

@classmethod
Expand Down Expand Up @@ -1779,6 +1782,7 @@ def set_contents(self, stream, progress_callback=None):
:param chunk_size: Desired chunk size to read stream in. It is up to
the storage interface if it respects this value.
"""
print("writable", self.multipart.file.writable)
size, checksum = self.multipart.file.update_contents(
stream, seek=self.start_byte, size=self.part_size,
progress_callback=progress_callback,
Expand Down
6 changes: 4 additions & 2 deletions invenio_files_rest/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

from __future__ import absolute_import, print_function

from .base import FileStorage
from .pyfs import PyFSFileStorage, pyfs_storage_factory
from .base import FileStorage, StorageBackend
from .pyfs import PyFSFileStorage, pyfs_storage_factory, PyFSStorageBackend
from .factory import StorageFactory

__all__ = (
'FileStorage',
'StorageBackend',
'pyfs_storage_factory',
'PyFSFileStorage',
'PyFSStorageBackend',
'StorageFactory',
)
154 changes: 72 additions & 82 deletions invenio_files_rest/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,32 @@

"""File storage base module."""

from __future__ import absolute_import, print_function
from __future__ import absolute_import, annotations, print_function

import hashlib
import urllib.parse
import warnings
from calendar import timegm
from datetime import datetime
from functools import partial
from typing import Any, Callable, Dict, TYPE_CHECKING, Tuple

from flask import current_app

from ..errors import FileSizeError, StorageError, UnexpectedFileSizeError
from ..helpers import chunk_size_or_default, compute_checksum, make_path, send_stream
from ..models import FileInstance
from ..utils import check_size, check_sizelimit, PassthroughChecksum

from .legacy import FileStorage

def check_sizelimit(size_limit, bytes_written, total_size):
"""Check if size limit was exceeded.
if TYPE_CHECKING:
from ..models import FileInstance

:param size_limit: The size limit.
:param bytes_written: The total number of bytes written.
:param total_size: The total file size.
:raises invenio_files_rest.errors.UnexpectedFileSizeError: If the bytes
written exceed the total size.
:raises invenio_files_rest.errors.FileSizeError: If the bytes
written are major than the limit size.
"""
if size_limit is not None and bytes_written > size_limit:
desc = 'File size limit exceeded.' \
if isinstance(size_limit, int) else size_limit.reason
raise FileSizeError(description=desc)

# Never write more than advertised
if total_size is not None and bytes_written > total_size:
raise UnexpectedFileSizeError(
description='File is bigger than expected.')
__all__ = ['FileStorage', 'StorageBackend']


def check_size(bytes_written, total_size):
"""Check if expected amounts of bytes have been written.
:param bytes_written: The total number of bytes written.
:param total_size: The total file size.
:raises invenio_files_rest.errors.UnexpectedFileSizeError: If the bytes
written exceed the total size.
"""
if total_size and bytes_written < total_size:
raise UnexpectedFileSizeError(
description='File is smaller than expected.')


class FileStorageMeta(type):
class StorageBackendMeta(type):
@property
def backend_name(cls):
try:
Expand All @@ -74,9 +48,11 @@ def backend_name(cls):
return cls._backend_name


class FileStorage(metaclass=FileStorageMeta):
class StorageBackend(metaclass=StorageBackendMeta):
"""Base class for storage interface to a single file."""

checksum_hash_name = 'md5'

def __init__(self, uri: str=None, size: int=None, modified: datetime=None, *, filepath=None):
"""Initialize storage object."""
self.uri = uri or filepath
Expand All @@ -94,20 +70,76 @@ def delete(self):
"""Delete the file."""
raise NotImplementedError

@classmethod
def initialize(cls, suggested_uri, size=0):
def initialize(self, size=0):
"""Initialize the file on the storage + truncate to the given size."""
return {
'readable': False,
'writable': True,
'uri': self.uri,
'size': size,
**self._initialize(size=size),
}

def _initialize(self, size=0):
raise NotImplementedError


def save(self, incoming_stream, size_limit=None, size=None,
chunk_size=None, progress_callback=None):
chunk_size=None, progress_callback: Callable[[int, int], None] = None
):
"""Save incoming stream to file storage."""

incoming_stream = PassthroughChecksum(
incoming_stream,
hash_name=self.checksum_hash_name,
progress_callback=progress_callback,
size_limit=size_limit,
size=size,
)

result = self._save(
incoming_stream,
size=None,
chunk_size=None
)

check_size(incoming_stream.bytes_read, size)
self._size = incoming_stream.total_size

return {
'checksum': incoming_stream.checksum,
'size': incoming_stream.total_size,
'uri': self.uri,
'readable': True,
'writable': False,
'storage_class': 'S',
**result,
}

def _save(self, incoming_stream, size_limit=None, size=None,
chunk_size=None) -> Dict[str, Any]:
"""Save incoming stream to file storage."""
raise NotImplementedError

def update(self, incoming_stream, seek=0, size=None, chunk_size=None,
progress_callback=None):
progress_callback=None) -> Tuple[int, str]:
"""Update part of file with incoming stream."""
incoming_stream = PassthroughChecksum(
incoming_stream,
hash_name=self.checksum_hash_name,
progress_callback=progress_callback,
size=size,
)

self._update(
incoming_stream,
seek=seek,
size=None,
chunk_size=chunk_size,
)

return incoming_stream.bytes_read, incoming_stream.checksum

def _update(self, incoming_stream, seek=0, size=None, chunk_size=None):
raise NotImplementedError

#
Expand Down Expand Up @@ -218,45 +250,3 @@ def _compute_checksum(self, stream, size=None, chunk_size=None,
except Exception as e:
raise StorageError(
'Could not compute checksum of file: {0}'.format(e))

def _write_stream(self, src, dst, size=None, size_limit=None,
chunk_size=None, progress_callback=None):
"""Get helper to save stream from src to dest + compute checksum.
:param src: Source stream.
:param dst: Destination stream.
:param size: If provided, this exact amount of bytes will be
written to the destination file.
:param size_limit: ``FileSizeLimit`` instance to limit number of bytes
to write.
"""
chunk_size = chunk_size_or_default(chunk_size)

algo, m = self._init_hash()
bytes_written = 0

while 1:
# Check that size limits aren't bypassed
check_sizelimit(size_limit, bytes_written, size)

chunk = src.read(chunk_size)

if not chunk:
if progress_callback:
progress_callback(bytes_written, bytes_written)
break

dst.write(chunk)

bytes_written += len(chunk)

if m:
m.update(chunk)

if progress_callback:
progress_callback(None, bytes_written)

check_size(bytes_written, size)

return bytes_written, '{0}:{1}'.format(
algo, m.hexdigest()) if m else None
Loading

0 comments on commit 1b06f92

Please sign in to comment.