Skip to content

Commit

Permalink
Final debugs and polish (#45)
Browse files Browse the repository at this point in the history
* fixing filename in mapped logs

* fixes

* use blob instead of identifier

* fix tests

* use blob instead of identifier

* debugs and polish

* fix test

* fixes

---------

Co-authored-by: CodyCBakerPhD <[email protected]>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 16, 2024
1 parent a00b580 commit a2b19d2
Show file tree
Hide file tree
Showing 18 changed files with 128 additions and 146 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ To iteratively parse all historical logs all at once (parallelization strongly r
reduce_all_dandi_raw_s3_logs \
--base_raw_s3_logs_folder_path < base log folder > \
--reduced_s3_logs_folder_path < output folder > \
--excluded_ips < comma-separated list of known IPs to exclude > \
--maximum_number_of_workers < number of CPUs to use > \
--maximum_buffer_size_in_mb < approximate amount of RAM to use >
--maximum_buffer_size_in_mb < approximate amount of RAM to use > \
--excluded_ips < comma-separated list of known IPs to exclude >
```

For example, on Drogon:
Expand All @@ -61,9 +61,9 @@ For example, on Drogon:
reduce_all_dandi_raw_s3_logs \
--base_raw_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs \
--reduced_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \
--excluded_ips < Drogons IP > \
--maximum_number_of_workers 6 \
--maximum_buffer_size_in_mb 5000
--maximum_buffer_size_in_mb 5000 \
--excluded_ips < Drogons IP >
```

### Reduce a single log file
Expand Down
6 changes: 3 additions & 3 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
"""

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._s3_log_file_parser import parse_raw_s3_log
from ._s3_log_file_reducer import reduce_raw_s3_log
from ._buffered_text_reader import BufferedTextReader
from ._dandi_s3_log_file_parser import reduce_dandi_raw_s3_log, reduce_all_dandi_raw_s3_logs
from ._dandi_s3_log_file_reducer import reduce_dandi_raw_s3_log, reduce_all_dandi_raw_s3_logs
from ._ip_utils import get_region_from_ip_address
from ._dandiset_mapper import map_reduced_logs_to_dandisets

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
"parse_raw_s3_log",
"reduce_raw_s3_log",
"BufferedTextReader",
"reduce_dandi_raw_s3_log",
"reduce_all_dandi_raw_s3_logs",
Expand Down
36 changes: 18 additions & 18 deletions src/dandi_s3_log_parser/_command_line_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import click

from ._config import REQUEST_TYPES
from ._dandi_s3_log_file_parser import (
from ._dandi_s3_log_file_reducer import (
reduce_all_dandi_raw_s3_logs,
reduce_dandi_raw_s3_log,
)
Expand All @@ -28,13 +28,6 @@
required=True,
type=click.Path(writable=True),
)
@click.option(
"--excluded_ips",
help="A comma-separated list of IP addresses to exclude from parsing.",
required=False,
type=str,
default=None,
)
@click.option(
"--maximum_number_of_workers",
help="The maximum number of workers to distribute tasks across.",
Expand All @@ -55,12 +48,19 @@
type=click.IntRange(min=1), # Bare minimum of 1 MB
default=1_000, # 1 GB recommended
)
@click.option(
"--excluded_ips",
help="A comma-separated list of IP addresses to exclude from parsing.",
required=False,
type=str,
default=None,
)
def _reduce_all_dandi_raw_s3_logs_cli(
base_raw_s3_logs_folder_path: str,
reduced_s3_logs_folder_path: str,
excluded_ips: str | None,
maximum_number_of_workers: int,
maximum_buffer_size_in_mb: int,
excluded_ips: str | None,
) -> None:
split_excluded_ips = excluded_ips.split(",") if excluded_ips is not None else list()
handled_excluded_ips = collections.defaultdict(bool) if len(split_excluded_ips) != 0 else None
Expand All @@ -71,9 +71,9 @@ def _reduce_all_dandi_raw_s3_logs_cli(
reduce_all_dandi_raw_s3_logs(
base_raw_s3_logs_folder_path=base_raw_s3_logs_folder_path,
reduced_s3_logs_folder_path=reduced_s3_logs_folder_path,
excluded_ips=handled_excluded_ips,
maximum_number_of_workers=maximum_number_of_workers,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
excluded_ips=handled_excluded_ips,
)


Expand All @@ -90,13 +90,6 @@ def _reduce_all_dandi_raw_s3_logs_cli(
required=True,
type=click.Path(writable=True),
)
@click.option(
"--excluded_ips",
help="A comma-separated list of IP addresses to exclude from reduction.",
required=False,
type=str,
default=None,
)
@click.option(
"--maximum_buffer_size_in_mb",
help=(
Expand All @@ -110,6 +103,13 @@ def _reduce_all_dandi_raw_s3_logs_cli(
type=click.IntRange(min=1), # Bare minimum of 1 MB
default=1_000, # 1 GB recommended
)
@click.option(
"--excluded_ips",
help="A comma-separated list of IP addresses to exclude from reduction.",
required=False,
type=str,
default=None,
)
def _reduce_dandi_raw_s3_log_cli(
raw_s3_log_file_path: str,
reduced_s3_logs_folder_path: str,
Expand All @@ -125,8 +125,8 @@ def _reduce_dandi_raw_s3_log_cli(
reduce_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
reduced_s3_logs_folder_path=reduced_s3_logs_folder_path,
excluded_ips=handled_excluded_ips,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
excluded_ips=handled_excluded_ips,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@

import pandas
import tqdm
from pydantic import DirectoryPath, Field, validate_call
from pydantic import DirectoryPath, Field, FilePath, validate_call

from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._s3_log_file_parser import parse_raw_s3_log
from ._s3_log_file_reducer import reduce_raw_s3_log


@validate_call
def reduce_all_dandi_raw_s3_logs(
*,
base_raw_s3_logs_folder_path: DirectoryPath,
reduced_s3_logs_folder_path: DirectoryPath,
excluded_ips: collections.defaultdict[str, bool] | None = None,
maximum_number_of_workers: int = Field(ge=1, default=1),
maximum_buffer_size_in_bytes: int = 4 * 10**9,
excluded_ips: collections.defaultdict[str, bool] | None = None,
) -> None:
"""
Batch parse all raw S3 log files in a folder and write the results to a folder of TSV files.
Expand All @@ -46,21 +46,21 @@ def reduce_all_dandi_raw_s3_logs(
base_raw_s3_logs_folder_path : file path
The Path to the folder containing the raw S3 log files to be reduced.
reduced_s3_logs_folder_path : file path
The Path to write each parsed S3 log file to.
The Path to write each reduced S3 log file to.
There will be one file per handled asset ID.
excluded_ips : collections.defaultdict(bool), optional
A lookup table whose keys are IP addresses to exclude from reduction.
maximum_number_of_workers : int, default: 1
The maximum number of workers to distribute tasks across.
maximum_buffer_size_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text files.
Actual total RAM usage will be higher due to overhead and caching.
Automatically splits this total amount over the maximum number of workers if `maximum_number_of_workers` is
greater than one.
excluded_ips : collections.defaultdict(bool), optional
A lookup table whose keys are IP addresses to exclude from reduction.
"""
base_raw_s3_logs_folder_path = pathlib.Path(base_raw_s3_logs_folder_path)
reduced_s3_logs_folder_path = pathlib.Path(reduced_s3_logs_folder_path)
excluded_ips = excluded_ips or collections.defaultdict(bool)

asset_id_handler = _get_default_dandi_asset_id_handler()
Expand All @@ -85,10 +85,10 @@ def reduce_all_dandi_raw_s3_logs(
raw_s3_log_file_path=raw_s3_log_file_path,
reduced_s3_logs_folder_path=reduced_s3_logs_folder_path,
mode="a",
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
excluded_ips=excluded_ips,
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(position=1, leave=False),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)
else:
# Create a fresh temporary directory in the home folder and then fresh subfolders for each worker
Expand Down Expand Up @@ -117,11 +117,11 @@ def reduce_all_dandi_raw_s3_logs(
futures.append(
executor.submit(
_multi_worker_reduce_dandi_raw_s3_log,
maximum_number_of_workers=maximum_number_of_workers,
raw_s3_log_file_path=raw_s3_log_file_path,
temporary_folder_path=temporary_folder_path,
excluded_ips=excluded_ips,
maximum_number_of_workers=maximum_number_of_workers,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes_per_worker,
excluded_ips=excluded_ips,
),
)

Expand All @@ -147,22 +147,22 @@ def reduce_all_dandi_raw_s3_logs(
leave=True,
mininterval=2.0,
):
per_worker_parsed_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
per_worker_reduced_s3_log_file_paths = list(per_worker_temporary_folder_path.iterdir())
assert (
len(per_worker_parsed_s3_log_file_paths) != 0
len(per_worker_reduced_s3_log_file_paths) != 0
), f"No files found in {per_worker_temporary_folder_path}!"

for per_worker_parsed_s3_log_file_path in tqdm.tqdm(
iterable=per_worker_parsed_s3_log_file_paths,
for per_worker_reduced_s3_log_file_path in tqdm.tqdm(
iterable=per_worker_reduced_s3_log_file_paths,
desc="Merging results per worker...",
total=len(per_worker_parsed_s3_log_file_paths),
total=len(per_worker_reduced_s3_log_file_paths),
position=1,
leave=False,
mininterval=2.0,
):
merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_parsed_s3_log_file_path.name
merged_temporary_file_path = reduced_s3_logs_folder_path / per_worker_reduced_s3_log_file_path.name

parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_parsed_s3_log_file_path, header=0)
parsed_s3_log = pandas.read_table(filepath_or_buffer=per_worker_reduced_s3_log_file_path, header=0)

header = False if merged_temporary_file_path.exists() else True
parsed_s3_log.to_csv(
Expand All @@ -180,11 +180,11 @@ def reduce_all_dandi_raw_s3_logs(
# pragma: no cover
def _multi_worker_reduce_dandi_raw_s3_log(
*,
maximum_number_of_workers: int,
raw_s3_log_file_path: pathlib.Path,
temporary_folder_path: pathlib.Path,
excluded_ips: collections.defaultdict[str, bool] | None,
maximum_number_of_workers: int,
maximum_buffer_size_in_bytes: int,
excluded_ips: collections.defaultdict[str, bool] | None,
) -> None:
"""
A mostly pass-through function to calculate the worker index on the worker and target the correct subfolder.
Expand All @@ -209,38 +209,39 @@ def _multi_worker_reduce_dandi_raw_s3_log(
date = datetime.datetime.now().strftime("%y%m%d")
parallel_errors_file_path = errors_folder_path / f"v{dandi_s3_log_parser_version}_{date}_parallel_errors.txt"
error_message += (
f"Worker index {worker_index}/{maximum_number_of_workers} parsing {raw_s3_log_file_path} failed due to\n\n"
f"Worker index {worker_index}/{maximum_number_of_workers} reducing {raw_s3_log_file_path} failed due to\n\n"
)

reduce_dandi_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
reduced_s3_logs_folder_path=per_worker_temporary_folder_path,
mode="a",
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
excluded_ips=excluded_ips,
asset_id_handler=asset_id_handler,
tqdm_kwargs=dict(
position=worker_index + 1, leave=False, desc=f"Parsing line buffers on worker {worker_index+1}..."
),
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)
except Exception as exception:
with open(file=parallel_errors_file_path, mode="a") as io:
error_message += f"{type(exception)}: {exception!s}\n\n{traceback.format_exc()}\n\n"
io.write(error_message)


@validate_call
def reduce_dandi_raw_s3_log(
*,
raw_s3_log_file_path: str | pathlib.Path,
reduced_s3_logs_folder_path: str | pathlib.Path,
raw_s3_log_file_path: FilePath,
reduced_s3_logs_folder_path: DirectoryPath,
mode: Literal["w", "a"] = "a",
maximum_buffer_size_in_bytes: int = 4 * 10**9,
excluded_ips: collections.defaultdict[str, bool] | None = None,
asset_id_handler: Callable | None = None,
tqdm_kwargs: dict | None = None,
maximum_buffer_size_in_bytes: int = 4 * 10**9,
) -> None:
"""
Parse a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
Reduce a raw S3 log file and write the results to a folder of TSV files, one for each unique asset ID.
'Parsing' here means:
- limiting only to requests of the specified type (i.e., GET, PUT, etc.)
Expand All @@ -251,14 +252,19 @@ def reduce_dandi_raw_s3_log(
raw_s3_log_file_path : string or pathlib.Path
Path to the raw S3 log file to be reduced.
reduced_s3_logs_folder_path : string or pathlib.Path
The path to write each parsed S3 log file to.
The path to write each reduced S3 log file to.
There will be one file per handled asset ID.
mode : "w" or "a", default: "a"
How to resolve the case when files already exist in the folder containing parsed logs.
"w" will overwrite existing content, "a" will append or create if the file does not yet exist.
The intention of the default usage is to have one consolidated raw S3 log file per day and then to iterate
over each day, parsing and binning by asset, effectively 'updating' the parsed collection on each iteration.
maximum_buffer_size_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text file.
Actual RAM usage will be higher due to overhead and caching.
excluded_ips : collections.defaultdict(bool), optional
A lookup table whose keys are IP addresses to exclude from reduction.
asset_id_handler : callable, optional
Expand All @@ -270,10 +276,7 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
split_by_slash = raw_asset_id.split("/")
return split_by_slash[0] + "_" + split_by_slash[-1]
tqdm_kwargs : dict, optional
Keyword arguments to pass to the tqdm progress bar.
maximum_buffer_size_in_bytes : int, default: 4 GB
The theoretical maximum amount of RAM (in bytes) to use on each buffer iteration when reading from the
source text file.
Keyword arguments to pass to the tqdm progress bar for line buffers.
"""
raw_s3_log_file_path = pathlib.Path(raw_s3_log_file_path)
reduced_s3_logs_folder_path = pathlib.Path(reduced_s3_logs_folder_path)
Expand All @@ -283,16 +286,16 @@ def asset_id_handler(*, raw_asset_id: str) -> str:
bucket = "dandiarchive"
operation_type = "REST.GET.OBJECT"

parse_raw_s3_log(
reduce_raw_s3_log(
raw_s3_log_file_path=raw_s3_log_file_path,
parsed_s3_log_folder_path=reduced_s3_logs_folder_path,
reduced_s3_logs_folder_path=reduced_s3_logs_folder_path,
mode=mode,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
bucket=bucket,
operation_type=operation_type,
excluded_ips=excluded_ips,
asset_id_handler=asset_id_handler,
tqdm_kwargs=tqdm_kwargs,
maximum_buffer_size_in_bytes=maximum_buffer_size_in_bytes,
)


Expand Down
Loading

0 comments on commit a2b19d2

Please sign in to comment.