Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow resuming an exploration and running in substeps #118

Merged
merged 11 commits into from
Oct 9, 2023
143 changes: 109 additions & 34 deletions optimas/explorations/base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""Contains the definition of the base Exploration class."""

import os
import glob
from typing import Optional, Union

import numpy as np

from libensemble.libE import libE
from libensemble.tools import save_libE_output, add_unique_random_streams
from libensemble.tools import add_unique_random_streams
from libensemble.alloc_funcs.start_only_persistent import only_persistent_gens
from libensemble.executors.mpi_executor import MPIExecutor
from libensemble.resources.resources import Resources
from libensemble.executors.executor import Executor
from libensemble.logger import LogConfig

from optimas.generators.base import Generator
from optimas.evaluators.base import Evaluator
from optimas.utils.logger import get_logger


logger = get_logger(__name__)


class Exploration():
Expand Down Expand Up @@ -43,6 +45,14 @@ class Exploration():
history file to disk. By default equals to ``sim_workers``.
exploration_dir_path : str, optional.
Path to the exploration directory. By default, ``'./exploration'``.
resume : bool, optional
Whether the exploration should resume from a previous run in the same
`exploration_dir_path`. If `True`, the exploration will continue from
the last evaluation of the previous run until the total number of
evaluations (including those of the previous run) reaches `max_evals`.
There is no need to provide the `history` path (it will be ignored).
If `False` (default value), the exploration will raise an error if
the `exploration_dir_path` already exists.
libe_comms : {'local', 'mpi'}, optional.
The communication mode for libEnseble. Determines whether to use
Python ``multiprocessing`` (local mode) or MPI for the communication
Expand All @@ -63,6 +73,7 @@ def __init__(
history: Optional[str] = None,
history_save_period: Optional[int] = None,
exploration_dir_path: Optional[str] = './exploration',
resume: Optional[bool] = False,
libe_comms: Optional[str] = 'local'
) -> None:
self.generator = generator
Expand All @@ -76,16 +87,41 @@ def __init__(
self.history_save_period = history_save_period
self.exploration_dir_path = exploration_dir_path
self.libe_comms = libe_comms
self._load_history(history)
self._n_evals = 0
self._resume = resume
self._history_file_name = 'exploration_history_after_evaluation_{}'
self._load_history(history, resume)
self._create_alloc_specs()
self._create_executor()
self._initialize_evaluator()
self._set_default_libe_specs()

def run(self) -> None:
"""Run the exploration."""
def run(
self,
n_evals: Optional[int] = None
) -> None:
"""Run the exploration.

Parameters
----------
n_evals : int, optional
Number of evaluations to run. If not given, the exploration will
run until the number of evaluations reaches `max_evals`.
"""
# Set exit criteria to maximum number of evaluations.
exit_criteria = {'sim_max': self.max_evals}
remaining_evals = self.max_evals - self._n_evals
if remaining_evals < 1:
raise ValueError(
'The maximum number or evaluations has been reached.'
)
if n_evals is None:
sim_max = remaining_evals
else:
sim_max = min(n_evals, remaining_evals)
exit_criteria = {'sim_max': sim_max}

# Get initial number of generator trials.
n_trials_initial = self.generator.n_trials

# Create persis_info.
persis_info = add_unique_random_streams({}, self.sim_workers + 2)
Expand All @@ -97,9 +133,13 @@ def run(self) -> None:
else:
self.libE_specs['zero_resource_workers'] = [1]

if self._n_evals > 0:
self.libE_specs['reuse_output_dir'] = True

# Get gen_specs and sim_specs.
run_params = self.evaluator.get_run_params()
gen_specs = self.generator.get_gen_specs(self.sim_workers, run_params)
gen_specs = self.generator.get_gen_specs(self.sim_workers, run_params,
sim_max)
sim_specs = self.evaluator.get_sim_specs(
self.generator.varying_parameters,
self.generator.objectives,
Expand All @@ -123,23 +163,20 @@ def run(self) -> None:
# Update generator with the one received from libE.
self.generator._update(persis_info[1]['generator'])

# Update number of evaluation in this exploration.
n_trials_final = self.generator.n_trials
self._n_evals += n_trials_final - n_trials_initial

# Determine if current rank is master.
if self.libE_specs["comms"] == "local":
is_master = True
nworkers = self.sim_workers + 1
else:
from mpi4py import MPI
is_master = (MPI.COMM_WORLD.Get_rank() == 0)
nworkers = MPI.COMM_WORLD.Get_size() - 1

# Save history.
if is_master:
save_libE_output(
history, persis_info, __file__, nworkers,
dest_path=os.path.abspath(self.exploration_dir_path))

# Reset state of libEnsemble.
self._reset_libensemble()
self._save_history()

def _create_executor(self) -> None:
"""Create libEnsemble executor."""
Expand All @@ -151,9 +188,25 @@ def _initialize_evaluator(self) -> None:

def _load_history(
self,
history: Union[str, np.ndarray, None]
history: Union[str, np.ndarray, None],
resume: Optional[bool] = False,
) -> None:
"""Load history file."""
# To resume an exploration, get history file from previous run.
if resume:
if history is not None:
logger.info(
'The `history` argument is ignored when `resume=True`. '
'The exploration will resume using the most recent '
'history file.'
)
history = self._get_most_recent_history_file_path()
if history is None:
raise ValueError(
'Previous history file not found. '
'Cannot resume exploration.'
)
# Read file.
if isinstance(history, str):
if os.path.exists(history):
# Load array.
Expand All @@ -169,8 +222,43 @@ def _load_history(
# Incorporate history into generator.
if history is not None:
self.generator.incorporate_history(history)
# When resuming an exploration, update evaluations counter.
if resume:
self._n_evals = history.size
self.history = history

def _save_history(self):
"""Save history array to file."""
filename = self._history_file_name.format(self._n_evals)
exploration_dir_path = os.path.abspath(self.exploration_dir_path)
file_path = os.path.join(exploration_dir_path, filename)
if not os.path.isfile(filename):
old_files = os.path.join(
exploration_dir_path, self._history_file_name.format("*"))
for old_file in glob.glob(old_files):
os.remove(old_file)
np.save(file_path, self.history)

def _get_most_recent_history_file_path(self):
"""Get path of most recently saved history file."""
old_exploration_history_files = glob.glob(
os.path.join(
os.path.abspath(self.exploration_dir_path),
self._history_file_name.format("*")
)
)
old_libe_history_files = glob.glob(
os.path.join(
os.path.abspath(self.exploration_dir_path),
'libE_history_{}'.format("*")
)
)
old_files = old_exploration_history_files + old_libe_history_files
if old_files:
file_evals = [int(file.split('_')[-1][:-4]) for file in old_files]
i_max_evals = np.argmax(np.array(file_evals))
return old_files[i_max_evals]

def _set_default_libe_specs(self) -> None:
"""Set default exploration libe_specs."""
libE_specs = {}
Expand Down Expand Up @@ -205,6 +293,9 @@ def _set_default_libe_specs(self) -> None:
libE_specs['use_workflow_dir'] = True
libE_specs['workflow_dir_path'] = self.exploration_dir_path

# Ensure evaluations of last batch are sent back to the generator.
libE_specs["final_gen_send"] = True

# get specs from generator and evaluator
gen_libE_specs = self.generator.get_libe_specs()
ev_libE_specs = self.evaluator.get_libe_specs()
Expand All @@ -219,19 +310,3 @@ def _create_alloc_specs(self) -> None:
'async_return': self.run_async
}
}

def _reset_libensemble(self) -> None:
"""Reset the state of libEnsemble.

