-
Notifications
You must be signed in to change notification settings - Fork 0
/
mzML2mzNextTwoFiles
196 lines (159 loc) · 7.15 KB
/
mzML2mzNextTwoFiles
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import json
import argparse
import time
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyopenms import *
import numpy as np
# TODO add meta data code
def writeSingleParquet(json_str):
# Create lists to hold spectrum and chromatogram data separately
spectra_data = []
chromatogram_data = []
# Process spectra
for spectrum in exp.getSpectra():
spectrum_id = spectrum.getNativeID()
ms_level = spectrum.getMSLevel()
rt = spectrum.getRT()
ion_mobility = None
if spectrum.getDriftTime() != 0:
ion_mobility = spectrum.getDriftTime()
# Get peaks as numpy arrays
mz_array, intensity_array = spectrum.get_peaks()
# Store complete spectrum data
spectra_data.append({
'id': spectrum_id,
'data_type': 'spectrum',
'ms_level': ms_level,
'rt': rt,
'ion_mobility': ion_mobility,
'mz_array': mz_array,
'intensity_array': intensity_array
})
# Process chromatograms
for chromatogram in exp.getChromatograms():
chrom_id = chromatogram.getNativeID()
time_array, intensity_array = chromatogram.get_peaks()
# Get precursor and product information
precursor = chromatogram.getPrecursor()
product = chromatogram.getProduct()
chromatogram_data.append({
'id': chrom_id,
'data_type': 'chromatogram',
'precursor_mz': precursor.getMZ(),
'product_mz': product.getMZ(),
'time_array': time_array,
'intensity_array': intensity_array
})
# Convert to DataFrames with array columns
spectra_df = pd.DataFrame(spectra_data)
chromatogram_df = pd.DataFrame(chromatogram_data)
# Optimize data types for non-array columns
spectra_df['id'] = spectra_df['id'].astype('category')
spectra_df['data_type'] = spectra_df['data_type'].astype('category')
spectra_df['ms_level'] = spectra_df['ms_level'].astype('Int8')
spectra_df['rt'] = spectra_df['rt'].astype('float32')
spectra_df['ion_mobility'] = spectra_df['ion_mobility'].astype('float32')
chromatogram_df['id'] = chromatogram_df['id'].astype('category')
chromatogram_df['data_type'] = chromatogram_df['data_type'].astype('category')
chromatogram_df['precursor_mz'] = chromatogram_df['precursor_mz'].astype('float32')
chromatogram_df['product_mz'] = chromatogram_df['product_mz'].astype('float32')
# Convert to PyArrow tables
spectra_table = pa.Table.from_pandas(spectra_df)
chromatogram_table = pa.Table.from_pandas(chromatogram_df)
# Add metadata
combined_meta = {
b"mzML_metadata": json_str.encode()
}
# Update schema metadata
spectra_table = spectra_table.replace_schema_metadata(combined_meta)
chromatogram_table = chromatogram_table.replace_schema_metadata(combined_meta)
# Write tables to Parquet files
start_time = time.time()
pq.write_table(spectra_table,
f"{args.output_file}_spectra.parquet",
compression='snappy',
use_dictionary=True,
write_statistics=True)
pq.write_table(chromatogram_table,
f"{args.output_file}_chromatograms.parquet",
compression='snappy',
use_dictionary=True,
write_statistics=True)
end_time = time.time()
print(f"Writing parquet files: {end_time - start_time} seconds")
def benchmarkSingleParquet():
import numpy as np
import pyarrow.dataset as ds
# Load datasets
spectra_dataset = ds.dataset(f"{args.output_file}_spectra.parquet", format="parquet")
chromatogram_dataset = ds.dataset(f"{args.output_file}_chromatograms.parquet", format="parquet")
# Function to collect unique IDs
def get_unique_ids(filtered_dataset):
unique_ids = set()
for batch in filtered_dataset.to_batches(columns=['id']):
ids_in_batch = batch.column('id').to_pylist()
unique_ids.update(ids_in_batch)
return list(unique_ids)
# Get spectrum counts
all_spectra = spectra_dataset.to_table()
ms1_spectra = spectra_dataset.filter(ds.field('ms_level') == 1).to_table()
ms2_spectra = spectra_dataset.filter(ds.field('ms_level') == 2).to_table()
num_spectra = len(all_spectra)
num_ms1_spectra = len(ms1_spectra)
num_ms2_spectra = len(ms2_spectra)
# First Task: Access 100 random spectra
start_time = time.time()
if num_spectra >= 100:
random_indices = np.random.choice(num_spectra, 100, replace=False)
selected_spectra = all_spectra.take(random_indices)
else:
print("Not enough spectra to perform the benchmark.")
end_time = time.time()
print(f"Accessing 100 random spectra: {end_time - start_time} seconds")
# Second Task: Extract 100 random m/z ranges from MS1 spectra
start_time = time.time()
if num_ms1_spectra >= 100:
random_indices = np.random.choice(num_ms1_spectra, 100, replace=True)
selected_spectra = ms1_spectra.take(random_indices)
mz_range_size = 1.0
selected_mz_values = []
for spectrum in selected_spectra:
mz_array = np.array(spectrum['mz_array'].as_py())
intensity_array = np.array(spectrum['intensity_array'].as_py())
if len(mz_array) == 0:
continue
min_mz = np.random.uniform(low=mz_array.min(), high=mz_array.max() - mz_range_size)
max_mz = min_mz + mz_range_size
mz_mask = (mz_array >= min_mz) & (mz_array <= max_mz)
selected_mz_values.append({
'mz': mz_array[mz_mask],
'intensity': intensity_array[mz_mask]
})
else:
print("Not enough MS1 spectra to perform the benchmark.")
end_time = time.time()
print(f"Extracting 100 random m/z ranges from MS1 spectra: {end_time - start_time} seconds")
# Third Task: Extract 100 random m/z ranges from MS2 spectra
start_time = time.time()
if num_ms2_spectra >= 100:
random_indices = np.random.choice(num_ms2_spectra, 100, replace=True)
selected_spectra = ms2_spectra.take(random_indices)
selected_mz_values = []
for spectrum in selected_spectra:
mz_array = np.array(spectrum['mz_array'].as_py())
intensity_array = np.array(spectrum['intensity_array'].as_py())
if len(mz_array) == 0:
continue
min_mz = np.random.uniform(low=mz_array.min(), high=mz_array.max() - mz_range_size)
max_mz = min_mz + mz_range_size
mz_mask = (mz_array >= min_mz) & (mz_array <= max_mz)
selected_mz_values.append({
'mz': mz_array[mz_mask],
'intensity': intensity_array[mz_mask]
})
else:
print("Not enough MS2 spectra to perform the benchmark.")
end_time = time.time()
print(f"Extracting 100 random m/z ranges from MS2 spectra: {end_time - start_time} seconds")