From c1252994271fa477f9fb7f5bb28030d96aca0c40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 6 Aug 2024 17:29:57 +0200 Subject: [PATCH] Add `flush_entire_series` parameter to Attributable::seriesFlush --- include/openPMD/Iteration.hpp | 1 + include/openPMD/RecordComponent.tpp | 2 +- include/openPMD/backend/Attributable.hpp | 10 ++++-- include/openPMD/backend/Writable.hpp | 5 +-- src/backend/Attributable.cpp | 10 +++--- src/backend/Writable.cpp | 39 +++++++++++++++++++++--- src/binding/python/Attributable.cpp | 5 +-- test/ParallelIOTest.cpp | 6 ++-- 8 files changed, 59 insertions(+), 19 deletions(-) diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index 52bf43293a..a35ceccd74 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -133,6 +133,7 @@ class Iteration : public Attributable friend class internal::AttributableData; template friend T &internal::makeOwning(T &self, Series); + friend class Writable; public: Iteration(Iteration const &) = default; diff --git a/include/openPMD/RecordComponent.tpp b/include/openPMD/RecordComponent.tpp index 0a4086e3d8..da23de5aca 100644 --- a/include/openPMD/RecordComponent.tpp +++ b/include/openPMD/RecordComponent.tpp @@ -294,7 +294,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer) * Flush the openPMD hierarchy to the backend without flushing any actual * data yet. */ - seriesFlush({FlushLevel::SkeletonOnly}); + seriesFlush({FlushLevel::SkeletonOnly}, /* flush_entire_series = */ false); size_t size = 1; for (auto ext : e) diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index 0f7b722ae5..3a4249a718 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -270,8 +270,14 @@ class Attributable * implement this flush call. * Must be provided in-line, configuration is not read * from files. + * @param flush_entire_series By default, this method is just a shortcut + * for Series::flush() when you don't currently have a Series handle + * at hand. If however the current object is an Iteration or + * contained within an Iteration, flushing can be restricted to that + * specific Iteration by setting this flag to false. */ - void seriesFlush(std::string backendConfig = "{}"); + void seriesFlush( + std::string backendConfig = "{}", bool flush_entire_series = true); /** String serialization to describe an Attributable * @@ -330,7 +336,7 @@ OPENPMD_protected internal::SeriesData *>; /** @} */ - void seriesFlush(internal::FlushParams const &); + void seriesFlush(internal::FlushParams const &, bool flush_entire_series); void flushAttributes(internal::FlushParams const &); diff --git a/include/openPMD/backend/Writable.hpp b/include/openPMD/backend/Writable.hpp index d0b8b4f3c7..47913eeedd 100644 --- a/include/openPMD/backend/Writable.hpp +++ b/include/openPMD/backend/Writable.hpp @@ -120,13 +120,14 @@ class Writable final * an object that has no parent, which is the Series object, and flush()-es * it. */ - void seriesFlush(std::string backendConfig = "{}"); + void seriesFlush( + std::string backendConfig = "{}", bool flush_entire_series = true); // clang-format off OPENPMD_private // clang-format on - void seriesFlush(internal::FlushParams const &); + void seriesFlush(internal::FlushParams const &, bool flush_entire_series); /* * These members need to be shared pointers since distinct instances of * Writable may share them. diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index d5ff005389..83b00119d4 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -118,9 +118,10 @@ Attributable &Attributable::setComment(std::string const &c) return *this; } -void Attributable::seriesFlush(std::string backendConfig) +void Attributable::seriesFlush( + std::string backendConfig, bool flush_entire_series) { - writable().seriesFlush(std::move(backendConfig)); + writable().seriesFlush(std::move(backendConfig), flush_entire_series); } Series Attributable::retrieveSeries() const @@ -240,9 +241,10 @@ auto Attributable::myPath() const -> MyPath return res; } -void Attributable::seriesFlush(internal::FlushParams const &flushParams) +void Attributable::seriesFlush( + internal::FlushParams const &flushParams, bool flush_entire_series) { - writable().seriesFlush(flushParams); + writable().seriesFlush(flushParams, flush_entire_series); } void Attributable::flushAttributes(internal::FlushParams const &flushParams) diff --git a/src/backend/Writable.cpp b/src/backend/Writable.cpp index 0e399a3a81..9308bd50ff 100644 --- a/src/backend/Writable.cpp +++ b/src/backend/Writable.cpp @@ -19,8 +19,10 @@ * If not, see . */ #include "openPMD/backend/Writable.hpp" +#include "openPMD/Error.hpp" #include "openPMD/Series.hpp" #include "openPMD/auxiliary/DerefDynamicCast.hpp" +#include namespace openPMD { @@ -42,12 +44,14 @@ Writable::~Writable() IOTask(this, Parameter(parent))); } -void Writable::seriesFlush(std::string backendConfig) +void Writable::seriesFlush(std::string backendConfig, bool flush_entire_series) { - seriesFlush({FlushLevel::UserFlush, std::move(backendConfig)}); + seriesFlush( + {FlushLevel::UserFlush, std::move(backendConfig)}, flush_entire_series); } -void Writable::seriesFlush(internal::FlushParams const &flushParams) +void Writable::seriesFlush( + internal::FlushParams const &flushParams, bool flush_entire_series) { Attributable impl; impl.setData({attributable, [](auto const *) {}}); @@ -59,8 +63,33 @@ void Writable::seriesFlush(internal::FlushParams const &flushParams) .setDirtyRecursive(true); } auto series = series_internal->asInternalCopyOf(); - series.flush_impl( - series.iterations.begin(), series.iterations.end(), flushParams); + auto [begin, end] = [&, &iteration_internal_lambda = iteration_internal]() + -> std::pair { + if (flush_entire_series && iteration_internal_lambda) + { + auto it = series.iterations.begin(); + auto end_lambda = series.iterations.end(); + for (; it != end_lambda; ++it) + { + if (&it->second.Iteration::get() == *iteration_internal_lambda) + { + auto next = it; + ++next; + return {it, next}; + } + } + throw std::runtime_error( + "[Writable::seriesFlush()] Found a containing Iteration that " + "seems to not be part of the containing Series?? You might try " + "running this with `flushing_entire_series=false` as a " + "workaround, but something is still wrong."); + } + else + { + return {series.iterations.begin(), series.iterations.end()}; + } + }(); + series.flush_impl(begin, end, flushParams); } } // namespace openPMD diff --git a/src/binding/python/Attributable.cpp b/src/binding/python/Attributable.cpp index 2383ceab50..a904d67ec4 100644 --- a/src/binding/python/Attributable.cpp +++ b/src/binding/python/Attributable.cpp @@ -515,8 +515,9 @@ void init_Attributable(py::module &m) }) .def( "series_flush", - py::overload_cast(&Attributable::seriesFlush), - py::arg("backend_config") = "{}") + py::overload_cast(&Attributable::seriesFlush), + py::arg("backend_config") = "{}", + py::arg("flush_entire_series") = true) .def_property_readonly( "attributes", diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index c6d90d773e..a6195006e4 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1182,11 +1182,11 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]") * conflict with the default buffer target that will run in the destructor, * unless the flush in the next line really is collective. */ - std::cout << "ENTER" << std::endl; MPI_Barrier(MPI_COMM_WORLD); - iteration.seriesFlush("adios2.engine.preferred_flush_target = \"disk\""); + iteration.seriesFlush( + "adios2.engine.preferred_flush_target = \"disk\"", + /* flush_entire_series = */ false); MPI_Barrier(MPI_COMM_WORLD); - std::cout << "LEAVE" << std::endl; } #endif