Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor benchmarks #12

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 32 additions & 76 deletions src/nwb_benchmarks/benchmarks/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,124 +2,80 @@

import warnings

import fsspec
import h5py
import pynwb
import remfile
from fsspec.asyn import reset_lock
from .streaming_base import (
ElectricalSeriesStreamingFsspecBase,
ElectricalSeriesStreamingRemfileBase,
ElectricalSeriesStreamingROS3Base,
FileReadStreamingBase,
)

# Useful if running in verbose mode
warnings.filterwarnings(action="ignore", message="No cached namespaces found in .*")
warnings.filterwarnings(action="ignore", message="Ignoring cached namespace .*")


class FileReadStreaming:
class FileReadStreaming(FileReadStreamingBase):
"""A basic benchmark for streaming an NWB file from the DANDI archive."""

repeat = 1

def setup(self):
# Random IBL raw data file; not that many groups
self.s3_url = "https://dandiarchive.s3.amazonaws.com/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
s3_url = "https://dandiarchive.s3.amazonaws.com/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"

def time_fsspec_no_cache(self):
reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()
filesystem = fsspec.filesystem("https")

with filesystem.open(path=self.s3_url, mode="rb") as byte_stream:
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()

# Must be done at this level since teardown occurs outside of repetitions
# reset_lock()
# fsspec.get_filesystem_class("https").clear_instance_cache()
self.fsspec_no_cache()

def time_ros3(self):
ros3_form = self.s3_url.replace("https://dandiarchive.s3.amazonaws.com", "s3://dandiarchive")
with pynwb.NWBHDF5IO(path=ros3_form, mode="r", load_namespaces=True, driver="ros3") as io:
nwbfile = io.read()
self.ros3()

def time_remfile(self):
byte_stream = remfile.File(url=self.s3_url)
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()

# def teardown(self):
# reset_lock()
# fsspec.get_filesystem_class("https").clear_instance_cache()
self.remfile()


class ElectricalSeriesStreamingROS3:
class ElectricalSeriesStreamingROS3(ElectricalSeriesStreamingROS3Base):
"""
A basic benchmark for streaming raw ecephys data.

Needs separate setup per class to only time slicing operation.
"""

repeat = 1
s3_url = "s3://dandiarchive/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
acquisition_path = "ElectricalSeriesAp"
slice_range = (slice(0, 30_000), slice(0, 384)) # ~23 MB

def setup(self):
self.s3_url = "s3://dandiarchive/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
self.acquisition_path = "ElectricalSeriesAp"
self.slice_range = (slice(0, 30_000), slice(0, 384)) # ~23 MB
self.io = pynwb.NWBHDF5IO(path=self.s3_url, mode="r", load_namespaces=True, driver="ros3")
self.nwbfile = self.io.read()

def time_ros3(self):
self.nwbfile.acquisition[self.acquisition_path].data[self.slice_range]
def time_slice_request(self):
"""Time for the slice_request test case"""
self.slice_request()


class ElectricalSeriesStreamingFsspec:
class ElectricalSeriesStreamingFsspec(ElectricalSeriesStreamingFsspecBase):
"""
A basic benchmark for streaming raw ecephys data.

Needs separate setup per class to only time slicing operation.
"""

repeat = 1
s3_url = "https://dandiarchive.s3.amazonaws.com/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
acquisition_path = "ElectricalSeriesAp"
slice_range = (slice(0, 30_000), slice(0, 384)) # ~23 MB

def setup(self):
self.s3_url = "https://dandiarchive.s3.amazonaws.com/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
self.acquisition_path = "ElectricalSeriesAp"
self.slice_range = (slice(0, 30_000), slice(0, 384)) # ~23 MB

reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()
def time_slice_request(self):
"""Time for the slice_request test case"""
self.slice_request()

self.filesystem = fsspec.filesystem("https")
self.byte_stream = self.filesystem.open(path=self.s3_url, mode="rb")
self.file = h5py.File(name=self.byte_stream)
self.io = pynwb.NWBHDF5IO(file=self.file, mode="r", load_namespaces=True)
self.nwbfile = self.io.read()

def time_fsspec_no_cache(self):
self.nwbfile.acquisition[self.acquisition_path].data[self.slice_range]

# def teardown(self):
# reset_lock()
# fsspec.get_filesystem_class("https").clear_instance_cache()


class ElectricalSeriesStreamingRemfile:
class ElectricalSeriesStreamingRemfile(ElectricalSeriesStreamingRemfileBase):
"""
A basic benchmark for streaming raw ecephys data.

Needs separate setup per class to only time slicing operation.
"""

repeat = 1
s3_url = "https://dandiarchive.s3.amazonaws.com/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
acquisition_path = "ElectricalSeriesAp"
slice_range = (slice(0, 30_000), slice(0, 384)) # ~23 MB

def setup(self):
self.s3_url = "https://dandiarchive.s3.amazonaws.com/blobs/8c5/65f/8c565f28-e5fc-43fe-8fb7-318ad2081319"
self.acquisition_path = "ElectricalSeriesAp"
self.slice_range = (slice(0, 30_000), slice(0, 384)) # ~23 MB
self.byte_stream = remfile.File(url=self.s3_url)
self.file = h5py.File(name=self.byte_stream)
self.io = pynwb.NWBHDF5IO(file=self.file, mode="r", load_namespaces=True)
self.nwbfile = self.io.read()

