Skip to content

Commit

Permalink
Modify example to support/enforce MPI streaming backend
Browse files Browse the repository at this point in the history
Will fail to build in non-MPI-enabled openPMD
Will fail to execute in ADIOS2 released versions, only works with master
branch
Will fail to execute on some systems

Add block for Series in writer

Same thing in reader

Run Streaming test under MPI
  • Loading branch information
franzpoeschel committed Aug 10, 2023
1 parent 15feda4 commit 053d429
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
14 changes: 11 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1281,9 +1281,17 @@ if(openPMD_BUILD_TESTING)
endif()
endif()
if(openPMD_HAVE_ADIOS2)
add_test(NAME Asynchronous.10_streaming
COMMAND sh -c "$<TARGET_FILE:10_streaming_write> & sleep 1; $<TARGET_FILE:10_streaming_read>"
WORKING_DIRECTORY ${openPMD_RUNTIME_OUTPUT_DIRECTORY})
if(openPMD_HAVE_MPI)
add_test(NAME Asynchronous.10_streaming
COMMAND sh -c "${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 1 $<TARGET_FILE:10_streaming_write> & \
sleep 1; ${MPIEXEC_EXECUTABLE} ${MPIEXEC_NUMPROC_FLAG} 1 $<TARGET_FILE:10_streaming_read>"
WORKING_DIRECTORY ${openPMD_RUNTIME_OUTPUT_DIRECTORY})
else()
add_test(NAME Asynchronous.10_streaming
COMMAND sh -c "$<TARGET_FILE:10_streaming_write> & \
sleep 1; $<TARGET_FILE:10_streaming_read>"
WORKING_DIRECTORY ${openPMD_RUNTIME_OUTPUT_DIRECTORY})
endif()
endif()
endif()

Expand Down
32 changes: 28 additions & 4 deletions examples/10_streaming_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <iostream>
#include <memory>

#include <mpi.h>

using std::cout;
using namespace openPMD;

Expand All @@ -19,7 +21,30 @@ int main()
return 0;
}

Series series = Series("electrons.sst", Access::READ_LINEAR);
{
int provided{};
MPI_Init_thread(nullptr, nullptr, MPI_THREAD_MULTIPLE, &provided);
switch (provided)
{
case MPI_THREAD_SINGLE:
throw std::runtime_error("MPI_THREAD_SINGLE");
case MPI_THREAD_FUNNELED:
throw std::runtime_error("MPI_THREAD_FUNNELED");
case MPI_THREAD_SERIALIZED:
throw std::runtime_error("MPI_THREAD_SERIALIZED");
case MPI_THREAD_MULTIPLE:
std::cout << "MPI_THREAD_MULTIPLE" << std::endl;
break;
default:
throw std::runtime_error("???????????????");
}
}

Series series = Series(
"electrons.sst",
Access::READ_ONLY,
MPI_COMM_WORLD,
R"(adios2.engine.parameters.DataTransport = "MPI")");

// `Series::writeIterations()` and `Series::readIterations()` are
// intentionally restricted APIs that ensure a workflow which also works
Expand All @@ -44,9 +69,6 @@ int main()
extents[i] = rc.getExtent();
}

// The iteration can be closed in order to help free up resources.
// The iteration's content will be flushed automatically.
// An iteration once closed cannot (yet) be reopened.
iteration.close();

for (size_t i = 0; i < 3; ++i)
Expand All @@ -71,6 +93,8 @@ int main()
*/
series.close();

MPI_Finalize();

return 0;
#else
std::cout << "The streaming example requires that openPMD has been built "
Expand Down
28 changes: 27 additions & 1 deletion examples/10_streaming_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <memory>
#include <numeric> // std::iota

#include <mpi.h>

using std::cout;
using namespace openPMD;

Expand All @@ -19,8 +21,31 @@ int main()
return 0;
}

{
int provided{};
MPI_Init_thread(nullptr, nullptr, MPI_THREAD_MULTIPLE, &provided);
switch (provided)
{
case MPI_THREAD_SINGLE:
throw std::runtime_error("MPI_THREAD_SINGLE");
case MPI_THREAD_FUNNELED:
throw std::runtime_error("MPI_THREAD_FUNNELED");
case MPI_THREAD_SERIALIZED:
throw std::runtime_error("MPI_THREAD_SERIALIZED");
case MPI_THREAD_MULTIPLE:
std::cout << "MPI_THREAD_MULTIPLE" << std::endl;
break;
default:
throw std::runtime_error("???????????????");
}
}

// open file for writing
Series series = Series("electrons.sst", Access::CREATE);
Series series = Series(
"electrons.sst",
Access::CREATE,
MPI_COMM_WORLD,
R"(adios2.engine.parameters.DataTransport = "MPI")");

Datatype datatype = determineDatatype<position_t>();
constexpr unsigned long length = 10ul;
Expand Down Expand Up @@ -57,6 +82,7 @@ int main()
* calling the destructor, including the release of file handles.
*/
series.close();
MPI_Finalize();

return 0;
#else
Expand Down

0 comments on commit 053d429

Please sign in to comment.