Skip to content

Commit

Permalink
Refactoring App registration (#493)
Browse files Browse the repository at this point in the history
* Refactoring App registration

Signed-off-by: dorjesinpo <[email protected]>

* Cleaning

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Nov 11, 2024
1 parent 1e0bb7c commit 759da3c
Show file tree
Hide file tree
Showing 39 changed files with 532 additions and 899 deletions.
1 change: 1 addition & 0 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,7 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
oldCfgAppIds,
newCfgAppIds);

// TODO: This should be one call - one QueueUpdateAdvisory for all Apps
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
Expand Down
78 changes: 35 additions & 43 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,27 @@ void createQueueUriKey(bmqt::Uri* out,
}

void afterAppIdRegisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfos& appInfos)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
queue->queueEngine()->afterAppIdRegistered(appInfos);
}

void afterAppIdUnregisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfos& appInfos)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
queue->queueEngine()->afterAppIdUnregistered(appInfos);
}

void handleHolderDummy(const bsl::shared_ptr<mqbi::QueueHandle>& handle)
Expand Down Expand Up @@ -4375,10 +4373,6 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

if (!d_cluster_p->isCSLModeEnabled()) {
return; // RETURN
}

if (!uri.isValid()) {
// This is an appID update for the entire domain, instead of any
// individual queue. Nothing to do for the queue helper.
Expand All @@ -4394,53 +4388,51 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (d_cluster_p->isCSLModeEnabled()) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.

mqbi::Storage::AppInfos one(1, d_allocator_p);
one.emplace(*cit);

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
one,
addedAppIds,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
*cit),
queue);

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(
partitionId,
uri,
qiter->second->key(),
cit->second);
}
}
}

for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(partitionId,
uri,
qiter->second->key(),
cit->second);
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
queue,
*cit),
queue);
}
if (queue) {
// TODO: replace with one call
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
addedAppIds),
queue);

d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
queue,
removedAppIds),
queue);
}

bmqu::Printer<AppInfos> printer1(&addedAppIds);
Expand Down
71 changes: 0 additions & 71 deletions src/groups/mqb/mqbblp/mqbblp_domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,6 @@ void queueHolderDummy(const bsl::shared_ptr<mqbi::Queue>& queue)
BALL_LOG_INFO << "Deleted queue '" << queue->uri().canonical() << "'";
}

void afterAppIdRegisteredDispatched(mqbi::Queue* queue,
const bsl::string& appId)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
}

void afterAppIdUnregisteredDispatched(mqbi::Queue* queue,
const bsl::string& appId)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

// Note: Inputing nullKey here is okay since this routine will be removed
// when we switch to CSL workflow.
queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppInfo(appId, mqbu::StorageKey()));
}

/// Validates an application subscription.
bool validdateSubscriptionExpression(bsl::ostream& errorDescription,
const mqbconfm::Expression& expression,
Expand Down Expand Up @@ -463,51 +437,6 @@ int Domain::configure(bsl::ostream& errorDescription,
BSLS_ASSERT_OPT(oldConfig.has_value());
BSLS_ASSERT_OPT(d_config.has_value());

// In non-CSL mode, manually dispatch AppId registration callbacks.
if (!d_cluster_sp->isCSLModeEnabled() &&
d_config.value().mode().isFanoutValue()) {
// Compute list of added and removed App IDs.
bsl::unordered_set<bsl::string> oldCfgAppIds(
oldConfig.value().mode().fanout().appIDs().cbegin(),
oldConfig.value().mode().fanout().appIDs().cend(),
d_allocator_p);
bsl::unordered_set<bsl::string> newCfgAppIds(
d_config.value().mode().fanout().appIDs().cbegin(),
d_config.value().mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex);

// Invoke callbacks for each added and removed ID on each queue.
bsl::unordered_set<bsl::string>::const_iterator it =
addedIds.cbegin();
QueueMap::const_iterator qIt;
for (; it != addedIds.cend(); it++) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
qIt->second.get(),
*it),
qIt->second.get());
}
}
for (it = removedIds.cbegin(); it != removedIds.cend(); ++it) {
for (qIt = d_queues.cbegin(); qIt != d_queues.cend(); ++qIt) {
d_dispatcher_p->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
qIt->second.get(),
*it),
qIt->second.get());
}
}
}

