Skip to content

Commit

Permalink
Adding clean up logic to delete sample files after the run
Browse files Browse the repository at this point in the history
  • Loading branch information
Masha Iureva authored and Masha Iureva committed Oct 11, 2024
1 parent 83481ac commit 890de88
Showing 1 changed file with 64 additions and 51 deletions.
115 changes: 64 additions & 51 deletions scripts/validate_mixer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from tqdm import tqdm
import importlib
import traceback
import shutil

s3_client = boto3.client('s3')

Expand Down Expand Up @@ -816,69 +817,81 @@ def validate_stream_filters(config):

def validate_documents_and_attributes(config, num_samples):
print("Sampling and validating document-attribute alignment, filters, and attribute names...")
for stream in config['streams']:
filter_attributes = set()
if 'filter' in stream:
include_filters = stream['filter'].get('include', [])
exclude_filters = stream['filter'].get('exclude', [])
filter_attributes = extract_attribute_names_from_filters(include_filters + exclude_filters)

base_doc_path = get_base_path(stream['documents'][0])
base_attr_path = re.sub(r'/documents($|/)', r'/attributes\1', base_doc_path)

doc_samples, attr_samples_dict = sample_and_download_files(stream, num_samples)
temp_dir = "temp_sample_files"
try:
for stream in config['streams']:
filter_attributes = set()
if 'filter' in stream:
include_filters = stream['filter'].get('include', [])
exclude_filters = stream['filter'].get('exclude', [])
filter_attributes = extract_attribute_names_from_filters(include_filters + exclude_filters)

for doc_sample in doc_samples:
print(f"\nValidating file: {doc_sample}")

doc_line_count = count_file_lines(doc_sample)
if doc_line_count == -1:
print("Failed to count lines in document file. Check the file and try again.")
return False
base_doc_path = get_base_path(stream['documents'][0])
base_attr_path = re.sub(r'/documents($|/)', r'/attributes\1', base_doc_path)

print(f"Document has {doc_line_count} lines")

doc_expected_fields = {'id', 'text', 'source', 'created', 'added', 'version', 'metadata', 'attributes'}
is_valid, error_messages = validate_jsonl(doc_sample, doc_expected_fields)
if not is_valid:
print("Document validation failed:")
for error in error_messages:
print(f" {error}")
return False

for attr_type in stream['attributes']:
attr_sample = attr_samples_dict[attr_type][doc_samples.index(doc_sample)]
print(f"\nValidating attribute file: {attr_sample}")

attr_line_count = count_file_lines(attr_sample)
if attr_line_count == -1:
print("Failed to count lines in attribute file. Skipping further validation for this attribute.")
continue
doc_samples, attr_samples_dict = sample_and_download_files(stream, num_samples)

print(f"Attribute file has {attr_line_count} lines")
for doc_sample in doc_samples:
print(f"\nValidating file: {doc_sample}")

if doc_line_count != attr_line_count:
print(f"ERROR: Line count mismatch! Document has {doc_line_count} lines, but attribute file has {attr_line_count} lines.")
doc_line_count = count_file_lines(doc_sample)
if doc_line_count == -1:
print("Failed to count lines in document file. Check the file and try again.")
return False
else:
print("Line count check passed: Document and attribute file have the same number of lines.")

print(f"Document has {doc_line_count} lines")

attr_expected_fields = {'id', 'attributes'}
is_valid, error_messages = validate_jsonl(attr_sample, attr_expected_fields)
doc_expected_fields = {'id', 'text', 'source', 'created', 'added', 'version', 'metadata', 'attributes'}
is_valid, error_messages = validate_jsonl(doc_sample, doc_expected_fields)
if not is_valid:
print("Warning: possible attribute validation mismatch:")
print("Document validation failed:")
for error in error_messages:
print(f" {error}")
else:
print("Attribute validation passed")
return False

for attr_type in stream['attributes']:
attr_sample = attr_samples_dict[attr_type][doc_samples.index(doc_sample)]
print(f"\nValidating attribute file: {attr_sample}")

attr_line_count = count_file_lines(attr_sample)
if attr_line_count == -1:
print("Failed to count lines in attribute file. Skipping further validation for this attribute.")
continue

if 'filter' in stream:
validate_filters_and_check_typos([attr_sample for attr_samples in attr_samples_dict.values() for attr_sample in attr_samples], stream['filter'], stream['attributes'])
print(f"Attribute file has {attr_line_count} lines")

if doc_line_count != attr_line_count:
print(f"ERROR: Line count mismatch! Document has {doc_line_count} lines, but attribute file has {attr_line_count} lines.")
return False
else:
print("Line count check passed: Document and attribute file have the same number of lines.")

attr_expected_fields = {'id', 'attributes'}
is_valid, error_messages = validate_jsonl(attr_sample, attr_expected_fields)
if not is_valid:
print("Warning: possible attribute validation mismatch:")
for error in error_messages:
print(f" {error}")
else:
print("Attribute validation passed")

if 'filter' in stream:
validate_filters_and_check_typos([attr_sample for attr_samples in attr_samples_dict.values() for attr_sample in attr_samples], stream['filter'], stream['attributes'])

filter_execution_results = execute_filter_commands(attr_samples_dict, stream['attributes'], stream['filter'], num_lines=100)
print(filter_execution_results)
filter_execution_results = execute_filter_commands(attr_samples_dict, stream['attributes'], stream['filter'], num_lines=100)
print(filter_execution_results)

return True

finally:
# Clean up: remove the temporary directory and its contents
if temp_dir and os.path.exists(temp_dir):
try:
shutil.rmtree(temp_dir)
# print(f"Temporary directory '{temp_dir}' has been removed.")
except Exception as e:
print(f"Error while removing temporary directory '{temp_dir}': {str(e)}")

return True

def load_and_validate_config(config_path):
print("Loading configuration file...")
Expand Down

0 comments on commit 890de88

Please sign in to comment.