From 7ed32662bce0cb34d63f1a6e4ed489578cf2126d Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Mon, 4 Nov 2024 06:13:19 +0000 Subject: [PATCH] Perf[BMQ,MQB]: resource manager for per-thread resources Signed-off-by: Evgeny Malygin --- src/groups/bmq/bmqu/bmqu_resourcemanager.cpp | 25 +- src/groups/bmq/bmqu/bmqu_resourcemanager.h | 105 ++++-- .../bmq/bmqu/bmqu_resourcemanager.t.cpp | 302 +++++++++++++++++- src/groups/mqb/mqba/mqba_adminsession.cpp | 11 +- src/groups/mqb/mqba/mqba_adminsession.h | 2 - src/groups/mqb/mqba/mqba_application.cpp | 62 ++-- src/groups/mqb/mqba/mqba_application.h | 4 - src/groups/mqb/mqba/mqba_clientsession.cpp | 23 +- src/groups/mqb/mqba/mqba_clientsession.h | 4 - .../mqb/mqba/mqba_sessionnegotiator.cpp | 4 - src/groups/mqb/mqbblp/mqbblp_queuestate.cpp | 4 + src/groups/mqb/mqbblp/mqbblp_queuestate.h | 10 +- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 4 +- src/groups/mqb/mqbc/mqbc_clusterdata.cpp | 22 +- src/groups/mqb/mqbc/mqbc_clusterdata.h | 22 -- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 19 +- src/groups/mqb/mqbc/mqbc_storageutil.cpp | 5 +- src/groups/mqb/mqbi/mqbi_cluster.h | 66 +--- src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 2 +- .../mqb/mqbstat/mqbstat_statcontroller.cpp | 6 +- .../mqb/mqbstat/mqbstat_statcontroller.h | 1 - 21 files changed, 501 insertions(+), 202 deletions(-) diff --git a/src/groups/bmq/bmqu/bmqu_resourcemanager.cpp b/src/groups/bmq/bmqu/bmqu_resourcemanager.cpp index 7fe445011..3647de722 100644 --- a/src/groups/bmq/bmqu/bmqu_resourcemanager.cpp +++ b/src/groups/bmq/bmqu/bmqu_resourcemanager.cpp @@ -23,6 +23,22 @@ namespace bmqu { ResourceManager *ResourceManager::g_instance_p = 0; +ResourceManager::ThreadResources::~ThreadResources() +{ + for (size_t i = 0; i < d_resources.size(); i++) { + if (d_resources.at(i).numReferences() > 1) { + BALL_LOG_ERROR + << "Resource " << d_resources.at(i).numReferences() + << " references " + << "will not be freed, something holds onto this resource"; + } + } + + // Free resources in the reversed order of how they were registered + while (!d_resources.empty()) { + d_resources.pop_back(); + } +} ResourceManager::ResourceManager(bslma::Allocator* allocator) : d_allocator_p(allocator) @@ -40,11 +56,10 @@ ResourceManager::~ResourceManager() { << "will not be freed"; } } - for (size_t i = 0; i < d_resources.size(); i++) { - if (d_resources.at(i).numReferences() > 1) { - BALL_LOG_ERROR << "Resource " << d_resourceCreators.at(i).numReferences() << " references " - << "will not be freed, something holds onto this resource"; - } + + // Free resources in the reversed order of how they were registered + while (!d_resourceCreators.empty()) { + d_resourceCreators.pop_back(); } } diff --git a/src/groups/bmq/bmqu/bmqu_resourcemanager.h b/src/groups/bmq/bmqu/bmqu_resourcemanager.h index 6d0a0ec72..5022ed536 100644 --- a/src/groups/bmq/bmqu/bmqu_resourcemanager.h +++ b/src/groups/bmq/bmqu/bmqu_resourcemanager.h @@ -43,13 +43,14 @@ // TBD: // BDE -#include -#include -#include -#include -#include #include +#include +#include +#include +#include +#include #include +#include #include #include #include @@ -81,9 +82,26 @@ class ResourceManager { }; struct ThreadResources { - + // TRAITS + BSLMF_NESTED_TRAIT_DECLARATION(ThreadResources, + bslma::UsesBslmaAllocator) + + // DATA + bsl::vector > d_resources; + + explicit ThreadResources(bslma::Allocator* allocator) + : d_resources(allocator) + { + // NOTHING + } + + ~ThreadResources(); }; + typedef bsl::unordered_map > + ThreadIdToResourcesMap; + // CLASS DATA static ResourceManager *g_instance_p; @@ -93,22 +111,24 @@ class ResourceManager { bslmt::Mutex d_mutex; bsl::vector > d_resourceCreators; - bsl::vector > d_resources; + + ThreadIdToResourcesMap d_resources; private: // CREATORS explicit ResourceManager(bslma::Allocator* allocator = 0); - ~ResourceManager(); // PRIVATE CLASS METHODS static unsigned int nextTypeId(); public: - // TYPES + // TRAITS + BSLMF_NESTED_TRAIT_DECLARATION(ResourceManager, bslma::UsesBslmaAllocator) public: // CREATORS + ~ResourceManager(); // PUBLIC CLASS METHODS static void init(bslma::Allocator* allocator); @@ -124,16 +144,11 @@ class ResourceManager { size_t typeId = Traits::typeId(); - BALL_LOG_ERROR << typeId; - bslmt::LockGuard guard(&manager.d_mutex); if (manager.d_resourceCreators.size() <= typeId) { manager.d_resourceCreators.resize(typeId + 1); } - if (manager.d_resources.size() <= typeId) { - manager.d_resources.resize(typeId + 1); - } BSLS_ASSERT_OPT(!manager.d_resourceCreators.at(typeId)); typename Traits::factory_pointer_type creator_sp; @@ -150,21 +165,63 @@ class ResourceManager { typedef ResourceTraits Traits; - size_t typeId = Traits::typeId(); + const size_t typeId = Traits::typeId(); + bsls::Types::Uint64 threadId = bslmt::ThreadUtil::selfIdAsUint64(); + + ThreadResources* resources = NULL; + + { + bslmt::LockGuard guard(&manager.d_mutex); // LOCK + + ThreadIdToResourcesMap::iterator it = manager.d_resources.find( + threadId); + if (it == manager.d_resources.end()) { + bsl::pair res = + manager.d_resources.emplace( + threadId, + bsl::make_shared( + manager.d_allocator_p)); + resources = res.first->second.get(); + } + else { + resources = it->second.get(); + } + } + BSLS_ASSERT_SAFE(resources); + // Now we work with `resources` exclusively assigned to this thread. + // Can modify this object without synchronizations. - bslmt::LockGuard guard(&manager.d_mutex); - - if (manager.d_resources.at(typeId)) { - return bsl::reinterpret_pointer_cast(manager.d_resources.at(typeId)); + if (resources->d_resources.size() <= typeId) { + resources->d_resources.resize(typeId + 1); + } + + if (resources->d_resources.at(typeId)) { + return bsl::reinterpret_pointer_cast( + resources->d_resources.at(typeId)); // RETURN } - BSLS_ASSERT_OPT(manager.d_resourceCreators.at(typeId) && "Resource factory for the resource is not registered"); + typename Traits::factory_type* creator = NULL; + { + bslmt::LockGuard guard(&manager.d_mutex); // LOCK + + // Need to make this check under mutex, since `d_resourceCreators` + // might be modified at the same time. + BSLS_ASSERT_OPT( + manager.d_resourceCreators.at(typeId) && + "Resource factory for the resource is not registered"); + + creator = + bsl::reinterpret_pointer_cast( + manager.d_resourceCreators.at(typeId)) + .get(); + } + BSLS_ASSERT_SAFE(creator); - const typename Traits::factory_type &creator = - *bsl::reinterpret_pointer_cast(manager.d_resourceCreators.at(typeId)); + typename Traits::pointer_type resource = (*creator)( + manager.d_allocator_p); - typename Traits::pointer_type resource(creator(manager.d_allocator_p)); - manager.d_resources.at(typeId) = resource; + // Store as a `void` shared pointer in the common collection. + resources->d_resources.at(typeId) = resource; return resource; } diff --git a/src/groups/bmq/bmqu/bmqu_resourcemanager.t.cpp b/src/groups/bmq/bmqu/bmqu_resourcemanager.t.cpp index 9e5d347cf..bb5b2427b 100644 --- a/src/groups/bmq/bmqu/bmqu_resourcemanager.t.cpp +++ b/src/groups/bmq/bmqu/bmqu_resourcemanager.t.cpp @@ -16,8 +16,12 @@ // bmqu_resourcemanager.t.cpp -*-C++-*- #include +// BMQ +#include + // BDE #include +#include #include #include #include @@ -37,7 +41,60 @@ using namespace bsl; // ---------------------------------------------------------------------------- namespace { +static bool startsWith(bslstl::StringRef str, bslstl::StringRef substr) +{ + return str.find(substr, 0) == 0; +} + struct Tester { + static void compare(const bsl::vector& expected, + const bsl::vector& observed) + { + if (expected.size() != observed.size()) { + bmqu::MemOutStream reason(s_allocator_p); + reason << "expected and observed event vectors have different" + << " sizes: " << expected.size() + << " != " << observed.size(); + fail(reason.str(), expected, observed); + } + for (size_t i = 0; i < observed.size(); i++) { + if (!startsWith(observed.at(i), expected.at(i))) { + bmqu::MemOutStream reason(s_allocator_p); + reason << "observed event [" << i << ", '" << observed.at(i) + << "'] doesn't start " + << "with prefix '" << expected.at(i) << "'"; + fail(reason.str(), expected, observed); + } + } + } + + static void fail(const bsl::string& reason, + const bsl::vector& expected, + const bsl::vector& observed) + { + bmqu::MemOutStream errorMessage(s_allocator_p); + errorMessage << "Failed events check: " << reason << bsl::endl + << "\texpected: ["; + for (size_t i = 0; i < expected.size(); i++) { + if (i > 0) { + errorMessage << ", "; + } + errorMessage << "'" << expected.at(i) << "'"; + } + errorMessage << "]" << bsl::endl; + errorMessage << "\tobserved: ["; + for (size_t i = 0; i < observed.size(); i++) { + if (i > 0) { + errorMessage << ", "; + } + errorMessage << "'" << observed.at(i) << "'"; + } + errorMessage << "]" << bsl::endl; + + throw bsl::runtime_error(errorMessage.str()); + ASSERT_D(errorMessage.str(), false); + } + bslmt::Mutex d_mutex; bsl::vector d_events; @@ -67,6 +124,41 @@ struct Tester { bslmt::LockGuard guard(&tester->d_mutex); return bsl::vector(tester->d_events, s_allocator_p); } + + static void expectEmpty() + { + bsl::vector observedEvents = getEvents(); + bsl::vector expectedEvents(s_allocator_p); + compare(expectedEvents, observedEvents); + } + + static void expect(bslstl::StringRef expected1) + { + bsl::vector observedEvents = getEvents(); + bsl::vector expectedEvents(s_allocator_p); + expectedEvents.push_back(expected1); + compare(expectedEvents, observedEvents); + } + + static void expect(bslstl::StringRef expected1, + bslstl::StringRef expected2) + { + bsl::vector observedEvents = getEvents(); + bsl::vector expectedEvents(s_allocator_p); + expectedEvents.push_back(expected1); + expectedEvents.push_back(expected2); + compare(expectedEvents, observedEvents); + } + + static void clear() + { + Tester* tester = inst(); + + bslmt::LockGuard guard(&tester->d_mutex); + tester->d_events.clear(); + // allocator check happiness: + tester->d_events.shrink_to_fit(); + } }; struct ResourceA { @@ -74,11 +166,11 @@ struct ResourceA { ResourceA(bsls::Types::Uint64 index): d_index(index) { - Tester::logEvent("ResourceA" + bsl::to_string(d_index)); + Tester::logEvent("ResourceA: " + bsl::to_string(d_index)); } ~ResourceA() { - Tester::logEvent("~ResourceA" + bsl::to_string(d_index)); + Tester::logEvent("~ResourceA: " + bsl::to_string(d_index)); } }; @@ -90,30 +182,220 @@ static bsl::shared_ptr createResourceA(bslma::Allocator *allocator) { return res; } +struct ResourceB { + ResourceA& d_dependency; + + ResourceB(ResourceA& dependency) + : d_dependency(dependency) + { + Tester::logEvent("ResourceB: " + bsl::to_string(d_dependency.d_index)); + } + + ~ResourceB() + { + Tester::logEvent("~ResourceB: " + + bsl::to_string(d_dependency.d_index)); + } +}; + +static bsl::shared_ptr createResourceB(bslma::Allocator* allocator) +{ + bsl::shared_ptr dependency = + bmqu::ResourceManager::getResource(); + ASSERT(dependency); + + bsl::shared_ptr res; + res.createInplace(allocator, *dependency); + return res; +} + +void threadFunction_test3(bslmt::Barrier* barrier, + bsl::shared_ptr* res) +{ + barrier->wait(); + + static const size_t k_ITERS = 1000; + + for (size_t i = 0; i < k_ITERS; i++) { + bsl::shared_ptr sp = + bmqu::ResourceManager::getResource(); + if (i > 0) { + // Getting the same resource every time + ASSERT_EQ(sp.get(), res->get()); + } + *res = sp; + } +} + } // close unnamed namespace // ============================================================================ // TESTS // ---------------------------------------------------------------------------- -static void test1_breathingTest() +static void test1_oneResourceOneThread() { - bmqtst::TestHelper::printTestName("BREATHING TEST"); + bmqtst::TestHelper::printTestName("ONE RESOURCE ONE THREAD"); bmqu::ResourceManager::init(s_allocator_p); + // expect no resource creation events yet + Tester::expectEmpty(); bmqu::ResourceManager::registerResourceFactory(createResourceA); + // expect no resource creation events yet + Tester::expectEmpty(); + + bsl::shared_ptr res1 = + bmqu::ResourceManager::getResource(); + // expect only one resource creation event + Tester::expect("ResourceA"); + + bsl::shared_ptr res2 = + bmqu::ResourceManager::getResource(); + // expect the same resource + ASSERT_EQ(res1.get(), res2.get()); + // expect no events other than the initial resource creation + Tester::expect("ResourceA"); + + res1.reset(); + res2.reset(); + // expect no events other than the initial resource creation, and + // no destructor event yet despite all pointers gathered before reset + Tester::expect("ResourceA"); + + bsl::shared_ptr res3 = + bmqu::ResourceManager::getResource(); + // expect no events other than the initial resource creation, and + // no extra creation when we re-acquired the resource + Tester::expect("ResourceA"); + + res3.reset(); + bmqu::ResourceManager::deinit(); + // expect a new destructor event after we released all references to + // the resource and deinitialized the resource manager + Tester::expect("ResourceA", "~ResourceA"); + + // clean the events in Tester for simplicity + Tester::clear(); + // should be able to reinitialize the resource manager + bmqu::ResourceManager::init(s_allocator_p); + // expect no resource creation events yet + Tester::expectEmpty(); + + bmqu::ResourceManager::registerResourceFactory(createResourceA); + // expect no resource creation events yet + Tester::expectEmpty(); + + bsl::shared_ptr res4 = + bmqu::ResourceManager::getResource(); + // expect only one resource creation event + Tester::expect("ResourceA"); + + res4.clear(); + bmqu::ResourceManager::deinit(); + // expect a new destructor event after we released all references to + // the resource and deinitialized the resource manager + Tester::expect("ResourceA", "~ResourceA"); + + Tester::clear(); +} + +static void test2_manyResourcesOneThread() +{ + bmqtst::TestHelper::printTestName("MANY RESOURCES ONE THREAD"); + + bmqu::ResourceManager::init(s_allocator_p); + + bmqu::ResourceManager::registerResourceFactory(createResourceA); + bmqu::ResourceManager::registerResourceFactory(createResourceB); + // expect no resource creation events yet + Tester::expectEmpty(); + + bsl::shared_ptr res1 = + bmqu::ResourceManager::getResource(); + // expect ResourceA being built before ResourceB, since we called ResourceA + // from ResourceB factory just before building ResourceB + Tester::expect("ResourceA", "ResourceB"); + + // clean the events in Tester for simplicity + Tester::clear(); + + res1.clear(); + bmqu::ResourceManager::deinit(); + // expect destructors being called in the opposite order of how we register + // resource factories, so ResourceB will be released before ResourceA + Tester::expect("~ResourceB", "~ResourceA"); + + Tester::clear(); +} + +static void test3_oneResourceManyThreads() +{ + bmqtst::TestHelper::printTestName("ONE RESOURCE MANY THREADS"); + + static const size_t k_NUM_THREADS = 16; + + bmqu::ResourceManager::init(s_allocator_p); + + bmqu::ResourceManager::registerResourceFactory(createResourceA); + // expect no resource creation events yet + Tester::expectEmpty(); + + // Barrier to get each thread to start at the same time; `+1` for this + // (main) thread. + bslmt::Barrier barrier(k_NUM_THREADS + 1); + + bsl::vector > resources(k_NUM_THREADS, + s_allocator_p); { - bsl::shared_ptr res = bmqu::ResourceManager::getResource(); + bslmt::ThreadGroup threadGroup(s_allocator_p); + for (size_t i = 0; i < k_NUM_THREADS; ++i) { + int rc = threadGroup.addThread( + bdlf::BindUtil::bind(&threadFunction_test3, + &barrier, + &resources.at(i))); + ASSERT_EQ_D(i, rc, 0); + } + barrier.wait(); + threadGroup.joinAll(); + } + // threads are joined and threadGroup is gone + + for (size_t i = 0; i < resources.size(); i++) { + for (size_t j = i + 1; j < resources.size(); j++) { + // Each thread acquired resource exclusively built for it + ASSERT_NE(resources.at(i).get(), resources.at(j).get()); + } } + { + bsl::vector events = Tester::getEvents(); + ASSERT_EQ(events.size(), k_NUM_THREADS); + for (size_t i = 0; i < events.size(); i++) { + startsWith(events.at(i), "ResourceA"); + } + } + + // clean the events in Tester for simplicity + Tester::clear(); + + resources.clear(); + // expect no more events + Tester::expectEmpty(); + bmqu::ResourceManager::deinit(); - bsl::vector events = Tester::getEvents(); - for (const bsl::string &ev : events) { - bsl::cout << ev << bsl::endl; + // expect destructors for all resources allocated per each thread + { + bsl::vector events = Tester::getEvents(); + ASSERT_EQ(events.size(), k_NUM_THREADS); + for (size_t i = 0; i < events.size(); i++) { + startsWith(events.at(i), "~ResourceA"); + } } + + Tester::clear(); } // ============================================================================ @@ -126,7 +408,9 @@ int main(int argc, char* argv[]) switch (_testCase) { case 0: - case 1: test1_breathingTest(); break; + case 3: test3_oneResourceManyThreads(); break; + case 2: test2_manyResourcesOneThread(); break; + case 1: test1_oneResourceOneThread(); break; default: { cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; s_testStatus = -1; diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp index 2bf5caea2..7c20fcc03 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.cpp @@ -48,13 +48,13 @@ // // BMQ +#include #include #include #include - -#include #include #include +#include #include // BDE @@ -273,8 +273,6 @@ AdminSession::AdminSession( const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage, const bsl::string& sessionDescription, mqbi::Dispatcher* dispatcher, - AdminSessionState::BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bdlmt::EventScheduler* scheduler, const mqbnet::Session::AdminCommandEnqueueCb& adminCb, bslma::Allocator* allocator) @@ -284,8 +282,9 @@ AdminSession::AdminSession( , d_clientIdentity_p(extractClientIdentity(d_negotiationMessage)) , d_description(sessionDescription, allocator) , d_channel_sp(channel) -, d_state(blobSpPool, - bufferFactory, +, d_state(bmqu::ResourceManager::getResource() + .get(), + bmqu::ResourceManager::getResource().get(), bmqp::SchemaEventBuilderUtil::bestEncodingSupported( d_clientIdentity_p->features()), allocator) diff --git a/src/groups/mqb/mqba/mqba_adminsession.h b/src/groups/mqb/mqba/mqba_adminsession.h index 135ed3db0..f17409aac 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.h +++ b/src/groups/mqb/mqba/mqba_adminsession.h @@ -242,8 +242,6 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient { const bmqp_ctrlmsg::NegotiationMessage& negotiationMessage, const bsl::string& sessionDescription, mqbi::Dispatcher* dispatcher, - AdminSessionState::BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bdlmt::EventScheduler* scheduler, const mqbnet::Session::AdminCommandEnqueueCb& adminEnqueueCb, bslma::Allocator* allocator); diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index dee58e76e..082835242 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -48,6 +48,7 @@ #include #include #include +#include // BDE #include @@ -88,6 +89,37 @@ void createBlob(bdlbb::BlobBufferFactory* bufferFactory, new (arena) bdlbb::Blob(bufferFactory, allocator); } +typedef bdlcc::SharedObjectPool< + bdlbb::Blob, + bdlcc::ObjectPoolFunctors::DefaultCreator, + bdlcc::ObjectPoolFunctors::RemoveAll > + BlobSpPool; + +bsl::shared_ptr +createBufferFactory(bslma::Allocator* allocator) +{ + return bsl::shared_ptr( + new bdlbb::PooledBlobBufferFactory(k_BLOBBUFFER_SIZE, + bsls::BlockGrowth::BSLS_CONSTANT, + allocator), + allocator); +} + +bsl::shared_ptr createBlobSpPool(bslma::Allocator* allocator) +{ + bsl::shared_ptr bufferFactory_sp = + bmqu::ResourceManager::getResource(); + return bsl::shared_ptr( + new BlobSpPool( + bdlf::BindUtil::bind(&createBlob, + bufferFactory_sp.get(), + bdlf::PlaceHolders::_1, // arena + bdlf::PlaceHolders::_2), // allocator + k_BLOB_POOL_GROWTH_STRATEGY, + allocator), + allocator); +} + } // close unnamed namespace // ----------- @@ -149,16 +181,6 @@ Application::Application(bdlmt::EventScheduler* scheduler, 1, bsls::TimeInterval(120).totalMilliseconds(), allocator) -, d_bufferFactory(k_BLOBBUFFER_SIZE, - bsls::BlockGrowth::BSLS_CONSTANT, - d_allocators.get("BufferFactory")) - -, d_blobSpPool(bdlf::BindUtil::bind(&createBlob, - &d_bufferFactory, - bdlf::PlaceHolders::_1, // arena - bdlf::PlaceHolders::_2), // allocator - k_BLOB_POOL_GROWTH_STRATEGY, - d_allocators.get("BlobSpPool")) , d_pushElementsPool(sizeof(mqbblp::PushStream::Element), d_allocators.get("PushElementsPool")) , d_allocatorsStatContext_p(allocatorsStatContext) @@ -257,6 +279,12 @@ int Application::start(bsl::ostream& errorDescription) int rc = rc_SUCCESS; + bmqu::ResourceManager::init(d_allocators.get("ResourceManager")); + bmqu::ResourceManager::registerResourceFactory( + createBufferFactory); + bmqu::ResourceManager::registerResourceFactory( + createBlobSpPool); + // Start the PluginManager { d_pluginManager_mp.load(new (*d_allocator_p) @@ -269,10 +297,7 @@ int Application::start(bsl::ostream& errorDescription) } } - mqbi::ClusterResources resources(d_scheduler_p, - &d_bufferFactory, - &d_blobSpPool, - &d_pushElementsPool); + mqbi::ClusterResources resources(d_scheduler_p, &d_pushElementsPool); // Start the StatController d_statController_mp.load( @@ -284,7 +309,6 @@ int Application::start(bsl::ostream& errorDescription) bdlf::PlaceHolders::_3, // os false), // fromReroute d_pluginManager_mp.get(), - &d_bufferFactory, d_allocatorsStatContext_p, d_scheduler_p, d_allocators.get("StatController")), @@ -316,10 +340,8 @@ int Application::start(bsl::ostream& errorDescription) // Start the transport manager SessionNegotiator* sessionNegotiator = new (*d_allocator_p) - SessionNegotiator(&d_bufferFactory, - d_dispatcher_mp.get(), + SessionNegotiator(d_dispatcher_mp.get(), d_statController_mp->clientsStatContext(), - &d_blobSpPool, d_scheduler_p, d_allocators.get("SessionNegotiator")); @@ -337,7 +359,6 @@ int Application::start(bsl::ostream& errorDescription) d_transportManager_mp.load(new (*d_allocator_p) mqbnet::TransportManager( d_scheduler_p, - &d_bufferFactory, negotiatorMp, d_statController_mp.get(), d_allocators.get("TransportManager")), @@ -390,7 +411,6 @@ int Application::start(bsl::ostream& errorDescription) // Start the DomainManager d_domainManager_mp.load(new (*d_allocator_p) DomainManager( d_configProvider_mp.get(), - &d_bufferFactory, d_clusterCatalog_mp.get(), d_dispatcher_mp.get(), d_statController_mp->domainsStatContext(), @@ -530,6 +550,8 @@ void Application::stop() DESTROY_OBJ(d_statController_mp, "StatController"); DESTROY_OBJ(d_pluginManager_mp, "PluginManager"); + bmqu::ResourceManager::deinit(); + BALL_LOG_INFO << "BMQbrkr stopped"; #undef DESTROY_OBJ diff --git a/src/groups/mqb/mqba/mqba_application.h b/src/groups/mqb/mqba/mqba_application.h index 2e490e015..5adf1ef0b 100644 --- a/src/groups/mqb/mqba/mqba_application.h +++ b/src/groups/mqb/mqba/mqba_application.h @@ -131,10 +131,6 @@ class Application { // the calling thread is blocked ("deadlock"). // Note that rerouted commands never route again. - bdlbb::PooledBlobBufferFactory d_bufferFactory; - - BlobSpPool d_blobSpPool; - bdlma::ConcurrentPool d_pushElementsPool; bmqst::StatContext* d_allocatorsStatContext_p; diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 0d1d5674c..643a606f9 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -158,6 +158,7 @@ #include // BMQ +#include #include #include #include @@ -168,15 +169,14 @@ #include #include #include +#include #include #include #include - -#include -#include #include #include #include +#include #include // BDE @@ -295,8 +295,6 @@ void finalizeClosedHandle(bsl::string description, ClientSessionState::ClientSessionState( bslma::ManagedPtr& clientStatContext, - BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bmqp::EncodingType::Enum encodingType, bslma::Allocator* allocator) : d_allocator_p(allocator) @@ -304,11 +302,12 @@ ClientSessionState::ClientSessionState( , d_unackedMessageInfos(d_allocator_p) , d_dispatcherClientData() , d_statContext_mp(clientStatContext) -, d_bufferFactory_p(bufferFactory) -, d_blobSpPool_p(blobSpPool) -, d_schemaEventBuilder(bufferFactory, allocator, encodingType) -, d_pushBuilder(bufferFactory, allocator) -, d_ackBuilder(bufferFactory, allocator) +, d_bufferFactory_p( + bmqu::ResourceManager::getResource().get()) +, d_blobSpPool_p(bmqu::ResourceManager::getResource().get()) +, d_schemaEventBuilder(d_bufferFactory_p, allocator, encodingType) +, d_pushBuilder(d_bufferFactory_p, allocator) +, d_ackBuilder(d_bufferFactory_p, allocator) , d_throttledFailedAckMessages() , d_throttledFailedPutMessages() { @@ -2593,8 +2592,6 @@ ClientSession::ClientSession( mqbblp::ClusterCatalog* clusterCatalog, mqbi::DomainFactory* domainFactory, bslma::ManagedPtr& clientStatContext, - ClientSessionState::BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bdlmt::EventScheduler* scheduler, bslma::Allocator* allocator) : d_self(this) // use default allocator @@ -2606,8 +2603,6 @@ ClientSession::ClientSession( , d_description(sessionDescription, allocator) , d_channel_sp(channel) , d_state(clientStatContext, - blobSpPool, - bufferFactory, bmqp::SchemaEventBuilderUtil::bestEncodingSupported( d_clientIdentity_p->features()), allocator) diff --git a/src/groups/mqb/mqba/mqba_clientsession.h b/src/groups/mqb/mqba/mqba_clientsession.h index 47c9e7ada..f5dfdfba1 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.h +++ b/src/groups/mqb/mqba/mqba_clientsession.h @@ -238,8 +238,6 @@ struct ClientSessionState { /// specified `allocator`. ClientSessionState( bslma::ManagedPtr& clientStatContext, - BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bmqp::EncodingType::Enum encodingType, bslma::Allocator* allocator); }; @@ -655,8 +653,6 @@ class ClientSession : public mqbnet::Session, mqbblp::ClusterCatalog* clusterCatalog, mqbi::DomainFactory* domainFactory, bslma::ManagedPtr& clientStatContext, - ClientSessionState::BlobSpPool* blobSpPool, - bdlbb::BlobBufferFactory* bufferFactory, bdlmt::EventScheduler* scheduler, bslma::Allocator* allocator); diff --git a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp index f6a54203c..72bb2ef0c 100644 --- a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp +++ b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp @@ -735,8 +735,6 @@ void SessionNegotiator::createSession(bsl::ostream& errorDescription, negoMsg, description, d_dispatcher_p, - d_blobSpPool_p, - d_bufferFactory_p, d_scheduler_p, d_adminCb, d_allocator_p); @@ -758,8 +756,6 @@ void SessionNegotiator::createSession(bsl::ostream& errorDescription, d_clusterCatalog_p, d_domainFactory_p, statContext, - d_blobSpPool_p, - d_bufferFactory_p, d_scheduler_p, d_allocator_p); diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp index b37129837..808d2ea35 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.cpp @@ -31,6 +31,7 @@ #include #include +#include // BDE #include @@ -53,6 +54,9 @@ QueueState::QueueState(mqbi::Queue* queue, const mqbi::ClusterResources resources, bslma::Allocator* allocator) : d_queue_p(queue) +, d_bufferFactory_p( + bmqu::ResourceManager::getResource().get()) +, d_blobSpPool_p(bmqu::ResourceManager::getResource().get()) , d_uri(uri, allocator) , d_description(allocator) , d_id(id) diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.h b/src/groups/mqb/mqbblp/mqbblp_queuestate.h index 1d2468d4c..c641b231c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.h @@ -111,6 +111,10 @@ class QueueState { mqbi::Queue* d_queue_p; // The queue associated to this state. + bdlbb::BlobBufferFactory* d_bufferFactory_p; + + BlobSpPool* d_blobSpPool_p; + bmqt::Uri d_uri; // The URI of the queue associated to // this state. @@ -289,9 +293,7 @@ class QueueState { bmqp_ctrlmsg::StreamParameters* value, unsigned int subQueueId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) const; - bdlbb::BlobBufferFactory* blobBufferFactory() const; bdlmt::EventScheduler* scheduler() const; - mqbi::ClusterResources::BlobSpPool* blobSpPool() const; const bsl::optional& pushElementsPool() const; bdlmt::FixedThreadPool* miscWorkThreadPool() const; const bsl::string& description() const; @@ -511,7 +513,7 @@ inline Routers::QueueRoutingContext& QueueState::routingContext() // ACCESSORS inline bdlbb::BlobBufferFactory* QueueState::blobBufferFactory() const { - return d_resources.bufferFactory(); + return d_bufferFactory_p; } inline bdlmt::EventScheduler* QueueState::scheduler() const @@ -521,7 +523,7 @@ inline bdlmt::EventScheduler* QueueState::scheduler() const inline mqbi::ClusterResources::BlobSpPool* QueueState::blobSpPool() const { - return d_resources.blobSpPool(); + return d_blobSpPool_p; } inline const bsl::optional& diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index e319f48f3..ebbd540bc 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -986,7 +987,8 @@ StorageManager::StorageManager( , d_lowDiskspaceWarning(false) , d_unrecognizedDomainsLock() , d_unrecognizedDomains(allocator) -, d_blobSpPool_p(&clusterData->blobSpPool()) +, d_blobSpPool_p( + bmqu::ResourceManager::getResource().get()) , d_domainFactory_p(domainFactory) , d_dispatcher_p(dispatcher) , d_clusterConfig(clusterConfig) diff --git a/src/groups/mqb/mqbc/mqbc_clusterdata.cpp b/src/groups/mqb/mqbc/mqbc_clusterdata.cpp index 0b8c36956..dcc5677f4 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterdata.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterdata.cpp @@ -25,8 +25,8 @@ #include #include #include - #include +#include // BDE #include @@ -117,15 +117,17 @@ ClusterData::ClusterData( cluster->isRemote(), allocator)) , d_cluster_p(cluster) -, d_messageTransmitter(resources.bufferFactory(), - cluster, - transportManager, - allocator) -, d_requestManager(bmqp::EventType::e_CONTROL, - resources.bufferFactory(), - resources.scheduler(), - false, // lateResponseMode - allocator) +, d_messageTransmitter( + bmqu::ResourceManager::getResource().get(), + cluster, + transportManager, + allocator) +, d_requestManager( + bmqp::EventType::e_CONTROL, + bmqu::ResourceManager::getResource().get(), + resources.scheduler(), + false, // lateResponseMode + allocator) , d_multiRequestManager(&d_requestManager, allocator) , d_domainFactory_p(domainFactory) , d_transportManager_p(transportManager) diff --git a/src/groups/mqb/mqbc/mqbc_clusterdata.h b/src/groups/mqb/mqbc/mqbc_clusterdata.h index 6d7a07571..3ae93a4d7 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterdata.h +++ b/src/groups/mqb/mqbc/mqbc_clusterdata.h @@ -118,12 +118,6 @@ class ClusterData { // TYPES /// Pool of shared pointers to Blobs - typedef bdlcc::SharedObjectPool< - bdlbb::Blob, - bdlcc::ObjectPoolFunctors::DefaultCreator, - bdlcc::ObjectPoolFunctors::RemoveAll > - BlobSpPool; - typedef bdlcc::SharedObjectPool< bmqu::AtomicState, bdlcc::ObjectPoolFunctors::DefaultCreator, @@ -237,12 +231,6 @@ class ClusterData { /// Get a modifiable reference to this object's event scheduler. bdlmt::EventScheduler& scheduler(); - /// Get a modifiable reference to this object's buffer factory. - bdlbb::BlobBufferFactory& bufferFactory(); - - /// Get a modifiable reference to this object's blobSpPool. - BlobSpPool& blobSpPool(); - /// Get a modifiable reference to this object's dispatcherClientData. mqbi::DispatcherClientData& dispatcherClientData(); @@ -349,16 +337,6 @@ inline bdlmt::EventScheduler& ClusterData::scheduler() return *d_resources.scheduler(); } -inline bdlbb::BlobBufferFactory& ClusterData::bufferFactory() -{ - return *d_resources.bufferFactory(); -} - -inline ClusterData::BlobSpPool& ClusterData::blobSpPool() -{ - return *d_resources.blobSpPool(); -} - inline mqbi::DispatcherClientData& ClusterData::dispatcherClientData() { return d_dispatcherClientData; diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d472aa809..3bc221451 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -35,6 +35,7 @@ #include #include #include +#include // BDE #include @@ -3344,7 +3345,7 @@ StorageManager::StorageManager( , d_lowDiskspaceWarning(false) , d_unrecognizedDomainsLock() , d_unrecognizedDomains(allocator) -, d_blobSpPool_p(&clusterData->blobSpPool()) +, d_blobSpPool_p(bmqu::ResourceManager::getResource().get()) , d_domainFactory_p(domainFactory) , d_dispatcher_p(dispatcher) , d_cluster_p(cluster) @@ -3532,13 +3533,15 @@ int StorageManager::start(bsl::ostream& errorDescription) bslma::Allocator* recoveryManagerAllocator = d_allocators.get( "RecoveryManager"); - d_recoveryManager_mp.load(new (*recoveryManagerAllocator) RecoveryManager( - &d_clusterData_p->bufferFactory(), - d_clusterConfig, - *d_clusterData_p, - dsCfg, - recoveryManagerAllocator), - recoveryManagerAllocator); + d_recoveryManager_mp.load( + new (*recoveryManagerAllocator) RecoveryManager( + bmqu::ResourceManager::getResource() + .get(), + d_clusterConfig, + *d_clusterData_p, + dsCfg, + recoveryManagerAllocator), + recoveryManagerAllocator); rc = d_recoveryManager_mp->start(errorDescription); diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 3bae89492..af6fbe6e2 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include // BDE @@ -1171,7 +1172,9 @@ int StorageUtil::assignPartitionDispatcherThreads( int processorId = i % numProcessors; mqbs::DataStoreConfig dsCfg; dsCfg.setScheduler(&clusterData->scheduler()) - .setBufferFactory(&clusterData->bufferFactory()) + .setBufferFactory( + bmqu::ResourceManager::getResource() + .get()) .setPreallocate(config.preallocate()) .setPrefaultPages(config.prefaultPages()) .setLocation(config.location()) diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index fc0a44a85..b36e5cb82 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -462,15 +462,6 @@ class Cluster : public DispatcherClient { struct ClusterResources { // Resources to use for all queues in all clusters - public: - // TYPES - - /// Pool of shared pointers to Blobs - typedef bdlcc::SharedObjectPool< - bdlbb::Blob, - bdlcc::ObjectPoolFunctors::DefaultCreator, - bdlcc::ObjectPoolFunctors::RemoveAll > - BlobSpPool; private: // PRIVATE DATA @@ -478,27 +469,16 @@ struct ClusterResources { /// EventScheduler to use bdlmt::EventScheduler* d_scheduler_p; - /// Blob buffer factory to use - bdlbb::BlobBufferFactory* d_bufferFactory_p; - - /// Pool of shared pointers to blob to - /// use. - BlobSpPool* d_blobSpPool_p; - /// Pool of PushStream elements for Proxy/Replica QueueEngine. bsl::optional d_pushElementsPool; public: // CREATORS - explicit ClusterResources(bdlmt::EventScheduler* scheduler, - bdlbb::BlobBufferFactory* bufferFactory, - BlobSpPool* blobSpPool); + explicit ClusterResources(bdlmt::EventScheduler* scheduler); - explicit ClusterResources(bdlmt::EventScheduler* scheduler, - bdlbb::BlobBufferFactory* bufferFactory, - BlobSpPool* blobSpPool, - bdlma::ConcurrentPool* pushElementsPool); + explicit ClusterResources(bdlmt::EventScheduler* scheduler, + bdlma::ConcurrentPool* pushElementsPool); ClusterResources(const ClusterResources& copy); @@ -507,12 +487,6 @@ struct ClusterResources { /// Returns a pointer to the event scheduler bdlmt::EventScheduler* scheduler() const; - /// Returns a pointer to the blob buffer factory - bdlbb::BlobBufferFactory* bufferFactory() const; - - /// Returns a pointer to the shared blob objects pool - BlobSpPool* blobSpPool() const; - /// Returns a pointer to the concurrent pool for Push elements const bsl::optional& pushElementsPool() const; }; @@ -531,40 +505,25 @@ inline bool Cluster::isFSMWorkflow() const return false; } -inline ClusterResources::ClusterResources( - bdlmt::EventScheduler* scheduler, - bdlbb::BlobBufferFactory* bufferFactory, - BlobSpPool* blobSpPool) +inline ClusterResources::ClusterResources(bdlmt::EventScheduler* scheduler) : d_scheduler_p(scheduler) -, d_bufferFactory_p(bufferFactory) -, d_blobSpPool_p(blobSpPool) , d_pushElementsPool() { BSLS_ASSERT_SAFE(d_scheduler_p); - BSLS_ASSERT_SAFE(d_bufferFactory_p); - BSLS_ASSERT_SAFE(d_blobSpPool_p); } inline ClusterResources::ClusterResources( - bdlmt::EventScheduler* scheduler, - bdlbb::BlobBufferFactory* bufferFactory, - BlobSpPool* blobSpPool, - bdlma::ConcurrentPool* pushElementsPool) + bdlmt::EventScheduler* scheduler, + bdlma::ConcurrentPool* pushElementsPool) : d_scheduler_p(scheduler) -, d_bufferFactory_p(bufferFactory) -, d_blobSpPool_p(blobSpPool) , d_pushElementsPool(pushElementsPool) { BSLS_ASSERT_SAFE(d_scheduler_p); - BSLS_ASSERT_SAFE(d_bufferFactory_p); - BSLS_ASSERT_SAFE(d_blobSpPool_p); BSLS_ASSERT_SAFE(d_pushElementsPool); } inline ClusterResources::ClusterResources(const ClusterResources& copy) : d_scheduler_p(copy.d_scheduler_p) -, d_bufferFactory_p(copy.d_bufferFactory_p) -, d_blobSpPool_p(copy.d_blobSpPool_p) , d_pushElementsPool(copy.d_pushElementsPool) { // NOTHING @@ -576,19 +535,6 @@ inline bdlmt::EventScheduler* ClusterResources::scheduler() const } // EventScheduler to use -inline bdlbb::BlobBufferFactory* ClusterResources::bufferFactory() const -{ - return d_bufferFactory_p; -} -// Blob buffer factory to use - -inline ClusterResources::BlobSpPool* ClusterResources::blobSpPool() const -{ - return d_blobSpPool_p; -} -// Pool of shared pointers to blob to -// use. - inline const bsl::optional& ClusterResources::pushElementsPool() const { diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 0d57dc8d7..4f4f2ff88 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -239,7 +239,7 @@ Cluster::Cluster(bdlbb::BlobBufferFactory* bufferFactory, , d_isLeader(isLeader) , d_isRestoringState(false) , d_processor() -, d_resources(&d_scheduler, bufferFactory, &d_blobSpPool) +, d_resources(&d_scheduler) { // PRECONDITIONS if (isClusterMember) { diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp index 97677494b..ee56de6a1 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp @@ -31,6 +31,7 @@ #include #include +// BMQ #include #include #include @@ -39,6 +40,7 @@ #include #include #include +#include #include // BDE @@ -656,7 +658,6 @@ int StatController::validateConfig(bsl::ostream& errorDescription) const StatController::StatController(const CommandProcessorFn& commandProcessor, mqbplug::PluginManager* pluginManager, - bdlbb::BlobBufferFactory* bufferFactory, bmqst::StatContext* allocatorsStatContext, bdlmt::EventScheduler* eventScheduler, bslma::Allocator* allocator) @@ -670,7 +671,8 @@ StatController::StatController(const CommandProcessorFn& commandProcessor, , d_statContextChannelsRemote_mp(0) , d_systemStatMonitor_mp(0) , d_pluginManager_p(pluginManager) -, d_bufferFactory_p(bufferFactory) +, d_bufferFactory_p( + bmqu::ResourceManager::getResource().get()) , d_commandProcessorFn(bsl::allocator_arg, allocator, commandProcessor) , d_printer_mp(0) , d_jsonPrinter_mp(0) diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.h b/src/groups/mqb/mqbstat/mqbstat_statcontroller.h index ec0922b62..dd3b01e8a 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.h +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.h @@ -293,7 +293,6 @@ class StatController { /// `allocator` for memory allocation. StatController(const CommandProcessorFn& commandProcessor, mqbplug::PluginManager* pluginManager, - bdlbb::BlobBufferFactory* bufferFactory, bmqst::StatContext* allocatorsStatContext, bdlmt::EventScheduler* eventScheduler, bslma::Allocator* allocator);