Skip to content

Commit

Permalink
Performance[mqbi::DispatcherEvent]: replace multiple inheritance with…
Browse files Browse the repository at this point in the history
… variant
  • Loading branch information
678098 committed Oct 8, 2024
1 parent 698d7be commit 2f56f75
Show file tree
Hide file tree
Showing 22 changed files with 1,593 additions and 1,117 deletions.
2 changes: 1 addition & 1 deletion src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ void AdminSession::onDispatcherEvent(const mqbi::DispatcherEvent& event)
switch (event.type()) {
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
&event.getAs<mqbi::DispatcherCallbackEvent>();

BSLS_ASSERT_SAFE(realEvent->callback());
realEvent->callback()(dispatcherClientData().processorHandle());
Expand Down
47 changes: 26 additions & 21 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2805,30 +2805,35 @@ void ClientSession::processEvent(
return; // RETURN
}

// Not a control or leader message, it's either a put or a confirm ..
mqbi::DispatcherEventType::Enum eventType;

if (event.isPutEvent()) {
eventType = mqbi::DispatcherEventType::e_PUT;
}
else if (event.isConfirmEvent()) {
eventType = mqbi::DispatcherEventType::e_CONFIRM;
}
else if (event.isRejectEvent()) {
eventType = mqbi::DispatcherEventType::e_REJECT;
}
else {
// Dispatch the event
switch (event.type()) {
case bmqp::EventType::e_PUT:
case bmqp::EventType::e_CONFIRM:
case bmqp::EventType::e_REJECT: {
break; // BREAK
}
default: {
BALL_LOG_ERROR << "#CLIENT_UNEXPECTED_EVENT " << description()
<< ": Unexpected event type: " << event;
return; // RETURN
}
}

// Dispatch the event
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_state.d_blobSpPool_p->getObject();
*blobSp = *(event.blob());
(*dispEvent).setType(eventType).setSource(this).setBlob(blobSp);

if (event.isPutEvent()) {
(*dispEvent).setSource(this).makePutEvent().setBlob(blobSp);
}
else if (event.isConfirmEvent()) {
(*dispEvent).setSource(this).makeConfirmEvent().setBlob(blobSp);
}
else if (event.isRejectEvent()) {
(*dispEvent).setSource(this).makeRejectEvent().setBlob(blobSp);
}

dispatcher()->dispatchEvent(dispEvent, this);
}
}
Expand Down Expand Up @@ -2996,23 +3001,23 @@ void ClientSession::onDispatcherEvent(const mqbi::DispatcherEvent& event)

switch (event.type()) {
case mqbi::DispatcherEventType::e_CONFIRM: {
onConfirmEvent(*(event.asConfirmEvent()));
onConfirmEvent(event.getAs<mqbi::DispatcherConfirmEvent>());
} break;
case mqbi::DispatcherEventType::e_REJECT: {
onRejectEvent(*(event.asRejectEvent()));
onRejectEvent(event.getAs<mqbi::DispatcherRejectEvent>());
} break;
case mqbi::DispatcherEventType::e_PUSH: {
onPushEvent(*(event.asPushEvent()));
onPushEvent(event.getAs<mqbi::DispatcherPushEvent>());
} break;
case mqbi::DispatcherEventType::e_PUT: {
onPutEvent(*(event.asPutEvent()));
onPutEvent(event.getAs<mqbi::DispatcherPutEvent>());
} break;
case mqbi::DispatcherEventType::e_ACK: {
onAckEvent(*(event.asAckEvent()));
onAckEvent(event.getAs<mqbi::DispatcherAckEvent>());
} break;
case mqbi::DispatcherEventType::e_CALLBACK: {
const mqbi::DispatcherCallbackEvent* realEvent =
event.asCallbackEvent();
&event.getAs<mqbi::DispatcherCallbackEvent>();

BSLS_ASSERT_SAFE(realEvent->callback());
flush(); // Flush any pending messages to guarantee ordering of events
Expand Down
29 changes: 15 additions & 14 deletions src/groups/mqb/mqba/mqba_clientsession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ class TestBench {
queueId);

mqbi::DispatcherEvent event(d_allocator_p);
event.setType(mqbi::DispatcherEventType::e_ACK)
event.makeAckEvent()
.setAckMessage(ackMessage);

dispatch(event);
Expand Down Expand Up @@ -864,13 +864,13 @@ class TestBench {
cat);

mqbi::DispatcherEvent event(d_allocator_p);
event.setType(mqbi::DispatcherEventType::e_PUT)
.setIsRelay(true) // Relay message
event
.setSource(&d_cs) // DispatcherClient *value
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(putHeader)
.setPartitionId(1) // d_state_p->partitionId()) // int value
.setBlob(eventBlob) // const bsl::shared_ptr<bdlbb::Blob>& value
.setCompressionAlgorithmType(cat);
.setPartitionId(1) // d_state_p->partitionId()) // int value
.setBlob(eventBlob); // const bsl::shared_ptr<bdlbb::Blob>& value

// Internal-ticket D167598037.
// Verify that PutMessageIterator does not change the input.
Expand Down Expand Up @@ -905,8 +905,8 @@ class TestBench {

mqbi::DispatcherEvent event(d_allocator_p);

event.setType(mqbi::DispatcherEventType::e_PUSH)
.setSource(&d_cs) // DispatcherClient *value
event.setSource(&d_cs) // DispatcherClient *value
.makePushEvent()
.setQueueId(queueId)
.setBlob(blob)
.setGuid(msgGUID)
Expand Down Expand Up @@ -1918,9 +1918,10 @@ static void test9_newStylePush()
blobSp.createInplace(s_allocator_p, &tb.d_bufferFactory, s_allocator_p);
*blobSp = peb.blob();

putEvent.setType(mqbi::DispatcherEventType::e_PUT)
.setIsRelay(true) // Relay message
putEvent
.setSource(&tb.d_cs) // DispatcherClient *value
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(putIt.header())
.setBlob(blobSp); // const bsl::shared_ptr<bdlbb::Blob>& value

Expand Down Expand Up @@ -2026,12 +2027,12 @@ static void test10_newStyleCompressedPush()
blobSp.createInplace(s_allocator_p, &tb.d_bufferFactory, s_allocator_p);
*blobSp = peb.blob();

putEvent.setType(mqbi::DispatcherEventType::e_PUT)
.setIsRelay(true) // Relay message
putEvent
.setSource(&tb.d_cs) // DispatcherClient *value
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(putIt.header())
.setBlob(blobSp) // const bsl::shared_ptr<bdlbb::Blob>& value
.setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_ZLIB);
.setBlob(blobSp); // const bsl::shared_ptr<bdlbb::Blob>& value

tb.dispatch(putEvent);

Expand Down
31 changes: 14 additions & 17 deletions src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ void Dispatcher_Executor::post(const bsl::function<void()>& f) const
mwcc::MultiQueueThreadPool<mqbi::DispatcherEvent>::Event* event =
d_processorPool_p->getUnmanagedEvent();

event->object()
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f));
event->object().makeDispatcherEvent().setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(f));

