Skip to content

Commit

Permalink
Fix[mqbblp::RemoteQueue]: 0 deduplication timeout causes expired PUTs (
Browse files Browse the repository at this point in the history
…#494)


Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 authored Oct 31, 2024
1 parent 527fa18 commit 198843c
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@
namespace BloombergLP {
namespace mqbblp {

namespace {

/// The default timeout for scheduled PUT expiration clean-up event.
static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES = 5;
static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS =
k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES *
bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE;

} // close unnamed namespace

// -----------------
// class RemoteQueue
// -----------------
Expand Down Expand Up @@ -366,7 +376,7 @@ void RemoteQueue::pushMessage(

if (result != mqbi::StorageResult::e_SUCCESS) {
if (d_throttledFailedPushMessages.requestPermission()) {
BALL_LOG_WARN << d_state_p->uri()
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri()
<< " failed to store broadcast PUSH ["
<< msgGUID << "], result = " << result;
}
Expand Down Expand Up @@ -483,6 +493,18 @@ RemoteQueue::RemoteQueue(QueueState* state,
os << '@' << d_state_p->uri().asString();
d_state_p->setDescription(os.str());

if (deduplicationTimeMs <= 0) {
d_pendingPutsTimeoutNs = k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS;
BALL_LOG_WARN << "Remote queue [" << d_state_p->description()
<< "]: cannot schedule PUT expiration timer with a "
<< "non-positive timeout from config ["
<< deduplicationTimeMs << " ms], use a default PUT "
<< "expiration timeout for scheduler instead ["
<< bmqu::PrintUtil::prettyTimeInterval(
d_pendingPutsTimeoutNs)
<< "]";
}

BALL_LOG_INFO << "Remote queue: " << d_state_p->uri()
<< " [id: " << d_state_p->id() << "]";
}
Expand Down Expand Up @@ -676,10 +698,10 @@ void RemoteQueue::onHandleReleased(
it != d_pendingConfirms.end();) {
if (it->d_handle == handle.get()) {
if (d_throttledFailedConfirmMessages.requestPermission()) {
BALL_LOG_WARN << "Dropping CONFIRM because downstream ["
<< handle << "] is gone. [queue: '"
<< d_state_p->description() << "', GUID: '"
<< it->d_guid << "']";
BALL_LOG_WARN << "[THROTTLED] Dropping CONFIRM because "
<< "downstream [" << handle << "] is gone. "
<< "[queue: '" << d_state_p->description()
<< "', GUID: '" << it->d_guid << "']";
}
it = d_pendingConfirms.erase(it);
++numProcessed;
Expand Down Expand Up @@ -865,7 +887,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn,

if (d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_WARN
<< "#CLIENT_IMPROPER_BEHAVIOR "
<< "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR "
<< "Failed PUT message for queue [" << d_state_p->uri()
<< "] from client [" << source->client()->description()
<< "]. Queue not opened in WRITE mode by the client.";
Expand Down Expand Up @@ -1152,7 +1174,7 @@ void RemoteQueue::onAckMessageDispatched(const mqbi::DispatcherAckEvent& event)

if (d_throttledFailedAckMessages.requestPermission()) {
BALL_LOG_STREAM(severity)
<< "Received ACK message [" << ackResult
<< "[THROTTLED] Received ACK message [" << ackResult
<< ", queue: " << d_state_p->description()
<< "] for unknown guid: " << ackMessage.messageGUID();
}
Expand Down Expand Up @@ -1313,7 +1335,7 @@ void RemoteQueue::expirePendingMessagesDispatched()

if (numExpired) {
if (d_throttledFailedPutMessages.requestPermission()) {
BALL_LOG_INFO << "[THROTTLED] " << d_state_p->uri() << ": expired "
BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << ": expired "
<< bmqu::PrintUtil::prettyNumber(numExpired)
<< " pending PUT messages ("
<< bmqu::PrintUtil::prettyNumber(numMessages -
Expand Down

0 comments on commit 198843c

Please sign in to comment.