From 91fa3b7466b995166104c13bfb18bbc4d020262f Mon Sep 17 00:00:00 2001 From: Samuel Moors Date: Sun, 10 Dec 2023 17:00:32 +0100 Subject: [PATCH] add hook to assign tasks per node --- eessi/testsuite/constants.py | 2 + eessi/testsuite/hooks.py | 67 ++++++++++++++----- eessi/testsuite/tests/apps/gromacs.py | 2 +- .../tests/apps/tensorflow/tensorflow.py | 23 ++++--- 4 files changed, 67 insertions(+), 27 deletions(-) diff --git a/eessi/testsuite/constants.py b/eessi/testsuite/constants.py index 02000cba..598af2cf 100644 --- a/eessi/testsuite/constants.py +++ b/eessi/testsuite/constants.py @@ -9,6 +9,7 @@ GPU = 'GPU' GPU_VENDOR = 'GPU_VENDOR' INTEL = 'INTEL' +NODE = 'NODE' NVIDIA = 'NVIDIA' DEVICE_TYPES = { @@ -20,6 +21,7 @@ CPU: 'cpu', CPU_SOCKET: 'cpu_socket', GPU: 'gpu', + NODE: 'node', } TAGS = { diff --git a/eessi/testsuite/hooks.py b/eessi/testsuite/hooks.py index 105c6a95..751b3ffb 100644 --- a/eessi/testsuite/hooks.py +++ b/eessi/testsuite/hooks.py @@ -33,7 +33,7 @@ def assign_default_num_cpus_per_node(test: rfm.RegressionTest): log(f'default_num_cpus_per_node set to {test.default_num_cpus_per_node}') -def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str): +def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, num_per: int = 1): """ Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]). Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node, @@ -49,8 +49,8 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str Examples: On a single node with 2 sockets, 64 cores and 128 hyperthreads: - - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread - - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task + - assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread + - assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task Future work: Currently, on a single node with 2 sockets, 64 cores and 128 hyperthreads, this @@ -58,6 +58,10 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str - assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET], true) launches 2 tasks with 64 threads per task In the future, we'd like to add an arugment that disables spawning tasks for hyperthreads. """ + if num_per != 1 and compute_unit in [COMPUTE_UNIT[GPU], COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET]]: + raise NotImplementedError( + f'Non-default num_per {num_per} is not implemented for compute_unit {compute_unit}.') + check_proc_attribute_defined(test, 'num_cpus') test.max_avail_cpus_per_node = test.current_partition.processor.num_cpus log(f'max_avail_cpus_per_node set to {test.max_avail_cpus_per_node}') @@ -74,17 +78,56 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str assign_default_num_cpus_per_node(test) - if compute_unit == COMPUTE_UNIT[GPU]: _assign_one_task_per_gpu(test) elif compute_unit == COMPUTE_UNIT[CPU]: _assign_one_task_per_cpu(test) elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]: _assign_one_task_per_cpu_socket(test) + elif compute_unit == COMPUTE_UNIT[NODE]: + _assign_num_tasks_per_node(test, num_per) else: raise ValueError(f'compute unit {compute_unit} is currently not supported') +def _assign_num_tasks_per_node(test: rfm.RegressionTest, num_per: int = 1): + """ + Sets num_tasks_per_node and num_cpus_per_task such that it will run + 'num_per' tasks per node, unless specified with: + --setvar num_tasks_per_node= + --setvar num_cpus_per_task=. + In those cases, those take precedence, and the remaining variable, if any + (num_cpus_per task or num_tasks_per_node respectively), is calculated based + on the equality test.num_tasks_per_node * test.num_cpus_per_task == + test.default_num_cpus_per_node. + + Default resources requested: + - num_tasks_per_node = num_per + - num_cpus_per_task = test.default_num_cpus_per_node / num_tasks_per_node + """ + # neither num_tasks_per_node nor num_cpus_per_task are set + if not test.num_tasks_per_node and not test.num_cpus_per_task: + test.num_tasks_per_node = num_per + test.num_cpus_per_task = test.default_num_cpus_per_node / test.num_tasks_per_node + + # num_tasks_per_node is not set, but num_cpus_per_task is + elif not test.num_tasks_per_node: + test.num_tasks_per_node = test.default_num_cpus_per_node / test.num_cpus_per_task + + # num_cpus_per_task is not set, but num_tasks_per_node is + elif not test.num_cpus_per_task: + test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) + + else: + pass # both num_tasks_per_node and num_cpus_per_task are already set + + test.num_tasks = test.num_nodes * test.num_tasks_per_node + + log(f'num_tasks_per_node set to {test.num_tasks_per_node}') + log(f'num_cpus_per_task set to {test.num_cpus_per_task}') + log(f'num_tasks set to {test.num_tasks}') + + def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest): """ Determines the number of tasks per node by dividing the default_num_cpus_per_node by @@ -103,11 +146,6 @@ def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest): num_tasks_per_node respectively) is calculated based on the equality test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node. - Variables: - - default_num_cpus_per_node: default number of CPUs per node as defined in the test - (e.g. by earlier hooks like set_tag_scale) - - Default resources requested: - num_tasks_per_node = default_num_cpus_per_node - num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node @@ -147,25 +185,20 @@ def _assign_one_task_per_cpu(test: rfm.RegressionTest): --setvar num_tasks_per_node= and/or --setvar num_cpus_per_task=. - Variables: - - default_num_cpus_per_node: default number of CPUs per node as defined in the test - (e.g. by earlier hooks like set_tag_scale) - - Default resources requested: - num_tasks_per_node = default_num_cpus_per_node - num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node """ - # neither num_tasks_per_node nor num_cpus_per_node are set + # neither num_tasks_per_node nor num_cpus_per_task are set if not test.num_tasks_per_node and not test.num_cpus_per_task: test.num_tasks_per_node = test.default_num_cpus_per_node test.num_cpus_per_task = 1 - # num_tasks_per_node is not set, but num_cpus_per_node is + # num_tasks_per_node is not set, but num_cpus_per_task is elif not test.num_tasks_per_node: test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task) - # num_cpus_per_node is not set, but num_tasks_per_node is + # num_cpus_per_task is not set, but num_tasks_per_node is elif not test.num_cpus_per_task: test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node) diff --git a/eessi/testsuite/tests/apps/gromacs.py b/eessi/testsuite/tests/apps/gromacs.py index 123cce18..7567cf85 100644 --- a/eessi/testsuite/tests/apps/gromacs.py +++ b/eessi/testsuite/tests/apps/gromacs.py @@ -89,7 +89,7 @@ def run_after_setup(self): # Calculate default requested resources based on the scale: # 1 task per CPU for CPU-only tests, 1 task per GPU for GPU tests. # Also support setting the resources on the cmd line. - hooks.assign_one_task_per_compute_unit(test=self, compute_unit=self.nb_impl) + hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.nb_impl) @run_after('setup') def set_omp_num_threads(self): diff --git a/eessi/testsuite/tests/apps/tensorflow/tensorflow.py b/eessi/testsuite/tests/apps/tensorflow/tensorflow.py index 96327cab..ee49d021 100644 --- a/eessi/testsuite/tests/apps/tensorflow/tensorflow.py +++ b/eessi/testsuite/tests/apps/tensorflow/tensorflow.py @@ -8,7 +8,7 @@ import reframe.utility.sanity as sn from eessi.testsuite import hooks, utils -from eessi.testsuite.constants import * +from eessi.testsuite.constants import * # noqa @rfm.simple_test class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest): @@ -23,7 +23,7 @@ class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest): # Make CPU and GPU versions of this test device_type = parameter(['cpu', 'gpu']) - + executable = 'python tf_test.py' time_limit = '30m' @@ -34,14 +34,15 @@ class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest): @deferrable def assert_tf_config_ranks(self): '''Assert that each rank sets a TF_CONFIG''' - n_ranks = sn.count(sn.extractall('^Rank [0-9]+: Set TF_CONFIG for rank (?P[0-9]+)', self.stdout, tag='rank')) + n_ranks = sn.count(sn.extractall( + '^Rank [0-9]+: Set TF_CONFIG for rank (?P[0-9]+)', self.stdout, tag='rank')) return sn.assert_eq(n_ranks, self.num_tasks) @deferrable def assert_completion(self): '''Assert that the test ran until completion''' n_fit_completed = sn.count(sn.extractall('^Rank [0-9]+: Keras fit completed', self.stdout)) - + return sn.all([ sn.assert_eq(n_fit_completed, self.num_tasks), ]) @@ -49,7 +50,7 @@ def assert_completion(self): @deferrable def assert_convergence(self): '''Assert that the network learned _something_ during training''' - accuracy=sn.extractsingle('^Final accuracy: (?P\S+)', self.stdout, 'accuracy', float) + accuracy = sn.extractsingle('^Final accuracy: (?P\S+)', self.stdout, 'accuracy', float) # mnist is a 10-class classification problem, so if accuracy >> 0.2 the network 'learned' something return sn.assert_gt(accuracy, 0.2) @@ -91,12 +92,13 @@ def run_after_setup(self): """hooks to run after the setup phase""" # TODO: implement # It should bind to socket, but different MPIs may have different arguments to do that... - # We should at very least prevent that it binds to single core per process, as that results in many threads being scheduled to one core + # We should at very least prevent that it binds to single core per process, + # as that results in many threads being scheduled to one core. # binding may also differ per launcher used. It'll be hard to support a wide range and still get proper binding if self.device_type == 'cpu': - hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET']) + hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET']) elif self.device_type == 'gpu': - hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU']) + hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU']) else: raise NotImplementedError(f'Failed to set number of tasks and cpus per task for device {self.device_type}') @@ -110,5 +112,8 @@ def set_thread_count_args(self): @run_after('setup') def set_binding_policy(self): - """Sets a binding policy for tasks. We don't bind threads because of https://github.com/tensorflow/tensorflow/issues/60843""" + """ + Sets a binding policy for tasks. We don't bind threads because of + https://github.com/tensorflow/tensorflow/issues/60843 + """ hooks.set_compact_process_binding(self)