Skip to content

Commit

Permalink
#501: more fixes, add do_stealing to config validator
Browse files Browse the repository at this point in the history
  • Loading branch information
cwschilly committed Feb 23, 2024
1 parent df0446a commit d4219df
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
2 changes: 1 addition & 1 deletion config/work-stealing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ algorithm:
parameters:
discretion_interval: 0.010
steal_time: 0.2
num_experiments: 20
num_experiments: 10

# Specify output
output_dir: ../output
Expand Down
66 changes: 40 additions & 26 deletions src/lbaf/Execution/lbsWorkStealingAlgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@

class StealRequest:
def __init__(self, r_snd: Rank, r_rcv: Rank):
"""Creates a steal request where r_snd steals a cluster from r_rcv"""
"""Creates a steal request in which r_snd attempts to steals a cluster from r_rcv."""
self.__r_snd = r_snd
self.__r_rcv = r_rcv

def get_requesting_rank(self):
"""Returns the rank that requested a steal."""
return self.__r_snd

def get_target_rank(self):
"""Returns the rank targeted by the steal."""
return self.__r_rcv


class RankWorker:
def __init__(self, env, rank_id, algorithm, lgr: Logger):
"""Class that handles all transfers, steals, and executions of tasks on a rank."""
"""Class that handles all steals and executions of tasks on a rank."""
# Set up simpy environment
self.env = env
self.action = env.process(self.run())
Expand All @@ -50,7 +52,8 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger):

def run(self):
"""Defines the process that will run within the simpy environment."""
while self.algorithm.any_ranks_have_work() if self.algorithm.do_stealing else self.__has_work():
# Continue if the rank has clusters left. If stealing is on, also continue if any other ranks have stealable clusters
while self.__continue_condition():

# Check if rank is currently executing a cluster
if self.current_cluster is not None:
Expand All @@ -60,13 +63,14 @@ def run(self):
self.env.process(self.__simulate_task(task))

# After each task, check if there is a steal request (to prevent hang ups)
self.__check_for_steal_requests()
if self.algorithm.do_stealing:
self.__check_for_steal_requests()

# Once all tasks are executed, reset the current cluster
self.current_cluster = None

# Otherwise, check if there is any other work
elif self.__has_work():
# Check if there is anything in queue
elif self.algorithm.has_work_in_deque(self.rank, including_steals=True):

# Pop next object in queue
item = self.algorithm.rank_queues[self.rank_id].popleft()
Expand All @@ -82,10 +86,10 @@ def run(self):

# Catch any errors
else:
self.__logger.info(f"Received some other datatype: {type(item)}")
self.__logger.error(f"Received some other datatype: {type(item)}")

# If no work is available, try to request a steal from a random rank
elif not self.pending_steal_request:
elif self.algorithm.do_stealing and not self.pending_steal_request:
target_rank_id = random.randrange(0, self.algorithm.num_ranks)
requesting_rank = self.rank
target_rank = self.algorithm.ranks[target_rank_id]
Expand All @@ -94,15 +98,21 @@ def run(self):
# Place steal request in target's queue
self.algorithm.rank_queues[target_rank_id].appendleft(steal_request)
self.pending_steal_request = True
print(f"Rank {self.rank_id} wants to steal from {target_rank_id}")
self.__logger.info(f" Rank {self.rank_id} wants to steal from Rank {target_rank_id}")
yield self.env.timeout(self.algorithm.steal_time) # double counting steal time here (line 81)

else:
# this rank is awaiting the fulfillment of a steal request
# and can not proceed until it gets a response
pass

# print(self.rank_id)
def __continue_condition(self):
"""Continue if the rank has clusters in its queue. If stealing is on, also continue if any other ranks have stealable clusters."""
if self.algorithm.do_stealing:
condition = self.__has_work() or self.algorithm.any_ranks_have_stealable_work()
else:
condition = self.__has_work()
return condition

def __get_total_work(self):
"""Returns the total work on the rank."""
Expand All @@ -114,21 +124,20 @@ def __get_total_work(self):
return total_work

def __has_work(self):
"""Returns True if the rank has a cluster in its queue."""
return self.algorithm.has_work(self.rank)
"""Returns True if the rank has a cluster either in its queue or in its current_cluster member."""
return self.algorithm.has_work_in_deque(self.rank) or self.current_cluster is not None

