Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mixer validator #215

Merged
merged 24 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b724d5f
Adding script that validates if mixer config is well formated and has…
Sep 10, 2024
1954932
Add S3 path validation with boto3 existence check
Sep 13, 2024
9ebe5f1
Adding check of the files, trying to run jq expressions on them and s…
Sep 16, 2024
67adadf
Add S3 path validation, sampling, and doc-attribute alignment checks
Sep 25, 2024
4391805
adding logic to split jsonpath expressions into pieces and check them
Sep 25, 2024
2885e7e
Added JsonPath syntax evaluation, started working on sampling docs an…
Sep 25, 2024
82920f8
Adding logic to check if all doc and corresponding attributes files c…
Sep 27, 2024
8745c8d
Adding functionality to check if filters in config and attribute file…
Oct 3, 2024
564cee6
updating filter checking logic to focus on filters missing from the m…
Oct 3, 2024
5e7e3d4
adding ligic to run jq and jsonpath filters on small set of docs to s…
Oct 7, 2024
445bfef
refactored to use smart open and added logic to download sample files…
Oct 9, 2024
83481ac
added logic to sample lines from doc and apply filters to it, refacto…
Oct 11, 2024
890de88
Adding clean up logic to delete sample files after the run
Oct 11, 2024
50763bd
Merge branch 'main' of https://github.com/allenai/dolma into mixer-va…
Oct 11, 2024
001fd04
adding test configs for mixer validator
Oct 14, 2024
15a7104
fixing bug in test configs
Oct 14, 2024
b740e45
addressing comments, spliting script into smaller files, moving test …
Oct 16, 2024
c1708e2
adding --verbose method, support of .env variables
Oct 17, 2024
2aca6a3
supporting != operator
Oct 17, 2024
d10de44
updating types in function definitions, updating Readme
Oct 21, 2024
e59c64b
fixing a bug in readme
Oct 21, 2024
07c2367
adding more error handlers
Oct 21, 2024
6941ac6
deleting the initial version of the script
Oct 21, 2024
f019fef
Merge branch 'main' into mixer-validator
mariia-iureva Oct 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies = [
# "fasttext==0.9.2", # broken with new version of setuptools; using fasttext-wheel instead
"fasttext-wheel==0.9.2",
"fsspec>=2023.6.0",
"jsonpath-ng",
"jq",
"msgspec>=0.14.2",
"nltk>=3.9.1",
"omegaconf>=2.3.0",
Expand Down
35 changes: 35 additions & 0 deletions scripts/validate_mixer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Dolma Mixer Configuration Validator

This script validates the configuration for the Dolma Mixer, ensuring that all necessary components are correctly set up before running the main process.

## Features

The validator performs the following checks:

1. Verifies the presence and format of required fields in the configuration.
2. Validates the syntax of the configuration file (YAML or JSON).
3. Checks for duplicate keys in the configuration.
4. Validates JQ or JSONPath expressions for syntax and compilation.
5. Verifies S3 path syntax and accessibility.
6. Confirms write permissions for output paths.
7. Checks the existence and accessibility of attribute files.
8. Samples a subset of files for detailed validation.
9. Ensures alignment between document and attribute files.
10. Validates the format and content of sampled files.
11. Executes JQ or JSONPath commands on sampled files.
12. Validates nested key existence in filter expressions.

## Usage

Run the validator using the following command:

```
python scripts/validate_mixer/main.py <path_to_config_file> <optional: number of files to sample>
```

- `<path_to_config_file>`: Path to your Dolma Mixer configuration file (YAML or JSON)
- `<number_of_files_to_sample>`: (Optional) Number of files to sample for detailed checks (default: 1)

## Output

The script provides detailed progress information and error messages for any validation failures, helping you troubleshoot configuration issues before running the main Dolma Mixer process.
6 changes: 6 additions & 0 deletions scripts/validate_mixer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .config_handler import load_config, validate_config_structure, validate_stream, validate_output, validate_filter_config
from .validator import load_and_validate_config, validate_s3_paths_and_permissions, validate_stream_filters, validate_documents_and_attributes
from .file_operations import sample_files, download_file, sample_and_download_files, count_file_lines, check_attribute_name_typos, sample_file_lines, sample_documents_with_attributes, validate_jsonl, validate_filters_and_check_typos, sample_and_extract_attributes
from .filter_operations import validate_jq_expression, validate_jsonpath_expression, validate_filter_expressions, evaluate_comparison, evaluate_jsonpath_condition, split_complex_jsonpath, prepare_filter, execute_filter_commands, extract_attribute_names_from_filters, extract_filter_attributes
from .s3_utils import validate_s3_path, check_s3_path_exists, check_s3_path_writable, check_s3_parent_exists, list_s3_objects, get_base_path, get_corresponding_attribute_path
from .utils import keyboard_interrupt_handler
105 changes: 105 additions & 0 deletions scripts/validate_mixer/config_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import yaml
import json
from typing import Dict, Any, List, Union, Type

def load_config(config_path: str) -> Dict[str, Any]:
"""Load the configuration file (YAML or JSON)."""
try:
with open(config_path, 'r') as file:
if config_path.endswith('.yaml') or config_path.endswith('.yml'):
return yaml.safe_load(file)
elif config_path.endswith('.json'):
return json.load(file)
else:
raise ValueError("Unsupported file format. Use .yaml, .yml, or .json")
except Exception as e:
raise ValueError(f"Error loading config file: {str(e)}")

def validate_config_structure(config: Dict[str, Any]) -> List[str]:
"""Validate the basic structure of the configuration."""
required_fields = ['streams', 'processes']
errors = []

for field in required_fields:
if field not in config:
errors.append(f"Missing required field: {field}")
elif field == 'streams':
errors.extend(validate_streams(config[field]))
elif field == 'processes':
errors.extend(validate_processes(config[field]))

return errors

def validate_streams(streams: Any) -> List[str]:
errors = []
if not isinstance(streams, list):
errors.append("'streams' should be a list")
else:
for i, stream in enumerate(streams):
stream_errors = validate_stream(stream, i)
errors.extend(stream_errors)
return errors

def validate_processes(processes: Any) -> List[str]:
if not isinstance(processes, int):
return ["'processes' should be an integer"]
return []

def validate_stream(stream: Dict[str, Any], index: int) -> List[str]:
"""Validate an individual stream configuration."""
required_fields = ['name', 'documents', 'attributes', 'output']
expected_type = {
'name': str,
'documents': list,
'attributes': list,
'output': dict
}
errors = []

for field in required_fields:
errors.extend(check_type_if_present(stream, field, expected_type[field], index))

if 'output' in stream:
output_errors = validate_output(stream['output'], index)
errors.extend(output_errors)

if 'filter' in stream:
filter_errors = validate_filter_config(stream['filter'], index)
errors.extend(filter_errors)
return errors

def check_type_if_present(stream: Dict[str, Any], field: str, expected_type: Union[Type, List[Type]], stream_index: int) -> List[str]:
Whattabatt marked this conversation as resolved.
Show resolved Hide resolved
"""Check if a field is present in the stream and has the expected type."""
errors = []
if field not in stream:
errors.append(f"Stream {stream_index}: Missing required field: {field}")
elif not isinstance(stream[field], expected_type):
type_name = expected_type.__name__ if isinstance(expected_type, type) else str(expected_type)
errors.append(f"Stream {stream_index}: '{field}' should be a {type_name}")
return errors

def validate_output(output: Dict[str, Any], stream_index: int) -> List[str]:
"""Validate the output configuration of a stream."""
required_fields = ['path', 'max_size_in_bytes']
errors = []

for field in required_fields:
if field not in output:
errors.append(f"Stream {stream_index} output: Missing required field: {field}")

if 'max_size_in_bytes' in output and not isinstance(output['max_size_in_bytes'], int):
errors.append(f"Stream {stream_index} output: 'max_size_in_bytes' should be an integer")

return errors

def validate_filter_config(filter_config: Dict[str, Any], stream_index: int) -> List[str]:
"""Validate the filter configuration of a stream."""
errors = []

if 'include' in filter_config and not isinstance(filter_config['include'], list):
errors.append(f"Stream {stream_index} filter: 'include' should be a list")

if 'exclude' in filter_config and not isinstance(filter_config['exclude'], list):
errors.append(f"Stream {stream_index} filter: 'exclude' should be a list")

return errors
207 changes: 207 additions & 0 deletions scripts/validate_mixer/file_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import os
import random
import re
import json
import itertools
from typing import List, Dict, Any, Tuple
from tqdm import tqdm
import smart_open

from s3_utils import s3_client, list_s3_objects, get_base_path, get_corresponding_attribute_path

def sample_files(s3_path: str, num_samples: int) -> List[str]:
"""Sample a subset of files from an S3 path."""
all_files = list(list_s3_objects(s3_path))
# Filter out directories (paths ending with '/')
all_files = [f for f in all_files if not f.endswith('/')]
chosen_files = random.sample(all_files, min(int(num_samples), len(all_files)))
print(f"Sampled {len(chosen_files)} files from {len(all_files)} matching files")
return chosen_files

def download_file(s3_path: str, local_path: str) -> None:
bucket, key = s3_path.replace("s3://", "").split("/", 1)
s3_client.download_file(bucket, key, local_path)

def sample_and_download_files(stream: Dict[str, Any], num_samples: int) -> Tuple[List[str], Dict[str, List[str]]]:
temp_dir = "temp_sample_files"

# Create the temporary directory if it doesn't exist
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)

