Skip to content

Commit

Permalink
Merge pull request #1036 from DARMA-tasking/1.0.0-beta.10-proposed-up…
Browse files Browse the repository at this point in the history
…date

Merge 1.0.0 beta.10 proposed update into 1.0.0
  • Loading branch information
lifflander authored Sep 9, 2020
2 parents 5c5dec1 + ddb7bbf commit a3c4058
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 57 deletions.
4 changes: 4 additions & 0 deletions src/vt/collective/collective_alg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,18 @@ static void broadcastConsensus(
"Planned collective does not exist within scope"
);
auto action = iter_seq->second.action_;
auto epoch = iter_seq->second.epoch_;

// Run the collective safely.
// The action is expected to use MPI calls; not VT calls.
{
VT_ALLOW_MPI_CALLS;
theMsg()->pushEpoch(epoch);
action();
theMsg()->popEpoch(epoch);
}

theTerm()->consume(epoch);
// Erase the tag that was actually executed
impl->planned_collective_.erase(iter_seq);

Expand Down
4 changes: 3 additions & 1 deletion src/vt/collective/collective_scope.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ namespace vt { namespace collective {
TagType CollectiveScope::mpiCollectiveAsync(ActionType action) {
auto impl = getScope();
auto tag = impl->next_seq_++;
auto epoch = theMsg()->getEpoch();

// Create a new collective action with the next tag
detail::ScopeImpl::CollectiveInfo info(tag, action);
detail::ScopeImpl::CollectiveInfo info(tag, action, epoch);
theTerm()->produce(epoch);

impl->planned_collective_.emplace(
std::piecewise_construct,
Expand Down
8 changes: 6 additions & 2 deletions src/vt/collective/collective_scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ struct ScopeImpl {

private:
struct CollectiveInfo {
CollectiveInfo(TagType in_seq, ActionType in_action)
: seq_(in_seq), action_(in_action)
CollectiveInfo(TagType in_seq, ActionType in_action, EpochType in_epoch)
: seq_(in_seq), action_(in_action), epoch_(in_epoch)
{ }

TagType seq_ = no_tag;
ActionType action_ = no_action;
EpochType epoch_ = no_epoch;
};

private:
Expand Down Expand Up @@ -120,6 +121,9 @@ struct CollectiveScope {
* are in use until \c isCollectiveDone returns \c true or \c
* waitCollective returns on the returned \c TagType
*
* The operation is counted as activity in the active termination
* detection epoch
*
* \param[in] action the action containing a closed set of MPI operations
*
* \return tag representing the operation set
Expand Down
25 changes: 15 additions & 10 deletions src/vt/event/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,21 @@ void AsyncEvent::initialize() {

EventType AsyncEvent::attachAction(EventType const& event, ActionType callable) {
auto const& this_node = theContext()->getNode();
auto const& event_id = createNormalEvent(this_node);
auto& holder = getEventHolder(event_id);

auto trigger = [=]{
callable();
};

auto const& event_state = testEventComplete(event);
auto const& this_event_owning_node = getOwningNode(event_id);

debug_print(
event, node,
"theEvent: event={}, newevent={}, state={}, "
"newevent_owning_node={}, this_node={}\n",
event, event_id, static_cast<int>(event_state), this_event_owning_node,
this_node
"event={}, state={}\n",
event, static_cast<int>(event_state)
);

EventType ret_event = no_event;

switch (event_state) {
case EventStateType::EventReady:
trigger();
Expand All @@ -98,9 +95,11 @@ EventType AsyncEvent::attachAction(EventType const& event, ActionType callable)
this->getEventHolder(event).attachAction(
trigger
);
holder.makeReadyTrigger();
break;
case EventStateType::EventRemote: {
auto const& event_id = createNormalEvent(this_node);
auto& holder = getEventHolder(event_id);

// attach event to new id
holder.attachAction(trigger);

Expand All @@ -118,13 +117,15 @@ EventType AsyncEvent::attachAction(EventType const& event, ActionType callable)
theMsg()->sendMsg<EventCheckFinishedMsg, checkEventFinished>(
owning_node, msg
);

ret_event = event_id;
}
break;
default:
vtAssert(0, "This should be unreachable");
break;
}
return event_id;
return ret_event;
}

/*static*/ void AsyncEvent::eventFinished(EventFinishedMsg* msg) {
Expand Down Expand Up @@ -240,7 +241,11 @@ EventType AsyncEvent::createParentEvent(NodeType const& node) {
}

void AsyncEvent::removeEventID(EventType const& event) {
lookup_container_.erase(event);
auto iter = lookup_container_.find(event);
if (iter != lookup_container_.end()) {
event_container_.erase(iter->second);
lookup_container_.erase(event);
}
}

AsyncEvent::EventHolderType& AsyncEvent::getEventHolder(EventType const& event) {
Expand Down
4 changes: 1 addition & 3 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ EventType ActiveMessenger::sendMsgBytesWithPut(
);
}

EventType new_event = theEvent()->createParentEvent(this_node_);

MsgSizeType new_msg_size = msg_size;

if (is_put && !is_put_packed) {
Expand Down Expand Up @@ -176,7 +174,7 @@ EventType ActiveMessenger::sendMsgBytesWithPut(

sendMsgBytes(dest, base, new_msg_size, send_tag);

return new_event;
return no_event;
}

EventType ActiveMessenger::sendMsgBytes(
Expand Down
6 changes: 3 additions & 3 deletions src/vt/pmpi/generate_mpi_wrappers.pl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
# It's a very rudimentary extractor that relies on definitions being on one line.
sub extract_defs {
my ($def_file) = @_;
use autodie;

open my $handle, '<', $def_file;
open my $handle, '<', $def_file
or die "$0: could not open definition file '$def_file': $!";
my @deflines = <$handle>;
close $handle;

Expand Down Expand Up @@ -78,7 +78,7 @@ sub should_guard_call {
}

open(my $out, '>', $output_file)
or die "Could not open file '$output_file': $!";
or die "$0: could not open output file '$output_file': $!";
select $out;

say <<PROLOGUE;
Expand Down
41 changes: 34 additions & 7 deletions src/vt/scheduler/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,10 @@ void Scheduler::runSchedulerWhile(std::function<bool()> cond) {
// as the parent context is "not idle". Likewise, no 'between scheduler'
// event is started.

vtAssert(
action_depth_ == 0 or not is_idle,
"Nested schedulers never expected from idle context"
);

triggerEvent(SchedulerEventType::BeginSchedulerLoop);

// When resuming a top-level scheduler, ensure to immediately enter
// an idle state if such applies.
// Ensure to immediately enter an idle state if such applies.
// The scheduler call ends idle as picking up work.
if (not is_idle and work_queue_.empty()) {
is_idle = true;
triggerEvent(SchedulerEventType::BeginIdle);
Expand Down Expand Up @@ -353,4 +348,36 @@ void runScheduler() {
theSched()->scheduler();
}

void runSchedulerThrough(EpochType epoch) {
// WARNING: This is to prevent global termination from spuriously
// thinking that the work done in this loop over the scheduler
// represents the entire work of the program, and thus leading to
// stuff being torn down
theTerm()->produce();
theSched()->runSchedulerWhile([=]{ return !theTerm()->isEpochTerminated(epoch); });
theTerm()->consume();
}

void runInEpochRooted(ActionType&& fn) {
theSched()->triggerEvent(sched::SchedulerEvent::PendingSchedulerLoop);

auto ep = theTerm()->makeEpochRooted();
theMsg()->pushEpoch(ep);
fn();
theMsg()->popEpoch(ep);
theTerm()->finishedEpoch(ep);
runSchedulerThrough(ep);
}

void runInEpochCollective(ActionType&& fn) {
theSched()->triggerEvent(sched::SchedulerEvent::PendingSchedulerLoop);

auto ep = theTerm()->makeEpochCollective();
theMsg()->pushEpoch(ep);
fn();
theMsg()->popEpoch(ep);
theTerm()->finishedEpoch(ep);
runSchedulerThrough(ep);
}

} //end namespace vt
6 changes: 6 additions & 0 deletions src/vt/vrt/collection/balance/elm_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,10 @@ ElementStats::SubphaseType ElementStats::getFocusedSubPhase(VirtualProxyType col

/*static*/ std::unordered_map<VirtualProxyType,ElementStats::SubphaseType> ElementStats::focused_subphase_;

void ElementStats::clear() {
phase_timings_.clear();
comm_.clear();
subphase_timings_.clear();
}

}}}} /* end namespace vt::vrt::collection::balance */
2 changes: 2 additions & 0 deletions src/vt/vrt/collection/balance/elm_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ struct ElementStats {
template <typename Serializer>
void serialize(Serializer& s);

void clear();

public:
template <typename ColT>
static void syncNextPhase(PhaseMsg<ColT>* msg, ColT* col);
Expand Down
2 changes: 2 additions & 0 deletions src/vt/vrt/collection/balance/elm_stats.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ template <typename ColT>

ProcStats::addProcStats(col, cur_phase, total_load, subphase_loads, comm);

col->getStats().clear();

auto const before_ready = theCollection()->numReadyCollections();
theCollection()->makeCollectionReady(untyped_proxy);
auto const after_ready = theCollection()->numReadyCollections();
Expand Down
3 changes: 2 additions & 1 deletion src/vt/vrt/collection/balance/proc_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ ProcStats::getProcSubphaseLoad(PhaseType phase) {
ProcStats::proc_migrate_.clear();
ProcStats::proc_temp_to_perm_.clear();
ProcStats::proc_perm_to_temp_.clear();
next_elm_ = 1;
ProcStats::proc_subphase_data_.clear();
}

/*static*/ void ProcStats::startIterCleanup() {
Expand All @@ -119,6 +119,7 @@ ProcStats::getProcSubphaseLoad(PhaseType phase) {
ProcStats::proc_migrate_.clear();
ProcStats::proc_temp_to_perm_.clear();
ProcStats::proc_perm_to_temp_.clear();
clearStats();
}

/*static*/ ElementIDType ProcStats::getNextElm() {
Expand Down
13 changes: 13 additions & 0 deletions src/vt/vrt/collection/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,19 @@ struct CollectionManager {
VirtualProxyType proxy, typename ColT::IndexType idx, Args&&... args
);

/**
* \internal \brief Insert into a collection on this node with a pointer to
* the collection element to insert
*
* \param[in] proxy the collection proxy
* \param[in] idx the index to insert
* \param[in] ptr unique ptr to insert for the collection
*/
template <typename ColT>
void staticInsertColPtr(
VirtualProxyType proxy, typename ColT::IndexType idx,
std::unique_ptr<ColT> ptr
);

public:
template <typename ColT>
Expand Down
Loading

0 comments on commit a3c4058

Please sign in to comment.