diff --git a/README.md b/README.md index 6da30b3..c110c94 100644 --- a/README.md +++ b/README.md @@ -50,9 +50,9 @@ To iteratively parse all historical logs all at once (parallelization strongly r reduce_all_dandi_raw_s3_logs \ --base_raw_s3_logs_folder_path < base log folder > \ --reduced_s3_logs_folder_path < output folder > \ - --excluded_ips < comma-separated list of known IPs to exclude > \ --maximum_number_of_workers < number of CPUs to use > \ - --maximum_buffer_size_in_mb < approximate amount of RAM to use > + --maximum_buffer_size_in_mb < approximate amount of RAM to use > \ + --excluded_ips < comma-separated list of known IPs to exclude > ``` For example, on Drogon: @@ -61,9 +61,9 @@ For example, on Drogon: reduce_all_dandi_raw_s3_logs \ --base_raw_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs \ --reduced_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \ - --excluded_ips < Drogons IP > \ --maximum_number_of_workers 6 \ - --maximum_buffer_size_in_mb 5000 + --maximum_buffer_size_in_mb 5000 \ + --excluded_ips < Drogons IP > ``` ### Reduce a single log file diff --git a/src/dandi_s3_log_parser/__init__.py b/src/dandi_s3_log_parser/__init__.py index cca6acc..1167436 100644 --- a/src/dandi_s3_log_parser/__init__.py +++ b/src/dandi_s3_log_parser/__init__.py @@ -18,15 +18,15 @@ """ from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH -from ._s3_log_file_parser import parse_raw_s3_log +from ._s3_log_file_reducer import reduce_raw_s3_log from ._buffered_text_reader import BufferedTextReader -from ._dandi_s3_log_file_parser import reduce_dandi_raw_s3_log, reduce_all_dandi_raw_s3_logs +from ._dandi_s3_log_file_reducer import reduce_dandi_raw_s3_log, reduce_all_dandi_raw_s3_logs from ._ip_utils import get_region_from_ip_address from ._dandiset_mapper import map_reduced_logs_to_dandisets __all__ = [ "DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH", - "parse_raw_s3_log", + "reduce_raw_s3_log", "BufferedTextReader", "reduce_dandi_raw_s3_log", "reduce_all_dandi_raw_s3_logs", diff --git a/src/dandi_s3_log_parser/_command_line_interface.py b/src/dandi_s3_log_parser/_command_line_interface.py index 3f08b99..6b69fe7 100644 --- a/src/dandi_s3_log_parser/_command_line_interface.py +++ b/src/dandi_s3_log_parser/_command_line_interface.py @@ -7,7 +7,7 @@ import click from ._config import REQUEST_TYPES -from ._dandi_s3_log_file_parser import ( +from ._dandi_s3_log_file_reducer import ( reduce_all_dandi_raw_s3_logs, reduce_dandi_raw_s3_log, ) @@ -28,13 +28,6 @@ required=True, type=click.Path(writable=True), ) -@click.option( - "--excluded_ips", - help="A comma-separated list of IP addresses to exclude from parsing.", - required=False, - type=str, - default=None, -) @click.option( "--maximum_number_of_workers", help="The maximum number of workers to distribute tasks across.", @@ -55,12 +48,19 @@ type=click.IntRange(min=1), # Bare minimum of 1 MB default=1_000, # 1 GB recommended ) +@click.option( + "--excluded_ips", + help="A comma-separated list of IP addresses to exclude from parsing.", + required=False, + type=str, + default=None, +) def _reduce_all_dandi_raw_s3_logs_cli( base_raw_s3_logs_folder_path: str, reduced_s3_logs_folder_path: str, - excluded_ips: str | None, maximum_number_of_workers: int, maximum_buffer_size_in_mb: int, + excluded_ips: str | None, ) -> None: split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else list() handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None @@ -71,9 +71,9 @@ def _reduce_all_dandi_raw_s3_logs_cli( reduce_all_dandi_raw_s3_logs( base_raw_s3_logs_folder_path=base_raw_s3_logs_folder_path, reduced_s3_logs_folder_path=reduced_s3_logs_folder_path, - excluded_ips=handled_excluded_ips, maximum_number_of_workers=maximum_number_of_workers, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + excluded_ips=handled_excluded_ips, ) @@ -90,13 +90,6 @@ def _reduce_all_dandi_raw_s3_logs_cli( required=True, type=click.Path(writable=True), ) -@click.option( - "--excluded_ips", - help="A comma-separated list of IP addresses to exclude from reduction.", - required=False, - type=str, - default=None, -) @click.option( "--maximum_buffer_size_in_mb", help=( @@ -110,6 +103,13 @@ def _reduce_all_dandi_raw_s3_logs_cli( type=click.IntRange(min=1), # Bare minimum of 1 MB default=1_000, # 1 GB recommended ) +@click.option( + "--excluded_ips", + help="A comma-separated list of IP addresses to exclude from reduction.", + required=False, + type=str, + default=None, +) def _reduce_dandi_raw_s3_log_cli( raw_s3_log_file_path: str, reduced_s3_logs_folder_path: str, @@ -125,8 +125,8 @@ def _reduce_dandi_raw_s3_log_cli( reduce_dandi_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, reduced_s3_logs_folder_path=reduced_s3_logs_folder_path, - excluded_ips=handled_excluded_ips, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, + excluded_ips=handled_excluded_ips, ) diff --git a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py b/src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py similarity index 90% rename from src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py rename to src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py index ddb2e11..d8dc64d 100644 --- a/src/dandi_s3_log_parser/_dandi_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_dandi_s3_log_file_reducer.py @@ -15,10 +15,10 @@ import pandas import tqdm -from pydantic import DirectoryPath, Field, validate_call +from pydantic import DirectoryPath, Field, FilePath, validate_call from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH -from ._s3_log_file_parser import parse_raw_s3_log +from ._s3_log_file_reducer import reduce_raw_s3_log @validate_call @@ -26,9 +26,9 @@ def reduce_all_dandi_raw_s3_logs( *, base_raw_s3_logs_folder_path: DirectoryPath, reduced_s3_logs_folder_path: DirectoryPath, - excluded_ips: collections.defaultdict[str, bool] | None = None, maximum_number_of_workers: int = Field(ge=1, default=1), maximum_buffer_size_in_bytes: int = 4 * 10**9, + excluded_ips: collections.defaultdict[str, bool] | None = None, ) -> None: """ Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files. @@ -46,21 +46,21 @@ def reduce_all_dandi_raw_s3_logs( base_raw_s3_logs_folder_path : file path The Path to the folder containing the raw S3 log files to be reduced. reduced_s3_logs_folder_path : file path - The Path to write each parsed S3 log file to. + The Path to write each reduced S3 log file to. There will be one file per handled asset ID. - excluded_ips : collections.defaultdict(bool), optional - A lookup table whose keys are IP addresses to exclude from reduction. maximum_number_of_workers : int, default: 1 The maximum number of workers to distribute tasks across. maximum_buffer_size_in_bytes : int, default: 4 GB The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the source text files. + Actual total RAM usage will be higher due to overhead and caching. + Automatically splits this total amount over the maximum number of workers if `maximum_number_of_workers` is greater than one. + excluded_ips : collections.defaultdict(bool), optional + A lookup table whose keys are IP addresses to exclude from reduction. """ - base_raw_s3_logs_folder_path = pathlib.Path(base_raw_s3_logs_folder_path) - reduced_s3_logs_folder_path = pathlib.Path(reduced_s3_logs_folder_path) excluded_ips = excluded_ips or collections.defaultdict(bool) asset_id_handler = _get_default_dandi_asset_id_handler() @@ -85,10 +85,10 @@ def reduce_all_dandi_raw_s3_logs( raw_s3_log_file_path=raw_s3_log_file_path, reduced_s3_logs_folder_path=reduced_s3_logs_folder_path, mode="a", + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, tqdm_kwargs=dict(position=1, leave=False), - maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) else: # Create a fresh temporary directory in the home folder and then fresh subfolders for each worker @@ -117,11 +117,11 @@ def reduce_all_dandi_raw_s3_logs( futures.append( executor.submit( _multi_worker_reduce_dandi_raw_s3_log, - maximum_number_of_workers=maximum_number_of_workers, raw_s3_log_file_path=raw_s3_log_file_path, temporary_folder_path=temporary_folder_path, - excluded_ips=excluded_ips, + maximum_number_of_workers=maximum_number_of_workers, maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_worker, + excluded_ips=excluded_ips, ), ) @@ -147,22 +147,22 @@ def reduce_all_dandi_raw_s3_logs( leave=True, mininterval=2.0, ): - per_worker_parsed_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir()) + per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir()) assert ( - len(per_worker_parsed_s3_log_file_paths) != 0 + len(per_worker_reduced_s3_log_file_paths) != 0 ), f"No files found in {per_worker_temporary_folder_path}!" - for per_worker_parsed_s3_log_file_path in tqdm.tqdm( - iterable=per_worker_parsed_s3_log_file_paths, + for per_worker_reduced_s3_log_file_path in tqdm.tqdm( + iterable=per_worker_reduced_s3_log_file_paths, desc="Merging results per worker...", - total=len(per_worker_parsed_s3_log_file_paths), + total=len(per_worker_reduced_s3_log_file_paths), position=1, leave=False, mininterval=2.0, ): - merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_parsed_s3_log_file_path.name + merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.name - parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_parsed_s3_log_file_path, header=0) + parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_reduced_s3_log_file_path, header=0) header = False if merged_temporary_file_path.exists() else True parsed_s3_log.to_csv( @@ -180,11 +180,11 @@ def reduce_all_dandi_raw_s3_logs( # pragma: no cover def _multi_worker_reduce_dandi_raw_s3_log( *, - maximum_number_of_workers: int, raw_s3_log_file_path: pathlib.Path, temporary_folder_path: pathlib.Path, - excluded_ips: collections.defaultdict[str, bool] | None, + maximum_number_of_workers: int, maximum_buffer_size_in_bytes: int, + excluded_ips: collections.defaultdict[str, bool] | None, ) -> None: """ A mostly pass-through function to calculate the worker index on the worker and target the correct subfolder. @@ -209,19 +209,19 @@ def _multi_worker_reduce_dandi_raw_s3_log( date = datetime.datetime.now().strftime("%y%m%d") parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt" error_message += ( - f"Worker index {worker_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n" + f"Worker index {worker_index}/{maximum_number_of_workers} reducing {raw_s3_log_file_path} failed due to\n\n" ) reduce_dandi_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, reduced_s3_logs_folder_path=per_worker_temporary_folder_path, mode="a", + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, tqdm_kwargs=dict( position=worker_index + 1, leave=False, desc=f"Parsing line buffers on worker {worker_index+1}..." ), - maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) except Exception as exception: with open(file=parallel_errors_file_path, mode="a") as io: @@ -229,18 +229,19 @@ def _multi_worker_reduce_dandi_raw_s3_log( io.write(error_message) +@validate_call def reduce_dandi_raw_s3_log( *, - raw_s3_log_file_path: str | pathlib.Path, - reduced_s3_logs_folder_path: str | pathlib.Path, + raw_s3_log_file_path: FilePath, + reduced_s3_logs_folder_path: DirectoryPath, mode: Literal["w", "a"] = "a", + maximum_buffer_size_in_bytes: int = 4 * 10**9, excluded_ips: collections.defaultdict[str, bool] | None = None, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, - maximum_buffer_size_in_bytes: int = 4 * 10**9, ) -> None: """ - Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + Reduce a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. 'Parsing' here means: - limiting only to requests of the specified type (i.e., GET, PUT, etc.) @@ -251,7 +252,7 @@ def reduce_dandi_raw_s3_log( raw_s3_log_file_path : string or pathlib.Path Path to the raw S3 log file to be reduced. reduced_s3_logs_folder_path : string or pathlib.Path - The path to write each parsed S3 log file to. + The path to write each reduced S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. @@ -259,6 +260,11 @@ def reduce_dandi_raw_s3_log( The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + maximum_buffer_size_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text file. + + Actual RAM usage will be higher due to overhead and caching. excluded_ips : collections.defaultdict(bool), optional A lookup table whose keys are IP addresses to exclude from reduction. asset_id_handler : callable, optional @@ -270,10 +276,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str: split_by_slash = raw_asset_id.split("/") return split_by_slash[0] + "_" + split_by_slash[-1] tqdm_kwargs : dict, optional - Keyword arguments to pass to the tqdm progress bar. - maximum_buffer_size_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the - source text file. + Keyword arguments to pass to the tqdm progress bar for line buffers. """ raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) reduced_s3_logs_folder_path = pathlib.Path(reduced_s3_logs_folder_path) @@ -283,16 +286,16 @@ def asset_id_handler(*, raw_asset_id: str) -> str: bucket = "dandiarchive" operation_type = "REST.GET.OBJECT" - parse_raw_s3_log( + reduce_raw_s3_log( raw_s3_log_file_path=raw_s3_log_file_path, - parsed_s3_log_folder_path=reduced_s3_logs_folder_path, + reduced_s3_logs_folder_path=reduced_s3_logs_folder_path, mode=mode, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, bucket=bucket, operation_type=operation_type, excluded_ips=excluded_ips, asset_id_handler=asset_id_handler, tqdm_kwargs=tqdm_kwargs, - maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) diff --git a/src/dandi_s3_log_parser/_dandiset_mapper.py b/src/dandi_s3_log_parser/_dandiset_mapper.py index 9f71d07..737a04f 100644 --- a/src/dandi_s3_log_parser/_dandiset_mapper.py +++ b/src/dandi_s3_log_parser/_dandiset_mapper.py @@ -67,8 +67,6 @@ def _map_reduced_logs_to_dandiset( ) -> None: dandiset_id = dandiset.identifier - dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id - for version in dandiset.get_versions(): version_id = version.identifier @@ -76,24 +74,25 @@ def _map_reduced_logs_to_dandiset( all_reduced_logs = [] for asset in dandiset_version.get_assets(): - asset_id = asset.identifier asset_suffixes = pathlib.Path(asset.path).suffixes + is_asset_zarr = ".zarr" in asset_suffixes - blob_or_zarr = "blobs" if ".zarr" not in asset_suffixes else "zarr" + blob_id = asset.blob if not is_asset_zarr else asset.zarr + blobs_or_zarr = "blobs" if not is_asset_zarr else "zarr" - reduced_log_file_path = reduced_s3_logs_folder_path / f"{blob_or_zarr}_{asset_id}.tsv" + reduced_log_file_path = reduced_s3_logs_folder_path / f"{blobs_or_zarr}_{blob_id}.tsv" if not reduced_log_file_path.exists(): continue # No reduced logs found (possible asset was never accessed); skip to next asset reduced_log = pandas.read_table(filepath_or_buffer=reduced_log_file_path, header=0) - reduced_log["asset_id"] = [asset_id] * len(reduced_log) + reduced_log["filename"] = [asset.path] * len(reduced_log) reduced_log["region"] = [ get_region_from_ip_address(ip_address=ip_address, ip_hash_to_region=ip_hash_to_region) for ip_address in reduced_log["ip_address"] ] - reordered_reduced_log = reduced_log.reindex(columns=("asset_id", "timestamp", "bytes_sent", "region")) + reordered_reduced_log = reduced_log.reindex(columns=("filename", "timestamp", "bytes_sent", "region")) all_reduced_logs.append(reordered_reduced_log) if len(all_reduced_logs) == 0: @@ -103,6 +102,7 @@ def _map_reduced_logs_to_dandiset( mapped_log.sort_values(by="timestamp") mapped_log.index = range(len(mapped_log)) + dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id dandiset_log_folder_path.mkdir(exist_ok=True) version_file_path = dandiset_log_folder_path / f"{version_id}.tsv" - mapped_log.to_csv(version_file_path, mode="w", sep="\t", header=True, index=True) + mapped_log.to_csv(path_or_buf=version_file_path, mode="w", sep="\t", header=True, index=True) diff --git a/src/dandi_s3_log_parser/_s3_log_file_parser.py b/src/dandi_s3_log_parser/_s3_log_file_reducer.py similarity index 70% rename from src/dandi_s3_log_parser/_s3_log_file_parser.py rename to src/dandi_s3_log_parser/_s3_log_file_reducer.py index a3b0365..4ee98a1 100644 --- a/src/dandi_s3_log_parser/_s3_log_file_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_file_reducer.py @@ -10,7 +10,7 @@ import pandas import tqdm -from pydantic import validate_call +from pydantic import DirectoryPath, FilePath, validate_call from ._buffered_text_reader import BufferedTextReader from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH @@ -18,31 +18,34 @@ @validate_call -def parse_raw_s3_log( +def reduce_raw_s3_log( *, - raw_s3_log_file_path: str | pathlib.Path, - parsed_s3_log_folder_path: str | pathlib.Path, + raw_s3_log_file_path: FilePath, + reduced_s3_logs_folder_path: DirectoryPath, mode: Literal["w", "a"] = "a", + maximum_buffer_size_in_bytes: int = 4 * 10**9, bucket: str | None = None, operation_type: Literal[_KNOWN_OPERATION_TYPES] = "REST.GET.OBJECT", excluded_ips: collections.defaultdict[str, bool] | None = None, asset_id_handler: Callable | None = None, tqdm_kwargs: dict | None = None, - maximum_buffer_size_in_bytes: int = 4 * 10**9, ) -> None: """ - Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. + Reduce a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID. - 'Parsing' here means: - - limiting only to requests of the specified type (i.e., GET, PUT, etc.) - - reducing the information to the asset ID, request time, request size, and geographic IP of the requester + 'Reduce' here means: + - Filtering all lines only by the bucket specified. + - Filtering all lines only by the type of operation specified (i.e., REST.GET.OBJECT, REST.PUT.OBJECT, etc.). + - Filtering out any non-success status codes. + - Filtering out any excluded IP addresses. + - Extracting only the asset ID, request timestamp, request size, and IP address that sent the request. Parameters ---------- raw_s3_log_file_path : str or pathlib.Path - Path to the raw S3 log file. - parsed_s3_log_folder_path : str or pathlib.Path - Path to write each parsed S3 log file to. + The path to the raw S3 log file. + reduced_s3_log_folder_path : str or pathlib.Path + The path to write each reduced S3 log file to. There will be one file per handled asset ID. mode : "w" or "a", default: "a" How to resolve the case when files already exist in the folder containing parsed logs. @@ -50,6 +53,11 @@ def parse_raw_s3_log( The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration. + maximum_buffer_size_in_bytes : int, default: 4 GB + The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the + source text file. + + Actual RAM usage will be higher due to overhead and caching. bucket : str Only parse and return lines that match this bucket. operation_type : str, default: "REST.GET" @@ -65,14 +73,9 @@ def asset_id_handler(*, raw_asset_id: str) -> str: split_by_slash = raw_asset_id.split("/") return split_by_slash[0] + "_" + split_by_slash[-1] tqdm_kwargs : dict, optional - Keyword arguments to pass to the tqdm progress bar. - maximum_buffer_size_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the - source text file. + Keyword arguments to pass to the tqdm progress bar for line buffers. """ - raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path) - parsed_s3_log_folder_path = pathlib.Path(parsed_s3_log_folder_path) - parsed_s3_log_folder_path.mkdir(exist_ok=True) + reduced_s3_logs_folder_path.mkdir(exist_ok=True) bucket = bucket or "" excluded_ips = excluded_ips or collections.defaultdict(bool) asset_id_handler = asset_id_handler or (lambda asset_id: asset_id) @@ -82,16 +85,16 @@ def asset_id_handler(*, raw_asset_id: str) -> str: reduced_and_binned_logs = _get_reduced_and_binned_log_lines( raw_s3_log_file_path=raw_s3_log_file_path, - asset_id_handler=asset_id_handler, + maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, bucket=bucket, operation_type=operation_type, excluded_ips=excluded_ips, + asset_id_handler=asset_id_handler, tqdm_kwargs=tqdm_kwargs, - maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes, ) for handled_asset_id, reduced_logs_per_handled_asset_id in reduced_and_binned_logs.items(): - parsed_s3_log_file_path = parsed_s3_log_folder_path / f"{handled_asset_id}.tsv" + parsed_s3_log_file_path = reduced_s3_logs_folder_path / f"{handled_asset_id}.tsv" data_frame = pandas.DataFrame(data=reduced_logs_per_handled_asset_id) @@ -102,45 +105,14 @@ def asset_id_handler(*, raw_asset_id: str) -> str: def _get_reduced_and_binned_log_lines( *, raw_s3_log_file_path: pathlib.Path, - asset_id_handler: Callable, + maximum_buffer_size_in_bytes: int, bucket: str, operation_type: Literal[_KNOWN_OPERATION_TYPES], excluded_ips: collections.defaultdict[str, bool], + asset_id_handler: Callable, tqdm_kwargs: dict, - maximum_buffer_size_in_bytes: int, ) -> collections.defaultdict[str, dict[str, list[str | int]]]: - """ - Reduce the full S3 log file to minimal content and bin by asset ID. - - Parameters - ---------- - raw_s3_log_file_path : str or pathlib.Path - Path to the raw S3 log file. - asset_id_handler : callable, optional - If your asset IDs in the raw log require custom handling (i.e., they contain slashes that you do not wish to - translate into nested directory paths) then define a function of the following form: - - # For example - def asset_id_handler(*, raw_asset_id: str) -> str: - split_by_slash = raw_asset_id.split("/") - return split_by_slash[0] + "_" + split_by_slash[-1] - bucket : str - Only parse and return lines that match this bucket. - operation_type : str - The type of operation to filter for. - excluded_ips : collections.defaultdict of strings to booleans - A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. - tqdm_kwargs : dict, optional - Keyword arguments to pass to the tqdm progress bar. - maximum_buffer_size_in_bytes : int, default: 4 GB - The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the - source text file. - - Returns - ------- - reduced_and_binned_logs : collections.defaultdict - A map of all reduced log line content binned by handled asset ID. - """ + """Reduce the full S3 log file to minimal content and bin by asset ID.""" tqdm_kwargs = tqdm_kwargs or dict() default_tqdm_kwargs = dict(desc="Parsing line buffers...", leave=False) resolved_tqdm_kwargs = dict(default_tqdm_kwargs) @@ -172,10 +144,10 @@ def asset_id_handler(*, raw_asset_id: str) -> str: _append_reduced_log_line( raw_line=raw_line, reduced_and_binned_logs=reduced_and_binned_logs, - asset_id_handler=asset_id_handler, bucket=bucket, operation_type=operation_type, excluded_ips=excluded_ips, + asset_id_handler=asset_id_handler, lines_errors_file_path=lines_errors_file_path, log_file_path=raw_s3_log_file_path, line_index=line_index, diff --git a/src/dandi_s3_log_parser/_s3_log_line_parser.py b/src/dandi_s3_log_parser/_s3_log_line_parser.py index bed788e..399a6d6 100644 --- a/src/dandi_s3_log_parser/_s3_log_line_parser.py +++ b/src/dandi_s3_log_parser/_s3_log_line_parser.py @@ -31,10 +31,10 @@ def _append_reduced_log_line( *, raw_line: str, reduced_and_binned_logs: collections.defaultdict[str, dict[str, list[str | int]]], - asset_id_handler: Callable, bucket: str, operation_type: Literal[_KNOWN_OPERATION_TYPES], excluded_ips: collections.defaultdict[str, bool], + asset_id_handler: Callable, lines_errors_file_path: pathlib.Path, line_index: int, log_file_path: pathlib.Path, @@ -63,6 +63,8 @@ def asset_id_handler(*, raw_asset_id: str) -> str: The type of operation to filter for. excluded_ips : collections.defaultdict of strings to booleans A lookup table / hash map whose keys are IP addresses and values are True to exclude from parsing. + lines_errors_file_path: pathlib.Path + The path to the file where line errors are being collected. line_index: int The index of the line in the raw log file. log_file_path: pathlib.Path diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv index c045fef..953e052 100644 --- a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.210812.1448.tsv @@ -1,3 +1,3 @@ - asset_id timestamp bytes_sent region -0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown -1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown + filename timestamp bytes_sent region +0 sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 2022-03-16 02:21:12 512 unknown +1 sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv index c045fef..953e052 100644 --- a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/0.230629.1955.tsv @@ -1,3 +1,3 @@ - asset_id timestamp bytes_sent region -0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown -1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown + filename timestamp bytes_sent region +0 sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 2022-03-16 02:21:12 512 unknown +1 sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv index c045fef..953e052 100644 --- a/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000003/draft.tsv @@ -1,3 +1,3 @@ - asset_id timestamp bytes_sent region -0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown -1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown + filename timestamp bytes_sent region +0 sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 2022-03-16 02:21:12 512 unknown +1 sub-YutaMouse20/sub-YutaMouse20_ses-YutaMouse20-140327_behavior+ecephys.nwb 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv index 041db08..919df08 100644 --- a/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/0.220126.2143.tsv @@ -1,3 +1,3 @@ - asset_id timestamp bytes_sent region -0 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-04-24 12:03:05 1443 unknown -1 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-12-31 23:06:42 1443 unknown + filename timestamp bytes_sent region +0 sub-anm215592/sub-anm215592_ses-20131015_obj-odx8px_behavior+icephys+ogen.nwb 2021-04-24 12:03:05 1443 unknown +1 sub-anm215592/sub-anm215592_ses-20131015_obj-odx8px_behavior+icephys+ogen.nwb 2021-12-31 23:06:42 1443 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv index 041db08..919df08 100644 --- a/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv +++ b/tests/examples/mapped_to_dandiset_example_0/expected_output/000013/draft.tsv @@ -1,3 +1,3 @@ - asset_id timestamp bytes_sent region -0 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-04-24 12:03:05 1443 unknown -1 cbcf1d6d-7f64-4d1f-8692-75e09e177ca6 2021-12-31 23:06:42 1443 unknown + filename timestamp bytes_sent region +0 sub-anm215592/sub-anm215592_ses-20131015_obj-odx8px_behavior+icephys+ogen.nwb 2021-04-24 12:03:05 1443 unknown +1 sub-anm215592/sub-anm215592_ses-20131015_obj-odx8px_behavior+icephys+ogen.nwb 2021-12-31 23:06:42 1443 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_5e9e92e1-f044-4aa0-ab47-1cfcb8899348.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_58c53789-eec4-4080-ad3b-207cf2a1cac9.tsv similarity index 100% rename from tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_5e9e92e1-f044-4aa0-ab47-1cfcb8899348.tsv rename to tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_58c53789-eec4-4080-ad3b-207cf2a1cac9.tsv diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv deleted file mode 100644 index c045fef..0000000 --- a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_a7b032b8-1e31-429f-975f-52a28cec6629.tsv +++ /dev/null @@ -1,3 +0,0 @@ - asset_id timestamp bytes_sent region -0 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-03-16 02:21:12 512 unknown -1 5e9e92e1-f044-4aa0-ab47-1cfcb8899348 2022-05-04 05:06:35 512 unknown diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cbcf1d6d-7f64-4d1f-8692-75e09e177ca6.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cad9e87d-9154-464c-91d6-2f08f2a9c354.tsv similarity index 100% rename from tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cbcf1d6d-7f64-4d1f-8692-75e09e177ca6.tsv rename to tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_cad9e87d-9154-464c-91d6-2f08f2a9c354.tsv diff --git a/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_not_a_real_id.tsv b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_not_a_real_id.tsv new file mode 100644 index 0000000..570c480 --- /dev/null +++ b/tests/examples/mapped_to_dandiset_example_0/reduced_logs/blobs_not_a_real_id.tsv @@ -0,0 +1,3 @@ +timestamp bytes_sent ip_address line_index +2022-03-16 02:21:12 512 192.0.2.0 1 +2022-05-04 05:06:35 512 192.0.2.0 1 diff --git a/tests/test_map_reduced_logs_to_all_dandisets.py b/tests/test_map_reduced_logs_to_all_dandisets.py index f7ab5a0..5bbc7a2 100644 --- a/tests/test_map_reduced_logs_to_all_dandisets.py +++ b/tests/test_map_reduced_logs_to_all_dandisets.py @@ -43,6 +43,9 @@ def test_map_reduced_logs_to_dandisets(tmpdir: py.path.local): assert set(test_dandiset_version_id_file_paths.keys()) == set(expected_dandiset_version_id_file_paths.keys()) for expected_version_id_file_path in expected_dandiset_version_id_file_paths.values(): + # Pandas assertion makes no reference to the file being tested when it fails + print(expected_version_id_file_path) + test_version_id_file_path = ( dandiset_logs_folder_path / expected_version_id_file_path.parent.name / expected_version_id_file_path.name ) diff --git a/tests/test_reduce_dandi_raw_s3_log_bad_lines.py b/tests/test_reduce_dandi_raw_s3_log_bad_lines.py index 8e1aeab..92a866d 100644 --- a/tests/test_reduce_dandi_raw_s3_log_bad_lines.py +++ b/tests/test_reduce_dandi_raw_s3_log_bad_lines.py @@ -22,12 +22,14 @@ def test_reduce_dandi_raw_s3_log_bad_lines(tmpdir: py.path.local) -> None: example_raw_s3_log_file_path = examples_folder_path / "0.log" expected_reduced_s3_logs_folder_path = examples_folder_path / "expected_output" - test_reduced_s3_log_folder_path = tmpdir / "reduced_example_2" + test_reduced_s3_logs_folder_path = tmpdir / "reduced_example_2" + test_reduced_s3_logs_folder_path.mkdir(exist_ok=True) + dandi_s3_log_parser.reduce_dandi_raw_s3_log( raw_s3_log_file_path=example_raw_s3_log_file_path, - reduced_s3_logs_folder_path=test_reduced_s3_log_folder_path, + reduced_s3_logs_folder_path=test_reduced_s3_logs_folder_path, ) - test_output_file_paths = list(test_reduced_s3_log_folder_path.iterdir()) + test_output_file_paths = list(test_reduced_s3_logs_folder_path.iterdir()) number_of_output_files = len(test_output_file_paths) expected_number_of_output_files = 3