Skip to content

Commit

Permalink
Merge pull request #342 from DUNE-DAQ/mrigan/new_latency
Browse files Browse the repository at this point in the history
Trigger latencies + monitoring (v5)
  • Loading branch information
MRiganSUSX authored Oct 11, 2024
2 parents 016df91 + 45905a7 commit edf35cd
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 23 deletions.
100 changes: 100 additions & 0 deletions include/trigger/Latency.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* @file Latency.hpp
*
* This is part of the DUNE DAQ Application Framework, copyright 2021.
* Licensing/copyright details are in the COPYING file that you should have
* received with this code.
*/

#ifndef TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_
#define TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_

#include "utilities/TimestampEstimator.hpp"
#include "utilities/TimestampEstimatorSystem.hpp"

#include <atomic>
#include <chrono>
#include <iostream> // Include for std::ostream

namespace dunedaq {
namespace trigger {

class Latency {
using latency = uint64_t;

public:
// Enumeration for selecting time units
enum class TimeUnit { Microseconds = 1, Milliseconds = 2 };

// Constructor with optional time unit selection (defaults to Microseconds)
Latency(TimeUnit time_unit = TimeUnit::Microseconds)
: m_latency_in(0), m_latency_out(0), m_time_unit(time_unit) {
setup_conversion();
}

~Latency() {}

// Function to update latency_in
void update_latency_in(uint64_t latency) {
update_single_latency(latency, m_latency_in);
}

// Function to update latency_out
void update_latency_out(uint64_t latency) {
update_single_latency(latency, m_latency_out);
}

// Function to get the value of latency_in
latency get_latency_in() const {
return m_latency_in.load();
}

// Function to get the value of latency_out
latency get_latency_out() const {
return m_latency_out.load();
}

private:
// Set up conversion based on the selected time unit
void setup_conversion() {
if (m_time_unit == TimeUnit::Microseconds) {
m_clock_ticks_conversion = 16 * 1e-3; // Conversion for microseconds
m_get_current_time = []() {
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
};
} else {
m_clock_ticks_conversion = 16 * 1e-6; // Conversion for milliseconds
m_get_current_time = []() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
};
}
}

// Function to get the current system time based on the set time unit
uint64_t get_current_system_time() const {
return m_get_current_time();
}

// Single update function for both latencies
void update_single_latency(uint64_t latency, std::atomic<uint64_t>& latency_atomic) {
uint64_t current_time = get_current_system_time();
uint64_t latency_time = latency * m_clock_ticks_conversion;
uint64_t diff = (current_time >= latency_time) ? (current_time - latency_time) : 0;
latency_atomic.store(diff);
}

std::atomic<latency> m_latency_in; // Member variable to store latency_in
std::atomic<latency> m_latency_out; // Member variable to store latency_out
TimeUnit m_time_unit; // Member variable to store the selected time unit (ms or ns)
double m_clock_ticks_conversion; // Conversion factor from ticks to the selected time unit

// Lambda to get the current time
std::function<uint64_t()> m_get_current_time;
};

} // namespace trigger
} // namespace dunedaq

#endif // TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_
15 changes: 15 additions & 0 deletions plugins/CustomTCMaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ CustomTCMaker::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
// Currently precalculates events for the next 60 seconds
m_sorting_size_limit = 60 * m_conf->get_clock_frequency_hz();

m_latency_monitoring.store( m_conf->get_latency_monitoring() );
}

//void
Expand All @@ -106,6 +107,14 @@ CustomTCMaker::generate_opmon_data()
info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() );

this->publish(std::move(info));

if ( m_latency_monitoring.load() && m_running_flag.load() ) {
opmon::TriggerLatencyStandalone lat_info;

lat_info.set_latency_out( m_latency_instance.get_latency_out() );

this->publish(std::move(lat_info));
}
}

