diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index bf411902a..30595ba0c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -64,6 +64,16 @@ namespace BloombergLP { namespace mqbblp { +namespace { + +/// The default timeout for scheduled PUT expiration clean-up event. +static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES = 5; +static const bsls::Types::Int64 k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS = + k_DEFAULT_PUT_EXPIRATION_TIMEOUT_MINUTES * + bdlt::TimeUnitRatio::k_NANOSECONDS_PER_MINUTE; + +} // close unnamed namespace + // ----------------- // class RemoteQueue // ----------------- @@ -366,7 +376,7 @@ void RemoteQueue::pushMessage( if (result != mqbi::StorageResult::e_SUCCESS) { if (d_throttledFailedPushMessages.requestPermission()) { - BALL_LOG_WARN << d_state_p->uri() + BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << " failed to store broadcast PUSH [" << msgGUID << "], result = " << result; } @@ -483,6 +493,18 @@ RemoteQueue::RemoteQueue(QueueState* state, os << '@' << d_state_p->uri().asString(); d_state_p->setDescription(os.str()); + if (deduplicationTimeMs <= 0) { + d_pendingPutsTimeoutNs = k_DEFAULT_PUT_EXPIRATION_TIMEOUT_NS; + BALL_LOG_WARN << "Remote queue [" << d_state_p->description() + << "]: cannot schedule PUT expiration timer with a " + << "non-positive timeout from config [" + << deduplicationTimeMs << " ms], use a default PUT " + << "expiration timeout for scheduler instead [" + << bmqu::PrintUtil::prettyTimeInterval( + d_pendingPutsTimeoutNs) + << "]"; + } + BALL_LOG_INFO << "Remote queue: " << d_state_p->uri() << " [id: " << d_state_p->id() << "]"; } @@ -676,10 +698,10 @@ void RemoteQueue::onHandleReleased( it != d_pendingConfirms.end();) { if (it->d_handle == handle.get()) { if (d_throttledFailedConfirmMessages.requestPermission()) { - BALL_LOG_WARN << "Dropping CONFIRM because downstream [" - << handle << "] is gone. [queue: '" - << d_state_p->description() << "', GUID: '" - << it->d_guid << "']"; + BALL_LOG_WARN << "[THROTTLED] Dropping CONFIRM because " + << "downstream [" << handle << "] is gone. " + << "[queue: '" << d_state_p->description() + << "', GUID: '" << it->d_guid << "']"; } it = d_pendingConfirms.erase(it); ++numProcessed; @@ -865,7 +887,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, if (d_throttledFailedPutMessages.requestPermission()) { BALL_LOG_WARN - << "#CLIENT_IMPROPER_BEHAVIOR " + << "[THROTTLED] #CLIENT_IMPROPER_BEHAVIOR " << "Failed PUT message for queue [" << d_state_p->uri() << "] from client [" << source->client()->description() << "]. Queue not opened in WRITE mode by the client."; @@ -1152,7 +1174,7 @@ void RemoteQueue::onAckMessageDispatched(const mqbi::DispatcherAckEvent& event) if (d_throttledFailedAckMessages.requestPermission()) { BALL_LOG_STREAM(severity) - << "Received ACK message [" << ackResult + << "[THROTTLED] Received ACK message [" << ackResult << ", queue: " << d_state_p->description() << "] for unknown guid: " << ackMessage.messageGUID(); } @@ -1313,7 +1335,7 @@ void RemoteQueue::expirePendingMessagesDispatched() if (numExpired) { if (d_throttledFailedPutMessages.requestPermission()) { - BALL_LOG_INFO << "[THROTTLED] " << d_state_p->uri() << ": expired " + BALL_LOG_WARN << "[THROTTLED] " << d_state_p->uri() << ": expired " << bmqu::PrintUtil::prettyNumber(numExpired) << " pending PUT messages (" << bmqu::PrintUtil::prettyNumber(numMessages -