diff --git a/src/vt/collective/collective_alg.cc b/src/vt/collective/collective_alg.cc index e0e314120d..024461a6c9 100644 --- a/src/vt/collective/collective_alg.cc +++ b/src/vt/collective/collective_alg.cc @@ -194,14 +194,18 @@ static void broadcastConsensus( "Planned collective does not exist within scope" ); auto action = iter_seq->second.action_; + auto epoch = iter_seq->second.epoch_; // Run the collective safely. // The action is expected to use MPI calls; not VT calls. { VT_ALLOW_MPI_CALLS; + theMsg()->pushEpoch(epoch); action(); + theMsg()->popEpoch(epoch); } + theTerm()->consume(epoch); // Erase the tag that was actually executed impl->planned_collective_.erase(iter_seq); diff --git a/src/vt/collective/collective_scope.cc b/src/vt/collective/collective_scope.cc index b92c633c1c..7905540105 100644 --- a/src/vt/collective/collective_scope.cc +++ b/src/vt/collective/collective_scope.cc @@ -55,9 +55,11 @@ namespace vt { namespace collective { TagType CollectiveScope::mpiCollectiveAsync(ActionType action) { auto impl = getScope(); auto tag = impl->next_seq_++; + auto epoch = theMsg()->getEpoch(); // Create a new collective action with the next tag - detail::ScopeImpl::CollectiveInfo info(tag, action); + detail::ScopeImpl::CollectiveInfo info(tag, action, epoch); + theTerm()->produce(epoch); impl->planned_collective_.emplace( std::piecewise_construct, diff --git a/src/vt/collective/collective_scope.h b/src/vt/collective/collective_scope.h index e9fc2ffe5c..2410a8510e 100644 --- a/src/vt/collective/collective_scope.h +++ b/src/vt/collective/collective_scope.h @@ -66,12 +66,13 @@ struct ScopeImpl { private: struct CollectiveInfo { - CollectiveInfo(TagType in_seq, ActionType in_action) - : seq_(in_seq), action_(in_action) + CollectiveInfo(TagType in_seq, ActionType in_action, EpochType in_epoch) + : seq_(in_seq), action_(in_action), epoch_(in_epoch) { } TagType seq_ = no_tag; ActionType action_ = no_action; + EpochType epoch_ = no_epoch; }; private: @@ -120,6 +121,9 @@ struct CollectiveScope { * are in use until \c isCollectiveDone returns \c true or \c * waitCollective returns on the returned \c TagType * + * The operation is counted as activity in the active termination + * detection epoch + * * \param[in] action the action containing a closed set of MPI operations * * \return tag representing the operation set diff --git a/src/vt/event/event.cc b/src/vt/event/event.cc index 3564d9d921..d7a384ccc8 100644 --- a/src/vt/event/event.cc +++ b/src/vt/event/event.cc @@ -72,24 +72,21 @@ void AsyncEvent::initialize() { EventType AsyncEvent::attachAction(EventType const& event, ActionType callable) { auto const& this_node = theContext()->getNode(); - auto const& event_id = createNormalEvent(this_node); - auto& holder = getEventHolder(event_id); auto trigger = [=]{ callable(); }; auto const& event_state = testEventComplete(event); - auto const& this_event_owning_node = getOwningNode(event_id); debug_print( event, node, - "theEvent: event={}, newevent={}, state={}, " - "newevent_owning_node={}, this_node={}\n", - event, event_id, static_cast(event_state), this_event_owning_node, - this_node + "event={}, state={}\n", + event, static_cast(event_state) ); + EventType ret_event = no_event; + switch (event_state) { case EventStateType::EventReady: trigger(); @@ -98,9 +95,11 @@ EventType AsyncEvent::attachAction(EventType const& event, ActionType callable) this->getEventHolder(event).attachAction( trigger ); - holder.makeReadyTrigger(); break; case EventStateType::EventRemote: { + auto const& event_id = createNormalEvent(this_node); + auto& holder = getEventHolder(event_id); + // attach event to new id holder.attachAction(trigger); @@ -118,13 +117,15 @@ EventType AsyncEvent::attachAction(EventType const& event, ActionType callable) theMsg()->sendMsg( owning_node, msg ); + + ret_event = event_id; } break; default: vtAssert(0, "This should be unreachable"); break; } - return event_id; + return ret_event; } /*static*/ void AsyncEvent::eventFinished(EventFinishedMsg* msg) { @@ -240,7 +241,11 @@ EventType AsyncEvent::createParentEvent(NodeType const& node) { } void AsyncEvent::removeEventID(EventType const& event) { - lookup_container_.erase(event); + auto iter = lookup_container_.find(event); + if (iter != lookup_container_.end()) { + event_container_.erase(iter->second); + lookup_container_.erase(event); + } } AsyncEvent::EventHolderType& AsyncEvent::getEventHolder(EventType const& event) { diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc index f6f48f38de..7cce284871 100644 --- a/src/vt/messaging/active.cc +++ b/src/vt/messaging/active.cc @@ -122,8 +122,6 @@ EventType ActiveMessenger::sendMsgBytesWithPut( ); } - EventType new_event = theEvent()->createParentEvent(this_node_); - MsgSizeType new_msg_size = msg_size; if (is_put && !is_put_packed) { @@ -176,7 +174,7 @@ EventType ActiveMessenger::sendMsgBytesWithPut( sendMsgBytes(dest, base, new_msg_size, send_tag); - return new_event; + return no_event; } EventType ActiveMessenger::sendMsgBytes( diff --git a/src/vt/pmpi/generate_mpi_wrappers.pl b/src/vt/pmpi/generate_mpi_wrappers.pl index d6373b7ea2..8fb681152d 100644 --- a/src/vt/pmpi/generate_mpi_wrappers.pl +++ b/src/vt/pmpi/generate_mpi_wrappers.pl @@ -18,9 +18,9 @@ # It's a very rudimentary extractor that relies on definitions being on one line. sub extract_defs { my ($def_file) = @_; - use autodie; - open my $handle, '<', $def_file; + open my $handle, '<', $def_file + or die "$0: could not open definition file '$def_file': $!"; my @deflines = <$handle>; close $handle; @@ -78,7 +78,7 @@ sub should_guard_call { } open(my $out, '>', $output_file) - or die "Could not open file '$output_file': $!"; + or die "$0: could not open output file '$output_file': $!"; select $out; say < cond) { // as the parent context is "not idle". Likewise, no 'between scheduler' // event is started. - vtAssert( - action_depth_ == 0 or not is_idle, - "Nested schedulers never expected from idle context" - ); - triggerEvent(SchedulerEventType::BeginSchedulerLoop); - // When resuming a top-level scheduler, ensure to immediately enter - // an idle state if such applies. + // Ensure to immediately enter an idle state if such applies. + // The scheduler call ends idle as picking up work. if (not is_idle and work_queue_.empty()) { is_idle = true; triggerEvent(SchedulerEventType::BeginIdle); @@ -353,4 +348,36 @@ void runScheduler() { theSched()->scheduler(); } +void runSchedulerThrough(EpochType epoch) { + // WARNING: This is to prevent global termination from spuriously + // thinking that the work done in this loop over the scheduler + // represents the entire work of the program, and thus leading to + // stuff being torn down + theTerm()->produce(); + theSched()->runSchedulerWhile([=]{ return !theTerm()->isEpochTerminated(epoch); }); + theTerm()->consume(); +} + +void runInEpochRooted(ActionType&& fn) { + theSched()->triggerEvent(sched::SchedulerEvent::PendingSchedulerLoop); + + auto ep = theTerm()->makeEpochRooted(); + theMsg()->pushEpoch(ep); + fn(); + theMsg()->popEpoch(ep); + theTerm()->finishedEpoch(ep); + runSchedulerThrough(ep); +} + +void runInEpochCollective(ActionType&& fn) { + theSched()->triggerEvent(sched::SchedulerEvent::PendingSchedulerLoop); + + auto ep = theTerm()->makeEpochCollective(); + theMsg()->pushEpoch(ep); + fn(); + theMsg()->popEpoch(ep); + theTerm()->finishedEpoch(ep); + runSchedulerThrough(ep); +} + } //end namespace vt diff --git a/src/vt/vrt/collection/balance/elm_stats.cc b/src/vt/vrt/collection/balance/elm_stats.cc index b3fcfe21a6..2a8e2d6a3c 100644 --- a/src/vt/vrt/collection/balance/elm_stats.cc +++ b/src/vt/vrt/collection/balance/elm_stats.cc @@ -216,4 +216,10 @@ ElementStats::SubphaseType ElementStats::getFocusedSubPhase(VirtualProxyType col /*static*/ std::unordered_map ElementStats::focused_subphase_; +void ElementStats::clear() { + phase_timings_.clear(); + comm_.clear(); + subphase_timings_.clear(); +} + }}}} /* end namespace vt::vrt::collection::balance */ diff --git a/src/vt/vrt/collection/balance/elm_stats.h b/src/vt/vrt/collection/balance/elm_stats.h index 1923ee1203..8ee08d278a 100644 --- a/src/vt/vrt/collection/balance/elm_stats.h +++ b/src/vt/vrt/collection/balance/elm_stats.h @@ -102,6 +102,8 @@ struct ElementStats { template void serialize(Serializer& s); + void clear(); + public: template static void syncNextPhase(PhaseMsg* msg, ColT* col); diff --git a/src/vt/vrt/collection/balance/elm_stats.impl.h b/src/vt/vrt/collection/balance/elm_stats.impl.h index f3fd3973fb..50a63ffa30 100644 --- a/src/vt/vrt/collection/balance/elm_stats.impl.h +++ b/src/vt/vrt/collection/balance/elm_stats.impl.h @@ -97,6 +97,8 @@ template ProcStats::addProcStats(col, cur_phase, total_load, subphase_loads, comm); + col->getStats().clear(); + auto const before_ready = theCollection()->numReadyCollections(); theCollection()->makeCollectionReady(untyped_proxy); auto const after_ready = theCollection()->numReadyCollections(); diff --git a/src/vt/vrt/collection/balance/proc_stats.cc b/src/vt/vrt/collection/balance/proc_stats.cc index daca3907d5..6a8fa859e6 100644 --- a/src/vt/vrt/collection/balance/proc_stats.cc +++ b/src/vt/vrt/collection/balance/proc_stats.cc @@ -98,7 +98,7 @@ ProcStats::getProcSubphaseLoad(PhaseType phase) { ProcStats::proc_migrate_.clear(); ProcStats::proc_temp_to_perm_.clear(); ProcStats::proc_perm_to_temp_.clear(); - next_elm_ = 1; + ProcStats::proc_subphase_data_.clear(); } /*static*/ void ProcStats::startIterCleanup() { @@ -119,6 +119,7 @@ ProcStats::getProcSubphaseLoad(PhaseType phase) { ProcStats::proc_migrate_.clear(); ProcStats::proc_temp_to_perm_.clear(); ProcStats::proc_perm_to_temp_.clear(); + clearStats(); } /*static*/ ElementIDType ProcStats::getNextElm() { diff --git a/src/vt/vrt/collection/manager.h b/src/vt/vrt/collection/manager.h index 0f6d7a5bbd..4a96e6c6b6 100644 --- a/src/vt/vrt/collection/manager.h +++ b/src/vt/vrt/collection/manager.h @@ -224,6 +224,19 @@ struct CollectionManager { VirtualProxyType proxy, typename ColT::IndexType idx, Args&&... args ); + /** + * \internal \brief Insert into a collection on this node with a pointer to + * the collection element to insert + * + * \param[in] proxy the collection proxy + * \param[in] idx the index to insert + * \param[in] ptr unique ptr to insert for the collection + */ + template + void staticInsertColPtr( + VirtualProxyType proxy, typename ColT::IndexType idx, + std::unique_ptr ptr + ); public: template diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index c4226fe854..84fe1fef73 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -1837,20 +1837,46 @@ VirtualProxyType CollectionManager::makeDistProxy(TagType const& tag) { /* end SPMD distributed collection support */ +template +void CollectionManager::staticInsertColPtr( + VirtualProxyType proxy, typename ColT::IndexType idx, + std::unique_ptr ptr +) { + using IndexT = typename ColT::IndexType; + using BaseIdxType = vt::index::BaseIndex; + + auto map_han = UniversalIndexHolder<>::getMap(proxy); + auto holder = findColHolder(proxy); + auto range = holder->max_idx; + auto const num_elms = range.getSize(); + auto fn = auto_registry::getHandlerMap(map_han); + auto const num_nodes = theContext()->getNumNodes(); + auto const cur = static_cast(&idx); + auto const max = static_cast(&range); + auto const home_node = fn(cur, max, num_nodes); + + // Through the attorney, setup all the properties on the newly constructed + // collection element: index, proxy, number of elements. Note: because of + // how the constructor works, the index is not currently available through + // "getIndex" + CollectionTypeAttorney::setup(ptr.get(), num_elms, idx, proxy); + + VirtualPtrType col_ptr( + static_cast*>(ptr.release()) + ); + + // Insert the element into the managed holder for elements + insertCollectionElement( + std::move(col_ptr), idx, range, map_han, proxy, true, home_node + ); +} + template void CollectionManager::staticInsert( VirtualProxyType proxy, typename ColT::IndexType idx, Args&&... args ) { using IndexT = typename ColT::IndexType; using IdxContextHolder = InsertContextHolder; - using BaseIdxType = vt::index::BaseIndex; - - auto const& num_nodes = theContext()->getNumNodes(); - - auto map_han = UniversalIndexHolder<>::getMap(proxy); - - // Set the current context index to `idx` - IdxContextHolder::set(&idx,proxy); auto tuple = std::make_tuple(std::forward(args)...); @@ -1859,12 +1885,8 @@ void CollectionManager::staticInsert( auto range = holder->max_idx; auto const num_elms = range.getSize(); - // Get the handler function - auto fn = auto_registry::getHandlerMap(map_han); - - auto const cur = static_cast(&idx); - auto const max = static_cast(&range); - auto const& home_node = fn(cur, max, num_nodes); + // Set the current context index to `idx` + IdxContextHolder::set(&idx,proxy); #if backend_check_enabled(detector) && backend_check_enabled(cons_multi_idx) auto elm_ptr = DerefCons::derefTuple( @@ -1877,25 +1899,17 @@ void CollectionManager::staticInsert( ); #endif + // Clear the current index context + IdxContextHolder::clear(); + debug_print_verbose( vrt_coll, node, "construct (staticInsert): ptr={}\n", print_ptr(elm_ptr.get()) ); - // Through the attorney, setup all the properties on the newly constructed - // collection element: index, proxy, number of elements. Note: because of - // how the constructor works, the index is not currently available through - // "getIndex" - CollectionTypeAttorney::setup(elm_ptr.get(), num_elms, idx, proxy); - - // Insert the element into the managed holder for elements - insertCollectionElement( - std::move(elm_ptr), idx, range, map_han, proxy, true, home_node - ); - - // Clear the current index context - IdxContextHolder::clear(); + std::unique_ptr col_ptr(static_cast(elm_ptr.release())); + staticInsertColPtr(proxy, idx, std::move(col_ptr)); } template < @@ -1946,6 +1960,9 @@ InsertToken CollectionManager::constructInsertMap( // Insert the meta-data for this new collection insertMetaCollection(proxy, map_han, range, is_static); + // Insert action on cleanup for this collection + theCollection()->addCleanupFn(proxy); + return InsertToken{proxy}; } @@ -3326,7 +3343,7 @@ CollectionManager::restoreFromFile( // @todo: error check the file read with bytes in directory auto col_ptr = checkpoint::deserializeFromFile(file_name); - token[idx].insert(std::move(*col_ptr)); + token[idx].insertPtr(std::move(col_ptr)); } return finishedInsert(std::move(token)); diff --git a/src/vt/vrt/collection/staged_token/token.h b/src/vt/vrt/collection/staged_token/token.h index 384b1d17b4..328d01bb92 100644 --- a/src/vt/vrt/collection/staged_token/token.h +++ b/src/vt/vrt/collection/staged_token/token.h @@ -62,6 +62,8 @@ struct InsertTokenRval { template void insert(Args&&... args); + void insertPtr(std::unique_ptr ptr); + friend CollectionManager; private: diff --git a/src/vt/vrt/collection/staged_token/token.impl.h b/src/vt/vrt/collection/staged_token/token.impl.h index e847813862..b2a7b9bee8 100644 --- a/src/vt/vrt/collection/staged_token/token.impl.h +++ b/src/vt/vrt/collection/staged_token/token.impl.h @@ -59,6 +59,11 @@ void InsertTokenRval::insert(Args&&... args) { return theCollection()->staticInsert(proxy_,idx_,args...); } +template +void InsertTokenRval::insertPtr(std::unique_ptr ptr) { + return theCollection()->staticInsertColPtr(proxy_,idx_,std::move(ptr)); +} + // /*virtual*/ InsertToken::~InsertToken() { // theCollection()->finishedStaticInsert(proxy_); // } diff --git a/tests/unit/collection/test_checkpoint.cc b/tests/unit/collection/test_checkpoint.cc index 196a1d5d69..b7e5510916 100644 --- a/tests/unit/collection/test_checkpoint.cc +++ b/tests/unit/collection/test_checkpoint.cc @@ -54,9 +54,37 @@ namespace vt { namespace tests { namespace unit { static constexpr std::size_t data1_len = 1024; static constexpr std::size_t data2_len = 64; +static std::size_t counter = 0; + struct TestCol : vt::Collection { - TestCol() = default; + TestCol() { + // fmt::print("{} ctor\n", theContext()->getNode()); + counter++; + } + TestCol(TestCol&& other) + : iter(other.iter), + data1(std::move(other.data1)), + data2(std::move(other.data2)), + token(other.token) + { + // fmt::print("{} move ctor\n", theContext()->getNode()); + counter++; + } + TestCol(TestCol const& other) + : iter(other.iter), + data1(other.data1), + data2(other.data2), + token(other.token) + { + // fmt::print("{} copy ctor\n", theContext()->getNode()); + counter++; + } + + virtual ~TestCol() { + // fmt::print("{} destroying\n", theContext()->getNode()); + counter--; + } struct NullMsg : vt::CollectionMessage {}; @@ -194,11 +222,20 @@ TEST_F(TestCheckpoint, test_checkpoint_1) { // Restoration should be done now vt::theCollective()->barrier(); - runInEpoch([&]{ + runInEpochCollective([&]{ if (this_node == 0) { proxy.broadcast(); } }); + + runInEpochCollective([&]{ + if (this_node == 0) { + proxy.destroy(); + } + }); + + // Ensure that all elements were properly destroyed + EXPECT_EQ(counter, 0); } }