void
Expand All @@ -131,6 +140,11 @@ CustomTCMaker::do_start(const nlohmann::json& obj)
{
m_running_flag.store(true);

// OpMon.
m_tc_made_count.store(0);
m_tc_sent_count.store(0);
m_tc_failed_sent_count.store(0);

auto start_params = obj.get<rcif::cmd::StartParams>();

std::string timestamp_method = m_conf->get_timestamp_method();
Expand Down Expand Up @@ -241,6 +255,7 @@ CustomTCMaker::send_trigger_candidates()
TLOG_DEBUG(1) << get_name() << " at timestamp " << m_timestamp_estimator->get_timestamp_estimate()
<< ", pushing a candidate with timestamp " << candidate.time_candidate;

if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate );
try {
m_trigger_candidate_sink->send(std::move(candidate), std::chrono::milliseconds(10));
m_tc_sent_count++;
Expand Down
7 changes: 7 additions & 0 deletions plugins/CustomTCMaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "iomanager/Sender.hpp"
#include "utilities/TimestampEstimator.hpp"
#include "triggeralgs/TriggerCandidate.hpp"
#include "trigger/Latency.hpp"
#include "trigger/opmon/customtcmaker_info.pb.h"
#include "trigger/opmon/latency_info.pb.h"

#include <memory>
#include <random>
Expand Down Expand Up @@ -115,6 +117,11 @@ class CustomTCMaker : public dunedaq::appfwk::DAQModule
std::atomic<metric_counter_type> m_tc_sent_count{ 0 };
std::atomic<metric_counter_type> m_tc_failed_sent_count{ 0 };
void print_opmon_stats();

// Create an instance of the Latency class
std::atomic<bool> m_latency_monitoring{ false };
dunedaq::trigger::Latency m_latency_instance;
std::atomic<metric_counter_type> m_latency_out{ 0 };
};
} // namespace trigger
} // namespace dunedaq
Expand Down
26 changes: 26 additions & 0 deletions plugins/MLTModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ MLTModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
m_decision_output = get_iom_sender<dfmessages::TriggerDecision>(con->UID());
}

// Latency related
m_latency_monitoring.store( mtrg->get_configuration()->get_latency_monitoring() );

// Now do the configuration: dummy for now
m_configured_flag.store(true);
}
Expand Down Expand Up @@ -106,6 +109,21 @@ MLTModule::generate_opmon_data()
td_info.set_inhibited(counts.inhibited.exchange(0));
this->publish( std::move(td_info), {{"type", name}} );
}

// latency
if ( m_latency_monitoring.load() && m_running_flag.load() ) {
// TC in, TD out
opmon::TriggerLatency lat_info;
lat_info.set_latency_in( m_latency_instance.get_latency_in() );
lat_info.set_latency_out( m_latency_instance.get_latency_out() );
this->publish(std::move(lat_info));

// vs readout window requests
opmon::ModuleLevelTriggerRequestLatency lat_request_info;
lat_request_info.set_latency_window_start( m_latency_requests_instance.get_latency_in() );
lat_request_info.set_latency_window_end( m_latency_requests_instance.get_latency_out() );
this->publish(std::move(lat_request_info));
}
}

void
Expand Down Expand Up @@ -202,6 +220,7 @@ void
MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
{
m_td_msg_received_count++;
if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( decision.trigger_timestamp );

auto trigger_types = unpack_types(decision.trigger_type);
for ( const auto t : trigger_types ) {
Expand All @@ -221,6 +240,12 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
<< decision.trigger_timestamp << " start " << decision.components.front().window_begin << " end " << decision.components.front().window_end
<< " number of links " << decision.components.size();

// readout window latency update
if (m_latency_monitoring.load()) {
m_latency_requests_instance.update_latency_in( decision.components.front().window_begin );
m_latency_requests_instance.update_latency_out( decision.components.front().window_end );
}

try {
m_decision_output->send(std::move(decision), std::chrono::milliseconds(1));
m_td_sent_count++;
Expand Down Expand Up @@ -260,6 +285,7 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
}

}
if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( decision.trigger_timestamp );
m_td_total_count++;
}

Expand Down
11 changes: 11 additions & 0 deletions plugins/MLTModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include "trigger/Issues.hpp"
#include "trigger/LivetimeCounter.hpp"
#include "trigger/TokenManager.hpp"
#include "trigger/Latency.hpp"
#include "trigger/opmon/moduleleveltrigger_info.pb.h"
#include "trigger/opmon/latency_info.pb.h"

#include "appfwk/DAQModule.hpp"

Expand Down Expand Up @@ -251,6 +253,15 @@ class MLTModule : public dunedaq::appfwk::DAQModule
return m_trigger_counters[type];
}

// Create an instance of the Latency class
std::atomic<bool> m_latency_monitoring{ false };
dunedaq::trigger::Latency m_latency_instance;
dunedaq::trigger::Latency m_latency_requests_instance;
std::atomic<metric_counter_type> m_latency_in{ 0 };
std::atomic<metric_counter_type> m_latency_out{ 0 };
std::atomic<metric_counter_type> m_latency_window_start{ 0 };
std::atomic<metric_counter_type> m_latency_window_end{ 0 };

void print_opmon_stats();
};
} // namespace trigger
Expand Down
16 changes: 16 additions & 0 deletions plugins/RandomTCMakerModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ RandomTCMakerModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
m_time_sync_source = get_iom_receiver<dfmessages::TimeSync>(con->UID());
}
m_conf = mtrg->get_configuration();
m_latency_monitoring.store( m_conf->get_latency_monitoring() );
}

