diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 726866d..571503e 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -18,6 +18,7 @@ from multiformats import CID, multicodec from . import unixfsv1 +from .car import read_car import logging @@ -69,20 +70,30 @@ def __str__(self): return f"GW({self.url})" async def info(self, path, session): - res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"}) self._raise_not_found_for_status(res, path) - cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + + roots = res.headers["X-Ipfs-Roots"].split(",") + if len(roots) != len(path.split("/")): + raise FileNotFoundError(path) + + cid = CID.decode(roots[-1]) resdata = await res.read() + _, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/ + blocks = {cid: data for cid, data, _ in blocks} + block = blocks[cid] + if cid.codec == RawCodec: return { "name": path, "CID": str(cid), "type": "file", - "size": len(resdata), + "size": len(block), } elif cid.codec == DagPbCodec: - node = unixfsv1.PBNode.loads(resdata) + + node = unixfsv1.PBNode.loads(block) data = unixfsv1.Data.loads(node.Data) if data.Type == unixfsv1.DataType.Raw: raise FileNotFoundError(path) # this is not a file, it's only a part of it @@ -133,12 +144,20 @@ async def iter_chunked(self, path, session, chunk_size): yield size, res.content.iter_chunked(chunk_size) async def ls(self, path, session, detail=False): - res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) + res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"}) self._raise_not_found_for_status(res, path) - resdata = await res.read() - cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1]) + roots = res.headers["X-Ipfs-Roots"].split(",") + if len(roots) != len(path.split("/")): + raise FileNotFoundError(path) + + cid = CID.decode(roots[-1]) assert cid.codec == DagPbCodec, "this is not a directory" - node = unixfsv1.PBNode.loads(resdata) + + resdata = await res.read() + + _, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/ + blocks = {cid: data for cid, data, _ in blocks} + node = unixfsv1.PBNode.loads(blocks[cid]) data = unixfsv1.Data.loads(node.Data) if data.Type != unixfsv1.DataType.Directory: # TODO: we might need support for HAMTShard here (for large directories) diff --git a/ipfsspec/car.py b/ipfsspec/car.py new file mode 100644 index 0000000..028e41e --- /dev/null +++ b/ipfsspec/car.py @@ -0,0 +1,116 @@ +""" +CAR handling functions. +""" + +from typing import List, Optional, Tuple, Union, Iterator, BinaryIO +import dataclasses + +import dag_cbor +from multiformats import CID, varint, multicodec, multihash + +from .utils import is_cid_list, StreamLike, ensure_stream + +DagPbCodec = multicodec.get("dag-pb") +Sha256Hash = multihash.get("sha2-256") + +@dataclasses.dataclass +class CARBlockLocation: + varint_size: int + cid_size: int + payload_size: int + offset: int = 0 + + @property + def cid_offset(self) -> int: + return self.offset + self.varint_size + + @property + def payload_offset(self) -> int: + return self.offset + self.varint_size + self.cid_size + + @property + def size(self) -> int: + return self.varint_size + self.cid_size + self.payload_size + + +def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]: + """ + Decodes a CAR header and returns the list of contained roots. + """ + header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase + header = dag_cbor.decode(stream.read(header_size)) + if not isinstance(header, dict): + raise ValueError("no valid CAR header found") + if header["version"] != 1: + raise ValueError("CAR is not version 1") + roots = header["roots"] + if not isinstance(roots, list): + raise ValueError("CAR header doesn't contain roots") + if not is_cid_list(roots): + raise ValueError("CAR roots do not only contain CIDs") + return roots, visize + header_size + + +def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]: + try: + block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase + except ValueError: + # stream has likely been consumed entirely + return None + + data = stream.read(block_size) + # as the size of the CID is variable but not explicitly given in + # the CAR format, we need to partially decode each CID to determine + # its size and the location of the payload data + if data[0] == 0x12 and data[1] == 0x20: + # this is CIDv0 + cid_version = 0 + default_base = "base58btc" + cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec + hash_codec: Union[int, multihash.Multihash] = Sha256Hash + cid_digest = data[2:34] + data = data[34:] + else: + # this is CIDv1(+) + cid_version, _, data = varint.decode_raw(data) + if cid_version != 1: + raise ValueError(f"CIDv{cid_version} is currently not supported") + default_base = "base32" + cid_codec, _, data = multicodec.unwrap_raw(data) + hash_codec, _, data = varint.decode_raw(data) + digest_size, _, data = varint.decode_raw(data) + cid_digest = data[:digest_size] + data = data[digest_size:] + cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest)) + + if not cid.hashfun.digest(data) == cid.digest: + raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified") + + return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data)) + + +def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]: + """ + Reads a CAR. + + Parameters + ---------- + stream_or_bytes: StreamLike + Stream to read CAR from + + Returns + ------- + roots : List[CID] + Roots as given by the CAR header + blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]] + Iterator over all blocks contained in the CAR + """ + stream = ensure_stream(stream_or_bytes) + roots, header_size = decode_car_header(stream) + def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]: + offset = header_size + while (next_block := decode_raw_car_block(stream)) is not None: + cid, data, sizes = next_block + yield cid, data, dataclasses.replace(sizes, offset=offset) + offset += sizes.size + return roots, blocks() diff --git a/ipfsspec/utils.py b/ipfsspec/utils.py new file mode 100644 index 0000000..d1bb255 --- /dev/null +++ b/ipfsspec/utils.py @@ -0,0 +1,21 @@ +""" +Some utilities. +""" + +from io import BytesIO +from typing import List, Union, BinaryIO + +from multiformats import CID +from typing_extensions import TypeGuard + +StreamLike = Union[BinaryIO, bytes] + +def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO: + if isinstance(stream_or_bytes, bytes): + return BytesIO(stream_or_bytes) + else: + return stream_or_bytes + + +def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]: + return all(isinstance(o, CID) for o in os) diff --git a/test/test_async.py b/test/test_async.py index afec6eb..4034d53 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -101,3 +101,20 @@ async def test_isfile(fs): assert res is True res = await fs._isfile(TEST_ROOT) assert res is False + +@pytest.mark.parametrize("detail", [False, True]) +@pytest.mark.parametrize("fs", ["http://127.0.0.1:8080", "https://ipfs.io"], indirect=True) +@pytest.mark.asyncio +async def test_ls_multi_gw(fs, detail): + """ + Test if ls works on different gateway implementations. + + See also: https://github.com/fsspec/ipfsspec/issues/39 + """ + res = await fs._ls("bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4", detail=detail) + expected = "bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4/welcome-to-IPFS.jpg" + if detail: + assert len(res) == 1 + assert res[0]["name"] == expected + else: + assert res == [expected]