Skip to content

Commit

Permalink
request car instead of raw from gateways
Browse files Browse the repository at this point in the history
Trustless gateways may not always return raw IPLD blocks when a path is
requested. They should however always return car.

Fixes #39
  • Loading branch information
d70-t committed Oct 14, 2024
1 parent 729e3ed commit 3020211
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 8 deletions.
35 changes: 27 additions & 8 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from multiformats import CID, multicodec
from . import unixfsv1
from .car import read_car

import logging

Expand Down Expand Up @@ -69,20 +70,30 @@ def __str__(self):
return f"GW({self.url})"

async def info(self, path, session):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
self._raise_not_found_for_status(res, path)
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])

roots = res.headers["X-Ipfs-Roots"].split(",")
if len(roots) != len(path.split("/")):
raise FileNotFoundError(path)

cid = CID.decode(roots[-1])
resdata = await res.read()

_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
blocks = {cid: data for cid, data, _ in blocks}
block = blocks[cid]

if cid.codec == RawCodec:
return {
"name": path,
"CID": str(cid),
"type": "file",
"size": len(resdata),
"size": len(block),
}
elif cid.codec == DagPbCodec:
node = unixfsv1.PBNode.loads(resdata)

node = unixfsv1.PBNode.loads(block)
data = unixfsv1.Data.loads(node.Data)
if data.Type == unixfsv1.DataType.Raw:
raise FileNotFoundError(path) # this is not a file, it's only a part of it
Expand Down Expand Up @@ -133,12 +144,20 @@ async def iter_chunked(self, path, session, chunk_size):
yield size, res.content.iter_chunked(chunk_size)

async def ls(self, path, session, detail=False):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.car"}, params={"format": "car", "dag-scope": "block"})
self._raise_not_found_for_status(res, path)
resdata = await res.read()
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
roots = res.headers["X-Ipfs-Roots"].split(",")
if len(roots) != len(path.split("/")):
raise FileNotFoundError(path)

cid = CID.decode(roots[-1])
assert cid.codec == DagPbCodec, "this is not a directory"
node = unixfsv1.PBNode.loads(resdata)

resdata = await res.read()