def time_remfile(self):
self.nwbfile.acquisition[self.acquisition_path].data[self.slice_range]
def time_slice_request(self):
"""Time for the slice_request test case"""
self.slice_request()
142 changes: 142 additions & 0 deletions src/nwb_benchmarks/benchmarks/streaming_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Base template classes for basic benchmarks for NWB streaming read."""

import fsspec
import h5py
import pynwb
import remfile
from fsspec.asyn import reset_lock


class FileReadStreamingBase:
"""
Base class for basic benchmarks for opening an NWB file on S3 for streaming read.

Child classes must set:
- set the s3_url on the class to an S3 asset
- specify the performance metrics to use for the test cases by specifying benchmark functions
"""

s3_url: str = None # S3 URL of the NWB asset

def setup(self):
assert self.s3_url is not None, "Test must set s3_url class variable"

def fsspec_no_cache(self):
reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()
filesystem = fsspec.filesystem("https")

with filesystem.open(path=self.s3_url, mode="rb") as byte_stream:
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()

def ros3(self):
ros3_form = self.s3_url.replace("https://dandiarchive.s3.amazonaws.com", "s3://dandiarchive")
with pynwb.NWBHDF5IO(path=ros3_form, mode="r", load_namespaces=True, driver="ros3") as io:
nwbfile = io.read()

def remfile(self):
byte_stream = remfile.File(url=self.s3_url)
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()


class ElectricalSeriesStreamingSliceTestMixin:
"""
Define test case for slicing an ElectricalSeries.

Child classes must set:
- self.nwb_file : NWBFile object to use. Usually set in the setup function
- acquisition_path: class variable with name of object in nwbfile.acquisition
- slice_range: class variable with data selection to apply

Child classes must specify the performance metrics to use for the test cases by
specifying the corresponding test. E.g.:

.. code-block:: python

def time_slice_request(self):
self.slice_request()
"""

acquisition_path: str # name of object in nwbfile.acquisition
slice_range: tuple[slice, int] # data selection to apply

def slice_request(self):
"""Test case for slicing the ElectricalSeries"""
self.nwbfile.acquisition[self.acquisition_path].data[self.slice_range]


class ElectricalSeriesStreamingROS3Base(ElectricalSeriesStreamingSliceTestMixin):
"""
"Base class for basic benchmark for streaming raw ecephys data.

Needs separate setup per class to only time slicing operation.

Child classes must set the following class variables:
- s3_url: URL of the S3 asset
- See ElectricalSeriesStreamingSliceTestMixin for additional requirements
"""

s3_url: str = None # S3 URL of the NWB asset

def setup(self):
assert self.s3_url is not None, "Test must set s3_url class variable"
assert self.acquisition_path is not None, "Test must set the acquisition_path class variable."
assert self.slice_range is not None, "Test must set the slice_range class variable."
self.io = pynwb.NWBHDF5IO(path=self.s3_url, mode="r", load_namespaces=True, driver="ros3")
self.nwbfile = self.io.read()


class ElectricalSeriesStreamingFsspecBase(ElectricalSeriesStreamingSliceTestMixin):
"""
"Base class for basic benchmarks for streaming raw ecephys data.

Needs separate setup per class to only time slicing operation.

Child classes must set the following class variables:
- s3_url: URL of the S3 asset
- See ElectricalSeriesStreamingSliceTestMixin for additional requirements
"""

s3_url: str = None # S3 URL of the NWB asset

def setup(self):
assert self.s3_url is not None, "Test must set s3_url class variable"
assert self.acquisition_path is not None, "Test must set the acquisition_path class variable."
assert self.slice_range is not None, "Test must set the slice_range class variable."

reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()

self.filesystem = fsspec.filesystem("https")
self.byte_stream = self.filesystem.open(path=self.s3_url, mode="rb")
self.file = h5py.File(name=self.byte_stream)
self.io = pynwb.NWBHDF5IO(file=self.file, mode="r", load_namespaces=True)
self.nwbfile = self.io.read()


class ElectricalSeriesStreamingRemfileBase(ElectricalSeriesStreamingSliceTestMixin):
"""
"Base class for basic benchmarks for streaming raw ecephys data.

Needs separate setup per class to only time slicing operation.

Child classes must set the following class variables:
- s3_url: URL of the S3 asset
- See ElectricalSeriesStreamingSliceTestMixin for additional requirements
"""

s3_url: str = None # S3 URL of the NWB asset

def setup(self):
assert self.s3_url is not None, "Test must set s3_url class variable"
assert self.acquisition_path is not None, "Test must set the acquisition_path class variable."
assert self.slice_range is not None, "Test must set the slice_range class variable."

self.byte_stream = remfile.File(url=self.s3_url)
self.file = h5py.File(name=self.byte_stream)
self.io = pynwb.NWBHDF5IO(file=self.file, mode="r", load_namespaces=True)
self.nwbfile = self.io.read()
2 changes: 1 addition & 1 deletion src/nwb_benchmarks/command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def main():
commit_hash,
]
if debug_mode:
cmd.extend(["--verbose", "--show-std-err"])
cmd.extend(["--verbose", "--show-stderr"])
if bench_mode:
cmd.extend(["--bench", specific_benchmark_pattern])

Expand Down