Skip to content

Commit

Permalink
Extend IncreaseGranularity strategy
Browse files Browse the repository at this point in the history
Now supports different granularities at write and read sides
  • Loading branch information
franzpoeschel committed Aug 5, 2024
1 parent fc0d9b0 commit 0bf6493
Showing 1 changed file with 76 additions and 42 deletions.
118 changes: 76 additions & 42 deletions src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,53 +150,87 @@ def assign(self, assignment, *_):
return assignment

class IncreaseGranularity(io.PartialStrategy):

def __init__(self,
granularity,
inner_distribution=io.ByHostname(io.RoundRobin())):
def __init__(
self,
granularity_in,
granularity_out,
inner_distribution=io.ByHostname(io.RoundRobin()),
):
super().__init__()
self.inner_distribution = inner_distribution
self.granularity = granularity
self.granularity_in = granularity_in
self.granularity_out = granularity_out

def assign(self, assignment, in_ranks, out_ranks):
if "in_ranks_inner" in dir(self):
return self.inner_distribution.assign(assignment,
self.in_ranks_inner,
self.out_ranks_inner)

hosts_in_order = []
already_seen = set()
for (_, hostname) in in_ranks.items():
if hostname not in already_seen:
already_seen.add(hostname)
hosts_in_order.append(hostname)
del already_seen
hostname_to_hostgroup = {} # real host -> host group
current_meta_host = 0
granularity_counter = 0
for host in hosts_in_order:
hostname_to_hostgroup[host] = str(current_meta_host)
granularity_counter += 1
if granularity_counter >= self.granularity:
granularity_counter = 0
current_meta_host += 1
in_ranks_inner = {}
for (rank, hostname) in in_ranks.items():
in_ranks_inner[rank] = hostname_to_hostgroup[hostname]
out_ranks_inner = {}
for (rank, hostname) in out_ranks.items():
try:
out_ranks_inner[rank] = hostname_to_hostgroup[hostname]
except KeyError:
out_ranks_inner[rank] = hostname

self.in_ranks_inner = in_ranks_inner
self.out_ranks_inner = out_ranks_inner

return self.inner_distribution.assign(assignment, in_ranks_inner,
out_ranks_inner)


return self.inner_distribution.assign(
assignment, self.in_ranks_inner, self.out_ranks_inner
)

def hosts_in_order(rank_assignment):
already_seen = set()
res = []
for (_, hostname) in rank_assignment.items():
if hostname not in already_seen:
already_seen.add(hostname)
res.append(hostname)
return res

in_hosts_in_order = hosts_in_order(in_ranks)
out_hosts_in_order = hosts_in_order(out_ranks)

def hostname_to_hostgroup(ordered_hosts, granularity):
res = {} # real host -> host group
current_meta_host = 0
granularity_counter = 0
for host in ordered_hosts:
res[host] = str(current_meta_host)
granularity_counter += 1
if granularity_counter >= granularity:
granularity_counter = 0
current_meta_host += 1
return res

in_hostname_to_hostgroup = hostname_to_hostgroup(
in_hosts_in_order, self.granularity_in
)
out_hostname_to_hostgroup = hostname_to_hostgroup(
out_hosts_in_order, self.granularity_out
)

def inner_rank_assignment(outer_assignment, hostname_to_hostgroup):
res = {}
for (rank, hostname) in outer_assignment.items():
res[rank] = hostname_to_hostgroup[hostname]
return res

self.in_ranks_inner = inner_rank_assignment(in_ranks, in_hostname_to_hostgroup)
self.out_ranks_inner = inner_rank_assignment(
out_ranks, out_hostname_to_hostgroup
)

return self.inner_distribution.assign(
assignment, self.in_ranks_inner, self.out_ranks_inner
)


# strategy = IncreaseGranularity(2, 1)
# assignment = [
# io.WrittenChunkInfo([0], [1], 0),
# io.WrittenChunkInfo([1], [1], 1),
# io.WrittenChunkInfo([2], [1], 2),
# io.WrittenChunkInfo([3], [1], 3),
# ]
# in_ranks = {0: "host0", 1: "host1", 2: "host3", 3: "host4"}
# out_ranks = {0: "host2", 1: "host5"}
# res = strategy.assign(assignment, in_ranks, out_ranks)
# print(f"NOT ASSIGNED: {len(res.not_assigned)} chunks")
# print("ASSIGNED:")
# for rank, chunks in res.assigned.items():
# print(f"\tRANK {rank}:", end='')
# for chunk in chunks:
# print(f" [{chunk.offset}-{chunk.extent}]", end='')
# print()

#Example how to implement a simple strategy in Python
class LoadAll(io.Strategy):
Expand Down

0 comments on commit 0bf6493

Please sign in to comment.