try:
doc_samples = sample_files(stream['documents'][0], num_samples)

base_doc_path = get_base_path(stream['documents'][0])
base_attr_path = re.sub(r'/documents($|/)', r'/attributes\1', base_doc_path)

total_files = len(doc_samples) * (len(stream['attributes']) + 1) # +1 for the document itself

with tqdm(total=total_files, desc="Downloading files") as pbar:
local_doc_samples = []
local_attr_samples_dict = {attr_type: [] for attr_type in stream['attributes']}

for doc_sample in doc_samples:
local_doc_path = os.path.join(temp_dir, os.path.basename(doc_sample))
download_file(doc_sample, local_doc_path)
local_doc_samples.append(local_doc_path)
pbar.update(1)

# Extract the base name and extension
base_name, extension = os.path.splitext(os.path.basename(doc_sample))
if extension == '.gz':
# Handle double extensions like .jsonl.gz
base_name, inner_extension = os.path.splitext(base_name)
extension = inner_extension + extension

for attr_type in stream['attributes']:
attr_sample = get_corresponding_attribute_path(doc_sample, base_doc_path, base_attr_path, attr_type)
# Construct the new filename with the attribute type before the extension, using a hyphen
new_filename = f"{base_name}-{attr_type}{extension}"
local_attr_path = os.path.join(temp_dir, new_filename)
download_file(attr_sample, local_attr_path)
local_attr_samples_dict[attr_type].append(local_attr_path)
pbar.update(1)

