Skip to content

Commit

Permalink
Merge pull request #33 from fsspec/fix_ls
Browse files Browse the repository at this point in the history
Implement info() and ls() using trustless gateway spec
  • Loading branch information
d70-t authored Sep 19, 2024
2 parents f12843e + 37aa092 commit d238640
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 108 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/local_gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
max-parallel: 4
matrix:
python-version: ["3.8", "3.9", "3.10"]
ipfs-version: ["0.27.0"] # this is the latest IPFS version supporting /api/v0, see issue #28
ipfs-version: ["0.30.0"]
steps:
- uses: actions/checkout@v1
- name: Set up Python ${{ matrix.python-version }}
Expand Down
191 changes: 98 additions & 93 deletions ipfsspec/async_ipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,104 +6,34 @@
from pathlib import Path
import warnings

import asyncio
import aiohttp

from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec.exceptions import FSTimeoutError

from multiformats import CID, multicodec
from . import unixfsv1

import logging

logger = logging.getLogger("ipfsspec")

DagPbCodec = multicodec.get("dag-pb")
RawCodec = multicodec.get("raw")

class RequestsTooQuick(OSError):
def __init__(self, retry_after=None):
self.retry_after = retry_after


class AsyncIPFSGatewayBase:
async def stat(self, path, session):
res = await self.api_get("files/stat", session, arg=path)
self._raise_not_found_for_status(res, path)
return await res.json()

async def file_info(self, path, session):
info = {"name": path}

headers = {"Accept-Encoding": "identity"} # this ensures correct file size
res = await self.cid_head(path, session, headers=headers, allow_redirects=True)

async with res:
self._raise_not_found_for_status(res, path)
if res.status != 200:
# TODO: maybe handle 301 here
raise FileNotFoundError(path)
if "Content-Length" in res.headers:
info["size"] = int(res.headers["Content-Length"])
elif "Content-Range" in res.headers:
info["size"] = int(res.headers["Content-Range"].split("/")[1])

if "ETag" in res.headers:
etag = res.headers["ETag"].strip("\"")
info["ETag"] = etag
if etag.startswith("DirIndex"):
info["type"] = "directory"
info["CID"] = etag.split("-")[-1]
else:
info["type"] = "file"
info["CID"] = etag

return info

async def cat(self, path, session):
res = await self.cid_get(path, session)
async with res:
self._raise_not_found_for_status(res, path)
if res.status != 200:
raise FileNotFoundError(path)
return await res.read()

async def ls(self, path, session):
res = await self.api_get("ls", session, arg=path)
self._raise_not_found_for_status(res, path)
resdata = await res.json()
types = {1: "directory", 2: "file"}
return [{
"name": path + "/" + link["Name"],
"CID": link["Hash"],
"type": types[link["Type"]],
"size": link["Size"],
}
for link in resdata["Objects"][0]["Links"]]

def _raise_not_found_for_status(self, response, url):
"""
Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
"""
if response.status == 404:
raise FileNotFoundError(url)
elif response.status == 400:
raise FileNotFoundError(url)
response.raise_for_status()


class AsyncIPFSGateway(AsyncIPFSGatewayBase):
class AsyncIPFSGateway:
resolution = "path"

def __init__(self, url, protocol="ipfs"):
self.url = url
self.protocol = protocol

async def api_get(self, endpoint, session, **kwargs):
res = await session.get(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url})
self._raise_requests_too_quick(res)
return res

async def api_post(self, endpoint, session, **kwargs):
res = await session.post(self.url + "/api/v0/" + endpoint, params=kwargs, trace_request_ctx={'gateway': self.url})
self._raise_requests_too_quick(res)
return res

async def _cid_req(self, method, path, headers=None, **kwargs):
headers = headers or {}
if self.resolution == "path":
Expand All @@ -116,17 +46,12 @@ async def _cid_req(self, method, path, headers=None, **kwargs):
self._raise_requests_too_quick(res)
return res

async def cid_head(self, path, session, headers=None, **kwargs):
async def head(self, path, session, headers=None, **kwargs):
return await self._cid_req(session.head, path, headers=headers, **kwargs)

async def cid_get(self, path, session, headers=None, **kwargs):
async def get(self, path, session, headers=None, **kwargs):
return await self._cid_req(session.get, path, headers=headers, **kwargs)

