Skip to content

Commit

Permalink
#501: execute stealrequests in FIFO order
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 26, 2024
1 parent 1c211a6 commit 32c404b
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):
self.rank_id = rank_id
self.rank = self.algorithm.ranks[self.rank_id]
self.pending_steal_request = False
self.other_ranks = [r_id for r_id in self.algorithm.ranks.keys() if r_id != self.rank_id]


# Initialize current cluster (if a rank is currently working through a cluster)
self.current_cluster = None
Expand Down Expand Up @@ -90,14 +92,16 @@ def run(self):

# If no work is available, try to request a steal from a random rank
elif self.algorithm.do_stealing and not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
target_rank_id = random.choice(self.other_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]

if self.algorithm.has_stealable_cluster(target_rank):
self.algorithm.iterate_attempted_steals()
steal_request = StealRequest(requesting_rank, target_rank)
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
idx = self.algorithm.get_index_of_first_non_steal_request(target_rank)
self.algorithm.rank_queues[target_rank_id].insert(idx, steal_request)
self.pending_steal_request = True
yield self.env.timeout(self.algorithm.steal_time) # TODO: are we double counting steal time here (see line 81)

Check warning on line 106 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

TODO: are we double counting steal time here (see line 81) (fixme)

Expand Down Expand Up @@ -233,6 +237,14 @@ def __reset(self):
self.num_ranks = len(self.ranks)
self.__initialize_rank_queues()

def get_index_of_first_non_steal_request(self, rank):
rank_queue = self.rank_queues[rank.get_id()]
for i in range(len(rank_queue)):
if isinstance(rank_queue[i], StealRequest):
continue
else:
return i

def iterate_attempted_steals(self):
"""Increases number of attempted steals by one."""
self.__attempted_steal_count += 1
Expand All @@ -242,7 +254,7 @@ def has_stealable_cluster(self, rank):
stealable = False
rank_queue = self.rank_queues[rank.get_id()]

# Make sure rank has at least two clusters in its queue
# Make sure rank has at a cluster at the back of its queue
if self.has_work_in_deque(rank) and isinstance(rank_queue[-1], list):
stealable = True

Expand Down

0 comments on commit 32c404b

Please sign in to comment.