return local_doc_samples, local_attr_samples_dict

except Exception as e:
print(f"An error occurred: {str(e)}")
raise

def count_file_lines(file_path: str) -> int:
"""
Count the number of lines in a file (local or S3, compressed or not).

:param file_path: Path to the file (can be S3 or local)
:return: Number of lines in the file, or -1 if an error occurred
"""
# print(f"Counting lines in file: {file_path}")
try:
with smart_open.open(file_path, 'rb') as f:
# print("successfully opened file in count_file_lines")
line_count = sum(1 for _ in f)
return line_count
except Exception as e:
print(f"Error counting lines in file {file_path}: {str(e)}")
return -1

def check_attribute_name_typos(config_attributes: set, sample_attributes: set) -> None:
"""Check for typos in attribute names by comparing config and sample data."""
missing_in_sample = config_attributes - sample_attributes
extra_in_sample = sample_attributes - config_attributes

if missing_in_sample:
print("Warning: The following attributes are in the config but not in the sample data:")
for attr in missing_in_sample:
print(f" - {attr}")

if extra_in_sample:
print("Info: The following attributes are in the sample data but not used in the config:")
for attr in extra_in_sample:
print(f" - {attr}")

def sample_file_lines(file_path: str, num_lines: int = 1) -> List[str] | None:
"""
Sample N lines from a file, handling both local and S3 paths, and compression.

Args:
file_path (str): Path to the file (local or S3)
num_lines (int): Number of lines to sample (default: 1)

Returns:
list: List of sampled lines, or None if an error occurred
"""
try:
if not isinstance(file_path, str):
raise ValueError(f"Expected string for file_path, got {type(file_path)}")
with smart_open.open(file_path, 'r') as f:
# Use itertools.islice to efficiently read N lines
sampled_lines = list(itertools.islice(f, num_lines))