def __check_for_steal_requests(self):
"""Checks next item in queue; if it's a steal request, responds accordingly."""
rank_queue = self.algorithm.rank_queues[self.rank_id]
if len(rank_queue) > 0 and isinstance(rank_queue[0], StealRequest):
print(f"Rank {self.rank_id} found a steal request while working through cluster")
steal_request = rank_queue.popleft()
self.algorithm.respond_to_steal_request(steal_request)
yield self.env.timeout((self.algorithm.steal_time))
else:
pass

def __simulate_task(self, task: Object):
"""Simulates the execution of a task"""
"""Simulates the execution of a task."""
self.algorithm.increment_task_count()
num_steal_reqs = []
queue_sizes = []
Expand All @@ -145,7 +154,10 @@ def __simulate_task(self, task: Object):


class WorkStealingAlgorithm(AlgorithmBase):
"""A class for simulating work stealing for memory-constrained problems."""
"""
A class for simulating work stealing for memory-constrained problems.
Inherits from AlgorithmBase in order to fit in with LBAF's runtime, but many parameters go unused.
"""
def __init__(
self,
work_model,
Expand All @@ -166,6 +178,7 @@ def __init__(
# Initialize the discretion interval
self.__discretion_interval = parameters.get("discretion_interval")

Check warning on line 179 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `WorkStealingAlgorithm.__discretion_interval` (unused-private-member)
self.steal_time = parameters.get("steal_time", 0.1)
self.do_stealing = parameters.get("do_stealing", True)

# Initialize logger
self.__logger = lgr
Expand All @@ -186,8 +199,6 @@ def __init__(
self.__num_experiments = parameters.get("num_experiments", 10)
self.__experiment_times = []

Check warning on line 200 in src/lbaf/Execution/lbsWorkStealingAlgorithm.py

View workflow job for this annotation

GitHub Actions / code-quality (ubuntu-latest, 3.8)

Unused private member `WorkStealingAlgorithm.__experiment_times` (unused-private-member)

# Initialize do_stealing
self.do_stealing = parameters.get("do_stealing", True)

def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict:
"""Cluster migratable objects by shared block ID when available."""
Expand Down Expand Up @@ -233,8 +244,8 @@ def has_stealable_cluster(self, rank):
stealable = False
rank_queue = self.rank_queues[rank.get_id()]

# Make sure rank has at least two clusters in its queue (requiring two clusters prevents passing the last cluster around forever)
if self.has_work(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1:
# Make sure rank has at least two clusters in its queue (this prevents passing the last cluster around forever)
if self.has_work_in_deque(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1:
stealable = True

return stealable
Expand Down Expand Up @@ -273,13 +284,16 @@ def get_total_task_count(self):
"""Returns the total number of tasks that need to be simualted."""
return self.__total_task_count

def has_work(self, rank):
"""Determines if a given rank has an object or cluster in its deque."""
return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()])
def has_work_in_deque(self, rank, including_steals=False):
"""Determines if a given rank's deque has a cluster (or StealRequest, if including_steals is True)."""
if including_steals:
return any(isinstance(item, (list, StealRequest)) for item in self.rank_queues[rank.get_id()])
else:
return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()])

def any_ranks_have_work(self):
"""Determines if any rank has an object, cluster, or StealRequest in its deque."""
return any(self.has_work(r) for r in self.ranks.values())
def any_ranks_have_stealable_work(self):
"""Determines if any rank has a cluster in its deque that can be stolen."""
return any(self.has_stealable_cluster(r) for r in self.ranks.values())

def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max):
"""Performs the simulation and returns the average time to complete all tasks."""
Expand Down
3 changes: 3 additions & 0 deletions src/lbaf/IO/lbsConfigurationValidator.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def __init__(self, config_to_validate: dict, logger: Logger):
{"name": "WorkStealing",
"parameters": {
"discretion_interval": float,
Optional("do_stealing"): And(
bool,
error="Should be of type 'bool'"),
Optional("steal_time"): And(
float,
lambda x: x>=0.0,
Expand Down

0 comments on commit d4219df

Please sign in to comment.