From d4219dfeecb44eed2214e1320867ada696f5ec90 Mon Sep 17 00:00:00 2001 From: Caleb Schilly Date: Fri, 23 Feb 2024 16:17:37 -0500 Subject: [PATCH] #501: more fixes, add do_stealing to config validator --- config/work-stealing.yaml | 2 +- .../Execution/lbsWorkStealingAlgorithm.py | 66 +++++++++++-------- src/lbaf/IO/lbsConfigurationValidator.py | 3 + 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/config/work-stealing.yaml b/config/work-stealing.yaml index 5f0c757f5..b1c5d10d2 100644 --- a/config/work-stealing.yaml +++ b/config/work-stealing.yaml @@ -22,7 +22,7 @@ algorithm: parameters: discretion_interval: 0.010 steal_time: 0.2 - num_experiments: 20 + num_experiments: 10 # Specify output output_dir: ../output diff --git a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py index 55d572e45..e51f9e959 100644 --- a/src/lbaf/Execution/lbsWorkStealingAlgorithm.py +++ b/src/lbaf/Execution/lbsWorkStealingAlgorithm.py @@ -15,20 +15,22 @@ class StealRequest: def __init__(self, r_snd: Rank, r_rcv: Rank): - """Creates a steal request where r_snd steals a cluster from r_rcv""" + """Creates a steal request in which r_snd attempts to steals a cluster from r_rcv.""" self.__r_snd = r_snd self.__r_rcv = r_rcv def get_requesting_rank(self): + """Returns the rank that requested a steal.""" return self.__r_snd def get_target_rank(self): + """Returns the rank targeted by the steal.""" return self.__r_rcv class RankWorker: def __init__(self, env, rank_id, algorithm, lgr: Logger): - """Class that handles all transfers, steals, and executions of tasks on a rank.""" + """Class that handles all steals and executions of tasks on a rank.""" # Set up simpy environment self.env = env self.action = env.process(self.run()) @@ -50,7 +52,8 @@ def __init__(self, env, rank_id, algorithm, lgr: Logger): def run(self): """Defines the process that will run within the simpy environment.""" - while self.algorithm.any_ranks_have_work() if self.algorithm.do_stealing else self.__has_work(): + # Continue if the rank has clusters left. If stealing is on, also continue if any other ranks have stealable clusters + while self.__continue_condition(): # Check if rank is currently executing a cluster if self.current_cluster is not None: @@ -60,13 +63,14 @@ def run(self): self.env.process(self.__simulate_task(task)) # After each task, check if there is a steal request (to prevent hang ups) - self.__check_for_steal_requests() + if self.algorithm.do_stealing: + self.__check_for_steal_requests() # Once all tasks are executed, reset the current cluster self.current_cluster = None - # Otherwise, check if there is any other work - elif self.__has_work(): + # Check if there is anything in queue + elif self.algorithm.has_work_in_deque(self.rank, including_steals=True): # Pop next object in queue item = self.algorithm.rank_queues[self.rank_id].popleft() @@ -82,10 +86,10 @@ def run(self): # Catch any errors else: - self.__logger.info(f"Received some other datatype: {type(item)}") + self.__logger.error(f"Received some other datatype: {type(item)}") # If no work is available, try to request a steal from a random rank - elif not self.pending_steal_request: + elif self.algorithm.do_stealing and not self.pending_steal_request: target_rank_id = random.randrange(0, self.algorithm.num_ranks) requesting_rank = self.rank target_rank = self.algorithm.ranks[target_rank_id] @@ -94,7 +98,7 @@ def run(self): # Place steal request in target's queue self.algorithm.rank_queues[target_rank_id].appendleft(steal_request) self.pending_steal_request = True - print(f"Rank {self.rank_id} wants to steal from {target_rank_id}") + self.__logger.info(f" Rank {self.rank_id} wants to steal from Rank {target_rank_id}") yield self.env.timeout(self.algorithm.steal_time) # double counting steal time here (line 81) else: @@ -102,7 +106,13 @@ def run(self): # and can not proceed until it gets a response pass - # print(self.rank_id) + def __continue_condition(self): + """Continue if the rank has clusters in its queue. If stealing is on, also continue if any other ranks have stealable clusters.""" + if self.algorithm.do_stealing: + condition = self.__has_work() or self.algorithm.any_ranks_have_stealable_work() + else: + condition = self.__has_work() + return condition def __get_total_work(self): """Returns the total work on the rank.""" @@ -114,21 +124,20 @@ def __get_total_work(self): return total_work def __has_work(self): - """Returns True if the rank has a cluster in its queue.""" - return self.algorithm.has_work(self.rank) + """Returns True if the rank has a cluster either in its queue or in its current_cluster member.""" + return self.algorithm.has_work_in_deque(self.rank) or self.current_cluster is not None def __check_for_steal_requests(self): """Checks next item in queue; if it's a steal request, responds accordingly.""" rank_queue = self.algorithm.rank_queues[self.rank_id] if len(rank_queue) > 0 and isinstance(rank_queue[0], StealRequest): + print(f"Rank {self.rank_id} found a steal request while working through cluster") steal_request = rank_queue.popleft() self.algorithm.respond_to_steal_request(steal_request) yield self.env.timeout((self.algorithm.steal_time)) - else: - pass def __simulate_task(self, task: Object): - """Simulates the execution of a task""" + """Simulates the execution of a task.""" self.algorithm.increment_task_count() num_steal_reqs = [] queue_sizes = [] @@ -145,7 +154,10 @@ def __simulate_task(self, task: Object): class WorkStealingAlgorithm(AlgorithmBase): - """A class for simulating work stealing for memory-constrained problems.""" + """ + A class for simulating work stealing for memory-constrained problems. + Inherits from AlgorithmBase in order to fit in with LBAF's runtime, but many parameters go unused. + """ def __init__( self, work_model, @@ -166,6 +178,7 @@ def __init__( # Initialize the discretion interval self.__discretion_interval = parameters.get("discretion_interval") self.steal_time = parameters.get("steal_time", 0.1) + self.do_stealing = parameters.get("do_stealing", True) # Initialize logger self.__logger = lgr @@ -186,8 +199,6 @@ def __init__( self.__num_experiments = parameters.get("num_experiments", 10) self.__experiment_times = [] - # Initialize do_stealing - self.do_stealing = parameters.get("do_stealing", True) def __build_rank_clusters(self, rank: Rank, with_nullset) -> dict: """Cluster migratable objects by shared block ID when available.""" @@ -233,8 +244,8 @@ def has_stealable_cluster(self, rank): stealable = False rank_queue = self.rank_queues[rank.get_id()] - # Make sure rank has at least two clusters in its queue (requiring two clusters prevents passing the last cluster around forever) - if self.has_work(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1: + # Make sure rank has at least two clusters in its queue (this prevents passing the last cluster around forever) + if self.has_work_in_deque(rank) and isinstance(rank_queue[-1], list) and sum(isinstance(elm, list) for elm in rank_queue) > 1: stealable = True return stealable @@ -273,13 +284,16 @@ def get_total_task_count(self): """Returns the total number of tasks that need to be simualted.""" return self.__total_task_count - def has_work(self, rank): - """Determines if a given rank has an object or cluster in its deque.""" - return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()]) + def has_work_in_deque(self, rank, including_steals=False): + """Determines if a given rank's deque has a cluster (or StealRequest, if including_steals is True).""" + if including_steals: + return any(isinstance(item, (list, StealRequest)) for item in self.rank_queues[rank.get_id()]) + else: + return any(isinstance(item, list) for item in self.rank_queues[rank.get_id()]) - def any_ranks_have_work(self): - """Determines if any rank has an object, cluster, or StealRequest in its deque.""" - return any(self.has_work(r) for r in self.ranks.values()) + def any_ranks_have_stealable_work(self): + """Determines if any rank has a cluster in its deque that can be stolen.""" + return any(self.has_stealable_cluster(r) for r in self.ranks.values()) def execute(self, p_id: int, phases: list, distributions: dict, statistics: dict, a_min_max): """Performs the simulation and returns the average time to complete all tasks.""" diff --git a/src/lbaf/IO/lbsConfigurationValidator.py b/src/lbaf/IO/lbsConfigurationValidator.py index f998b09d6..376120f6e 100644 --- a/src/lbaf/IO/lbsConfigurationValidator.py +++ b/src/lbaf/IO/lbsConfigurationValidator.py @@ -184,6 +184,9 @@ def __init__(self, config_to_validate: dict, logger: Logger): {"name": "WorkStealing", "parameters": { "discretion_interval": float, + Optional("do_stealing"): And( + bool, + error="Should be of type 'bool'"), Optional("steal_time"): And( float, lambda x: x>=0.0,