diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 868478f8f6..daa4cc2c5c 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -251,6 +251,22 @@ namespace chunk_assignment [[nodiscard]] std::unique_ptr clone() const override; }; + struct BlocksOfSourceRanks : Strategy + { + private: + unsigned int mpi_size, mpi_rank; + + public: + BlocksOfSourceRanks(unsigned int mpi_rank, unsigned int mpi_size); + + Assignment assign( + PartialAssignment, + RankMeta const &in, + RankMeta const &out) override; + + [[nodiscard]] std::unique_ptr clone() const override; + }; + /** * @brief Strategy that assigns chunks to be read by processes within * the same host that produced the chunk. diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index ceb074b887..6162a49f3a 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -361,6 +361,47 @@ namespace chunk_assignment return std::unique_ptr(new Blocks(*this)); } + BlocksOfSourceRanks::BlocksOfSourceRanks( + unsigned int mpi_rank_in, unsigned int mpi_size_in) + : mpi_size(mpi_size_in), mpi_rank(mpi_rank_in) + {} + + Assignment BlocksOfSourceRanks::assign( + PartialAssignment pa, RankMeta const &, RankMeta const &) + { + auto [notAssigned, res] = std::move(pa); + std::map> + sortSourceChunksBySourceRank; + for (auto &chunk : notAssigned) + { + auto sourceID = chunk.sourceID; + sortSourceChunksBySourceRank[sourceID].push_back(std::move(chunk)); + } + notAssigned.clear(); + auto [myChunksFrom, myChunksTo] = + OneDimensionalBlockSlicer::n_th_block_inside( + sortSourceChunksBySourceRank.size(), mpi_rank, mpi_size); + auto it = sortSourceChunksBySourceRank.begin(); + for (size_t i = 0; i < myChunksFrom; ++i) + { + ++it; + } + for (size_t i = 0; i < myChunksTo; ++i, ++it) + { + std::transform( + it->second.begin(), + it->second.end(), + std::back_inserter(res[mpi_rank]), + [](WrittenChunkInfo &chunk) { return std::move(chunk); }); + } + return res; + } + + std::unique_ptr BlocksOfSourceRanks::clone() const + { + return std::unique_ptr(new BlocksOfSourceRanks(*this)); + } + ByHostname::ByHostname(std::unique_ptr withinNode) : m_withinNode(std::move(withinNode)) {} diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index df0d8be629..626aa6fc8c 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -305,6 +305,11 @@ void init_Chunk(py::module &m) py::init(), py::arg("mpi_rank"), py::arg("mpi_size")); + py::class_(m, "BlocksOfSourceRanks") + .def( + py::init(), + py::arg("mpi_rank"), + py::arg("mpi_size")); py::class_(m, "ByHostname") .def( diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 99f64ab13b..d243ce78bb 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -2460,6 +2460,14 @@ void adios2_chunk_distribution() auto blocksAssignment = blocksStrategy.assign( chunkTable, rankMetaIn, readingRanksHostnames); printAssignment("BLOCKS", blocksAssignment, readingRanksHostnames); + + BlocksOfSourceRanks blocksOfSourceRanksStrategy(mpi_rank, mpi_size); + auto blocksOfSourceRanksAssignment = blocksOfSourceRanksStrategy.assign( + chunkTable, rankMetaIn, readingRanksHostnames); + printAssignment( + "BLOCKS OF SOURCE RANKS", + blocksOfSourceRanksAssignment, + readingRanksHostnames); } }