Skip to content

Commit

Permalink
[wip] IncreaseGranularity strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jul 16, 2024
1 parent 5e17686 commit 346c0d3
Showing 1 changed file with 51 additions and 2 deletions.
53 changes: 51 additions & 2 deletions src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,56 @@ def assign(self, assignment, *_):
assignment.assigned[self.rank].append(element)
return assignment

class IncreaseGranularity(io.PartialStrategy):

# Example how to implement a simple strategy in Python
def __init__(self,
granularity,
inner_distribution=io.ByHostname(io.RoundRobin())):
super().__init__()
self.inner_distribution = inner_distribution
self.granularity = granularity

def assign(self, assignment, in_ranks, out_ranks):
if "in_ranks_inner" in dir(self):
return self.inner_distribution.assign(assignment,
self.in_ranks_inner,
self.out_ranks_inner)

hosts_in_order = []
already_seen = set()
for (_, hostname) in in_ranks.items():
if hostname not in already_seen:
already_seen.add(hostname)
hosts_in_order.append(hostname)
del already_seen
hostname_to_hostgroup = {} # real host -> host group
current_meta_host = 0
granularity_counter = 0
for host in hosts_in_order:
hostname_to_hostgroup[host] = str(current_meta_host)
granularity_counter += 1
if granularity_counter >= self.granularity:
granularity_counter = 0
current_meta_host += 1
in_ranks_inner = {}
for (rank, hostname) in in_ranks.items():
in_ranks_inner[rank] = hostname_to_hostgroup[hostname]
out_ranks_inner = {}
for (rank, hostname) in out_ranks.items():
try:
out_ranks_inner[rank] = hostname_to_hostgroup[hostname]
except KeyError:
out_ranks_inner[rank] = hostname

self.in_ranks_inner = in_ranks_inner
self.out_ranks_inner = out_ranks_inner

return self.inner_distribution.assign(assignment, in_ranks_inner,
out_ranks_inner)



#Example how to implement a simple strategy in Python
class LoadAll(io.Strategy):

def __init__(self, rank):
Expand Down Expand Up @@ -168,7 +216,7 @@ def distribution_strategy(dataset_extent,
strategy_identifier=match.group(2))
return io.FromPartialStrategy(io.ByHostname(inside_node), second_phase)
elif strategy_identifier == 'all':
return io.FromPartialStrategy(LoadOne(mpi_rank), LoadAll(mpi_rank))
return io.FromPartialStrategy(IncreaseGranularity(5), LoadAll(mpi_rank))
elif strategy_identifier == 'roundrobin':
return io.RoundRobin()
elif strategy_identifier == 'binpacking':
Expand Down Expand Up @@ -319,6 +367,7 @@ def __copy(self, src, dest, current_path="/data/"):
dest.make_constant(src.get_attribute("value"))
else:
chunk_table = src.available_chunks()
# todo buffer the strategy
strategy = distribution_strategy(shape, self.comm.rank,
self.comm.size)
my_chunks = strategy.assign(chunk_table, self.inranks,
Expand Down

0 comments on commit 346c0d3

Please sign in to comment.