Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4323: fix for per shard memory based eviction #7174

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions db/revision_cache_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewShardedLRURevisionCache(revCacheOptions *RevisionCacheOptions, backingSt
revCacheOptions.MaxItemCount = uint32(perCacheCapacity)
var perCacheMemoryCapacity float32
if revCacheOptions.MaxBytes > 0 {
perCacheMemoryCapacity = 1.1 * float32(revCacheOptions.MaxBytes) / float32(revCacheOptions.ShardCount)
perCacheMemoryCapacity = float32(revCacheOptions.MaxBytes) / float32(revCacheOptions.ShardCount)
revCacheOptions.MaxBytes = int64(perCacheMemoryCapacity)
}

Expand Down Expand Up @@ -82,16 +82,17 @@ func (sc *ShardedLRURevisionCache) Remove(docID, revID string, collectionID uint

// An LRU cache of document revision bodies, together with their channel access.
type LRURevisionCache struct {
backingStores map[uint32]RevisionCacheBackingStore
cache map[IDAndRev]*list.Element
lruList *list.List
cacheHits *base.SgwIntStat
cacheMisses *base.SgwIntStat
cacheNumItems *base.SgwIntStat
lock sync.Mutex
capacity uint32
memoryCapacity int64
cacheMemoryBytes *base.SgwIntStat
backingStores map[uint32]RevisionCacheBackingStore
cache map[IDAndRev]*list.Element
lruList *list.List
cacheHits *base.SgwIntStat
cacheMisses *base.SgwIntStat
cacheNumItems *base.SgwIntStat
lock sync.Mutex
capacity uint32 // Max number of items capacity of LRURevisionCache
memoryCapacity int64 // Max memory capacity of LRURevisionCache
currMemoryUsage base.AtomicInt // count of number of bytes used currently in the LRURevisionCache
cacheMemoryBytesStat *base.SgwIntStat // stat for overall revision cache memory usage in bytes. When using sharded cache, will be shared by all shards.
}

// The cache payload data. Stored as the Value of a list Element.
Expand All @@ -114,15 +115,15 @@ type revCacheValue struct {
func NewLRURevisionCache(revCacheOptions *RevisionCacheOptions, backingStores map[uint32]RevisionCacheBackingStore, cacheHitStat *base.SgwIntStat, cacheMissStat *base.SgwIntStat, cacheNumItemsStat *base.SgwIntStat, revCacheMemoryStat *base.SgwIntStat) *LRURevisionCache {

return &LRURevisionCache{
cache: map[IDAndRev]*list.Element{},
lruList: list.New(),
capacity: revCacheOptions.MaxItemCount,
backingStores: backingStores,
cacheHits: cacheHitStat,
cacheMisses: cacheMissStat,
cacheNumItems: cacheNumItemsStat,
cacheMemoryBytes: revCacheMemoryStat,
memoryCapacity: revCacheOptions.MaxBytes,
cache: map[IDAndRev]*list.Element{},
lruList: list.New(),
capacity: revCacheOptions.MaxItemCount,
backingStores: backingStores,
cacheHits: cacheHitStat,
cacheMisses: cacheMissStat,
cacheNumItems: cacheNumItemsStat,
cacheMemoryBytesStat: revCacheMemoryStat,
memoryCapacity: revCacheOptions.MaxBytes,
}
}

Expand Down Expand Up @@ -151,7 +152,7 @@ func (rc *LRURevisionCache) UpdateDelta(ctx context.Context, docID, revID string
if value != nil {
outGoingBytes := value.updateDelta(toDelta)
if outGoingBytes != 0 {
rc.cacheMemoryBytes.Add(outGoingBytes)
rc.updateRevCacheMemoryUsage(outGoingBytes)
}
// check for memory based eviction
rc.revCacheMemoryBasedEviction()
Expand All @@ -169,7 +170,7 @@ func (rc *LRURevisionCache) getFromCache(ctx context.Context, docID, revID strin

if !statEvent && err == nil {
// cache miss so we had to load doc, increment memory count
rc.cacheMemoryBytes.Add(value.getItemBytes())
rc.updateRevCacheMemoryUsage(value.getItemBytes())
// check for memory based eviction
rc.revCacheMemoryBasedEviction()
}
Expand Down Expand Up @@ -205,7 +206,7 @@ func (rc *LRURevisionCache) GetActive(ctx context.Context, docID string, collect

if !statEvent && err == nil {
// cache miss so we had to load doc, increment memory count
rc.cacheMemoryBytes.Add(value.getItemBytes())
rc.updateRevCacheMemoryUsage(value.getItemBytes())
// check for rev cache memory based eviction
rc.revCacheMemoryBasedEviction()
}
Expand Down Expand Up @@ -234,7 +235,7 @@ func (rc *LRURevisionCache) Put(ctx context.Context, docRev DocumentRevision, co
value := rc.getValue(docRev.DocID, docRev.RevID, collectionID, true)
// increment incoming bytes
docRev.CalculateBytes()
rc.cacheMemoryBytes.Add(docRev.MemoryBytes)
rc.updateRevCacheMemoryUsage(docRev.MemoryBytes)
value.store(docRev)
// check for rev cache memory based eviction
rc.revCacheMemoryBasedEviction()
Expand All @@ -250,7 +251,7 @@ func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision,
if elem := rc.cache[key]; elem != nil {
revItem := elem.Value.(*revCacheValue)
// decrement item bytes by the removed item
rc.cacheMemoryBytes.Add(-revItem.getItemBytes())
rc.updateRevCacheMemoryUsage(-revItem.getItemBytes())
rc.lruList.Remove(elem)
newItem = false
}
Expand All @@ -275,13 +276,13 @@ func (rc *LRURevisionCache) Upsert(ctx context.Context, docRev DocumentRevision,

docRev.CalculateBytes()
// add new item bytes to overall count
rc.cacheMemoryBytes.Add(docRev.MemoryBytes)
rc.updateRevCacheMemoryUsage(docRev.MemoryBytes)
value.store(docRev)

// check we aren't over memory capacity, if so perform eviction
numItemsRemoved = 0
if rc.memoryCapacity > 0 {
for rc.cacheMemoryBytes.Value() > rc.memoryCapacity {
for rc.currMemoryUsage.Value() > rc.memoryCapacity {
rc.purgeOldest_()
numItemsRemoved++
}
Expand Down Expand Up @@ -332,7 +333,7 @@ func (rc *LRURevisionCache) Remove(docID, revID string, collectionID uint32) {
rc.lruList.Remove(element)
// decrement the overall memory bytes count
revItem := element.Value.(*revCacheValue)
rc.cacheMemoryBytes.Add(-revItem.getItemBytes())
rc.updateRevCacheMemoryUsage(-revItem.getItemBytes())
delete(rc.cache, key)
rc.cacheNumItems.Add(-1)
}
Expand All @@ -352,7 +353,7 @@ func (rc *LRURevisionCache) purgeOldest_() {
value := rc.lruList.Remove(rc.lruList.Back()).(*revCacheValue)
delete(rc.cache, value.key)
// decrement memory overall size
rc.cacheMemoryBytes.Add(-value.getItemBytes())
rc.updateRevCacheMemoryUsage(-value.getItemBytes())
}

// Gets the body etc. out of a revCacheValue. If they aren't present already, the loader func
Expand Down Expand Up @@ -531,7 +532,7 @@ func (delta *RevisionDelta) CalculateDeltaBytes() {
// revCacheMemoryBasedEviction checks for rev cache eviction, if required calls performEviction which will acquire lock to evict
func (rc *LRURevisionCache) revCacheMemoryBasedEviction() {
// if memory capacity is not set, don't check for eviction this way
if rc.memoryCapacity > 0 && rc.cacheMemoryBytes.Value() > rc.memoryCapacity {
if rc.memoryCapacity > 0 && rc.currMemoryUsage.Value() > rc.memoryCapacity {
rc.performEviction()
}
}
Expand All @@ -541,9 +542,19 @@ func (rc *LRURevisionCache) performEviction() {
rc.lock.Lock()
defer rc.lock.Unlock()
var numItemsRemoved int64
for rc.cacheMemoryBytes.Value() > rc.memoryCapacity {
for rc.currMemoryUsage.Value() > rc.memoryCapacity {
rc.purgeOldest_()
numItemsRemoved++
}
rc.cacheNumItems.Add(-numItemsRemoved)
}

// updateRevCacheMemoryUsage atomically increases overall memory usage for cache and the actual rev cache objects usage
func (rc *LRURevisionCache) updateRevCacheMemoryUsage(bytesCount int64) {
// We need to keep track of the current LRURevisionCache memory usage AND the overall usage of the cache. We need
// overall memory usage for the stat added to show rev cache usage plus we need the current rev cache capacity of the
// LRURevisionCache object for sharding the rev cache. This way we can perform eviction on per shard basis much like
// we do with the number of items capacity eviction
rc.currMemoryUsage.Add(bytesCount)
rc.cacheMemoryBytesStat.Add(bytesCount)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other scenarios like this (for the channel cache in particular), we compute aggregate stats at stat collection time (see UpdateCalculatedStats). The above approach may be fine for 3.2.1 (based on performance results), but filing a tracking ticket to use this approach going forward would be a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the approach I wanted to take, but it came with its complexities with fetching the current memory usage. Would require a new interface method on the rev cache which I thought at this time may not be wanted (given the time constraints). I can file a ticket to explore this in a future release.

}
106 changes: 105 additions & 1 deletion db/revision_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,16 @@ func TestImmediateRevCacheMemoryBasedEviction(t *testing.T) {
assert.Equal(t, int64(0), memoryBytesCounted.Value())
assert.Equal(t, int64(0), cacheNumItems.Value())

docRev, err := cache.Get(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta)
// assert we can still fetch this upsert doc
docRev, err := cache.Get(ctx, "doc2", "1-abc", testCollectionID, false)
require.NoError(t, err)
assert.Equal(t, "doc2", docRev.DocID)
assert.Equal(t, int64(102), docRev.MemoryBytes)
assert.NotNil(t, docRev.BodyBytes)
assert.Equal(t, int64(0), memoryBytesCounted.Value())
assert.Equal(t, int64(0), cacheNumItems.Value())

docRev, err = cache.Get(ctx, "doc1", "1-abc", testCollectionID, RevCacheOmitDelta)
require.NoError(t, err)
assert.NotNil(t, docRev.BodyBytes)

Expand All @@ -657,6 +666,101 @@ func TestImmediateRevCacheMemoryBasedEviction(t *testing.T) {
assert.Equal(t, int64(0), cacheNumItems.Value())
}

// TestShardedMemoryEviction:
// - Test adding a doc to each shard in the test
// - Assert that each shard has individual count for memory usage as expected
// - Add new doc that will take over the shard memory capacity and assert that that eviction takes place and
// all stats are as expected
func TestShardedMemoryEviction(t *testing.T) {
dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxBytes: 160,
MaxItemCount: 10,
ShardCount: 2,
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
cacheStats := db.DbStats.Cache()

docBody := Body{
"channels": "_default",
}

// add doc that will be added to one shard
size, _ := createDocAndReturnSizeAndRev(t, ctx, "doc1", collection, docBody)
assert.Equal(t, int64(size), cacheStats.RevisionCacheTotalMemory.Value())
// grab this particular shard + assert that the shard memory usage is as expected
shardedCache := db.revisionCache.(*ShardedLRURevisionCache)
doc1Shard := shardedCache.getShard("doc1")
assert.Equal(t, int64(size), doc1Shard.currMemoryUsage.Value())

// add new doc in diff shard + assert that the shard memory usage is as expected
size, _ = createDocAndReturnSizeAndRev(t, ctx, "doc2", collection, docBody)
doc2Shard := shardedCache.getShard("doc2")
assert.Equal(t, int64(size), doc2Shard.currMemoryUsage.Value())
// overall mem usage should be combination oif the two added docs
assert.Equal(t, int64(size*2), cacheStats.RevisionCacheTotalMemory.Value())

// two docs should reside in cache at this time
assert.Equal(t, int64(2), cacheStats.RevisionCacheNumItems.Value())

docBody = Body{
"channels": "_default",
"some": "field",
}
// add new doc to trigger eviction and assert stats are as expected
newDocSize, _ := createDocAndReturnSizeAndRev(t, ctx, "doc3", collection, docBody)
doc3Shard := shardedCache.getShard("doc3")
assert.Equal(t, int64(newDocSize), doc3Shard.currMemoryUsage.Value())
assert.Equal(t, int64(2), cacheStats.RevisionCacheNumItems.Value())
assert.Equal(t, int64(size+newDocSize), cacheStats.RevisionCacheTotalMemory.Value())
}

// TestShardedMemoryEvictionWhenShardEmpty:
// - Test adding a doc to sharded revision cache that will immediately be evicted due to size
// - Assert that stats look as expected
func TestShardedMemoryEvictionWhenShardEmpty(t *testing.T) {
dbcOptions := DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
MaxBytes: 100,
MaxItemCount: 10,
ShardCount: 2,
},
}
db, ctx := SetupTestDBWithOptions(t, dbcOptions)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
cacheStats := db.DbStats.Cache()

docBody := Body{
"channels": "_default",
}

// add doc that will be added to one shard
rev, _, err := collection.Put(ctx, "doc1", docBody)
require.NoError(t, err)
shardedCache := db.revisionCache.(*ShardedLRURevisionCache)

// assert that doc was not added to cache as it's too large
doc1Shard := shardedCache.getShard("doc1")
assert.Equal(t, int64(0), doc1Shard.currMemoryUsage.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheNumItems.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheTotalMemory.Value())

// test we can still fetch this doc
docRev, err := collection.GetRev(ctx, "doc1", rev, false, nil)
require.NoError(t, err)
assert.Equal(t, "doc1", docRev.DocID)
assert.NotNil(t, docRev.BodyBytes)

// assert rev cache is still empty
assert.Equal(t, int64(0), doc1Shard.currMemoryUsage.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheNumItems.Value())
assert.Equal(t, int64(0), cacheStats.RevisionCacheTotalMemory.Value())
}

func TestImmediateRevCacheItemBasedEviction(t *testing.T) {
cacheHitCounter, cacheMissCounter, getDocumentCounter, getRevisionCounter, cacheNumItems, memoryBytesCounted := base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}, base.SgwIntStat{}
backingStoreMap := CreateTestSingleBackingStoreMap(&testBackingStore{nil, &getDocumentCounter, &getRevisionCounter}, testCollectionID)
Expand Down
1 change: 1 addition & 0 deletions docs/api/components/schemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,7 @@ Database:
max_memory_count_mb:
description: |-
The maximum amount of memory the revision cache should take up in MB, setting to 0 will disable any eviction based on memory at rev cache.
There is a minimum value of 50 (50MB) for this config option.
When set this memory limit will work in in hand with revision cache size parameter. So you will potentially get eviction at revision cache both based off memory footprint and number of items in the cache.
**This is an Enterprise Edition feature only**
type: integer
Expand Down
36 changes: 34 additions & 2 deletions rest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3070,6 +3070,7 @@ func TestNotFoundOnInvalidDatabase(t *testing.T) {
}

func TestRevCacheMemoryLimitConfig(t *testing.T) {
base.SetUpTestLogging(t, base.LevelInfo, base.KeyAll)
rt := NewRestTester(t, &RestTesterConfig{
CustomTestBucket: base.GetTestBucket(t),
PersistentConfig: true,
Expand All @@ -3088,7 +3089,7 @@ func TestRevCacheMemoryLimitConfig(t *testing.T) {
dbConfig.CacheConfig = &CacheConfig{}
dbConfig.CacheConfig.RevCacheConfig = &RevCacheConfig{
MaxItemCount: base.Uint32Ptr(100),
MaxMemoryCountMB: base.Uint32Ptr(4),
MaxMemoryCountMB: base.Uint32Ptr(51),
}
RequireStatus(t, rt.UpsertDbConfig("db1", dbConfig), http.StatusCreated)

Expand All @@ -3102,8 +3103,39 @@ func TestRevCacheMemoryLimitConfig(t *testing.T) {

assert.Equal(t, uint32(100), *dbConfig.CacheConfig.RevCacheConfig.MaxItemCount)
if base.IsEnterpriseEdition() {
assert.Equal(t, uint32(4), *dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
assert.Equal(t, uint32(51), *dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
} else {
assert.Nil(t, dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
}

dbConfig.CacheConfig = &CacheConfig{}
dbConfig.CacheConfig.RevCacheConfig = &RevCacheConfig{
MaxItemCount: base.Uint32Ptr(100),
MaxMemoryCountMB: base.Uint32Ptr(4),
}
resp = rt.UpsertDbConfig("db1", dbConfig)
if base.IsEnterpriseEdition() {
assertHTTPErrorReason(t, resp, http.StatusInternalServerError, "Internal error: maximum rev cache memory size cannot be lower than 50 MB")
} else {
// CE will roll back to no memory limit as it's an EE ony feature
RequireStatus(t, resp, http.StatusCreated)
}

// test turing off the memory based rev cache
dbConfig.CacheConfig = &CacheConfig{}
dbConfig.CacheConfig.RevCacheConfig = &RevCacheConfig{
MaxItemCount: base.Uint32Ptr(100),
MaxMemoryCountMB: base.Uint32Ptr(0),
}
RequireStatus(t, rt.UpsertDbConfig("db1", dbConfig), http.StatusCreated)

resp = rt.SendAdminRequest(http.MethodGet, "/db1/_config", "")
RequireStatus(t, resp, http.StatusOK)

// empty db config struct
dbConfig = DbConfig{}
require.NoError(t, json.Unmarshal(resp.BodyBytes(), &dbConfig))
assert.NotNil(t, dbConfig.CacheConfig)
assert.Equal(t, uint32(100), *dbConfig.CacheConfig.RevCacheConfig.MaxItemCount)
assert.Equal(t, uint32(0), *dbConfig.CacheConfig.RevCacheConfig.MaxMemoryCountMB)
}
4 changes: 4 additions & 0 deletions rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,10 @@ func dbcOptionsFromConfig(ctx context.Context, sc *ServerContext, config *DbConf
revCacheOptions.MaxItemCount = *config.CacheConfig.RevCacheConfig.MaxItemCount
}
if config.CacheConfig.RevCacheConfig.MaxMemoryCountMB != nil {
maxMemoryConfigValue := *config.CacheConfig.RevCacheConfig.MaxMemoryCountMB
if maxMemoryConfigValue != uint32(0) && maxMemoryConfigValue < uint32(50) {
return db.DatabaseContextOptions{}, fmt.Errorf("maximum rev cache memory size cannot be lower than 50 MB")
}
revCacheOptions.MaxBytes = int64(*config.CacheConfig.RevCacheConfig.MaxMemoryCountMB * 1024 * 1024) // Convert MB input to bytes
}
if config.CacheConfig.RevCacheConfig.ShardCount != nil {
Expand Down
Loading