After calling `libE`, some libEnsemble attributes do not come back to
their original states. This leads to issues if another `Exploration`
run is launched within the same script. This method resets the
necessary libEnsemble attributes to their original state.
"""
if Resources.resources is not None:
del Resources.resources
Resources.resources = None
if Executor.executor is not None:
del Executor.executor
Executor.executor = None
LogConfig.config.logger_set = False
9 changes: 7 additions & 2 deletions optimas/gen_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ def persistent_generator(H, persis_info, gen_specs, libE_info):

ps = PersistentSupport(libE_info, EVAL_GEN_TAG)

# Maximum number of total evaluations to generate.
max_evals = gen_specs['user']['max_evals']

# Number of points to generate initially.
number_of_gen_points = gen_specs['user']['gen_batch_size']
number_of_gen_points = min(gen_specs['user']['gen_batch_size'], max_evals)

n_gens = 0
n_failed_gens = 0

# Receive information from the manager (or a STOP_TAG)
Expand Down Expand Up @@ -68,6 +72,7 @@ def persistent_generator(H, persis_info, gen_specs, libE_info):
H_o['num_procs'][i] = run_params["num_procs"]
H_o['num_gpus'][i] = run_params["num_gpus"]

n_gens += np.sum(H_o['num_procs'] != 0)
n_failed_gens = np.sum(H_o['num_procs'] == 0)
H_o = H_o[H_o['num_procs'] > 0]

Expand All @@ -88,7 +93,7 @@ def persistent_generator(H, persis_info, gen_specs, libE_info):
# Register trial with unknown SEM
generator.tell([trial])
# Set the number of points to generate to that number:
number_of_gen_points = n + n_failed_gens
number_of_gen_points = min(n + n_failed_gens, max_evals - n_gens)
n_failed_gens = 0
else:
number_of_gen_points = 0
Expand Down
5 changes: 3 additions & 2 deletions optimas/generators/ax/developer/multitask.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ def __init__(
def get_gen_specs(
self,
sim_workers: int,
run_params: dict
run_params: Dict,
sim_max: int
) -> Dict:
"""Get the libEnsemble gen_specs."""
# Get base specs.
gen_specs = super().get_gen_specs(sim_workers, run_params)
gen_specs = super().get_gen_specs(sim_workers, run_params, sim_max)
# Add task to output parameters.
max_length = max([len(self.lofi_task.name), len(self.hifi_task.name)])
gen_specs['out'].append(('task', str, max_length))
Expand Down
16 changes: 14 additions & 2 deletions optimas/generators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ def gpu_id(self):
def dedicated_resources(self):
return self._dedicated_resources

@property
def n_trials(self):
return len(self._trials)

def ask(
self,
n_trials: int
Expand Down Expand Up @@ -244,14 +248,20 @@ def save_model_to_file(self) -> None:
def get_gen_specs(
self,
sim_workers: int,
run_params: dict
run_params: Dict,
max_evals: int
) -> Dict:
"""Get the libEnsemble gen_specs.

Parameters
----------
sim_workers : int
Total number of parallel simulation workers.
run_params : dict
Dictionary containing the number of processes and gpus
required.
max_evals : int
Maximum number of evaluations to generate.
"""
self._prepare_to_send()
gen_specs = {
Expand Down Expand Up @@ -281,7 +291,9 @@ def get_gen_specs(
# GPU in which to run generator.
'gpu_id': self._gpu_id,
# num of procs and gpus required
'run_params': run_params
'run_params': run_params,
# Maximum number of evaluations to generate.
'max_evals': max_evals
}
}
return gen_specs
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
'Programming Language :: Python :: 3.11',
]
dependencies = [
'libensemble >= 0.10.2',
'libensemble >= 1.0.0',
'jinja2',
'ax-platform >= 0.2.9',
'mpi4py',
Expand Down
Loading
Loading