Skip to content

Commit

Permalink
complexity: improve complexity estimation for scatter-gathered CWL wo…
Browse files Browse the repository at this point in the history
…rkflows

closes reanahub#375
  • Loading branch information
audrium authored and tiborsimko committed Aug 16, 2021
1 parent 6c66eb7 commit 5c718b8
Showing 1 changed file with 37 additions and 17 deletions.
54 changes: 37 additions & 17 deletions reana_server/complexity.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,23 @@ def _parse_steps(self, workflow):
"""Parse CWL workflow specification tree."""
tree = {}
steps = workflow.get("steps", [])
wid = workflow.get("id")
for step in steps:
name = step.get("id")
run = step.get("run")
hints = step.get("hints", [{}]).pop()
# Parse scatter params
scatter = step.get("scatter")
scatter_params = None
if scatter:
scatter_params = next(
filter(lambda p: p["id"] == scatter, step.get("in", [])), {},
).get("source")
scatter_params = (
next(filter(lambda p: p["id"] == scatter, step.get("in", [])), {},)
.get("source")
.split("/")
.pop()
)
# Parse nested workflows
workflow_file = step.get("run")
nested_workflow = workflow_file if isinstance(workflow_file, str) else None
nested_workflow = self._parse_steps(run) if isinstance(run, dict) else None
# Parse initial complexity
jobs = self._get_number_of_jobs(hints)
memory_limit = self._get_memory_limit(hints)
Expand All @@ -284,7 +288,7 @@ def _parse_steps(self, workflow):
for param in params:
# Extract dependencies from param (e.g '#main/gendata/data')
if param:
dependencies.update(param.split("/")[1:-1])
dependencies.update(param.replace(wid + "/", "").split("/")[0:-1])

tree[name] = {
"complexity": complexity,
Expand All @@ -309,25 +313,41 @@ def _populate_dependencies(self, steps):
"""Populate dependencies to parsed CWL workflow tree steps."""
for step, value in steps.items():
nested_workflow = value.get("workflow")
if nested_workflow:
if nested_workflow and isinstance(nested_workflow, str):
for nested_step, nested_value in steps.items():
if nested_workflow in nested_step:
nested_value["dependencies"] += value["dependencies"]
return steps

def _populate_complexity(self, steps):
"""Populate complexity to parsed CWL workflow tree steps."""
for step, value in steps.items():
scatter_params = value.get("scatter_params")
if scatter_params:
param_len = len(self.input_params.get(scatter_params, []))
if not param_len:
continue

def _parse_steps(steps):
steps = steps.copy()
for step, value in steps.items():
scatter_params = value.get("scatter_params")
nested_workflow = value.get("workflow")
complexity = value.get("complexity")
value["complexity"] = [
(item[0] * param_len, item[1]) for item in complexity
]
return steps

# Handle nested stages
if nested_workflow and isinstance(nested_workflow, dict):
parsed_steps = _parse_steps(nested_workflow)
value["workflow"] = parsed_steps
if parsed_steps:
parsed_steps = self._filter_initial_steps(parsed_steps, "init")
complexity = self._calculate_complexity(parsed_steps)

# Handle scatter parameters
if scatter_params:
param_len = len(self.input_params.get(scatter_params, []))
if not param_len:
continue
complexity = [(item[0] * param_len, item[1]) for item in complexity]

value["complexity"] = complexity
return steps

return _parse_steps(steps)

def _filter_initial_steps(self, steps, initial_step):
"""Filter out initial CWL workflow tree steps."""
Expand Down

0 comments on commit 5c718b8

Please sign in to comment.