// Notify the 'cluster' of the updated configuration, so it can write
// any needed update-advisories to the CSL.
d_cluster_sp->onDomainReconfigured(*this,
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_localqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ int LocalQueue::configure(bsl::ostream& errorDescription, bool isReconfigure)
d_allocator_p);
}

rc = d_queueEngine_mp->configure(errorDescription);
rc = d_queueEngine_mp->configure(errorDescription, isReconfigure);
if (rc != 0) {
return 10 * rc + rc_QUEUE_ENGINE_CFG_FAILURE; // RETURN
}
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbblp/mqbblp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ void Queue::convertToLocalDispatched()

d_state.setId(bmqp::QueueId::k_PRIMARY_QUEUE_ID);
createLocal();
rc = d_localQueue_mp->configure(errorDescription, true);
rc = d_localQueue_mp->configure(errorDescription, false);
if (rc != 0) {
BALL_LOG_ERROR
<< "#QUEUE_CONVERTION_FAILURE " << d_state.uri()
Expand Down Expand Up @@ -483,7 +483,6 @@ Queue::Queue(const bmqt::Uri& uri,
// storage.

d_state.setStorageManager(storageManager)
.setAppKeyGenerator(storageManager)
.setMiscWorkThreadPool(threadPool)
.setRoutingConfig(routingCfg)
.setMessageThrottleConfig(messageThrottleConfig);
Expand Down
34 changes: 12 additions & 22 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,30 +170,26 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value)
return *this;
}

void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key)
void QueueConsumptionMonitor::registerSubStream(const bsl::string& id)
{
// Should always be called from the queue thread, but will be invoked from
// the cluster thread once upon queue creation.

// PRECONDITIONS
BSLS_ASSERT_SAFE(key != mqbu::StorageKey::k_NULL_KEY ||
d_subStreamInfos.empty());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(mqbu::StorageKey::k_NULL_KEY) ==
d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(id) == d_subStreamInfos.end());

d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo()));
d_subStreamInfos.insert(bsl::make_pair(id, SubStreamInfo()));
}

void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key)
void QueueConsumptionMonitor::unregisterSubStream(const bsl::string& id)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

SubStreamInfoMapConstIter iter = d_subStreamInfos.find(key);
SubStreamInfoMapConstIter iter = d_subStreamInfos.find(id);
BSLS_ASSERT_SAFE(iter != d_subStreamInfos.end());
d_subStreamInfos.erase(iter);
}
Expand Down Expand Up @@ -231,7 +227,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
iter != last;
++iter) {
SubStreamInfo& info = iter->second;
const mqbu::StorageKey& appKey = iter->first;
const bsl::string& id = iter->first;
if (info.d_messageSent) {
// Queue is 'alive' because at least one message was sent
// since the last 'timer'.
Expand All @@ -241,7 +237,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)

if (info.d_state == State::e_IDLE) {
// object was in idle state
onTransitionToAlive(&info, appKey);
onTransitionToAlive(&info, id);
continue; // CONTINUE
}

Expand All @@ -253,7 +249,7 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
// No delivered messages in the last 'maxIdleTime'.

// Call callback to log alarm if there are undelivered messages.
const bool haveUndelivered = d_loggingCb(appKey,
const bool haveUndelivered = d_loggingCb(id,
info.d_state ==
State::e_ALIVE);

Expand All @@ -269,16 +265,15 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)
// so transition to alive.
if (info.d_state == State::e_IDLE) {
info.d_lastKnownGoodTimer = d_currentTimer;
onTransitionToAlive(&info, appKey);
onTransitionToAlive(&info, id);
}
}
}
}
}

void QueueConsumptionMonitor::onTransitionToAlive(
SubStreamInfo* subStreamInfo,
const mqbu::StorageKey& appKey)
void QueueConsumptionMonitor::onTransitionToAlive(SubStreamInfo* subStreamInfo,
const bsl::string& id)
{
// executed by the *QUEUE DISPATCHER* thread

Expand All @@ -291,12 +286,7 @@ void QueueConsumptionMonitor::onTransitionToAlive(
bdlma::LocalSequentialAllocator<2048> localAllocator(0);

bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator);
bsl::string appId;

if (!appKey.isNull() &&
d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) {
uriBuilder.setId(appId);
}
uriBuilder.setId(id);

bmqt::Uri uri(&localAllocator);
uriBuilder.uri(&uri);
Expand Down
Loading

0 comments on commit 759da3c

Please sign in to comment.