Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hook to assign tasks per node #97

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eessi/testsuite/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
GPU = 'GPU'
GPU_VENDOR = 'GPU_VENDOR'
INTEL = 'INTEL'
NODE = 'NODE'
NVIDIA = 'NVIDIA'

DEVICE_TYPES = {
Expand All @@ -20,6 +21,7 @@
CPU: 'cpu',
CPU_SOCKET: 'cpu_socket',
GPU: 'gpu',
NODE: 'node',
}

TAGS = {
Expand Down
67 changes: 50 additions & 17 deletions eessi/testsuite/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def assign_default_num_cpus_per_node(test: rfm.RegressionTest):
log(f'default_num_cpus_per_node set to {test.default_num_cpus_per_node}')


def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str):
def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, num_per: int = 1):
"""
Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]).
Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node,
Expand All @@ -49,15 +49,19 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str

Examples:
On a single node with 2 sockets, 64 cores and 128 hyperthreads:
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task
- assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread
- assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task

Future work:
Currently, on a single node with 2 sockets, 64 cores and 128 hyperthreads, this
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU], true) launches 128 tasks with 1 thread
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET], true) launches 2 tasks with 64 threads per task
In the future, we'd like to add an arugment that disables spawning tasks for hyperthreads.
"""
if num_per != 1 and compute_unit in [COMPUTE_UNIT[GPU], COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET]]:
raise NotImplementedError(
f'Non-default num_per {num_per} is not implemented for compute_unit {compute_unit}.')

check_proc_attribute_defined(test, 'num_cpus')
test.max_avail_cpus_per_node = test.current_partition.processor.num_cpus
log(f'max_avail_cpus_per_node set to {test.max_avail_cpus_per_node}')
Expand All @@ -74,17 +78,56 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str

assign_default_num_cpus_per_node(test)


if compute_unit == COMPUTE_UNIT[GPU]:
_assign_one_task_per_gpu(test)
elif compute_unit == COMPUTE_UNIT[CPU]:
_assign_one_task_per_cpu(test)
elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]:
_assign_one_task_per_cpu_socket(test)
elif compute_unit == COMPUTE_UNIT[NODE]:
_assign_num_tasks_per_node(test, num_per)
else:
raise ValueError(f'compute unit {compute_unit} is currently not supported')


def _assign_num_tasks_per_node(test: rfm.RegressionTest, num_per: int = 1):
"""
Sets num_tasks_per_node and num_cpus_per_task such that it will run
'num_per' tasks per node, unless specified with:
--setvar num_tasks_per_node=<x>
--setvar num_cpus_per_task=<y>.
In those cases, those take precedence, and the remaining variable, if any
(num_cpus_per task or num_tasks_per_node respectively), is calculated based
on the equality test.num_tasks_per_node * test.num_cpus_per_task ==
test.default_num_cpus_per_node.

Default resources requested:
- num_tasks_per_node = num_per
- num_cpus_per_task = test.default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
test.num_tasks_per_node = num_per
test.num_cpus_per_task = test.default_num_cpus_per_node / test.num_tasks_per_node

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
test.num_tasks_per_node = test.default_num_cpus_per_node / test.num_cpus_per_task

# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

else:
pass # both num_tasks_per_node and num_cpus_per_task are already set

test.num_tasks = test.num_nodes * test.num_tasks_per_node

log(f'num_tasks_per_node set to {test.num_tasks_per_node}')
log(f'num_cpus_per_task set to {test.num_cpus_per_task}')
log(f'num_tasks set to {test.num_tasks}')


def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
"""
Determines the number of tasks per node by dividing the default_num_cpus_per_node by
Expand All @@ -103,11 +146,6 @@ def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
num_tasks_per_node respectively) is calculated based on the equality
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.

Variables:
- default_num_cpus_per_node: default number of CPUs per node as defined in the test
(e.g. by earlier hooks like set_tag_scale)


Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
Expand Down Expand Up @@ -147,25 +185,20 @@ def _assign_one_task_per_cpu(test: rfm.RegressionTest):
--setvar num_tasks_per_node=<x> and/or
--setvar num_cpus_per_task=<y>.

