Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf[BMQ,MQB]: stable GC history preventing allocations #498

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,13 @@
namespace BloombergLP {
namespace bmqc {

// -------------------------------------------
// struct OrderedHashMapWithHistory_ImpDetails
// -------------------------------------------

const int
OrderedHashMapWithHistory_ImpDetails::k_INSERT_GC_MESSAGES_BATCH_SIZE =
1000;

} // close package namespace
} // close enterprise namespace
60 changes: 55 additions & 5 deletions src/groups/bmq/bmqc/bmqc_orderedhashmapwithhistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@
namespace BloombergLP {
namespace bmqc {

// ===========================================
// struct OrderedHashMapWithHistory_ImpDetails
// ===========================================

/// PRIVATE CLASS.
// For use only by `bmqc::OrderedHashMapWithHistory` implementation.
struct OrderedHashMapWithHistory_ImpDetails {
// PRIVATE CLASS DATA
/// How many messages to GC when GC required in
/// `bmqc::OrderedHashMapWithHistory::insert`
static const int k_INSERT_GC_MESSAGES_BATCH_SIZE;
};

// ========================================
// class OrderedHashMapWithHistory_Iterator
// ========================================
Expand Down Expand Up @@ -217,7 +230,27 @@ class OrderedHashMapWithHistory {

size_t d_historySize; // how many historical (!d_isLive) items

gc_iterator d_gcIt; // where to start 'gc'
/// Whether this container has more elements to `gc`. This flag might be
/// set or unset during every `gc` call according to this container's
/// needs.
bool d_requireGC;

/// The `now` time of the last GC. We assume that the current actual time
/// is no less than this timestamp.
TimeType d_lastGCTime;

/// The iterator pointing to the element where garbage collection should
/// continue once `gc` is called. According to contract, this iterator
/// only goes forward. All the elements passed by this iterator are either
/// removed or marked for removal, depending on what happened first:
/// - If the element was not erased by the user before, but its timeout
/// happened in this container, it is marked for deletion in `gc` and
/// iterator goes forward. Next, it is the user's responsibility to call
/// `erase` on this element to fully remove it.
/// - If the user removes the element before its timeout happened, the
/// element becomes `not alive`, but still lives in the history.
/// Eventually `gc` reaches this element and fully removes it.
gc_iterator d_gcIt;

// PRIVATE CLASS METHODS
static const KEY& get_key(const bsl::pair<const KEY, VALUE>& value)
Expand Down Expand Up @@ -486,6 +519,8 @@ inline OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::
, d_first(d_impl.end())
, d_last(d_impl.end())
, d_historySize(0)
, d_requireGC(false)
, d_lastGCTime(0)
, d_gcIt(endGc())
{
// NOTHING
Expand Down Expand Up @@ -515,6 +550,8 @@ inline void OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::clear()

d_first = d_last = end();
d_gcIt = endGc();
d_requireGC = false;
d_lastGCTime = 0;
d_historySize = 0;
}

Expand Down Expand Up @@ -617,6 +654,12 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::insert(
const SOURCE_TYPE& value,
TimeType timePoint)
{
if (d_requireGC) {
gc(bsl::max(timePoint, d_lastGCTime),
OrderedHashMapWithHistory_ImpDetails::
k_INSERT_GC_MESSAGES_BATCH_SIZE);
}

bsl::pair<gc_iterator, bool> result = d_impl.insert(
Value(value, d_timeout ? timePoint + d_timeout : 0));
// No need to keep track of element's timePoint if the map is not
Expand Down Expand Up @@ -652,7 +695,9 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::gc(TimeType now,
// 'erase' can set the iterator back to erase item if its expiration time
// is sooner than the current one.

if (d_gcIt == endGc()) {
d_lastGCTime = now;
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(d_gcIt == endGc())) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
d_gcIt = beginGc();
}

Expand All @@ -664,23 +709,28 @@ OrderedHashMapWithHistory<KEY, VALUE, HASH, VALUE_TYPE>::gc(TimeType now,
}
gc_iterator it = d_gcIt++;
if (it->d_isLive) {
// This is an old item. It should be removed by 'erase'.
// No need to return to this item. No need to check time again.
// Indicate that it needs to be removed by setting its time to 0.
// This item was not erased by the user yet, but its timeout in
// this container happened. Mark it for deletion by setting
// `d_time` to 0, so the next time user calls `erase` on it, it
// will be fully removed.
it->d_time = 0;
}
else {
// This item was erased by the user before, and we can fully remove
// it right here.
d_impl.erase(it);
--d_historySize;
}
// Meaning, there is no need for 'd_gcIt' to step back. Only forward.

if (--batchSize == 0) {
// remember where we have stopped and resume from there next time
d_requireGC = true;
return true; // RETURN
}
}

d_requireGC = false;
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace BloombergLP {
namespace mqbblp {

namespace {
const int k_GC_MESSAGES_INTERVAL_SECONDS = 30;
const int k_GC_MESSAGES_INTERVAL_SECONDS = 5;

bsl::ostream& printRecoveryBanner(bsl::ostream& out,
const bsl::string& lastLineSuffix)
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace BloombergLP {
namespace mqbc {

namespace {
const int k_GC_MESSAGES_INTERVAL_SECONDS = 30;
const int k_GC_MESSAGES_INTERVAL_SECONDS = 5;

bool isPrimaryActive(const mqbi::StorageManager_PartitionInfo pinfo)
{
Expand Down
24 changes: 10 additions & 14 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7212,22 +7212,18 @@ void FileStore::flush()
return; // RETURN
}

const bool haveMore = gcExpiredMessages(bdlt::CurrentTime::utc());
const bool haveMoreHistory = gcHistory();
BSLA_MAYBE_UNUSED const bool haveMore = gcExpiredMessages(
bdlt::CurrentTime::utc());
BSLA_MAYBE_UNUSED const bool haveMoreHistory = gcHistory();

// This is either Idle or k_GC_MESSAGES_INTERVAL_SECONDS timeout.
// 'gcHistory' attempts to iterate all old items. If there are more of them
// than the batchSize (1000), it returns 'true'. In this case, re-enable
// flush client to call it again next Idle time.
// If it returns 'false', there is no immediate work. Wait for the
// next k_GC_MESSAGES_INTERVAL_SECONDS.

if (haveMore || haveMoreHistory) {
// Explicitly schedule 'flush()' instead of relying on idleness
dispatcher()->execute(bdlf::BindUtil::bind(&FileStore::flush, this),
this,
mqbi::DispatcherEventType::e_CALLBACK);
}
// We try to remove at most k_GC_MESSAGES_BATCH_SIZE items in history.
// If there are more items ready to remove, the container's state changes,
// so any additional `insert` operation to the container will cause
// additional GC, until all old items are removed.
// If we don't balance adding new elements to the history with GC history,
// we might lose a lot of time on allocations of new items to the history,
// as well as get OOM due to uncontrollable history size increase.
}

void FileStore::setReplicationFactor(int value)
Expand Down
Loading