Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write to parquet #110

Merged
merged 7 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,27 +321,36 @@ Once we've combined references to all the chunks of all our legacy files into on

The [kerchunk library](https://github.com/fsspec/kerchunk) has its own [specification](https://fsspec.github.io/kerchunk/spec.html) for how byte range references should be serialized (either as a JSON or parquet file).

To write out all the references in the virtual dataset as a single kerchunk-compliant JSON file, you can use the {py:meth}`ds.virtualize.to_kerchunk <virtualizarr.xarray.VirtualiZarrDatasetAccessor.to_kerchunk>` accessor method.
To write out all the references in the virtual dataset as a single kerchunk-compliant JSON or parquet file, you can use the {py:meth}`ds.virtualize.to_kerchunk <virtualizarr.xarray.VirtualiZarrDatasetAccessor.to_kerchunk>` accessor method.

```python
combined_vds.virtualize.to_kerchunk('combined.json', format='json')
```

These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (so you need kerchunk to be installed to use `engine='kerchunk'`).
These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (kerchunk must be installed to use `engine='kerchunk'`).

```python
import fsspec

fs = fsspec.filesystem("reference", fo=f"combined.json")
mapper = fs.get_mapper("")

combined_ds = xr.open_dataset(mapper, engine="kerchunk")
combined_ds = xr.open_dataset('combined.json', engine="kerchunk")
```

```{note}
Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables.
```

When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally.

```python
combined_vds.virtualize.to_kerchunk('combined.parq', format='parquet')
```

And again we can read these references using the "kerchunk" backend as if it were a regular Zarr store

```python
combined_ds = xr.open_dataset('combined.parq', engine="kerchunk")
```

By default references are placed in separate parquet file when the total number of references exceeds `record_size`. If there are fewer than `categorical_threshold` unique urls referenced by a particular variable, url will be stored as a categorical variable.

### Writing as Zarr

Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr <virtualizarr.xarray.VirtualiZarrDatasetAccessor.to_zarr>` accessor method.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ test = [
"scipy",
"pooch",
"ruff",
"fastparquet",
"s3fs"

]


Expand Down
23 changes: 23 additions & 0 deletions virtualizarr/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
import xarray as xr
import xarray.testing as xrt

from virtualizarr import open_virtual_dataset

Expand All @@ -11,3 +13,24 @@ def test_open_scalar_variable(tmpdir):

vds = open_virtual_dataset(f"{tmpdir}/scalar.nc")
assert vds["a"].shape == ()


@pytest.mark.parametrize("format", ["json", "parquet"])
def test_kerchunk_roundtrip(tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")

# use open_virtual_dataset to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
42 changes: 42 additions & 0 deletions virtualizarr/tests/test_kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import pandas as pd
import pytest
import ujson # type: ignore
import xarray as xr
Expand Down Expand Up @@ -127,6 +128,47 @@ def test_accessor_to_kerchunk_json(self, tmp_path):
}
assert loaded_refs == expected_ds_refs

def test_accessor_to_kerchunk_parquet(self, tmp_path):
chunks_dict = {
"0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.1": {"path": "foo.nc", "offset": 200, "length": 100},
}
manifest = ChunkManifest(entries=chunks_dict)
arr = ManifestArray(
chunkmanifest=manifest,
zarray=dict(
shape=(2, 4),
dtype=np.dtype("<i8"),
chunks=(2, 2),
compressor=None,
filters=None,
fill_value=None,
order="C",
),
)
ds = xr.Dataset({"a": (["x", "y"], arr)})

filepath = tmp_path / "refs"

ds.virtualize.to_kerchunk(filepath, format="parquet", record_size=2)

with open(tmp_path / "refs" / ".zmetadata") as f:
meta = ujson.load(f)
assert list(meta) == ["metadata", "record_size"]
assert meta["record_size"] == 2

df0 = pd.read_parquet(filepath / "a" / "refs.0.parq")

assert df0.to_dict() == {
"offset": {0: 100, 1: 200},
"path": {
0: "foo.nc",
1: "foo.nc",
},
"size": {0: 100, 1: 100},
"raw": {0: None, 1: None},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "raw" mean in this specification?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it means "inlined" in the kerchunk library tests this ref: "a/6": b"data", results in a non-null "raw"

https://github.com/fsspec/kerchunk/blob/a5eae4d45f7f601346d0fdc58c0864911330c5d6/kerchunk/tests/test_df.py#L62

}


def test_kerchunk_roundtrip_in_memory_no_concat():
# Set up example xarray dataset
Expand Down
38 changes: 34 additions & 4 deletions virtualizarr/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,23 @@ def to_kerchunk(
) -> KerchunkStoreRefs: ...

@overload
def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None: ...
def to_kerchunk(self, filepath: str | Path, format: Literal["json"]) -> None: ...

@overload
def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None: ...
def to_kerchunk(
self,
filepath: str | Path,
format: Literal["parquet"],
record_size: int = 100_000,
categorical_threshold: int = 10,
) -> None: ...

def to_kerchunk(
self,
filepath: str | None = None,
filepath: str | Path | None = None,
format: Literal["dict", "json", "parquet"] = "dict",
record_size: int = 100_000,
categorical_threshold: int = 10,
) -> KerchunkStoreRefs | None:
"""
Serialize all virtualized arrays in this xarray dataset into the kerchunk references format.
Expand All @@ -386,6 +394,13 @@ def to_kerchunk(
format : 'dict', 'json', or 'parquet'
Format to serialize the kerchunk references as.
If 'json' or 'parquet' then the 'filepath' argument is required.
record_size (parquet only): int
Number of references to store in each reference file (default 100,000). Bigger values
mean fewer read requests but larger memory footprint.
categorical_threshold (parquet only) : int
Encode urls as pandas.Categorical to reduce memory footprint if the ratio
of the number of unique urls to total number of refs for each variable
is greater than or equal to this number. (default 10)

References
----------
Expand All @@ -404,6 +419,21 @@ def to_kerchunk(

return None
elif format == "parquet":
raise NotImplementedError()
from kerchunk.df import refs_to_dataframe

if isinstance(filepath, Path):
url = str(filepath)
elif isinstance(filepath, str):
url = filepath

# refs_to_dataframe is responsible for writing to parquet.
# at no point does it create a full in-memory dataframe.
refs_to_dataframe(
refs,
TomNicholas marked this conversation as resolved.
Show resolved Hide resolved
url=url,
record_size=record_size,
categorical_threshold=categorical_threshold,
)
return None
else:
raise ValueError(f"Unrecognized output format: {format}")
Loading