Variables:
- default_num_cpus_per_node: default number of CPUs per node as defined in the test
(e.g. by earlier hooks like set_tag_scale)


Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_node are set
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
test.num_tasks_per_node = test.default_num_cpus_per_node
test.num_cpus_per_task = 1

# num_tasks_per_node is not set, but num_cpus_per_node is
# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_node is not set, but num_tasks_per_node is
# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

Expand Down
2 changes: 1 addition & 1 deletion eessi/testsuite/tests/apps/gromacs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def run_after_setup(self):
# Calculate default requested resources based on the scale:
# 1 task per CPU for CPU-only tests, 1 task per GPU for GPU tests.
# Also support setting the resources on the cmd line.
hooks.assign_one_task_per_compute_unit(test=self, compute_unit=self.nb_impl)
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.nb_impl)

@run_after('setup')
def set_omp_num_threads(self):
Expand Down
23 changes: 14 additions & 9 deletions eessi/testsuite/tests/apps/tensorflow/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import reframe.utility.sanity as sn

from eessi.testsuite import hooks, utils
from eessi.testsuite.constants import *
from eessi.testsuite.constants import * # noqa

@rfm.simple_test
class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest):
Expand All @@ -23,7 +23,7 @@ class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest):

# Make CPU and GPU versions of this test
device_type = parameter(['cpu', 'gpu'])

executable = 'python tf_test.py'

time_limit = '30m'
Expand All @@ -34,22 +34,23 @@ class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest):
@deferrable
def assert_tf_config_ranks(self):
'''Assert that each rank sets a TF_CONFIG'''
n_ranks = sn.count(sn.extractall('^Rank [0-9]+: Set TF_CONFIG for rank (?P<rank>[0-9]+)', self.stdout, tag='rank'))
n_ranks = sn.count(sn.extractall(
'^Rank [0-9]+: Set TF_CONFIG for rank (?P<rank>[0-9]+)', self.stdout, tag='rank'))
return sn.assert_eq(n_ranks, self.num_tasks)

@deferrable
def assert_completion(self):
'''Assert that the test ran until completion'''
n_fit_completed = sn.count(sn.extractall('^Rank [0-9]+: Keras fit completed', self.stdout))

return sn.all([
sn.assert_eq(n_fit_completed, self.num_tasks),
])

@deferrable
def assert_convergence(self):
'''Assert that the network learned _something_ during training'''
accuracy=sn.extractsingle('^Final accuracy: (?P<accuracy>\S+)', self.stdout, 'accuracy', float)
accuracy = sn.extractsingle('^Final accuracy: (?P<accuracy>\S+)', self.stdout, 'accuracy', float)
# mnist is a 10-class classification problem, so if accuracy >> 0.2 the network 'learned' something
return sn.assert_gt(accuracy, 0.2)

Expand Down Expand Up @@ -91,12 +92,13 @@ def run_after_setup(self):
"""hooks to run after the setup phase"""
# TODO: implement
# It should bind to socket, but different MPIs may have different arguments to do that...
# We should at very least prevent that it binds to single core per process, as that results in many threads being scheduled to one core
# We should at very least prevent that it binds to single core per process,
# as that results in many threads being scheduled to one core.
# binding may also differ per launcher used. It'll be hard to support a wide range and still get proper binding
if self.device_type == 'cpu':
hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET'])
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET'])
elif self.device_type == 'gpu':
hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU'])
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU'])
else:
raise NotImplementedError(f'Failed to set number of tasks and cpus per task for device {self.device_type}')

Expand All @@ -110,5 +112,8 @@ def set_thread_count_args(self):

@run_after('setup')
def set_binding_policy(self):
"""Sets a binding policy for tasks. We don't bind threads because of https://github.com/tensorflow/tensorflow/issues/60843"""
"""
Sets a binding policy for tasks. We don't bind threads because of
https://github.com/tensorflow/tensorflow/issues/60843
"""
hooks.set_compact_process_binding(self)
Loading