async def version(self, session):
res = await self.api_get("version", session)
res.raise_for_status()
return await res.json()

@staticmethod
def _raise_requests_too_quick(response):
if response.status == 429:
Expand All @@ -139,6 +64,90 @@ def _raise_requests_too_quick(response):
def __str__(self):
return f"GW({self.url})"

async def info(self, path, session):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
self._raise_not_found_for_status(res, path)
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
resdata = await res.read()

if cid.codec == RawCodec:
return {
"name": path,
"CID": str(cid),
"type": "file",
"size": len(resdata),
}
elif cid.codec == DagPbCodec:
node = unixfsv1.PBNode.loads(resdata)
data = unixfsv1.Data.loads(node.Data)
if data.Type == unixfsv1.DataType.Raw:
raise FileNotFoundError(path) # this is not a file, it's only a part of it
elif data.Type == unixfsv1.DataType.Directory:
return {
"name": path,
"CID": str(cid),
"type": "directory",
"islink": False,
}
elif data.Type == unixfsv1.DataType.File:
return {
"name": path,
"CID": str(cid),
"type": "file",
"size": data.filesize,
"islink": False,
}
elif data.Type == unixfsv1.DataType.Metadata:
raise NotImplementedError(f"The path '{path}' contains a Metadata node, this is currently not implemented")
elif data.Type == unixfsv1.DataType.Symlink:
return {
"name": path,
"CID": str(cid),
"type": "other", # TODO: maybe we should have directory or file as returning type, but that probably would require resolving at least another level of blocks
"islink": True,
}
elif data.Type == unixfsv1.DataType.HAMTShard:
raise NotImplementedError(f"The path '{path}' contains a HAMTSharded directory, this is currently not implemented")
else:
raise FileNotFoundError(path) # it exists, but is not a UNIXFSv1 object, so it's not a file

async def cat(self, path, session):
res = await self.get(path, session)
async with res:
self._raise_not_found_for_status(res, path)
return await res.read()

async def ls(self, path, session, detail=False):
res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"})
self._raise_not_found_for_status(res, path)
resdata = await res.read()
cid = CID.decode(res.headers["X-Ipfs-Roots"].split(",")[-1])
assert cid.codec == DagPbCodec, "this is not a directory"
node = unixfsv1.PBNode.loads(resdata)
data = unixfsv1.Data.loads(node.Data)
if data.Type != unixfsv1.DataType.Directory:
# TODO: we might need support for HAMTShard here (for large directories)
raise NotADirectoryError(path)

if detail:
return await asyncio.gather(*(
self.info(path + "/" + link.Name, session)
for link in node.Links))
else:
return [path + "/" + link.Name for link in node.Links]

def _raise_not_found_for_status(self, response, url):
"""
Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
"""
if response.status == 404: # returned for known missing files
raise FileNotFoundError(url)
elif response.status == 400: # return for invalid requests, so it's also certainly not there
raise FileNotFoundError(url)
response.raise_for_status()




async def get_client(**kwargs):
timeout = aiohttp.ClientTimeout(sock_connect=1, sock_read=5)
Expand Down Expand Up @@ -167,14 +176,14 @@ def get_gateway(protocol="ipfs"):
ipfs_gateway = os.environ.get("IPFS_GATEWAY", "")
if ipfs_gateway:
logger.debug("using IPFS gateway from IPFS_GATEWAY environment variable: %s", ipfs_gateway)
return AsyncGateway(ipfs_gateway, protocol)
return AsyncIPFSGateway(ipfs_gateway, protocol)

# internal configuration: accept IPFSSPEC_GATEWAYS for backwards compatibility
if ipfsspec_gateways := os.environ.get("IPFSSPEC_GATEWAYS", ""):
ipfs_gateway = ipfsspec_gateways.split()[0]
logger.debug("using IPFS gateway from IPFSSPEC_GATEWAYS environment variable: %s", ipfs_gateway)
warnings.warn("The IPFSSPEC_GATEWAYS environment variable is deprecated, please configure your IPFS Gateway according to IPIP-280, e.g. by using the IPFS_GATEWAY environment variable or using the ~/.ipfs/gateway file.", DeprecationWarning)
return AsyncGateway(ipfs_gateway, protocol)
return AsyncIPFSGateway(ipfs_gateway, protocol)

