-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PMDA with refactored _single_frame
#128
base: master
Are you sure you want to change the base?
Changes from 15 commits
1780ecb
3fe1394
10f7194
64cd7d4
19a65b6
1aa6b29
9ef2bb6
142df9d
0e2295b
6696f9d
e0d3f7c
2dae366
c1e4cc0
ad24667
72181ae
7db243c
00a9b04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -240,8 +240,8 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, | |
metadata=None, padding=2.0, updating=False, | ||
parameters=None, gridcenter=None, xdim=None, ydim=None, | ||
zdim=None): | ||
u = atomgroup.universe | ||
super(DensityAnalysis, self).__init__(u, (atomgroup, )) | ||
universe = atomgroup.universe | ||
super().__init__(universe) | ||
self._atomgroup = atomgroup | ||
self._delta = delta | ||
self._atomselection = atomselection | ||
|
@@ -253,7 +253,7 @@ def __init__(self, atomgroup, delta=1.0, atomselection=None, | |
self._xdim = xdim | ||
self._ydim = ydim | ||
self._zdim = zdim | ||
self._trajectory = u.trajectory | ||
self._trajectory = universe.trajectory | ||
if updating and atomselection is None: | ||
raise ValueError("updating=True requires a atomselection string") | ||
elif not updating and atomselection is not None: | ||
|
@@ -289,20 +289,31 @@ def _prepare(self): | |
grid, edges = np.histogramdd(np.zeros((1, 3)), bins=bins, | ||
range=arange, normed=False) | ||
grid *= 0.0 | ||
self._grid = grid | ||
|
||
self._results = [grid] * self.n_frames | ||
self._edges = edges | ||
self._arange = arange | ||
self._bins = bins | ||
|
||
def _single_frame(self, ts, atomgroups): | ||
coord = self.current_coordinates(atomgroups[0], self._atomselection, | ||
self._updating) | ||
result = np.histogramdd(coord, bins=self._bins, range=self._arange, | ||
normed=False) | ||
return result[0] | ||
def _single_frame(self): | ||
h, _ = np.histogramdd(self._atomgroup.positions, | ||
bins=self._bins, range=self._arange, | ||
normed=False) | ||
# reduce (proposed change #2542 to match the parallel version in pmda.density) | ||
# return self._reduce(self._grid, h) | ||
# | ||
# serial code can simply do | ||
|
||
# the current timestep of the trajectory is self._ts | ||
# self._results[self._frame_index][0] = self._ts.frame | ||
# the actual trajectory is at self._trajectory | ||
# self._results[self._frame_index][1] = self._trajectory.time | ||
self._results[self._frame_index] = h | ||
|
||
def _conclude(self): | ||
self._grid = self._results[:].sum(axis=0) | ||
|
||
# sum both inside and among blocks. | ||
self._grid = self._results[:].sum(axis=(0, 1)) | ||
self._grid /= float(self.n_frames) | ||
metadata = self._metadata if self._metadata is not None else {} | ||
metadata['psf'] = self._atomgroup.universe.filename | ||
|
@@ -322,14 +333,6 @@ def _conclude(self): | |
density.make_density() | ||
self.density = density | ||
|
||
@staticmethod | ||
def _reduce(res, result_single_frame): | ||
""" 'accumulate' action for a time series""" | ||
if isinstance(res, list) and len(res) == 0: | ||
res = result_single_frame | ||
else: | ||
res += result_single_frame | ||
return res | ||
Comment on lines
-325
to
-332
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the design that gets rid of reduce. |
||
|
||
@staticmethod | ||
def current_coordinates(atomgroup, atomselection, updating): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,7 +76,7 @@ def __init__(self, universe, atomgroups): | |
|
||
super(LeafletFinder, self).__init__(universe, (atomgroups,)) | ||
|
||
def _find_connected_components(self, data, cutoff=15.0): | ||
def _find_connected_components(self, data_list, cutoff=15.0): | ||
"""Perform the Connected Components discovery for the atoms in data. | ||
|
||
Parameters | ||
|
@@ -99,62 +99,66 @@ def _find_connected_components(self, data, cutoff=15.0): | |
|
||
""" | ||
# pylint: disable=unsubscriptable-object | ||
window, index = data[0] | ||
num = window[0].shape[0] | ||
i_index = index[0] | ||
j_index = index[1] | ||
graph = nx.Graph() | ||
|
||
if i_index == j_index: | ||
train = window[0] | ||
test = window[1] | ||
else: | ||
train = np.vstack([window[0], window[1]]) | ||
test = np.vstack([window[0], window[1]]) | ||
|
||
tree = cKDTree(train, leafsize=40) | ||
edges = tree.query_ball_point(test, cutoff) | ||
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) | ||
for idx, dest_list in enumerate(edges)] | ||
|
||
edge_list_flat = np.array([list(item) for sublist in edge_list for | ||
item in sublist]) | ||
|
||
if i_index == j_index: | ||
res = edge_list_flat.transpose() | ||
res[0] = res[0] + i_index - 1 | ||
res[1] = res[1] + j_index - 1 | ||
else: | ||
removed_elements = list() | ||
for i in range(edge_list_flat.shape[0]): | ||
if (edge_list_flat[i, 0] >= 0 and | ||
edge_list_flat[i, 0] <= num - 1) and \ | ||
(edge_list_flat[i, 1] >= 0 and | ||
edge_list_flat[i, 1] <= num - 1) or \ | ||
(edge_list_flat[i, 0] >= num and | ||
edge_list_flat[i, 0] <= 2 * num - 1) and \ | ||
(edge_list_flat[i, 1] >= num and | ||
edge_list_flat[i, 1] <= 2 * num - 1) or \ | ||
(edge_list_flat[i, 0] >= num and | ||
edge_list_flat[i, 0] <= 2 * num - 1) and \ | ||
(edge_list_flat[i, 1] >= 0 and | ||
edge_list_flat[i, 1] <= num - 1): | ||
removed_elements.append(i) | ||
res = np.delete(edge_list_flat, removed_elements, | ||
axis=0).transpose() | ||
res[0] = res[0] + i_index - 1 | ||
res[1] = res[1] - num + j_index - 1 | ||
if res.shape[1] == 0: | ||
res = np.zeros((2, 1), dtype=np.int) | ||
|
||
edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] | ||
graph.add_edges_from(edges) | ||
|
||
# partial connected components | ||
|
||
subgraphs = nx.connected_components(graph) | ||
comp = [g for g in subgraphs] | ||
return comp | ||
#raise TypeError(data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I think I packed too much inside this PR. I was intended to discover the possibility to parallel LeafletFinder both among frames and inside single frame. Because for now, it only starts to work on the next frame after all the jobs are done in the current one. So I changed this more coarsed instead of passing hundreds of jobs per frame * hundreds of frames to dask graph. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Problem is twofold (after
u = mda.Universe(GRO_MEMPROT, XTC_MEMPROT)
u.trajectory[4] # last frame
pickle.loads(pickle.dumps(u.trajectory))
EOFError: Trying to seek over max number of frames The major problem is trajectory._xdr.current_frame == 5 (1-based). I might need to add extra fix (and test?) to https://github.com/MDAnalysis/mdanalysis/pull/2723/files, or maybe in an individual PR? since the pickling is handled on their own
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The "last frame" thing is a real issue. Oops! Don't worry about LeafletFinder at the moment, it's not really your job to fix it, and it has lots of issues. (If you need it for your own research and you have an interest in getting it working then that's a bit different but I'd still say, focus on the serialization core problem for now.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just pushed a fix for the "last frame" issue. Not
A solution I can think of is to let There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good analysis of use cases and it would be useful to write this down somewhere. With PMDA so far (except LeafletFinder) we have been focusing on the simple split-apply-combine because that can be put in a simple "framework". Beyond that, it becomes difficult to do a "one size fits all" and it becomes a serious research project in CS. I would be happy if we had a library that allows users to easily write their own split/apply/combine type analysis and where we provide a few additional parallelized analysis that might not fit into this scheme (such as LeafletFinder). An interesting idea that has been coming up repeatedly is to "stack" multiple analysis, i.e., run multiple Finally, run one analysis on multiple universes seems to be a standard pleasingly parallel job that can make use of existing workflow management tools – I don't see what we can do directly to support it. |
||
comp_s = [] | ||
for data in data_list: | ||
window, index = data | ||
num = window[0].shape[0] | ||
i_index = index[0] | ||
j_index = index[1] | ||
graph = nx.Graph() | ||
|
||
if i_index == j_index: | ||
train = window[0] | ||
test = window[1] | ||
else: | ||
train = np.vstack([window[0], window[1]]) | ||
test = np.vstack([window[0], window[1]]) | ||
|
||
tree = cKDTree(train, leafsize=40) | ||
edges = tree.query_ball_point(test, cutoff) | ||
edge_list = [list(zip(np.repeat(idx, len(dest_list)), dest_list)) | ||
for idx, dest_list in enumerate(edges)] | ||
|
||
edge_list_flat = np.array([list(item) for sublist in edge_list for | ||
item in sublist]) | ||
|
||
if i_index == j_index: | ||
res = edge_list_flat.transpose() | ||
res[0] = res[0] + i_index - 1 | ||
res[1] = res[1] + j_index - 1 | ||
else: | ||
removed_elements = list() | ||
for i in range(edge_list_flat.shape[0]): | ||
if (edge_list_flat[i, 0] >= 0 and | ||
edge_list_flat[i, 0] <= num - 1) and \ | ||
(edge_list_flat[i, 1] >= 0 and | ||
edge_list_flat[i, 1] <= num - 1) or \ | ||
(edge_list_flat[i, 0] >= num and | ||
edge_list_flat[i, 0] <= 2 * num - 1) and \ | ||
(edge_list_flat[i, 1] >= num and | ||
edge_list_flat[i, 1] <= 2 * num - 1) or \ | ||
(edge_list_flat[i, 0] >= num and | ||
edge_list_flat[i, 0] <= 2 * num - 1) and \ | ||
(edge_list_flat[i, 1] >= 0 and | ||
edge_list_flat[i, 1] <= num - 1): | ||
removed_elements.append(i) | ||
res = np.delete(edge_list_flat, removed_elements, | ||
axis=0).transpose() | ||
res[0] = res[0] + i_index - 1 | ||
res[1] = res[1] - num + j_index - 1 | ||
if res.shape[1] == 0: | ||
res = np.zeros((2, 1), dtype=np.int) | ||
|
||
edges = [(res[0, k], res[1, k]) for k in range(0, res.shape[1])] | ||
graph.add_edges_from(edges) | ||
|
||
# partial connected components | ||
|
||
subgraphs = nx.connected_components(graph) | ||
comp = [g for g in subgraphs] | ||
comp_s.append(comp) | ||
return comp_s | ||
|
||
# pylint: disable=arguments-differ | ||
def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, | ||
|
@@ -200,12 +204,13 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, | |
# Distribute the data over the available cores, apply the map function | ||
# and execute. | ||
parAtoms = db.from_sequence(arranged_coord, | ||
npartitions=len(arranged_coord)) | ||
npartitions=n_jobs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LeafletFinder is not parallelized over frames... I am not sure that choosing n_jobs is the correct choice here. Need to look at the original paper/algorithm. |
||
parAtomsMap = parAtoms.map_partitions(self._find_connected_components, | ||
cutoff=cutoff) | ||
Components = parAtomsMap.compute(**scheduler_kwargs) | ||
|
||
# Gather the results and start the reduction. TODO: think if it can go | ||
Components = [item for sublist in Components for item in sublist] | ||
# to the private _reduce method of the based class. | ||
result = list(Components) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storing a full histogram for each frame is bad – you can easily run out of memory. I think it is important that the aggregation is done every step and not just in
_conclude
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't that what also happens with
_reduce
? It won't pass the full histogram back to the main process, but only the calculated frames in_dask_helper
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the density reduce
pmda/pmda/density.py
Lines 326 to 332 in 13fa3b5
pmda/pmda/density.py
Lines 305 to 306 in 13fa3b5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, the PMDA paper has a discussion on that topic.