// submit the event
int rc = d_processorPool_p->enqueueEvent(event, d_processorHandle);
Expand Down Expand Up @@ -183,9 +182,9 @@ void Dispatcher_ClientExecutor::post(const bsl::function<void()>& f) const
processorPool()->getUnmanagedEvent();

event->object()
.setType(mqbi::DispatcherEventType::e_CALLBACK)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f))
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p));
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p))
.makeDispatcherEvent()
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f));

// submit the event
int rc = processorPool()->enqueueEvent(event, processorHandle());
Expand Down Expand Up @@ -363,7 +362,7 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
if (event->object().type() ==
mqbi::DispatcherEventType::e_DISPATCHER) {
const mqbi::DispatcherDispatcherEvent* realEvent =
event->object().asDispatcherEvent();
&event->object().getAs<mqbi::DispatcherDispatcherEvent>();

// We must flush now (and irrespective of a callback actually being
// set on the event) to ensure the flushList is empty before
Expand Down Expand Up @@ -403,7 +402,7 @@ void Dispatcher::queueEventCb(mqbi::DispatcherClientType::Enum type,
if (event->object().type() ==
mqbi::DispatcherEventType::e_DISPATCHER) {
const mqbi::DispatcherDispatcherEvent* realEvent =
event->object().asDispatcherEvent();
&event->object().getAs<mqbi::DispatcherDispatcherEvent>();

if (realEvent->finalizeCallback()) {
BALL_LOG_TRACE << "Calling finalizeCallback on queue "
Expand Down Expand Up @@ -595,13 +594,13 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client,
mqbi::DispatcherEvent* event =
&context.d_processorPool_mp->getUnmanagedEvent()->object();
(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setDestination(client) // not needed
.makeDispatcherEvent()
.setCallback(
bdlf::BindUtil::bind(&Dispatcher::onNewClient,
this,
type,
bdlf::PlaceHolders::_1)) // processor
.setDestination(client); // not needed
bdlf::PlaceHolders::_1)); // processor
context.d_processorPool_mp->enqueueEvent(event, processor);
return processor; // RETURN
} // break;
Expand Down Expand Up @@ -692,7 +691,7 @@ void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
if (processorPool[i] != 0) {
mqbi::DispatcherEvent* qEvent =
&processorPool[i]->getUnmanagedEvent()->object();
qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER)
qEvent->makeDispatcherEvent()
.setCallback(functor)
.setFinalizeCallback(doneCallback);
processorPool[i]->enqueueEventOnAllQueues(qEvent);
Expand All @@ -718,11 +717,9 @@ void Dispatcher::synchronize(mqbi::DispatcherClientType::Enum type,

bslmt::Semaphore semaphore;
mqbi::DispatcherEvent* event = getEvent(type);
(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(
bdlf::BindUtil::bind(static_cast<PostFn>(&bslmt::Semaphore::post),
&semaphore));
(*event).makeDispatcherEvent().setCallback(
bdlf::BindUtil::bind(static_cast<PostFn>(&bslmt::Semaphore::post),
&semaphore));
dispatchEvent(event, type, handle);
semaphore.wait();
}
Expand Down
16 changes: 10 additions & 6 deletions src/groups/mqb/mqba/mqba_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,14 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,
BSLS_ASSERT_SAFE(functor);

mqbi::DispatcherEvent* event = getEvent(client);

(*event).setType(type).setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
if (type == mqbi::DispatcherEventType::e_DISPATCHER) {
(*event).makeDispatcherEvent().setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
}
else {
(*event).makeCallbackEvent().setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
}

dispatchEvent(event, client);
}
Expand All @@ -584,9 +589,8 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,

mqbi::DispatcherEvent* event = getEvent(client.clientType());

(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(functor));
(*event).makeDispatcherEvent().setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));

dispatchEvent(event, client.clientType(), client.processorHandle());
}
Expand Down
Loading

0 comments on commit 2f56f75

Please sign in to comment.