# check various well-known files for possible gateway configurations
if ipfs_path := os.environ.get("IPFS_PATH", ""):
Expand Down Expand Up @@ -274,11 +283,7 @@ async def set_session(self):
async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
session = await self.set_session()
res = await self.gateway.ls(path, session)
if detail:
return res
else:
return [r["name"] for r in res]
return await self.gateway.ls(path, session, detail=detail)

ls = sync_wrapper(_ls)

Expand All @@ -290,11 +295,11 @@ async def _cat_file(self, path, start=None, end=None, **kwargs):
async def _info(self, path, **kwargs):
path = self._strip_protocol(path)
session = await self.set_session()
return await self.gateway.file_info(path, session)
return await self.gateway.info(path, session)

def open(self, path, mode="rb", block_size=None, cache_options=None, **kwargs):
if mode != "rb":
raise NotImplementedError
raise NotImplementedError("opening modes other than read binary are not implemented")
data = self.cat_file(path) # load whole chunk into memory
return io.BytesIO(data)

Expand Down
109 changes: 109 additions & 0 deletions ipfsspec/unixfsv1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
from UNIXFS spec (https://github.com/ipfs/specs/blob/master/UNIXFS.md):
message Data {
enum DataType {
Raw = 0;
Directory = 1;
File = 2;
Metadata = 3;
Symlink = 4;
HAMTShard = 5;
}
required DataType Type = 1;
optional bytes Data = 2;
optional uint64 filesize = 3;
repeated uint64 blocksizes = 4;
optional uint64 hashType = 5;
optional uint64 fanout = 6;
optional uint32 mode = 7;
optional UnixTime mtime = 8;
}
message Metadata {
optional string MimeType = 1;
}
message UnixTime {
required int64 Seconds = 1;
optional fixed32 FractionalNanoseconds = 2;
}
from DAG-PB spec (https://ipld.io/specs/codecs/dag-pb/spec/):
message PBLink {
// binary CID (with no multibase prefix) of the target object
optional bytes Hash = 1;
// UTF-8 string name
optional string Name = 2;
// cumulative size of target object
optional uint64 Tsize = 3;
}
message PBNode {
// refs to other objects
repeated PBLink Links = 2;
// opaque user data
optional bytes Data = 1;
}
"""

from dataclasses import dataclass
from enum import IntEnum
from typing import List, Optional

from pure_protobuf.dataclasses_ import field, message # type: ignore
from pure_protobuf.types import uint32, uint64, int64, fixed32 # type: ignore

class DataType(IntEnum):
Raw = 0
Directory = 1
File = 2
Metadata = 3
Symlink = 4
HAMTShard = 5

@message
@dataclass
class UnixTime:
Seconds: int64 = field(1)
FractionalNanoseconds: Optional[fixed32] = field(2)

@message
@dataclass
class Data:
# pylint: disable=too-many-instance-attributes
Type: DataType = field(1)
Data: Optional[bytes] = field(2, default=None)
filesize: Optional[uint64] = field(3, default=None)
blocksizes: List[uint64] = field(4, default_factory=list, packed=False)
hashType: Optional[uint64] = field(5, default=None)
fanout: Optional[uint64] = field(6, default=None)
mode: Optional[uint32] = field(7, default=None)
mtime: Optional[UnixTime] = field(8, default=None)

@message
@dataclass
class Metadata:
MimeType: Optional[str] = field(1, default=None)


@message
@dataclass
class PBLink:
Hash: Optional[bytes] = field(1, default=None)
Name: Optional[str] = field(2, default=None)
Tsize: Optional[uint64] = field(3, default=None)

Data_ = Data
@message
@dataclass
class PBNode:
Links: List[PBLink] = field(2, default_factory=list)
Data: Optional[bytes] = field(1, default=None)
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
"fsspec>=0.9.0",
"requests",
"aiohttp",
"multiformats",
"dag-cbor >= 0.2.2",
"pure-protobuf >= 2.1.0, <3",
],
entry_points={
'fsspec.specs': [
Expand Down
Loading

0 comments on commit d238640

Please sign in to comment.