Skip to content

Commit

Permalink
CBG-4323: fix for per shard memory based eviction (#7174)
Browse files Browse the repository at this point in the history
* CBG-4323: fix for per shard memory based eviction

* add min size, some test assertions and remove 10% buffer on shards

* fix test for CE

* update docs

* fix failing test on default collection

* address comments
  • Loading branch information
gregns1 authored and bbrks committed Oct 31, 2024
1 parent c1d99bd commit 25ab9d0
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 34 deletions.
73 changes: 42 additions & 31 deletions db/revision_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewShardedLRURevisionCache(revCacheOptions *RevisionCacheOptions, backingSt
revCacheOptions.MaxItemCount = uint32(perCacheCapacity)
var perCacheMemoryCapacity float32
if revCacheOptions.MaxBytes > 0 {
perCacheMemoryCapacity = 1.1 * float32(revCacheOptions.MaxBytes) / float32(revCacheOptions.ShardCount)
perCacheMemoryCapacity = float32(revCacheOptions.MaxBytes) / float32(revCacheOptions.ShardCount)
revCacheOptions.MaxBytes = int64(perCacheMemoryCapacity)
}

Expand Down Expand Up @@ -82,16 +82,17 @@ func (sc *ShardedLRURevisionCache) Remove(docID, revID string, collectionID uint

// An LRU cache of document revision bodies, together with their channel access.
type LRURevisionCache struct {
backingStores map[uint32]RevisionCacheBackingStore
cache map[IDAndRev]*list.Element
lruList *list.List
cacheHits *base.SgwIntStat
cacheMisses *base.SgwIntStat
cacheNumItems *base.SgwIntStat
lock sync.Mutex
capacity uint32
memoryCapacity int64
cacheMemoryBytes *base.SgwIntStat
backingStores map[uint32]RevisionCacheBackingStore
cache map[IDAndRev]*list.Element
lruList *list.List
cacheHits *base.SgwIntStat
cacheMisses *base.SgwIntStat
cacheNumItems *base.SgwIntStat
lock sync.Mutex
capacity uint32 // Max number of items capacity of LRURevisionCache
memoryCapacity int64 // Max memory capacity of LRURevisionCache
currMemoryUsage base.AtomicInt // count of number of bytes used currently in the LRURevisionCache
cacheMemoryBytesStat *base.SgwIntStat // stat for overall revision cache memory usage in bytes. When using sharded cache, will be shared by all shards.
}

// The cache payload data. Stored as the Value of a list Element.
Expand All @@ -114,15 +115,15 @@ type revCacheValue struct {
func NewLRURevisionCache(revCacheOptions *RevisionCacheOptions, backingStores map[uint32]RevisionCacheBackingStore, cacheHitStat *base.SgwIntStat, cacheMissStat *base.SgwIntStat, cacheNumItemsStat *base.SgwIntStat, revCacheMemoryStat *base.SgwIntStat) *LRURevisionCache {

return &LRURevisionCache{
cache: map[IDAndRev]*list.Element{},
lruList: list.New(),
capacity: revCacheOptions.MaxItemCount,
backingStores: backingStores,
cacheHits: cacheHitStat,
cacheMisses: cacheMissStat,
cacheNumItems: cacheNumItemsStat,
cacheMemoryBytes: revCacheMemoryStat,
memoryCapacity: revCacheOptions.MaxBytes,
cache: map[IDAndRev]*list.Element{},
lruList: list.New(),
capacity: revCacheOptions.MaxItemCount,
backingStores: backingStores,
cacheHits: cacheHitStat,
cacheMisses: cacheMissStat,
cacheNumItems: cacheNumItemsStat,
cacheMemoryBytesStat: revCacheMemoryStat,
memoryCapacity: revCacheOptions.MaxBytes,
}
}

Expand Down Expand Up @@ -151,7 +152,7 @@ func (rc *LRURevisionCache) UpdateDelta(ctx context.Context, docID, revID string
if value != nil {
outGoingBytes := value.updateDelta(toDelta)
if outGoingBytes != 0 {
rc.cacheMemoryBytes.Add(outGoingBytes)
rc.updateRevCacheMemoryUsage(outGoingBytes)
}
// check for memory based eviction
rc.revCacheMemoryBasedEviction()
Expand All @@ -169,7 +170,7 @@ func (rc *LRURevisionCache) getFromCache(ctx context.Context, docID, revID strin

if !statEvent && err == nil {
// cache miss so we had to load doc, increment memory count
rc.cacheMemoryBytes.Add(value.getItemBytes())
rc.updateRevCacheMemoryUsage(value.getItemBytes())
// check for memory based eviction
rc.revCacheMemoryBasedEviction()
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func (rc *LRURevisionCache) GetActive(ctx context.Context, docID string, collect

if !statEvent && err == nil {
// cache miss so we had to load doc, increment memory count
rc.cacheMemoryBytes.Add(value.getItemBytes())
rc.updateRevCacheMemoryUsage(value.getItemBytes())
// check for rev cache memory based eviction
rc.revCacheMemoryBasedEviction()
}
Expand Down Expand Up @@ -234,7 +235,7 @@ func (rc *LRURevisionCache) Put(ctx context.Context, docRev DocumentRevision, co
value := rc.getValue(docRev.DocID, docRev.RevID, collectionID, true)
// increment incoming bytes
docRev.CalculateBytes()
rc.cacheMemoryBytes.Add(docRev.MemoryBytes)
rc.updateRevCacheMemoryUsage(docRev.MemoryBytes)
value.store(docRev)
// check for rev cache memory based eviction
rc.revCacheMemoryBasedEviction()
Expand All @@ -250,7 +251,7 @@ func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision,
if elem := rc.cache[key]; elem != nil {
revItem := elem.Value.(*revCacheValue)
// decrement item bytes by the removed item
rc.cacheMemoryBytes.Add(-revItem.getItemBytes())
rc.updateRevCacheMemoryUsage(-revItem.getItemBytes())
rc.lruList.Remove(elem)
newItem = false
}
Expand All @@ -275,13 +276,13 @@ func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision,

docRev.CalculateBytes()
// add new item bytes to overall count
rc.cacheMemoryBytes.Add(docRev.MemoryBytes)
rc.updateRevCacheMemoryUsage(docRev.MemoryBytes)
value.store(docRev)

// check we aren't over memory capacity, if so perform eviction
numItemsRemoved = 0
if rc.memoryCapacity > 0 {
for rc.cacheMemoryBytes.Value() > rc.memoryCapacity {
for rc.currMemoryUsage.Value() > rc.memoryCapacity {
rc.purgeOldest_()
numItemsRemoved++
}
Expand Down Expand Up @@ -332,7 +333,7 @@ func (rc *LRURevisionCache) Remove(docID, revID string, collectionID uint32) {
rc.lruList.Remove(element)
// decrement the overall memory bytes count
revItem := element.Value.(*revCacheValue)
rc.cacheMemoryBytes.Add(-revItem.getItemBytes())
rc.updateRevCacheMemoryUsage(-revItem.getItemBytes())
delete(rc.cache, key)
rc.cacheNumItems.Add(-1)
}
Expand All @@ -352,7 +353,7 @@ func (rc *LRURevisionCache) purgeOldest_() {
value := rc.lruList.Remove(rc.lruList.Back()).(*revCacheValue)
delete(rc.cache, value.key)
// decrement memory overall size
rc.cacheMemoryBytes.Add(-value.getItemBytes())
rc.updateRevCacheMemoryUsage(-value.getItemBytes())
}

// Gets the body etc. out of a revCacheValue. If they aren't present already, the loader func
Expand Down Expand Up @@ -531,7 +532,7 @@ func (delta *RevisionDelta) CalculateDeltaBytes() {
// revCacheMemoryBasedEviction checks for rev cache eviction, if required calls performEviction which will acquire lock to evict
func (rc *LRURevisionCache) revCacheMemoryBasedEviction() {
// if memory capacity is not set, don't check for eviction this way
if rc.memoryCapacity > 0 && rc.cacheMemoryBytes.Value() > rc.memoryCapacity {
if rc.memoryCapacity > 0 && rc.currMemoryUsage.Value() > rc.memoryCapacity {
rc.performEviction()
}
}
Expand All @@ -541,9 +542,19 @@ func (rc *LRURevisionCache) performEviction() {
rc.lock.Lock()
defer rc.lock.Unlock()
var numItemsRemoved int64
for rc.cacheMemoryBytes.Value() > rc.memoryCapacity {
for rc.currMemoryUsage.Value() > rc.memoryCapacity {
rc.purgeOldest_()
numItemsRemoved++
}
rc.cacheNumItems.Add(-numItemsRemoved)
}

// updateRevCacheMemoryUsage atomically increases overall memory usage for cache and the actual rev cache objects usage
func (rc *LRURevisionCache) updateRevCacheMemoryUsage(bytesCount int64) {
// We need to keep track of the current LRURevisionCache memory usage AND the overall usage of the cache. We need
// overall memory usage for the stat added to show rev cache usage plus we need the current rev cache capacity of the
// LRURevisionCache object for sharding the rev cache. This way we can perform eviction on per shard basis much like
// we do with the number of items capacity eviction
rc.currMemoryUsage.Add(bytesCount)
rc.cacheMemoryBytesStat.Add(bytesCount)
}
106 changes: 105 additions & 1 deletion db/revision_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,16 @@ func TestImmediateRevCacheMemoryBasedEviction(t *testing.T) {
assert.Equal(t, int64(0), memoryBytesCounted.Value())
assert.Equal(t, int64(0), cacheNumItems.Value())

docRev, err := cache.Get(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta)
// assert we can still fetch this upsert doc
docRev, err := cache.Get(ctx, "doc2", "1-abc", testCollectionID, false)
require.NoError(t, err)
assert.Equal(t, "doc2", docRev.DocID)
assert.Equal(t, int64(102), docRev.MemoryBytes)
assert.NotNil(t, docRev.BodyBytes)
assert.Equal(t, int64(0), memoryBytesCounted.Value())
assert.Equal(t, int64(0), cacheNumItems.Value())

docRev, err = cache.Get(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta)
require.NoError(t, err)
assert.NotNil(t, docRev.BodyBytes)

Expand All @@ -657,6 +666,101 @@ func TestImmediateRevCacheMemoryBasedEviction(t *testing.T) {
assert.Equal(t, int64(0), cacheNumItems.Value())
}

// TestShardedMemoryEviction:
// - Test adding a doc to each shard in the test
// - Assert that each shard has individual count for memory usage as expected
// - Add new doc that will take over the shard memory capacity and assert that that eviction takes place and
// all stats are as expected
func TestShardedMemoryEviction(t *testing.T) {
dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxBytes: 160,
MaxItemCount: 10,
ShardCount: 2,
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
cacheStats := db.DbStats.Cache()

docBody := Body{
"channels": "_default",
}

// add doc that will be added to one shard
size, _ := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, docBody)
assert.Equal(t, int64(size), cacheStats.RevisionCacheTotalMemory.Value())
// grab this particular shard + assert that the shard memory usage is as expected
shardedCache := db.revisionCache.(*ShardedLRURevisionCache)
doc1Shard := shardedCache.getShard("doc1")
assert.Equal(t, int64(size), doc1Shard.currMemoryUsage.Value())

// add new doc in diff shard + assert that the shard memory usage is as expected
size, _ = createDocAndReturnSizeAndRev(t, ctx, "doc2", collection, docBody)
doc2Shard := shardedCache.getShard("doc2")
assert.Equal(t, int64(size), doc2Shard.currMemoryUsage.Value())
// overall mem usage should be combination oif the two added docs
assert.Equal(t, int64(size*2), cacheStats.RevisionCacheTotalMemory.Value())

// two docs should reside in cache at this time
assert.Equal(t, int64(2), cacheStats.RevisionCacheNumItems.Value())

docBody = Body{
"channels": "_default",
"some": "field",
}
// add new doc to trigger eviction and assert stats are as expected
newDocSize, _ := createDocAndReturnSizeAndRev(t, ctx, "doc3", collection, docBody)
doc3Shard := shardedCache.getShard("doc3")
assert.Equal(t, int64(newDocSize), doc3Shard.currMemoryUsage.Value())
assert.Equal(t, int64(2), cacheStats.RevisionCacheNumItems.Value())
assert.Equal(t, int64(size+newDocSize), cacheStats.RevisionCacheTotalMemory.Value())
}

// TestShardedMemoryEvictionWhenShardEmpty:
// - Test adding a doc to sharded revision cache that will immediately be evicted due to size
// - Assert that stats look as expected
func TestShardedMemoryEvictionWhenShardEmpty(t *testing.T) {
dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxBytes: 100,
MaxItemCount: 10,
ShardCount: 2,
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
cacheStats := db.DbStats.Cache()

docBody := Body{
"channels": "_default",
}

// add doc that will be added to one shard
rev, _, err := collection.Put(ctx, "doc1", docBody)
require.NoError(t, err)
shardedCache := db.revisionCache.(*ShardedLRURevisionCache)

// assert that doc was not added to cache as it's too large
doc1Shard := shardedCache.getShard("doc1")
assert.Equal(t, int64(0), doc1Shard.currMemoryUsage.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheNumItems.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheTotalMemory.Value())

// test we can still fetch this doc
docRev, err := collection.GetRev(ctx, "doc1", rev, false, nil)
require.NoError(t, err)
assert.Equal(t, "doc1", docRev.DocID)
assert.NotNil(t, docRev.BodyBytes)

// assert rev cache is still empty
assert.Equal(t, int64(0), doc1Shard.currMemoryUsage.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheNumItems.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheTotalMemory.Value())
}

func TestImmediateRevCacheItemBasedEviction(t *testing.T) {
cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}
backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID)
Expand Down
1 change: 1 addition & 0 deletions docs/api/components/schemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,7 @@ Database:
max_memory_count_mb:
description: |-
The maximum amount of memory the revision cache should take up in MB, setting to 0 will disable any eviction based on memory at rev cache.
There is a minimum value of 50 (50MB) for this config option.
When set this memory limit will work in in hand with revision cache size parameter. So you will potentially get eviction at revision cache both based off memory footprint and number of items in the cache.
**This is an Enterprise Edition feature only**
type: integer
Expand Down
36 changes: 34 additions & 2 deletions rest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3070,6 +3070,7 @@ func TestNotFoundOnInvalidDatabase(t *testing.T) {
}

func TestRevCacheMemoryLimitConfig(t *testing.T) {
base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll)
rt := NewRestTester(t, &RestTesterConfig{
CustomTestBucket: base.GetTestBucket(t),
PersistentConfig: true,
Expand All @@ -3088,7 +3089,7 @@ func TestRevCacheMemoryLimitConfig(t *testing.T) {
dbConfig.CacheConfig = &CacheConfig{}
dbConfig.CacheConfig.RevCacheConfig = &RevCacheConfig{
MaxItemCount: base.Uint32Ptr(100),
MaxMemoryCountMB: base.Uint32Ptr(4),
MaxMemoryCountMB: base.Uint32Ptr(51),
}
RequireStatus(t, rt.UpsertDbConfig("db1", dbConfig), http.StatusCreated)

Expand All @@ -3102,8 +3103,39 @@ func TestRevCacheMemoryLimitConfig(t *testing.T) {

assert.Equal(t, uint32(100), *dbConfig.CacheConfig.RevCacheConfig.MaxItemCount)
if base.IsEnterpriseEdition() {
assert.Equal(t, uint32(4), *dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
assert.Equal(t, uint32(51), *dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
} else {
assert.Nil(t, dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
}

dbConfig.CacheConfig = &CacheConfig{}
dbConfig.CacheConfig.RevCacheConfig = &RevCacheConfig{
MaxItemCount: base.Uint32Ptr(100),
MaxMemoryCountMB: base.Uint32Ptr(4),
}
resp = rt.UpsertDbConfig("db1", dbConfig)
if base.IsEnterpriseEdition() {
assertHTTPErrorReason(t, resp, http.StatusInternalServerError, "Internal error: maximum rev cache memory size cannot be lower than 50 MB")
} else {
// CE will roll back to no memory limit as it's an EE ony feature
RequireStatus(t, resp, http.StatusCreated)
}

// test turing off the memory based rev cache
dbConfig.CacheConfig = &CacheConfig{}
dbConfig.CacheConfig.RevCacheConfig = &RevCacheConfig{
MaxItemCount: base.Uint32Ptr(100),
MaxMemoryCountMB: base.Uint32Ptr(0),
}
RequireStatus(t, rt.UpsertDbConfig("db1", dbConfig), http.StatusCreated)

resp = rt.SendAdminRequest(http.MethodGet, "/db1/_config", "")
RequireStatus(t, resp, http.StatusOK)

// empty db config struct
dbConfig = DbConfig{}
require.NoError(t, json.Unmarshal(resp.BodyBytes(), &dbConfig))
assert.NotNil(t, dbConfig.CacheConfig)
assert.Equal(t, uint32(100), *dbConfig.CacheConfig.RevCacheConfig.MaxItemCount)
assert.Equal(t, uint32(0), *dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
}
4 changes: 4 additions & 0 deletions rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,10 @@ func dbcOptionsFromConfig(ctx context.Context, sc *ServerContext, config *DbConf
revCacheOptions.MaxItemCount = *config.CacheConfig.RevCacheConfig.MaxItemCount
}
if config.CacheConfig.RevCacheConfig.MaxMemoryCountMB != nil {
maxMemoryConfigValue := *config.CacheConfig.RevCacheConfig.MaxMemoryCountMB
if maxMemoryConfigValue != uint32(0) && maxMemoryConfigValue < uint32(50) {
return db.DatabaseContextOptions{}, fmt.Errorf("maximum rev cache memory size cannot be lower than 50 MB")
}
revCacheOptions.MaxBytes = int64(*config.CacheConfig.RevCacheConfig.MaxMemoryCountMB * 1024 * 1024) // Convert MB input to bytes
}
if config.CacheConfig.RevCacheConfig.ShardCount != nil {
Expand Down

0 comments on commit 25ab9d0

Please sign in to comment.