void
Expand All @@ -74,6 +75,14 @@ RandomTCMakerModule::generate_opmon_data()
info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() );

this->publish(std::move(info));

if ( m_latency_monitoring.load() && m_running_flag.load() ) {
opmon::TriggerLatencyStandalone lat_info;

lat_info.set_latency_out( m_latency_instance.get_latency_out() );

this->publish(std::move(lat_info));
}
}

void
Expand All @@ -89,6 +98,11 @@ RandomTCMakerModule::do_start(const nlohmann::json& obj)

m_running_flag.store(true);

// OpMon.
m_tc_made_count.store(0);
m_tc_sent_count.store(0);
m_tc_failed_sent_count.store(0);

std::string timestamp_method = m_conf->get_timestamp_method();
if (timestamp_method == "kTimeSync") {
TLOG_DEBUG(0) << "Creating TimestampEstimator";
Expand Down Expand Up @@ -208,11 +222,13 @@ RandomTCMakerModule::send_trigger_candidates()
}
next_trigger_timestamp = m_timestamp_estimator->get_timestamp_estimate();
triggeralgs::TriggerCandidate candidate = create_candidate(next_trigger_timestamp);

m_tc_made_count++;

TLOG_DEBUG(1) << get_name() << " at timestamp " << m_timestamp_estimator->get_timestamp_estimate()
<< ", pushing a candidate with timestamp " << candidate.time_candidate;

if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate );
try{
m_trigger_candidate_sink->send(std::move(candidate), std::chrono::milliseconds(10));
m_tc_sent_count++;
Expand Down
7 changes: 7 additions & 0 deletions plugins/RandomTCMakerModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "iomanager/Sender.hpp"
#include "utilities/TimestampEstimator.hpp"
#include "triggeralgs/TriggerCandidate.hpp"
#include "trigger/Latency.hpp"
#include "trigger/opmon/randomtcmaker_info.pb.h"
#include "trigger/opmon/latency_info.pb.h"

#include <memory>
#include <random>
Expand Down Expand Up @@ -101,6 +103,11 @@ class RandomTCMakerModule : public dunedaq::appfwk::DAQModule
std::atomic<metric_counter_type> m_tc_sent_count{ 0 };
std::atomic<metric_counter_type> m_tc_failed_sent_count{ 0 };
void print_opmon_stats();

// Create an instance of the Latency class
std::atomic<bool> m_latency_monitoring{ false };
dunedaq::trigger::Latency m_latency_instance;
std::atomic<metric_counter_type> m_latency_out{ 0 };
};
} // namespace trigger
} // namespace dunedaq
Expand Down
4 changes: 0 additions & 4 deletions plugins/TriggerDataHandlerModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ TriggerDataHandlerModule::create_readout(const appmodel::DataHandlerModule* modc
std::string raw_dt = modconf->get_module_configuration()->get_input_data_type();
TLOG() << "Choosing specializations for DataHandlingModel with data_type:" << raw_dt << ']';

TLOG() << "modconf: " << modconf;
TLOG() << modconf->class_name();
TLOG() << modconf->get_module_configuration();

// IF TriggerPrimitive (TP)
if (raw_dt.find("TriggerPrimitive") != std::string::npos) {
TLOG(TLVL_WORK_STEPS) << "Creating readout for TriggerPrimitive";
Expand Down
20 changes: 20 additions & 0 deletions schema/trigger/opmon/latency_info.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
syntax = "proto3";

package dunedaq.trigger.opmon;

// Message for latency variables
// Latency represents the difference between current system (clock) time and the data time of particular (TX) data object
// Units are us
// Used by many trigger modules
message TriggerLatency {
uint32 latency_in = 1;
uint32 latency_out = 2;
}

// Message for latency variables
// Latency represents the difference between current system (clock) time and the data time of particular (TX) data object
// Units are us
// Special case for Standalone makers
message TriggerLatencyStandalone {
uint32 latency_out = 1;
}
8 changes: 8 additions & 0 deletions schema/trigger/opmon/moduleleveltrigger_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@ message TriggerDecisionInfo {
uint32 paused = 4; // Number of paused (triggers are paused)
uint32 inhibited = 5; // Number of inhibited (DFO is busy)
}

// Message for MLT TD requests latency vars
// Latency represents the difference between current system (clock) time and the requested TD readout window (start/end)
// Units are currently us (but use an enum and can be changed)
message ModuleLevelTriggerRequestLatency {
uint32 latency_window_start = 1;
uint32 latency_window_end = 2;
}
Loading

0 comments on commit edf35cd

Please sign in to comment.