From 07ea0ed12f936b493d4a87ff418491cc2ea952dd Mon Sep 17 00:00:00 2001 From: Jake Callahan Date: Tue, 14 Nov 2023 10:11:58 -0500 Subject: [PATCH] Add SharedResource class to robottelo.utils (#12341) This class will be used to manage shared resources between multitple processes or threads. The most common case will likely be a shared upgrade satellite between multiple xdist workers. I also handled an issues I encountered with the config_helpers script. (cherry picked from commit 2d3cd63d6b34b9d2449c25c5404e03f6d5d0447a) --- robottelo/utils/shared_resource.py | 199 ++++++++++++++++++++++++ scripts/config_helpers.py | 5 +- tests/robottelo/test_shared_resource.py | 61 ++++++++ 3 files changed, 263 insertions(+), 2 deletions(-) create mode 100644 robottelo/utils/shared_resource.py create mode 100644 tests/robottelo/test_shared_resource.py diff --git a/robottelo/utils/shared_resource.py b/robottelo/utils/shared_resource.py new file mode 100644 index 00000000000..0ad0bd92e46 --- /dev/null +++ b/robottelo/utils/shared_resource.py @@ -0,0 +1,199 @@ +"""Allow multiple processes to communicate status on a single shared resource. + +This is useful for cases where multiple processes need to wait for all other processes to be ready +before continuing with some common action. The most common use case in this framework will likely +be to wait for all pre-upgrade setups to be ready before performing the upgrade. + +The system works by creating a file in /tmp with the name of the resource. This is a common file +where each process can communicate its status. The first process to register will be the main +watcher. The main watcher will wait for all other processes to be ready, then perform the action. +If the main actor fails to complete the action, and the action is recoverable, another process +will take over as the main watcher and attempt to perform the action. If the action is not +recoverable, the main watcher will fail and release all other processes. + +It is recommended to use this class as a context manager, as it will automatically register and +report when the process is done. + +Example: + >>> with SharedResource("target_sat.hostname", upgrade_action, **upgrade_kwargs) as resource: + ... # Do pre-upgrade setup steps + ... resource.ready() # tell the other processes that we are ready + ... yield target_sat # give the upgraded satellite to the test + ... # Do post-upgrade cleanup steps if any +""" +import json +from pathlib import Path +import time +from uuid import uuid4 + +from broker.helpers import FileLock + + +class SharedResource: + """A class representing a shared resource. + + Attributes: + action (function): The function to be executed when the resource is ready. + action_args (tuple): The arguments to be passed to the action function. + action_kwargs (dict): The keyword arguments to be passed to the action function. + action_is_recoverable (bool): Whether the action is recoverable or not. + id (str): The unique identifier of the shared resource. + resource_file (Path): The path to the file representing the shared resource. + is_main (bool): Whether the current instance is the main watcher or not. + is_recovering (bool): Whether the current instance is recovering from an error or not. + """ + + def __init__(self, resource_name, action, *action_args, **action_kwargs): + """Initializes a new instance of the SharedResource class. + + Args: + resource_name (str): The name of the shared resource. + action (function): The function to be executed when the resource is ready. + action_args (tuple): The arguments to be passed to the action function. + action_kwargs (dict): The keyword arguments to be passed to the action function. + """ + self.resource_file = Path(f"/tmp/{resource_name}.shared") + self.lock_file = FileLock(self.resource_file) + self.id = str(uuid4().fields[-1]) + self.action = action + self.action_is_recoverable = action_kwargs.pop("action_is_recoverable", False) + self.action_args = action_args + self.action_kwargs = action_kwargs + self.is_recovering = False + + def _update_status(self, status): + """Updates the status of the shared resource. + + Args: + status (str): The new status of the shared resource. + """ + with self.lock_file: + curr_data = json.loads(self.resource_file.read_text()) + curr_data["statuses"][self.id] = status + self.resource_file.write_text(json.dumps(curr_data, indent=4)) + + def _update_main_status(self, status): + """Updates the main status of the shared resource. + + Args: + status (str): The new main status of the shared resource. + """ + with self.lock_file: + curr_data = json.loads(self.resource_file.read_text()) + curr_data["main_status"] = status + self.resource_file.write_text(json.dumps(curr_data, indent=4)) + + def _check_all_status(self, status): + """Checks if all watchers have the specified status. + + Args: + status (str): The status to check for. + + Returns: + bool: True if all watchers have the specified status, False otherwise. + """ + with self.lock_file: + curr_data = json.loads(self.resource_file.read_text()) + for watcher_id in curr_data["watchers"]: + if curr_data["statuses"].get(watcher_id) != status: + return False + return True + + def _wait_for_status(self, status): + """Waits until all watchers have the specified status. + + Args: + status (str): The status to wait for. + """ + while not self._check_all_status(status): + time.sleep(1) + + def _wait_for_main_watcher(self): + """Waits for the main watcher to finish.""" + while True: + curr_data = json.loads(self.resource_file.read_text()) + if curr_data["main_status"] != "done": + time.sleep(60) + elif curr_data["main_status"] == "action_error": + self._try_take_over() + elif curr_data["main_status"] == "error": + raise Exception(f"Error in main watcher: {curr_data['main_watcher']}") + else: + break + + def _try_take_over(self): + """Tries to take over as the main watcher.""" + with self.lock_file: + curr_data = json.loads(self.resource_file.read_text()) + if curr_data["main_status"] in ("action_error", "error"): + curr_data["main_status"] = "recovering" + curr_data["main_watcher"] = self.id + self.resource_file.write_text(json.dumps(curr_data, indent=4)) + self.is_main = True + self.is_recovering = True + self.wait() + + def register(self): + """Registers the current process as a watcher.""" + with self.lock_file: + if self.resource_file.exists(): + curr_data = json.loads(self.resource_file.read_text()) + self.is_main = False + else: # First watcher to register, becomes the main watcher, and creates the file + curr_data = { + "watchers": [], + "statuses": {}, + "main_watcher": self.id, + "main_status": "waiting", + } + self.is_main = True + curr_data["watchers"].append(self.id) + curr_data["statuses"][self.id] = "pending" + self.resource_file.write_text(json.dumps(curr_data, indent=4)) + + def ready(self): + """Marks the current process as ready to perform the action.""" + self._update_status("ready") + self.wait() + + def done(self): + """Marks the current process as done performing post actions.""" + self._update_status("done") + + def act(self): + """Attempt to perform the action.""" + try: + self.action(*self.action_args, **self.action_kwargs) + except Exception as err: + self._update_main_status("error") + raise err + + def wait(self): + """Top-level wait function, separating behavior between main and non-main watchers.""" + if self.is_main and not (self.is_recovering and not self.action_is_recoverable): + self._wait_for_status("ready") + self._update_main_status("acting") + self.act() + self._update_main_status("done") + else: + self._wait_for_main_watcher() + + def __enter__(self): + """Registers the current process as a watcher and returns the instance.""" + self.register() + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Marks the current process as done and updates the main watcher if needed.""" + if exc_type is FileNotFoundError: + raise exc_value + if exc_type is None: + self.done() + if self.is_main: + self._wait_for_status("done") + self.resource_file.unlink() + else: + self._update_status("error") + if self.is_main: + self._update_main_status("error") + raise exc_value diff --git a/scripts/config_helpers.py b/scripts/config_helpers.py index 6176fa1cb5e..a2cb194fe80 100644 --- a/scripts/config_helpers.py +++ b/scripts/config_helpers.py @@ -14,8 +14,8 @@ def merge_nested_dictionaries(original, new, overwrite=False): # if the key is not in the original, add it if key not in original: original[key] = value - # if the key is in the original, and the value is a dictionary, recurse - elif isinstance(value, dict): + # if the key is in the original, and original[key] and value are dictionaries, recurse + elif isinstance(original[key], dict) and isinstance(value, dict): # use deepdiff to check if the dictionaries are the same if deepdiff.DeepDiff(original[key], value): original[key] = merge_nested_dictionaries(original[key], value, overwrite) @@ -24,6 +24,7 @@ def merge_nested_dictionaries(original, new, overwrite=False): # if the key is in the original, and the value is a list, ask the user elif overwrite == "ask": choice_prompt = ( + "-------------------------\n" f"The current value for {key} is {original[key]}.\n" "Please choose an option:\n" "1. Keep the current value\n" diff --git a/tests/robottelo/test_shared_resource.py b/tests/robottelo/test_shared_resource.py new file mode 100644 index 00000000000..ff2146cd80a --- /dev/null +++ b/tests/robottelo/test_shared_resource.py @@ -0,0 +1,61 @@ +import multiprocessing +from pathlib import Path +import random +from threading import Thread +import time + +from robottelo.utils.shared_resource import SharedResource + + +def upgrade_action(*args, **kwargs): + print(f"Upgrading satellite with {args=} and {kwargs=}") + time.sleep(1) + print("Satellite upgraded!") + + +def run_resource(resource_name): + time.sleep(random.random() * 5) # simulate random pre-setup + with SharedResource(resource_name, upgrade_action) as resource: + assert Path(f"/tmp/{resource_name}.shared").exists() + time.sleep(5) # simulate setup actions + resource.ready() + time.sleep(1) # simulate cleanup actions + + +def test_shared_resource(): + """Test the SharedResource class.""" + with SharedResource("test_resource", upgrade_action, 1, 2, 3, foo="bar") as resource: + assert Path("/tmp/test_resource.shared").exists() + assert resource.is_main + assert not resource.is_recovering + assert resource.action == upgrade_action + assert resource.action_args == (1, 2, 3) + assert resource.action_kwargs == {"foo": "bar"} + assert not resource.action_is_recoverable + + resource.ready() + assert resource._check_all_status("ready") + + assert not Path("/tmp/test_resource.shared").exists() + + +def test_shared_resource_multiprocessing(): + """Test the SharedResource class with multiprocessing.""" + with multiprocessing.Pool(2) as pool: + pool.map(run_resource, ["test_resource_mp", "test_resource_mp"]) + + assert not Path("/tmp/test_resource_mp.shared").exists() + + +def test_shared_resource_multithreading(): + """Test the SharedResource class with multithreading.""" + t1 = Thread(target=run_resource, args=("test_resource_th",)) + t2 = Thread(target=run_resource, args=("test_resource_th",)) + + t1.start() + t2.start() + + t1.join() + t2.join() + + assert not Path("/tmp/test_resource_th.shared").exists()