Skip to content

Commit

Permalink
Add network benchmark (#14)
Browse files Browse the repository at this point in the history
* Added network profiling utility module

* Added example benchmark for network metrics

* Fix a few bugs with network scripts

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Convert metrics to python numerics instead of numpy to make them JSON serializable for asv

* Add pyshark as a dependency

* Refactor and expand network performance tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add transfer time and http packet stats

* Capture total in network benchmark

* Add benchmarks with cache filesystems for ffspec and remfile

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add docstrings

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Cody Baker <[email protected]>
  • Loading branch information
3 people authored Feb 16, 2024
1 parent b07bfbb commit dfe40fd
Show file tree
Hide file tree
Showing 6 changed files with 644 additions and 1 deletion.
1 change: 1 addition & 0 deletions environments/nwb_benchmarks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ dependencies:
- requests
- aiohttp
- remfile
- pyshark
- hdmf @ git+https://github.com/hdmf-dev/hdmf.git@expose_aws_region # required until region fix is released
- -e ..
246 changes: 246 additions & 0 deletions src/nwb_benchmarks/benchmarks/netbench_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
"""Basic benchmarks for network performance metrics for streaming read of NWB files."""

import tempfile
import warnings

import fsspec
import h5py
import pynwb
import remfile
from fsspec.asyn import reset_lock
from fsspec.implementations.cached import CachingFileSystem

from .netperf.benchmarks import NetworkBenchmarkBase

# Useful if running in verbose mode
warnings.filterwarnings(action="ignore", message="No cached namespaces found in .*")
warnings.filterwarnings(action="ignore", message="Ignoring cached namespace .*")

S3_NWB = "https://dandiarchive.s3.amazonaws.com/ros3test.nwb" # Small test NWB file
# S3_NWB = "https://dandiarchive.s3.amazonaws.com/blobs/c49/311/c493119b-4b99-4b14-bc03-65bb28cfbd29" # ECephy+Behavior
# S3_NWB = "https://dandiarchive.s3.amazonaws.com/blobs/38c/c24/38cc240b-77c5-418a-9040-a7f582ff6541" # Ophys
# S3_NWB = "https://dandiarchive.s3.amazonaws.com/blobs/c98/3a4/c983a4e1-097a-402c-bda8-e6a41cb7e24a" # ICEphys


class NetworkBenchmarkRos3Read(NetworkBenchmarkBase):
"""Benchmark NWB file read with Ros3"""

s3_url = S3_NWB
repeat = 1
unit = "Bytes"
timeout = 1200

def setup_cache(self):
return self.compute_test_case_metrics()

def test_case(self):
with pynwb.NWBHDF5IO(self.s3_url, mode="r", driver="ros3") as io:
nwbfile = io.read()
# test_data = nwbfile.acquisition["ts_name"].data[:]

def track_bytes_downloaded(self, cache):
return cache["bytes_downloaded"]

def track_bytes_uploaded(self, cache):
return cache["bytes_uploaded"]

def track_bytes_total(self, cache):
return cache["bytes_total"]

def track_num_packets(self, cache):
return cache["num_packets"]

def track_num_packets_downloaded(self, cache):
return cache["num_packets_downloaded"]

def track_num_packets_uploaded(self, cache):
return cache["num_packets_uploaded"]

def track_total_transfer_time(self, cache):
return cache["total_transfer_time"]

def track_total_time(self, cache):
return cache["total_time"]


class NetworkBenchmarkRemFileRead(NetworkBenchmarkBase):
"""Benchmark NWB file read with RemFile"""

s3_url = S3_NWB
repeat = 1
unit = "Bytes"
timeout = 1200

def setup_cache(self):
return self.compute_test_case_metrics()

def test_case(self):
byte_stream = remfile.File(url=self.s3_url)
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()
# test_data = nwbfile.acquisition["ts_name"].data[:]

def track_bytes_downloaded(self, cache):
return cache["bytes_downloaded"]

def track_bytes_uploaded(self, cache):
return cache["bytes_uploaded"]

def track_bytes_total(self, cache):
return cache["bytes_total"]

def track_num_packets(self, cache):
return cache["num_packets"]

def track_num_packets_downloaded(self, cache):
return cache["num_packets_downloaded"]

def track_num_packets_uploaded(self, cache):
return cache["num_packets_uploaded"]

def track_total_transfer_time(self, cache):
return cache["total_transfer_time"]

def track_total_time(self, cache):
return cache["total_time"]


class NetworkBenchmarkRemFileWithCacheRead(NetworkBenchmarkBase):
"""Benchmark NWB file read with RemFile using a remfile.DiskCache as a temporary cache"""

s3_url = S3_NWB
repeat = 1
unit = "Bytes"
timeout = 1200

def setup_cache(self):
return self.compute_test_case_metrics()

def test_case(self):
byte_stream = remfile.File(url=self.s3_url)
with tempfile.TemporaryDirectory() as tmpdirname:
disk_cache = remfile.DiskCache(tmpdirname)
with h5py.File(name=byte_stream, disk_cache=disk_cache) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()
# test_data = nwbfile.acquisition["ts_name"].data[:]

def track_bytes_downloaded(self, cache):
return cache["bytes_downloaded"]

def track_bytes_uploaded(self, cache):
return cache["bytes_uploaded"]

def track_bytes_total(self, cache):
return cache["bytes_total"]

def track_num_packets(self, cache):
return cache["num_packets"]

def track_num_packets_downloaded(self, cache):
return cache["num_packets_downloaded"]

def track_num_packets_uploaded(self, cache):
return cache["num_packets_uploaded"]

def track_total_transfer_time(self, cache):
return cache["total_transfer_time"]

def track_total_time(self, cache):
return cache["total_time"]


class NetworkBenchmarkFsspecWithCacheFileRead(NetworkBenchmarkBase):
"""Benchmark NWB file read with fsspec using CachingFileSystem"""

s3_url = S3_NWB
repeat = 1
unit = "Bytes"
timeout = 1200

def setup_cache(self):
return self.compute_test_case_metrics()

def test_case(self):
reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()
filesystem = fsspec.filesystem("https")
# create a cache to save downloaded data to disk (optional)
cached_filesystem = CachingFileSystem(fs=filesystem) # Use temporary storage that will be cleaned up

with cached_filesystem.open(path=self.s3_url, mode="rb") as byte_stream:
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()
# test_data = nwbfile.acquisition["ts_name"].data[:]

def track_bytes_downloaded(self, cache):
return cache["bytes_downloaded"]

def track_bytes_uploaded(self, cache):
return cache["bytes_uploaded"]

def track_bytes_total(self, cache):
return cache["bytes_total"]

def track_num_packets(self, cache):
return cache["num_packets"]

def track_num_packets_downloaded(self, cache):
return cache["num_packets_downloaded"]

def track_num_packets_uploaded(self, cache):
return cache["num_packets_uploaded"]

def track_total_transfer_time(self, cache):
return cache["total_transfer_time"]

def track_total_time(self, cache):
return cache["total_time"]


class NetworkBenchmarkFsspecFileRead(NetworkBenchmarkBase):
"""Benchmark NWB file read with fsspec (no extra cache)"""

s3_url = S3_NWB
repeat = 1
unit = "Bytes"
timeout = 1200

def setup_cache(self):
return self.compute_test_case_metrics()

def test_case(self):
reset_lock()
fsspec.get_filesystem_class("https").clear_instance_cache()
filesystem = fsspec.filesystem("https")
with filesystem.open(path=self.s3_url, mode="rb") as byte_stream:
with h5py.File(name=byte_stream) as file:
with pynwb.NWBHDF5IO(file=file, load_namespaces=True) as io:
nwbfile = io.read()
# test_data = nwbfile.acquisition["ts_name"].data[:]

def track_bytes_downloaded(self, cache):
return cache["bytes_downloaded"]

def track_bytes_uploaded(self, cache):
return cache["bytes_uploaded"]

def track_bytes_total(self, cache):
return cache["bytes_total"]

def track_num_packets(self, cache):
return cache["num_packets"]

def track_num_packets_downloaded(self, cache):
return cache["num_packets_downloaded"]

def track_num_packets_uploaded(self, cache):
return cache["num_packets_uploaded"]

def track_total_transfer_time(self, cache):
return cache["total_transfer_time"]

def track_total_time(self, cache):
return cache["total_time"]
Empty file.
84 changes: 84 additions & 0 deletions src/nwb_benchmarks/benchmarks/netperf/benchmarks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
Base classes for implementing benchmarks for evaluating
network performance metrics for streaming read of NWB files.
"""

import os
import time

from asv_runner.benchmarks.mark import SkipNotImplemented

from .profile import CaptureConnections, NetProfiler, NetStats


class NetworkBenchmarkBase:
"""
Base class for network performance metrics for streaming data access.
Child classes must implement:
- test_case : implementing the test case to be run
- setup_cache: when implemented here in the base class, asv will run the setup_cache function
only once for all child classes. As such, each child class must implement its
own setup_cache, which should typically just consists of a call to
``return self.compute_test_case_metrics()``.
- track_...: methods defining the metrics to be tracked. These functions usually just retrieve
the corresponding metric from the cache and return the result. For example:
.. code-block:: python
def track_bytes_downloaded(self, cache): # unit is Bytes
return cache["bytes_downloaded"]
def track_bytes_uploaded(self, cache): # unit is Bytes
return cache["bytes_uploaded"]
def track_bytes_total(self, cache): # unit is Bytes
return cache["bytes_total"]
def track_num_packets(self, cache): # unit is Count
return cache["num_packets"]
def track_num_packets_downloaded(self, cache): # unit is Count
return cache["num_packets_downloaded"]
def track_num_packets_uploaded(self, cache): # unit is Count
return cache["num_packets_uploaded"]
"""

s3_url: str = None

def test_case(self):
raise SkipNotImplemented()

def compute_test_case_metrics(self):
self.start_net_capture()
t0 = time.time()
self.test_case()
t1 = time.time()
total_time = t1 - t0
self.stop_netcapture()
self.net_stats["total_time"] = total_time
return self.net_stats

def start_net_capture(self):
# start the capture_connections() function to update the current connections of this machine
self.connections_thread = CaptureConnections()
self.connections_thread.start()
time.sleep(0.2) # not sure if this is needed but just to be safe

# start capturing the raw packets by running the tshark commandline tool in a subprocess
self.netprofiler = NetProfiler()
self.netprofiler.start_capture()

def stop_netcapture(self):
# Stop capturing packets and connections
self.netprofiler.stop_capture()
self.connections_thread.stop()

# get the connections for the PID of this process
self.pid_connections = self.connections_thread.get_connections_for_pid(os.getpid())
# Parse packets and filter out all the packets for this process pid by matching with the pid_connections
self.pid_packets = self.netprofiler.get_packets_for_connections(self.pid_connections)
# Compute all the network statistics
self.net_stats = NetStats.get_stats(packets=self.pid_packets)
Loading

0 comments on commit dfe40fd

Please sign in to comment.