Skip to content

Commit

Permalink
Add overwrite option in mpi-adaptor and file remove utility function
Browse files Browse the repository at this point in the history
  • Loading branch information
KIwabuchi committed Oct 5, 2023
1 parent 401b0db commit e0db6e4
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 50 deletions.
17 changes: 11 additions & 6 deletions example/mpi_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
// SPDX-License-Identifier: (Apache-2.0 OR MIT)

#include <metall/utility/metall_mpi_adaptor.hpp>
#include <metall/utility/filesystem.hpp>

int main(int argc, char **argv) {
::MPI_Init(&argc, &argv);
{
// mpi_adaptor with the create mode fails if the directory exists.
// This remove function fails if directory exists and created by a different
// number of MPI ranks.
metall::utility::metall_mpi_adaptor::remove("/tmp/metall_mpi");
// This over-write mode fails if the existing file/directory is not Metall
// datastore or the existing datastore was created by a different number of
// MPI ranks.
// To forcibly remove the existing datastore, one can use the following
// code.
// metall::utility::filesystem::remove("/tmp/metall_mpi");
// ::MPI_Barrier(MPI_COMM_WORLD);
bool overwrite = true;

metall::utility::metall_mpi_adaptor mpi_adaptor(metall::create_only,
"/tmp/metall_mpi");
metall::utility::metall_mpi_adaptor mpi_adaptor(
metall::create_only, "/tmp/metall_mpi", MPI_COMM_WORLD, overwrite);
auto &metall_manager = mpi_adaptor.get_local_manager();

auto rank = metall_manager.construct<int>("my-rank")();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,13 @@ class mmap_segment_storage {
if (is_open())
return false; // Cannot open multiple segments simultaneously.

std::string s("Open a segment under: " + base_path);
logger::out(logger::level::info, __FILE__, __LINE__, s.c_str());
{
std::string s("Open a segment under: " + base_path);
logger::out(logger::level::info, __FILE__, __LINE__, s.c_str());
}

m_base_path = base_path;
m_vm_region_size = mdtl::round_down(vm_region_size, page_size());
;
m_segment = reinterpret_cast<char *>(
mdtl::round_up(reinterpret_cast<uintptr_t>(vm_region), page_size()));
m_read_only = read_only;
Expand Down
22 changes: 22 additions & 0 deletions include/metall/utility/filesystem.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 Lawrence Livermore National Security, LLC and other Metall
// Project Developers. See the top-level COPYRIGHT file for details.
//
// SPDX-License-Identifier: (Apache-2.0 OR MIT)

#ifndef METALL_UTILITY_FILESYSTEM_HPP
#define METALL_UTILITY_FILESYSTEM_HPP

#include <metall/detail/file.hpp>

namespace metall::utility::filesystem {

/// \brief Remove a file or directory
/// \return Upon successful completion, returns true; otherwise, false is
/// returned. If the file or directory does not exist, true is returned.
inline bool remove(std::string_view path) {
return metall::mtlldetail::remove_file(path.data());
}

} // namespace metall::utility::filesystem

#endif // METALL_UTILITY_FILESYSTEM_HPP
122 changes: 81 additions & 41 deletions include/metall/utility/metall_mpi_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class metall_mpi_adaptor {
: m_mpi_comm(comm),
m_root_dir_prefix(root_dir_prefix),
m_local_metall_manager(nullptr) {
priv_verify_num_partitions(root_dir_prefix, comm);
if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
::MPI_Abort(comm, -1);
}
m_local_metall_manager = std::make_unique<manager_type>(
metall::open_only,
ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
Expand All @@ -56,7 +58,9 @@ class metall_mpi_adaptor {
: m_mpi_comm(comm),
m_root_dir_prefix(root_dir_prefix),
m_local_metall_manager(nullptr) {
priv_verify_num_partitions(root_dir_prefix, comm);
if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
::MPI_Abort(comm, -1);
}
m_local_metall_manager = std::make_unique<manager_type>(
metall::open_read_only,
ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
Expand All @@ -67,12 +71,16 @@ class metall_mpi_adaptor {
/// \param root_dir_prefix A root directory path of a Metall datastore.
/// The same name of file or directory must not exist.
/// \param comm A MPI communicator.
/// \param overwrite If true, overwrite an existing datastore.
/// This mode does not overwrite an existing datastore if it is not Metall
/// datastore created by the same number of MPI processes.
metall_mpi_adaptor(metall::create_only_t, const std::string &root_dir_prefix,
const MPI_Comm &comm = MPI_COMM_WORLD)
const MPI_Comm &comm = MPI_COMM_WORLD,
bool overwrite = false)
: m_mpi_comm(comm),
m_root_dir_prefix(root_dir_prefix),
m_local_metall_manager(nullptr) {
priv_setup_root_dir(root_dir_prefix, comm);
priv_setup_root_dir(root_dir_prefix, overwrite, comm);
m_local_metall_manager = std::make_unique<manager_type>(
metall::create_only,
ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
Expand All @@ -84,13 +92,17 @@ class metall_mpi_adaptor {
/// The same name of file or directory must not exist.
/// \param capacity The max capacity of the datastore.
/// \param comm A MPI communicator.
/// \param overwrite If true, overwrite an existing datastore.
/// This mode does not overwrite an existing datastore if it is not Metall
/// datastore created by the same number of MPI processes.
metall_mpi_adaptor(metall::create_only_t, const std::string &root_dir_prefix,
const std::size_t capacity,
const MPI_Comm &comm = MPI_COMM_WORLD)
const MPI_Comm &comm = MPI_COMM_WORLD,
bool overwrite = false)
: m_mpi_comm(comm),
m_root_dir_prefix(root_dir_prefix),
m_local_metall_manager(nullptr) {
priv_setup_root_dir(root_dir_prefix, comm);
priv_setup_root_dir(root_dir_prefix, overwrite, comm);
m_local_metall_manager = std::make_unique<manager_type>(
metall::create_only,
ds::make_local_dir_path(m_root_dir_prefix, priv_mpi_comm_rank(comm))
Expand Down Expand Up @@ -146,11 +158,15 @@ class metall_mpi_adaptor {
/// mode is undefined. \param source_dir_path A path to a source datastore.
/// \param destination_dir_path A path to a destination datastore.
/// \param comm A MPI communicator.
/// \param overwrite If true, overwrite an existing datastore.
/// This mode does not overwrite an existing datastore if it is not Metall
/// datastore created by the same number of MPI processes.
/// \return Returns true if all processes success;
/// otherwise, returns false.
static bool copy(const char *source_dir_path,
const char *destination_dir_path,
const MPI_Comm &comm = MPI_COMM_WORLD) {
static bool copy(const std::string &source_dir_path,
const std::string &destination_dir_path,
const MPI_Comm &comm = MPI_COMM_WORLD,
bool overwrite = false) {
if (!consistent(source_dir_path, comm)) {
if (priv_mpi_comm_rank(comm) == 0) {
std::stringstream ss;
Expand All @@ -161,7 +177,7 @@ class metall_mpi_adaptor {
}
return false;
}
priv_setup_root_dir(destination_dir_path, comm);
priv_setup_root_dir(destination_dir_path, overwrite, comm);
const int rank = priv_mpi_comm_rank(comm);
return priv_global_and(
manager_type::copy(
Expand All @@ -172,10 +188,14 @@ class metall_mpi_adaptor {

/// \brief Take a snapshot of the current Metall datastore to another
/// location. \param destination_dir_path A path to a destination datastore.
/// \param overwrite If true, overwrite an existing datastore.
/// This mode does not overwrite an existing datastore if it is not Metall
/// datastore created by the same number of MPI processes.
/// \return Returns true if all processes success;
/// otherwise, returns false.
bool snapshot(const char *destination_dir_path) {
priv_setup_root_dir(destination_dir_path, m_mpi_comm);
bool snapshot(const std::string &destination_dir_path,
bool overwrite = false) {
priv_setup_root_dir(destination_dir_path, overwrite, m_mpi_comm);
const int rank = priv_mpi_comm_rank(m_mpi_comm);
return priv_global_and(
m_local_metall_manager->snapshot(
Expand All @@ -188,35 +208,35 @@ class metall_mpi_adaptor {
/// \param comm A MPI communicator.
/// \return Returns true if all processes success;
/// otherwise, returns false.
static bool remove(const char *root_dir_prefix,
/// If there is no directory with the given name, returns true.
static bool remove(const std::string &root_dir_prefix,
const MPI_Comm &comm = MPI_COMM_WORLD) {
const int rank = priv_mpi_comm_rank(comm);
const int size = priv_mpi_comm_size(comm);

if (!metall::mtlldetail::file_exist(
ds::make_root_dir_path(root_dir_prefix))) {
// As long as the root directory does not exist, we consider it as a
// success.
return true;
}

// ----- Check if this is a Metall datastore ----- //
bool corrent_dir = true;
bool metall_dir = true;
if (!metall::mtlldetail::file_exist(
ds::make_root_dir_path(root_dir_prefix) + "/" +
k_datastore_mark_file_name)) {
corrent_dir = false;
}
if (!priv_global_and(corrent_dir, comm)) {
return false;
metall_dir = false;
}

// ----- Check if #of MPI processes matches ----- //
bool correct_mpi_size = true;
if (rank == 0) {
const int read_size = priv_read_num_partitions(root_dir_prefix, comm);
if (read_size != size) {
correct_mpi_size = false;
std::stringstream ss;
ss << " Invalid number of MPI processes (provided " << size << ", "
<< "expected " << correct_mpi_size << ")";
logger::out(logger::level::error, __FILE__, __LINE__, ss.str().c_str());
if (!priv_global_and(metall_dir, comm)) {
if (rank == 0) {
std::string s("This is not a Metall datastore: " +
ds::make_root_dir_path(root_dir_prefix));
logger::out(logger::level::error, __FILE__, __LINE__, s.c_str());
}
return false;
}
if (!priv_global_and(correct_mpi_size, comm)) {
if (!priv_verify_num_partitions(root_dir_prefix, comm)) {
return false;
}

Expand Down Expand Up @@ -244,17 +264,17 @@ class metall_mpi_adaptor {
/// \param root_dir_prefix A root directory path of datastore.
/// \param comm A MPI communicator.
/// \return The number of partitions of a Metall datastore.
static int partitions(const char *root_dir_prefix,
static int partitions(const std::string &root_dir_prefix,
const MPI_Comm &comm = MPI_COMM_WORLD) {
return priv_read_num_partitions(root_dir_prefix, comm);
return priv_read_partition_size(root_dir_prefix, comm);
}

/// \brief Checks if all local datastores are consistent.
/// \param root_dir_prefix A root directory path of datastore.
/// \param comm A MPI communicator.
/// \return Returns true if all datastores are consistent;
/// otherwise, returns false.
static bool consistent(const char *root_dir_prefix,
static bool consistent(const std::string &root_dir_prefix,
const MPI_Comm &comm = MPI_COMM_WORLD) {
const int rank = priv_mpi_comm_rank(comm);
const auto local_path = ds::make_local_dir_path(root_dir_prefix, rank);
Expand All @@ -271,12 +291,28 @@ class metall_mpi_adaptor {
// -------------------- //
// Private methods
// -------------------- //
static void priv_remove_for_overwrite(const std::string &root_dir_prefix,
const MPI_Comm &comm) {
if (!remove(root_dir_prefix, comm)) {
if (priv_mpi_comm_rank(comm) == 0) {
std::stringstream ss;
ss << "Failed to overwrite " << root_dir_prefix;
logger::out(logger::level::error, __FILE__, __LINE__, ss.str().c_str());
::MPI_Abort(comm, -1);
}
}
}

static void priv_setup_root_dir(const std::string &root_dir_prefix,
const MPI_Comm &comm) {
bool overwrite, const MPI_Comm &comm) {
const int rank = priv_mpi_comm_rank(comm);
const int size = priv_mpi_comm_size(comm);
const std::string root_dir_path = ds::make_root_dir_path(root_dir_prefix);

if (overwrite) {
priv_remove_for_overwrite(root_dir_prefix, comm);
}

// Make sure the root directory and a file with the same name do not exist
const auto local_ret = metall::mtlldetail::file_exist(root_dir_path);
if (priv_global_or(local_ret, comm)) {
Expand Down Expand Up @@ -333,7 +369,7 @@ class metall_mpi_adaptor {
ofs.close();
}

static int priv_read_num_partitions(const std::string &root_dir_prefix,
static int priv_read_partition_size(const std::string &root_dir_prefix,
const MPI_Comm &comm) {
const std::string path = ds::make_root_dir_path(root_dir_prefix) + "/" +
k_partition_size_file_name;
Expand All @@ -353,19 +389,23 @@ class metall_mpi_adaptor {
return read_size;
}

static void priv_verify_num_partitions(const std::string &root_dir_prefix,
static bool priv_verify_num_partitions(const std::string &root_dir_prefix,
const MPI_Comm &comm) {
const int rank = priv_mpi_comm_rank(comm);
const int size = priv_mpi_comm_size(comm);

bool correct_mpi_size = true;
if (rank == 0) {
if (priv_read_num_partitions(root_dir_prefix, comm) != size) {
logger::out(logger::level::error, __FILE__, __LINE__,
"Invalid number of MPI processes");
::MPI_Abort(comm, -1);
const int read_size = priv_read_partition_size(root_dir_prefix, comm);
if (read_size != size) {
correct_mpi_size = false;
std::stringstream ss;
ss << "Invalid number of MPI processes (provided " << size << ", "
<< "expected " << read_size << ")";
logger::out(logger::level::error, __FILE__, __LINE__, ss.str().c_str());
}
}
priv_mpi_barrier(comm);
return priv_global_and(correct_mpi_size, comm);
}

static int priv_mpi_comm_rank(const MPI_Comm &comm) {
Expand Down

0 comments on commit e0db6e4

Please sign in to comment.