forked from markbarbet/MSI_theory
-
Notifications
You must be signed in to change notification settings - Fork 0
/
postprocessor.py
448 lines (405 loc) · 20.8 KB
/
postprocessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# -*- coding: utf-8 -*-
"""
Execute the nominal and perturbed PAPR-MESS calculations,
extract rate constants and fit them into Chebyshev expressions.
@author: Lei Lei
"""
import Mess_executor as ME
import os, io, sys
import numpy as np
from scipy.optimize import curve_fit
import pandas as pd
class PAPR_MESS:
def __init__(self, input_path, input_name, nom_dict, pert_dict, channels, abstraction=False):
self.input_path = input_path
self.nominal_file = input_name
self.pert_dict = pert_dict
self.nom_dict = nom_dict
self.channels = channels
self.abstraction = abstraction
def Run(self):
"""Generate the perturbed PAPR-MESS inputs, execute them, obtain the temperature-dependent rate constants
for the specified channels."""
model = ME.Mess_Executor(self.input_path, self.nominal_file, self.pert_dict, self.nom_dict, self.abstraction)
# Note: file structure for debugging
# -- nwd: nominal calculation directory, twd/nonimal/
# -- pwd: purturbed calculations directory, twd/perturbation/
# -- mwd: main parent directory, ./
# -- twd: trial directory that contains all new calculations, mwd/PAPR-MESS_calculation/system_name/calculation_#/
# create new directory for new calculation
self.twd = model.new_trial_directory()
# generate PAPR-MESS files and execute them
model.new_calculation(self.channels, self.twd, run_MESS=True)
# logistics
if model.pert_P != model.nom_P:
sys.exit('Error: perturbed and nominal systems have different pressure range.')
self.pertb_ls = model.pertb
self.nwd = model.nwd
self.pwd = model.pwd
self.mwd = model.mwd
self.input_name = model.input_name
self.Punit = model.Punit # Pressure unit
def fit_Cheb_rates(self, n_P, n_T, P_min=0.01, P_max=100, T_min=200, T_max=3000, target_dir=None, same_line_result=False):
"""Fit the rate constants into Chebyshev polynomials."""
# Execute the nominal and perturbed PAPR-MESS files
# self.Run()
self.n_P = n_P # number of perssure degree
self.n_T = n_T # number of temperature degree
self.Cheb_coef = {}
self.pert_ls = {}
self.P_min = P_min
self.P_max = P_max
self.T_min = T_min
self.T_max = T_max
# for debugging: fit rate constants in a give trail directory
if target_dir != None:
self.twd = target_dir
self.nwd = target_dir + "nominal/"
self.pwd = target_dir + "perturbation/"
# read in the temperature-dependent rate constants for each channel of perturbed files
for wd in [self.nwd, self.pwd]:
os.chdir(wd)
if wd == self.nwd:
wpert = self.nom_dict
else:
wpert = self.pert_dict
fhand = io.open('T_rate.csv', 'rb')
lines = fhand.readlines()
fhand.close()
file_flag = False
pres_flag = False
channel_flag = False
final_line = False
rate = {}
pert = {}
coef_dict = {}
# generate a rate constants dictionary for each perturbation
for key in wpert.keys():
rate[key] = {}
pert[key] = {}
coef_dict[key] = {}
T_ls = []
chan_rate = []
self.P_ls = []
for num, line in enumerate(lines):
# for the last line in the file
if num == len(lines) - 1:
final_line = True
T_ls.append(float(line.split(',')[0]))
chan_rate.append(float(line.split(',')[1]))
# for the other lines
line = line.strip()
if line.startswith('=') or final_line:
file_flag = True
# fit Chebyshev formula and write them in files
if len(T_ls) > 2:
T_ls = np.array(T_ls)
chan_rate = np.array(chan_rate)
print(rate)
print(self.key)
if self.channel in rate[self.key].keys():
rate[self.key][self.channel] = np.concatenate((rate[self.key][self.channel], chan_rate))
else:
rate[self.key][self.channel] = np.array(chan_rate)
if any(chan_rate < 0):
sys.exit("Error: Negative rate constants detected in run %s-%s..." %(self.key, self.channel))
self.T_ls = T_ls
T_ls = []
chan_rate = []
continue
if file_flag:
file_flag = False
pres_flag = True
line = line[:-4]
self.system = line.split('_')[0]
self.key = '_'.join(line.split('_')[1:-1])
self.pertb = float(line.split('_')[-1])
pert[self.key] = self.pertb
continue
if pres_flag:
pres_flag = False
channel_flag = True
# modified for abstraction reactions
try:
self.pressure = float(line.split(' ')[0])
Punit = line.split(' ')[1]
except ValueError:
self.pressure = line.split(' ')[0]
Punit = 'atm'
if self.pressure in self.P_ls:
continue
else:
self.P_ls.append(self.pressure)
continue
if channel_flag:
channel_flag = False
self.channel = line.split(',')[1]
continue
if not file_flag:
T_ls.append(float(line.split(',')[0]))
chan_rate.append(float(line.split(',')[1]))
if os.path.exists('Chebyshev_fit.txt'):
fhand = io.open('Chebyshev_fit.txt', 'ab')
else:
fhand = io.open('Chebyshev_fit.txt', 'wb')
for spc in rate.keys():
fhand.write('=' * 30 + '\n')
fhand.write(spc + '\n')
for key in rate[spc].keys():
k = rate[spc][key]
fhand.write(key + '\n')
coef = self.cheby_poly(n_T, n_P, k, self.T_ls, self.P_ls, P_min, P_max, T_min, T_max).reshape((n_P, n_T))
coef_dict[spc][key] = coef
#print(key,coef)
fhand.write('T_min:%s K T_max:%s K P_min:%s %s P_max:%s %s\n'%(T_min, T_max, P_min, Punit, P_max, Punit))
if same_line_result:
fhand.write(str(coef.reshape((1,-1)).tolist()) + '\n')
else:
for P in range(n_P):
fhand.write(str(coef[P,:].tolist()) + '\n')
fhand.close()
# store the fitted coefficients for Chebyshev polynomials
if wd == self.nwd:
self.Cheb_coef['nominal'] = coef_dict
self.pert_ls['nominal'] = pert
else:
self.Cheb_coef['perturbed'] = coef_dict
self.pert_ls['perturbed'] = pert
if target_dir == None:
print("Fitting channel-specific rate conctants into Chebyshev form for system %s ..." %self.input_name.split(".")[0])
def first_cheby_poly(self, x, n):
'''Generate n-th order Chebyshev ploynominals of first kind.'''
if n == 0: return 1
elif n == 1: return x
result = 2. * x * self.first_cheby_poly(x, 1) - self.first_cheby_poly(x, 0)
m = 0
while n - m > 2:
result = 2. * x * result - self.first_cheby_poly(x, m+1)
m += 1
return result
def reduced_T(self, T, T_min, T_max):
'''Calculate the reduced temperature.'''
T_tilde = 2. * T ** (-1) - T_min ** (-1) - T_max ** (-1)
T_tilde /= (T_max ** (-1) - T_min ** (-1))
return T_tilde
def reduced_P(self, P, P_min, P_max):
'''Calculate the reduced pressure.'''
P_tilde = 2. * np.log(P) - np.log(P_min) - np.log(P_max)
P_tilde /= (np.log(P_max) - np.log(P_min))
return P_tilde
def cheby_poly(self, n_T, n_P, k, T_ls, P_ls, P_min=0.01, P_max=100, T_min=200, T_max=3000):
'''Fit the Chebyshev polynominals to rate constants.
Input rate constants vector k should be arranged based on pressure.'''
# modified for abstraction reactions
if P_ls == ['--']:
P_ls = [1.0]
cheb_mat = np.zeros((len(k), n_T * n_P))
for n, P in enumerate(P_ls): # !! assume that at each presssure, we have the same temperateure range
for m, T in enumerate(T_ls):
for i in range(n_T):
T_tilde = self.reduced_T(T, T_min, T_max)
T_cheb = self.first_cheby_poly(T_tilde, i)
for j in range(n_P):
P_tilde = self.reduced_P(P, P_min, P_max)
P_cheb = self.first_cheby_poly(P_tilde, j)
cheb_mat[n*len(T_ls)+m, i*n_P+j] = P_cheb * T_cheb
coef = np.linalg.lstsq(cheb_mat, np.log10(np.array(k)))[0]
return coef
def Cheb_sens_coeff(self, same_line_result=False, aggregated_sens=True, debug=False):
"""Calculate the sensitivity coefficients for the Chebyshev rate constants."""
# initialization
if aggregated_sens:
self.aggregated_sens = pd.DataFrame()
if not debug:
self.aggregated_sens['Pressure (%s)' %self.Punit] = np.repeat(self.P_ls, len(self.T_ls))
self.aggregated_sens['Temperature (K)'] = np.tile(self.T_ls, len(self.P_ls))
self.Cheb_sens = {}
# decide sensitivity coefficients for each perturbation
for key in self.pert_dict.keys():
# write the sensitivity coefficients into file
os.chdir(self.twd)
if os.path.exists('Chebyshev_sens.txt'):
fhand = io.open('Chebyshev_sens.txt', 'ab')
else:
fhand = io.open('Chebyshev_sens.txt', 'wb')
fhand.write('='*30 + '\n')
fhand.write(key + '\n')
Cheb_sens = {}
# determine nominal calculation
nom_key = key[:-1] + '1'
if len(self.Cheb_coef['nominal'].keys()) == 1:
nom_key = self.Cheb_coef['nominal'].keys()[0]
# calculate the channel-specific sensitivity coefficients
for chan in self.Cheb_coef['perturbed'][key].keys():
rate_diff = self.Cheb_coef['perturbed'][key][chan] - self.Cheb_coef['nominal'][nom_key][chan]
if 'Energy' in key:
sens = rate_diff / (self.pert_ls['perturbed'][key] - self.pert_ls['nominal'][nom_key])
print((self.pert_ls['perturbed'][key] - self.pert_ls['nominal'][nom_key]),'DIFFERNENCE')
print(self.pert_ls['perturbed'][key],self.pert_ls['nominal'][nom_key])
if aggregated_sens:
self.aggregated_sens['%s_%s'%(key,chan)] = self.calculate_sensitivity(sens).reshape((1,-1))[0] * 349.758 * np.log(10)
elif 'Power' in key:
#print('LOOKING HERE')
#print(chan,'perturbebd')
#print(self.Cheb_coef['perturbed'][key][chan])
#print(chan,'nominal')
#print(self.Cheb_coef['nominal'][nom_key][chan])
#print(chan,'This is power rate diff')
#print(self.Cheb_coef['perturbed'][key][chan] - self.Cheb_coef['nominal'][nom_key][chan])
#print('This is power denominator')
#print(self.pert_ls['perturbed'][key], self.pert_ls['nominal'][nom_key], self.pert_ls['perturbed'][key] - self.pert_ls['nominal'][nom_key])
sens = rate_diff / (self.pert_ls['perturbed'][key] - self.pert_ls['nominal'][nom_key])
if aggregated_sens:
self.aggregated_sens['%s_%s'%(key,chan)] = self.calculate_sensitivity(sens).reshape((1,-1))[0] * np.log(10)
else:
sens = rate_diff / np.log((1. + self.pert_ls['perturbed'][key]) / (1. + self.pert_ls['nominal'][nom_key]))
if aggregated_sens:
self.aggregated_sens['%s_%s'%(key,chan)] = self.calculate_sensitivity(sens).reshape((1,-1))[0] * np.log(10)
Cheb_sens[chan] = sens
# write into output file
fhand.write(str(chan) + '\n')
if same_line_result:
fhand.write(str(sens.reshape((1,-1)).tolist()) + '\n')
else:
sens = sens.reshape(self.n_P, self.n_T)
for P in range(self.n_P):
fhand.write(str(sens[P,:].tolist()) + '\n')
self.Cheb_sens[key] = Cheb_sens
fhand.close()
if aggregated_sens:
# write aggregated sensitivity into file
self.aggregated_sens.to_csv("Aggregated_sens.csv", index=False)
if not debug:
os.chdir(self.mwd)
print("Calculating channel-specific sensitivity coefficients for Chebyshev polynomials for system %s ..." %self.input_name.split('.')[0])
def fit_Arr_perturbed_rates(self, target_dir=None):
"""Fit rate constants into Arrhenius formula."""
self.Arr_coef = {}
# read in the temperature-dependent rate constants for each channel of perturbed files
for wd in [self.nwd, self.pwd]:
os.chdir(wd)
fhand = io.open('T_rate.csv', 'rb')
lines = fhand.readlines()
fhand.close()
file_flag = False
pres_flag = False
channel_flag = False
final_line = False
temp = []
rate = []
for line in lines:
if line == lines[-1]:
final_line = True
temp.append(float(line.split(',')[0]))
rate.append(float(line.split(',')[1]))
line = line.strip()
if line.startswith('=') or final_line:
file_flag = True
# fit the three-parameter Arrhenius formula and write them in files
if len(temp) > 2:
temp = np.array(temp)
rate = np.array(rate)
if any(rate < 0):
sys.exit("Error: Negative rate constants detected in run %s..." %(self.channel))
fit = self.log_three_para_Arr_fit(temp, rate)
# determine nominal or perturbation
if wd == self.nwd:
self.Arr_coef[self.channel] = {}
self.Arr_coef[self.channel][self.pressure] = {}
self.Arr_coef[self.channel][self.pressure]['nominal_%s' %(self.pertb)] = fit
else:
self.Arr_coef[self.channel][self.pressure]['pert_%s' %(self.pertb)] = fit
# reset
temp = []
rate = []
if os.path.exists('Arrhenius_fit.txt'):
fhand = io.open('Arrhenius_fit.txt', 'ab')
else:
fhand = io.open('Arrhenius_fit.txt', 'wb')
fhand.write('=' * 30 + '\n')
fhand.write(self.system + '\n')
fhand.write(self.pertb + '\n')
fhand.write(self.pressure + '\n')
fhand.write(self.channel + ' (' + ', '.join([str(_) for _ in fit]) + ') \n')
fhand.close()
continue
if file_flag:
file_flag = False
pres_flag = True
line = line[:-4]
self.system = line.split('_')[0]
self.pertb = '_'.join(line.split('_')[1:])
continue
if pres_flag:
pres_flag = False
channel_flag = True
self.pressure = line
continue
if channel_flag:
channel_flag = False
self.channel = line.split(',')[1]
continue
if not file_flag:
temp.append(float(line.split(',')[0]))
rate.append(float(line.split(',')[1]))
if target_dir == None:
print("Fitting channel-specific rate conctants into Arrhenius form for system %s ..." %self.input_name.split(".")[0])
def log_three_para_Arr_fit(self, Temp, rate, ini_guess = (1,1,1), max_initeration = 1000000):
'''Fit three-parameter Arrhenius rate coefficient expression.'''
rate = np.log(rate)
func = lambda T, A, n, Ea: np.log(A) + n * np.log(T) - Ea / T
fit = curve_fit(func, Temp, rate, p0 = ini_guess, maxfev = max_initeration, ftol = 1E-11, xtol = 1E-11, gtol = 1E-11)
return fit[0]
def Arr_sens_coeff(self):
"""Calculate sensitivity coefficients for Arrhenius fittings."""
# print(self.Arr_coef)
# decide sensitivity coefficients for each perturbation
for key in self.Arr_coef.keys():
# write the sensitivity coefficients into file
os.chdir(self.twd)
if os.path.exists('Arrhenius_sens.txt'):
fhand = io.open('Arrhenius_sens.txt', 'ab')
else:
fhand = io.open('Arrhenius_sens.txt', 'wb')
fhand.write('='*30 + '\n')
fhand.write(key + '\n')
Arr_sens = {}
# determine nominal calculation
for channel in self.Arr_coef.keys():
for pressure in self.Arr_coef[channel].keys():
# get nominla fittings
for key in self.Arr_coef[channel][pressure].keys():
if 'nominal' in key:
nom_pert = float(key.split('_')[-1])
nom_coef = self.Arr_coef[channel][pressure][key]
break
for key in self.Arr_coef[channel][pressure].keys():
if 'nominal' in key:
continue
else:
pert = float(key.split('_')[-1])
rate_diff = self.Arr_coef[channel][pressure][key] - nom_coef
if 'Energy' in key:
sens = rate_diff / (pert - nom_pert)
else:
sens = rate_diff / np.log((1. + pert) / (1. + nom_pert))
# write into output file
fhand.write('Pressure: ' + str(pressure) + ', ' + str(key) + '\n')
fhand.write(str(sens) + '\n')
fhand.close()
os.chdir(self.mwd)
print("Calculating channel-specific sensitivity coefficients for Arrhenius fittings for system %s ..." %self.input_name.split('.')[0])
def calculate_sensitivity(self, sens_coef):
'''Calculate the aggregated sensiticity as a function of temperature and pressure.'''
cheb_mat = np.zeros((len(self.P_ls) * len(self.T_ls), self.n_T * self.n_P))
for n, P in enumerate(self.P_ls): # !! assume that at each presssure, we have the same temperateure range
for m, T in enumerate(self.T_ls):
for i in range(self.n_T):
T_tilde = self.reduced_T(T, self.T_min, self.T_max)
T_cheb = self.first_cheby_poly(T_tilde, i)
for j in range(self.n_P):
P_tilde = self.reduced_P(P, self.P_min, self.P_max)
P_cheb = self.first_cheby_poly(P_tilde, j)
cheb_mat[n*len(self.T_ls)+m, i*self.n_P+j] = P_cheb * T_cheb
sens = np.dot(cheb_mat, sens_coef.reshape((-1,1)))
return sens