if not sampled_lines:
print(f"Warning: File is empty or could not be read: {file_path}")
return None

# Strip whitespace from each line
sampled_lines = [line.strip() for line in sampled_lines]

if len(sampled_lines) < num_lines:
print(f"Warning: Requested {num_lines} lines, but file only contains {len(sampled_lines)} lines: {file_path}")

return sampled_lines

except ValueError as ve:
print(f"Error in sample_file_lines: {str(ve)}")
return None
except Exception as e:
print(f"Error in sample_file_lines when reading file {file_path}: {str(e)}")
print(f"Error type: {type(e)}")
print(f"File path type: {type(file_path)}")
return None


def sample_documents_with_attributes(doc_file_paths: List[str], attr_file_paths: List[str], num_samples: int = 100) -> List[Dict[str, Any]]:
sampled_docs = []
for doc_path, attr_paths in zip(doc_file_paths, attr_file_paths):
doc_lines = sample_file_lines(doc_path, num_samples)
if not doc_lines:
continue

attr_samples = {}
for attr_path in attr_paths:
attr_lines = sample_file_lines(attr_path, num_samples)
if attr_lines:
attr_name = os.path.basename(attr_path).split('.')[0] # Extract attribute name from file name
attr_samples[attr_name] = attr_lines

for i, doc_line in enumerate(doc_lines):
doc = json.loads(doc_line)
for attr_name, attr_lines in attr_samples.items():
if i < len(attr_lines):
doc[attr_name] = json.loads(attr_lines[i])
sampled_docs.append(doc)

return sampled_docs


def validate_jsonl(file_path: str, expected_fields: set) -> Tuple[bool, List[str]]:
"""
Validate that the file is a valid JSONL and contains expected fields.

:param file_path: Path to the file (can be S3 or local)
:param expected_fields: Set of field names expected in each JSON object
:return: Tuple (is_valid, error_messages)
"""
unexpected_fields = set()
error_messages = []
is_valid = True

try:
with smart_open.open(file_path, 'r') as f:
for i, line in enumerate(f, 1):
try:
data = json.loads(line)
missing_fields = expected_fields - set(data.keys())
new_fields = set(data.keys()) - expected_fields

if missing_fields:
error_messages.append(f"Line {i}: Missing expected fields: {missing_fields}")
is_valid = False

if new_fields:
unexpected_fields.update(new_fields)
is_valid = False

except json.JSONDecodeError:
error_messages.append(f"Line {i}: Invalid JSON")
is_valid = False

except Exception as e:
error_messages.append(f"Error reading file {file_path}: {str(e)}")
is_valid = False

if unexpected_fields:
error_messages.append(f"Additional fields found across the file: {unexpected_fields}")
return is_valid, error_messages

Loading
Loading