From c17764bd533d5fda39b0a9728cb4c09634b0f013 Mon Sep 17 00:00:00 2001 From: Pieter Robberechts Date: Thu, 25 Apr 2024 15:09:07 +0200 Subject: [PATCH] fix(io): Allow compression for remote files --- kloppy/_providers/statsbomb.py | 6 - kloppy/io.py | 200 +++++++++++++++++++++++++------- kloppy/tests/test_io.py | 202 +++++++++++++++++++++++---------- setup.py | 4 + 4 files changed, 304 insertions(+), 108 deletions(-) diff --git a/kloppy/_providers/statsbomb.py b/kloppy/_providers/statsbomb.py index 1d750bc0..d26614ae 100644 --- a/kloppy/_providers/statsbomb.py +++ b/kloppy/_providers/statsbomb.py @@ -1,4 +1,3 @@ -import contextlib import warnings from typing import Union @@ -12,11 +11,6 @@ from kloppy.io import open_as_file, FileLike, Source -@contextlib.contextmanager -def dummy_context_mgr(): - yield None - - def load( event_data: FileLike, lineup_data: FileLike, diff --git a/kloppy/io.py b/kloppy/io.py index c213a90b..d2ca20e0 100644 --- a/kloppy/io.py +++ b/kloppy/io.py @@ -1,11 +1,20 @@ +"""I/O utilities for reading raw data.""" import contextlib import logging import os import urllib.parse from dataclasses import dataclass, replace -from io import BytesIO +from io import BytesIO, BufferedIOBase from pathlib import PurePath -from typing import IO, BinaryIO, Tuple, Union +from typing import ( + IO, + BinaryIO, + Tuple, + Union, + Optional, + Generator, + ContextManager, +) from kloppy.config import get_config from kloppy.exceptions import InputNotFoundError @@ -14,7 +23,7 @@ logger = logging.getLogger(__name__) -def _open(file: str, mode: str): +def _open(file: str, mode: str) -> IO: if file.endswith(".gz"): import gzip @@ -30,8 +39,41 @@ def _open(file: str, mode: str): return open(file, mode) +def _decompress(stream: BinaryIO) -> BinaryIO: + stream.seek(0) + if stream.read(3) == b"\x1f\x8b\x08": + import gzip + + stream.seek(0) + print("GZIP") + return gzip.GzipFile(fileobj=stream, mode="rb") + elif stream.read(6) == b"\xfd7zXZ\x00": + import lzma + + stream.seek(0) + return lzma.LZMAFile(stream) + elif stream.read(3) == b"BZh": + import bz2 + + stream.seek(0) + return bz2.BZ2File(stream) + stream.seek(0) + return stream + + @dataclass(frozen=True) class Source: + """A wrapper around a file-like object to enable optional inputs. + + Args: + data (FileLike): The file-like object. + optional (bool): Whether the file is optional. Defaults to False. + skip_if_missing (bool): Whether to skip the file if it is missing. Defaults to False. + + Example: + >>> open_as_file(Source.create("example.csv", optional=True)) + """ + data: "FileLike" optional: bool = False skip_if_missing: bool = False @@ -43,10 +85,33 @@ def create(cls, input_: "FileLike", **kwargs): return Source(data=input_, **kwargs) -FileLike = Union[str, PurePath, bytes, IO[bytes], Source] +FileLike = Union[str, PurePath, bytes, BinaryIO, Source] def get_file_extension(f: FileLike) -> str: + """Determine the file extension of the given file-like object. + + Args: + f (FileLike): The file-like object whose extension needs to be determined. + + Returns: + str: The file extension, including the dot ('.') if present. + + Raises: + Exception: If the extension cannot be determined. + + Note: + - If the file has compression extensions such as '.gz', '.xz', or + '.bz2', they will be stripped before determining the extension. + + Example: + >>> get_file_extension("example.xml.gz") + '.xml' + >>> get_file_extension(Path("example.txt")) + '.txt' + >>> get_file_extension(Source(data="example.csv")) + '.csv' + """ if isinstance(f, PurePath) or isinstance(f, str): f = str(f) for ext in [".gz", ".xz", ".bz2"]: @@ -59,7 +124,28 @@ def get_file_extension(f: FileLike) -> str: raise Exception("Could not determine extension") -def get_local_cache_stream(url: str, cache_dir: str) -> Tuple[BinaryIO, bool]: +def get_local_cache_stream( + url: str, cache_dir: str +) -> Tuple[BinaryIO, Union[bool, str]]: + """Get a stream to the local cache file for the given URL. + + Args: + url (str): The URL to cache. + cache_dir (str): The directory where the cache file will be stored. + + Returns: + Tuple[BinaryIO, bool | str]: A tuple containing a binary stream to the + local cache file and the path to the cache file if it already + exists and is non-empty, otherwise False. + + Note: + - If the specified cache directory does not exist, it will be created. + - If the cache file does not exist, it will be created and will be + named after the URL. + + Example: + >>> stream, exists = get_local_cache_stream("https://example.com/data", "./cache") + """ if not os.path.exists(cache_dir): os.makedirs(cache_dir) @@ -71,7 +157,7 @@ def get_local_cache_stream(url: str, cache_dir: str) -> Tuple[BinaryIO, bool]: # 1. The file is created when it does not exist # 2. The file is not truncated when it does exist # 3. The file can be read - return _open(local_filename, "a+b"), ( + return open(local_filename, "a+b"), ( os.path.exists(local_filename) and os.path.getsize(local_filename) > 0 and local_filename @@ -79,11 +165,46 @@ def get_local_cache_stream(url: str, cache_dir: str) -> Tuple[BinaryIO, bool]: @contextlib.contextmanager -def dummy_context_mgr(): - yield None +def dummy_context_mgr() -> Generator[None, None, None]: + yield + + +def open_as_file(input_: FileLike) -> ContextManager[Optional[BinaryIO]]: + """Open a byte stream to the given input object. + + The following input types are supported: + - A string or `pathlib.Path` object representing a local file path. + - A string representing a URL. It should start with 'http://' or + 'https://'. + - A string representing a path to a file in a Amazon S3 cloud storage + bucket. It should start with 's3://'. + - A xml or json string containing the data. The string should contain + a '{' or '<' character. Otherwise, it will be treated as a file path. + - A bytes object containing the data. + - A buffered binary stream that inherits from `io.BufferedIOBase`. + - A [Source](`kloppy.io.Source`) object that wraps any of the above + input types. + + Args: + input_ (FileLike): The input object to be opened. + Returns: + BinaryIO: A binary stream to the input object. -def open_as_file(input_: FileLike) -> IO: + Raises: + InputNotFoundError: If the input file is not found. + + Example: + >>> with open_as_file("example.txt") as f: + ... contents = f.read() + + Note: + To support reading data from other sources, see the + [Adapter](`kloppy.io.adapters.Adapter`) class. + + If the given file path or URL ends with '.gz', '.xz', or '.bz2', the + file will be decompressed before being read. + """ if isinstance(input_, Source): if input_.data is None and input_.optional: # This saves us some additional code in every vendor specific code @@ -96,41 +217,40 @@ def open_as_file(input_: FileLike) -> IO: logging.info(f"Input {input_.data} not found. Skipping") return dummy_context_mgr() raise + elif isinstance(input_, str) and ("{" in input_ or "<" in input_): + return BytesIO(input_.encode("utf8")) + elif isinstance(input_, bytes): + return BytesIO(input_) elif isinstance(input_, str) or isinstance(input_, PurePath): if isinstance(input_, PurePath): input_ = str(input_) - is_path = True - else: - is_path = False - if not is_path and ("{" in input_ or "<" in input_): - return BytesIO(input_.encode("utf8")) - else: - adapter = get_adapter(input_) - if adapter: - cache_dir = get_config("cache") - if cache_dir: - stream, local_cache_file = get_local_cache_stream( - input_, cache_dir - ) - else: - stream = BytesIO() - local_cache_file = None - - if not local_cache_file: - logger.info(f"Retrieving {input_}") - adapter.read_to_stream(input_, stream) - logger.info("Retrieval complete") - else: - logger.info(f"Using local cached file {local_cache_file}") - stream.seek(0) + adapter = get_adapter(input_) + if adapter: + cache_dir = get_config("cache") + assert cache_dir is None or isinstance(cache_dir, str) + if cache_dir: + stream, local_cache_file = get_local_cache_stream( + input_, cache_dir + ) else: - if not os.path.exists(input_): - raise InputNotFoundError(f"File {input_} does not exist") + stream = BytesIO() + local_cache_file = None - stream = _open(input_, "rb") - return stream - elif isinstance(input_, bytes): - return BytesIO(input_) - else: + if not local_cache_file: + logger.info(f"Retrieving {input_}") + adapter.read_to_stream(input_, stream) + logger.info("Retrieval complete") + else: + logger.info(f"Using local cached file {local_cache_file}") + stream = _decompress(stream) + else: + if not os.path.exists(input_): + raise InputNotFoundError(f"File {input_} does not exist") + + stream = _open(input_, "rb") + return stream + elif isinstance(input_, BufferedIOBase): return input_ + + raise ValueError(f"Unsupported input type: {type(input_)}") diff --git a/kloppy/tests/test_io.py b/kloppy/tests/test_io.py index 52035f84..0a826687 100644 --- a/kloppy/tests/test_io.py +++ b/kloppy/tests/test_io.py @@ -1,84 +1,162 @@ import os from pathlib import Path +from io import BytesIO +import json +import gzip +import s3fs +from moto import mock_s3 -from kloppy.io import open_as_file, get_file_extension - +import pytest -class TestOpenAsFile: - """Tests for the open_as_file function.""" +from kloppy.io import open_as_file, get_file_extension +from kloppy.exceptions import InputNotFoundError - def test_bytes(self, base_dir: Path): - """It should be able to open a file from a bytes object.""" - path = base_dir / "files" / "tracab_meta.xml" - with open(path, "rb") as f: - data = f.read() - with open_as_file(data) as fp: - assert fp.read() == data +@pytest.fixture() +def filesystem_content(tmp_path: Path): + """Set up the content to be read from a file.""" + path = tmp_path / "testfile.txt" + with open(path, "w") as f: + f.write("Hello, world!") - def test_str(self, base_dir: Path): - """It should be able to open a file from a string object.""" - path = str(base_dir / "files" / "tracab_meta.xml") - with open_as_file(path) as fp: - data = fp.read() + gz_path = tmp_path / "testfile.txt.gz" + with open(gz_path, "wb") as f: + import gzip - assert len(data) == os.path.getsize(path) + with gzip.open(f, "wb") as f_out: + f_out.write(b"Hello, world!") - def test_path(self, base_dir: Path): - """It should be able to open a file from a Path object.""" - path = base_dir / "files" / "tracab_meta.xml" - with open_as_file(path) as fp: - data = fp.read() + xz_path = tmp_path / "testfile.txt.xz" + with open(xz_path, "wb") as f: + import lzma - assert len(data) == os.path.getsize(path) + with lzma.open(f, "wb") as f_out: + f_out.write(b"Hello, world!") - def test_gzip(self, base_dir: Path, tmp_path: Path): - """It should be able to open a gzipped file.""" - raw_path = base_dir / "files" / "tracab_meta.xml" - gz_path = tmp_path / "tracab_meta.xml.gz" - # Create a gzipped file - import gzip + bz2_path = tmp_path / "testfile.txt.bz2" + with open(bz2_path, "wb") as f: + import bz2 - with open(raw_path, "rb") as f: - with gzip.open(gz_path, "wb") as f_out: - f_out.write(f.read()) - # Read the gzipped file - with open_as_file(raw_path) as fp: - data = fp.read() + with bz2.open(f, "wb") as f_out: + f_out.write(b"Hello, world!") + + return tmp_path + + +@pytest.fixture +def httpserver_content(httpserver): + """Set up the content to be read from a HTTP server.""" + httpserver.expect_request("/testfile.txt").respond_with_data( + "Hello, world!" + ) + httpserver.expect_request("/compressed_testfile.txt").respond_with_data( + gzip.compress(b"Hello, world!"), + headers={"Content-Encoding": "gzip", "Content-Type": "text/plain"}, + ) + httpserver.expect_request("/testfile.txt.gz").respond_with_data( + gzip.compress(b"Hello, world!"), + headers={"Content-Type": "application/x-gzip"}, + ) + + +@pytest.fixture +def s3_content(): + with mock_s3(): + s3_fs = s3fs.S3FileSystem(anon=True) + s3_fs.mkdir("test-bucket") + with s3_fs.open(f"test-bucket/testfile.txt", "wb") as f: + f.write(b"Hello, world!") + with s3_fs.open(f"test-bucket/testfile.txt.gz", "wb") as f: + f.write(gzip.compress(b"Hello, world!")) + yield s3_fs + s3_fs.rm("test-bucket", recursive=True) - assert len(data) == os.path.getsize(raw_path) - def test_xz(self, base_dir: Path, tmp_path: Path): - """It should be able to open a LZMA-compressed file.""" - raw_path = base_dir / "files" / "tracab_meta.xml" - gz_path = tmp_path / "tracab_meta.xml.gz" - # Create a LMZA-compressed file - import lzma +class TestOpenAsFile: + """Tests for the open_as_file function.""" - with open(raw_path, "rb") as f: - with lzma.open(gz_path, "wb") as f_out: - f_out.write(f.read()) - # Read the gzipped file - with open_as_file(raw_path) as fp: - data = fp.read() + def test_bytes(self): + """It should be able to open a file from a bytes object.""" + with open_as_file(b"Hello, world!") as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" - assert len(data) == os.path.getsize(raw_path) + def test_data_string(self): + """It should be able to open a file from a string object.""" + with open_as_file('{"msg": "Hello, world!"}') as fp: + assert fp is not None + assert json.load(fp) == {"msg": "Hello, world!"} + + def test_stream(self): + """It should be able to open a file from a byte stream object.""" + data = b"Hello, world!" + with open_as_file(BytesIO(data)) as fp: + assert fp is not None + assert fp.read() == data - def test_bz2(self, base_dir: Path, tmp_path: Path): - """It should be able to open a bzip2-compressed file.""" - raw_path = base_dir / "files" / "tracab_meta.xml" - gz_path = tmp_path / "tracab_meta.xml.gz" - # Create a bz2-compressed file - import bz2 + def test_path_str(self, filesystem_content: Path): + """It should be able to open a file from a string path.""" + path = str(filesystem_content / "testfile.txt") + with open_as_file(path) as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" - with open(raw_path, "rb") as f: - with bz2.open(gz_path, "wb") as f_out: - f_out.write(f.read()) - # Read the gzipped file - with open_as_file(raw_path) as fp: - data = fp.read() + def test_path_obj(self, filesystem_content: Path): + """It should be able to open a file from a Path object.""" + path = filesystem_content / "testfile.txt" + with open_as_file(path) as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" - assert len(data) == os.path.getsize(raw_path) + @pytest.mark.parametrize("ext", [".gz", ".xz", ".bz2"]) + def test_path_compressed(self, filesystem_content: Path, ext: str): + """It should be able to open a compressed local file.""" + path = filesystem_content / f"testfile.txt{ext}" + with open_as_file(path) as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" + + def test_path_missing(self, filesystem_content: Path): + """It should raise an error if the file is not found.""" + path = filesystem_content / "missing.txt" + with pytest.raises(InputNotFoundError): + with open_as_file(path) as fp: + pass + + def test_http(self, httpserver, httpserver_content): + """It should be able to open a file from a URL.""" + url = httpserver.url_for("/testfile.txt") + with open_as_file(url) as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" + + def test_http_compressed(self, httpserver, httpserver_content): + """It should be able to open a compressed file from a URL.""" + # If the server returns a content-encoding header, the file should be + # decompressed by the request library + url = httpserver.url_for("/compressed_testfile.txt") + with open_as_file(url) as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" + + # If the server does not set a content-type header, but the URL ends + # with .gz, the file should be decompressed by kloppy + url = httpserver.url_for("/testfile.txt.gz") + with open_as_file(url) as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" + + def test_s3(self, s3_content): + """It should be able to open a file from an S3 bucket.""" + with open_as_file("s3://test-bucket/testfile.txt") as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" + + def test_s3_compressed(self, s3_content): + """It should be able to open a file from an S3 bucket.""" + with open_as_file("s3://test-bucket/testfile.txt.gz") as fp: + assert fp is not None + assert fp.read() == b"Hello, world!" def test_get_file_extension(): diff --git a/setup.py b/setup.py index b45bcf79..799a458a 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,10 @@ def setup_package(): # of Pandas (1.3) does not support pyarrow 'pyarrow==11.0.0;python_version>"3.7"', "pytest-lazy-fixture", + "s3fs<0.5", + "moto[s3]==1.3.7;python_version<'3.10'", + "moto[s3]==4.1.0;python_version>='3.10'", + "pytest-httpserver", ], "development": ["pre-commit==2.6.0"], "query": ["networkx>=2.4,<3"],