_, blocks = read_car(resdata) # roots should be ignored by https://specs.ipfs.tech/http-gateways/trustless-gateway/
blocks = {cid: data for cid, data, _ in blocks}
node = unixfsv1.PBNode.loads(blocks[cid])
data = unixfsv1.Data.loads(node.Data)
if data.Type != unixfsv1.DataType.Directory:
# TODO: we might need support for HAMTShard here (for large directories)
Expand Down
116 changes: 116 additions & 0 deletions ipfsspec/car.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
CAR handling functions.
"""

from typing import List, Optional, Tuple, Union, Iterator, BinaryIO
import dataclasses

import dag_cbor
from multiformats import CID, varint, multicodec, multihash

from .utils import is_cid_list, StreamLike, ensure_stream

DagPbCodec = multicodec.get("dag-pb")
Sha256Hash = multihash.get("sha2-256")

@dataclasses.dataclass
class CARBlockLocation:
varint_size: int
cid_size: int
payload_size: int
offset: int = 0

@property
def cid_offset(self) -> int:
return self.offset + self.varint_size

@property
def payload_offset(self) -> int:
return self.offset + self.varint_size + self.cid_size

@property
def size(self) -> int:
return self.varint_size + self.cid_size + self.payload_size


def decode_car_header(stream: BinaryIO) -> Tuple[List[CID], int]:
"""
Decodes a CAR header and returns the list of contained roots.
"""
header_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
header = dag_cbor.decode(stream.read(header_size))
if not isinstance(header, dict):
raise ValueError("no valid CAR header found")
if header["version"] != 1:
raise ValueError("CAR is not version 1")
roots = header["roots"]
if not isinstance(roots, list):
raise ValueError("CAR header doesn't contain roots")
if not is_cid_list(roots):
raise ValueError("CAR roots do not only contain CIDs")
return roots, visize + header_size


def decode_raw_car_block(stream: BinaryIO) -> Optional[Tuple[CID, bytes, CARBlockLocation]]:
try:
block_size, visize, _ = varint.decode_raw(stream) # type: ignore [call-overload] # varint uses BufferedIOBase
except ValueError:
# stream has likely been consumed entirely
return None

data = stream.read(block_size)
# as the size of the CID is variable but not explicitly given in
# the CAR format, we need to partially decode each CID to determine
# its size and the location of the payload data
if data[0] == 0x12 and data[1] == 0x20:
# this is CIDv0
cid_version = 0
default_base = "base58btc"
cid_codec: Union[int, multicodec.Multicodec] = DagPbCodec
hash_codec: Union[int, multihash.Multihash] = Sha256Hash
cid_digest = data[2:34]
data = data[34:]
else:
# this is CIDv1(+)
cid_version, _, data = varint.decode_raw(data)
if cid_version != 1:
raise ValueError(f"CIDv{cid_version} is currently not supported")
default_base = "base32"
cid_codec, _, data = multicodec.unwrap_raw(data)
hash_codec, _, data = varint.decode_raw(data)
digest_size, _, data = varint.decode_raw(data)
cid_digest = data[:digest_size]
data = data[digest_size:]
cid = CID(default_base, cid_version, cid_codec, (hash_codec, cid_digest))

if not cid.hashfun.digest(data) == cid.digest:
raise ValueError(f"CAR is corrupted. Entry '{cid}' could not be verified")

return cid, bytes(data), CARBlockLocation(visize, block_size - len(data), len(data))


def read_car(stream_or_bytes: StreamLike) -> Tuple[List[CID], Iterator[Tuple[CID, bytes, CARBlockLocation]]]:
"""
Reads a CAR.
Parameters
----------
stream_or_bytes: StreamLike
Stream to read CAR from
Returns
-------
roots : List[CID]
Roots as given by the CAR header
blocks : Iterator[Tuple[cid, BytesLike, CARBlockLocation]]
Iterator over all blocks contained in the CAR
"""
stream = ensure_stream(stream_or_bytes)
roots, header_size = decode_car_header(stream)
def blocks() -> Iterator[Tuple[CID, bytes, CARBlockLocation]]:
offset = header_size
while (next_block := decode_raw_car_block(stream)) is not None:
cid, data, sizes = next_block
yield cid, data, dataclasses.replace(sizes, offset=offset)
offset += sizes.size
return roots, blocks()
21 changes: 21 additions & 0 deletions ipfsspec/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
Some utilities.
"""

from io import BytesIO
from typing import List, Union, BinaryIO

from multiformats import CID
from typing_extensions import TypeGuard

StreamLike = Union[BinaryIO, bytes]

def ensure_stream(stream_or_bytes: StreamLike) -> BinaryIO:
if isinstance(stream_or_bytes, bytes):
return BytesIO(stream_or_bytes)
else:
return stream_or_bytes


def is_cid_list(os: List[object]) -> TypeGuard[List[CID]]:
return all(isinstance(o, CID) for o in os)
17 changes: 17 additions & 0 deletions test/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,20 @@ async def test_isfile(fs):
assert res is True
res = await fs._isfile(TEST_ROOT)
assert res is False

@pytest.mark.parametrize("detail", [False, True])
@pytest.mark.parametrize("fs", ["http://127.0.0.1:8080", "https://ipfs.io"], indirect=True)
@pytest.mark.asyncio
async def test_ls_multi_gw(fs, detail):
"""
Test if ls works on different gateway implementations.
See also: https://github.com/fsspec/ipfsspec/issues/39
"""
res = await fs._ls("bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4", detail=detail)
expected = "bafybeicn7i3soqdgr7dwnrwytgq4zxy7a5jpkizrvhm5mv6bgjd32wm3q4/welcome-to-IPFS.jpg"
if detail:
assert len(res) == 1
assert res[0]["name"] == expected
else:
assert res == [expected]

0 comments on commit 3020211

Please sign in to comment.