Skip to content

Commit

Permalink
BRIN fix load/unload with locks, fixes #820
Browse files Browse the repository at this point in the history
  • Loading branch information
diegosalvi committed Feb 1, 2024
1 parent d4f7280 commit 805e051
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions herddb-core/src/main/java/herddb/index/brin/BlockRangeIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ void addValue(Key key, Val value, PutState<Key, Val> state) {
/* Eventual new block from split. It must added to PageReplacementPolicy only after lock release */
Block<Key, Val> newblock = null;
lock.lock();
Page.Metadata unload = null;
try {

Block<Key, Val> currentNext = this.next;
Expand All @@ -331,7 +332,7 @@ void addValue(Key key, Val value, PutState<Key, Val> state) {
return;
}

ensureBlockLoaded();
unload = ensureBlockLoadedWithoutUnload();

mergeAddValue(key, value, values);
size += index.evaluateEntrySize(key, value);
Expand All @@ -342,12 +343,15 @@ void addValue(Key key, Val value, PutState<Key, Val> state) {
}
} finally {
lock.unlock();
if (unload != null) {
unload.owner.unload(unload.pageId);
}
}

if (newblock != null) {
final Metadata unload = index.pageReplacementPolicy.add(newblock.page);
if (unload != null) {
unload.owner.unload(unload.pageId);
final Page.Metadata newBlockUnload = index.pageReplacementPolicy.add(newblock.page);
if (newBlockUnload != null) {
newBlockUnload.owner.unload(newBlockUnload.pageId);
}
}

Expand All @@ -359,7 +363,7 @@ void addValue(Key key, Val value, PutState<Key, Val> state) {
void delete(Key key, Val value, DeleteState<Key, Val> state) {

lock.lock();

Page.Metadata unload = null;
try {

final Block<Key, Val> currentNext = this.next;
Expand All @@ -377,7 +381,7 @@ void delete(Key key, Val value, DeleteState<Key, Val> state) {
* (the key list could be split between both nodes)
*/
if (currentNext == null || nextMinKeyCompare >= 0) {
ensureBlockLoaded();
unload = ensureBlockLoadedWithoutUnload();
List<Val> valuesForKey = values.get(key);
if (valuesForKey != null) {
boolean removed = valuesForKey.remove(value);
Expand Down Expand Up @@ -411,13 +415,17 @@ void delete(Key key, Val value, DeleteState<Key, Val> state) {

} finally {
lock.unlock();
if (unload != null) {
unload.owner.unload(unload.pageId);
}
}

}

void lookUpRange(Key firstKey, Key lastKey, LookupState<Key, Val> state) {

lock.lock();
Page.Metadata unload = null;

/*
* If we got here means that at some point this block had a min key compatible
Expand All @@ -440,7 +448,7 @@ void lookUpRange(Key firstKey, Key lastKey, LookupState<Key, Val> state) {
*/
if (currentNext == null || currentNext.key.compareMinKey(firstKey) >= 0) {
// index seek case
ensureBlockLoaded();
unload = ensureBlockLoadedWithoutUnload();
if (firstKey.equals(lastKey)) {
List<Val> seek = values.get(firstKey);
if (seek != null && !seek.isEmpty()) {
Expand All @@ -465,13 +473,13 @@ void lookUpRange(Key firstKey, Key lastKey, LookupState<Key, Val> state) {
*/
if (currentNext == null || currentNext.key.compareMinKey(firstKey) >= 0) {
// index seek case
ensureBlockLoaded();
unload = ensureBlockLoadedWithoutUnload();
values.tailMap(firstKey, true).forEach((k, seg) -> {
state.found.addAll(seg);
});
}
} else {
ensureBlockLoaded();
unload = ensureBlockLoadedWithoutUnload();
values.headMap(lastKey, true).forEach((k, seg) -> {
state.found.addAll(seg);
});
Expand All @@ -490,23 +498,34 @@ void lookUpRange(Key firstKey, Key lastKey, LookupState<Key, Val> state) {

} finally {
lock.unlock();
if (unload != null) {
unload.owner.unload(unload.pageId);
}
}
}

void ensureBlockLoaded() {
final Page.Metadata unload = ensureBlockLoadedWithoutUnload();
final Page.Metadata unload = ensureBlockLoadedWithoutUnload(true);
if (unload != null) {
unload.owner.unload(unload.pageId);
}
}

void ensureBlockLoadedLocally() {
ensureBlockLoadedWithoutUnload(false);
}

Page.Metadata ensureBlockLoadedWithoutUnload() {
return ensureBlockLoadedWithoutUnload(true);
}

private Page.Metadata ensureBlockLoadedWithoutUnload(boolean addPageToPageReplacementPolicy) {
if (!loaded) {
try {
values = new TreeMap<>();

/*
* Skip load and add if we already known that there isn't data. Add is a really
* Skip load and add if we already know that there isn't data. Add is a really
* little overhead but load should read from disk so we just skip such unuseful
* roundtrip
*/
Expand All @@ -520,15 +539,22 @@ Page.Metadata ensureBlockLoadedWithoutUnload() {

loaded = true;

/* Deferred page unload */
final Page.Metadata unload = index.pageReplacementPolicy.add(page);
return unload;
if (addPageToPageReplacementPolicy) {
/* Deferred page unload */
final Page.Metadata unload = index.pageReplacementPolicy.add(page);
return unload;
}

return null;

} catch (IOException err) {
throw new RuntimeException(err);
}
} else {
index.pageReplacementPolicy.pageHit(page);
/* Signal the page hit to replacement policy only if we were willing to load the page on it */
if (addPageToPageReplacementPolicy) {
index.pageReplacementPolicy.pageHit(page);
}
}

return null;
Expand Down Expand Up @@ -767,14 +793,15 @@ private BlockRangeIndexMetadata.BlockMetadata<K> merge(Block<K, V> first, List<B
*/
final ListIterator<Block<K, V>> iterator = merging.listIterator(merging.size());

Page.Metadata firstBlockUnload = null;
Page.Metadata unload = null;

first.lock.lock();
BlockRangeIndexMetadata.BlockMetadata<K> metadata;

try {

/* We need block data to attempt merge */
first.ensureBlockLoaded();
unload = first.ensureBlockLoadedWithoutUnload();

while (iterator.hasPrevious()) {

Expand All @@ -786,15 +813,8 @@ private BlockRangeIndexMetadata.BlockMetadata<K> merge(Block<K, V> first, List<B
/* Real merging is needed only if there is some data to merge, otherwise we just "delete" it */
if (other.size != 0) {

final Page.Metadata unload = other.ensureBlockLoadedWithoutUnload();
if (unload != null) {
if (first.page.owner == unload.owner) {
/* We defer page unloading if requested page is first block! */
firstBlockUnload = unload;
} else {
unload.owner.unload(unload.pageId);
}
}
/* Loading other block data on local thread memory, it will be discarded at merge end */
other.ensureBlockLoadedLocally();

/* Recover potetially overwritten data before merge. */
List<V> potentialOverwrite = first.values.get(other.key.minKey);
Expand Down Expand Up @@ -838,8 +858,8 @@ private BlockRangeIndexMetadata.BlockMetadata<K> merge(Block<K, V> first, List<B
/* Update next reference */
first.next = other.next;

/* Data not needed anymore */
other.unloadNoLock();
/* Data not needed anymore, no checkpoint needed for discarded blocks */
other.forcedUnload();

/* Remove the block from knowledge */
blocks.remove(other.key);
Expand All @@ -850,21 +870,21 @@ private BlockRangeIndexMetadata.BlockMetadata<K> merge(Block<K, V> first, List<B

}

metadata = first.checkpointNoLock();

} finally {
first.lock.unlock();

/* Deferred unload */
if (unload != null) {
unload.owner.unload(unload.pageId);
}
}

if (fineEnabled) {
LOG.fine("merged block " + first.pageId + " (" + first.key + ") has " + first.size + " byte size at checkpoint");
}

BlockRangeIndexMetadata.BlockMetadata<K> metadata = first.checkpointNoLock();

/* Deferred unload of first block! */
if (firstBlockUnload != null) {
firstBlockUnload.owner.unload(firstBlockUnload.pageId);
}

return metadata;

}
Expand Down

0 comments on commit 805e051

Please sign in to comment.