Skip to content

Commit

Permalink
changefeedccl: limit in-memory rowfetcher cache
Browse files Browse the repository at this point in the history
We generate a row.Fetcher for each new version of a watched
table, and cache it in-memory. This is technically an unbounded
memory usage and thus could cause an OOM, although in practice
you won't hit that unless you are Schema Changes Georg, the
hypothetical outlier who makes thousands of schema changes a second.
To support Schema Changes Georg, this PR bounds the size of the
cache and evicts the oldest record. There's smarter stuff we could
do (per an existing TODO) to eagerly evict records using resolved
timestamps, but that's an optimization only a small subset of
hypothetical Georgs would care about (those running single
changefeeds on many tables, all with many rapid schema changes)
so I doubt we'll want to bother.

Release note: None
  • Loading branch information
HonoreDB committed Dec 15, 2021
1 parent edc8711 commit 08542b2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
"//pkg/util/ctxgroup",
"//pkg/util/duration",
"//pkg/util/encoding",
Expand Down
27 changes: 18 additions & 9 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
Expand All @@ -35,14 +36,22 @@ import (
type rowFetcherCache struct {
codec keys.SQLCodec
leaseMgr *lease.Manager
fetchers map[idVersion]*row.Fetcher
fetchers *cache.UnorderedCache

collection *descs.Collection
db *kv.DB

a rowenc.DatumAlloc
}

var rfCacheConfig = cache.Config{
Policy: cache.CacheFIFO,
// TODO: If we find ourselves thrashing here in changefeeds on many tables,
// we can improve performance by eagerly evicting versions using Resolved notifications.
// A old version with a timestamp entirely before a notification can be safely evicted.
ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 },
}

type idVersion struct {
id descpb.ID
version descpb.DescriptorVersion
Expand All @@ -60,7 +69,7 @@ func newRowFetcherCache(
leaseMgr: leaseMgr,
collection: cf.NewCollection(nil /* TemporarySchemaProvider */),
db: db,
fetchers: make(map[idVersion]*row.Fetcher),
fetchers: cache.NewUnorderedCache(rfCacheConfig),
}
}

Expand Down Expand Up @@ -138,10 +147,13 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
// UserDefinedTypeColsHaveSameVersion if we have a hit because we are
// guaranteed that the tables have the same version. Additionally, these
// fetchers are always initialized with a single tabledesc.Immutable.
if rf, ok := c.fetchers[idVer]; ok &&
catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) {
return rf, nil
if v, ok := c.fetchers.Get(idVer); ok {
rf := v.(*row.Fetcher)
if catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) {
return rf, nil
}
}

// TODO(dan): Allow for decoding a subset of the columns.
var colIdxMap catalog.TableColMap
var valNeededForCol util.FastIntSet
Expand Down Expand Up @@ -177,9 +189,6 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(
// Necessary because virtual columns are not populated.
rf.IgnoreUnexpectedNulls = true

// TODO(dan): Bound the size of the cache. Resolved notifications will let
// us evict anything for timestamps entirely before the notification. Then
// probably an LRU just in case?
c.fetchers[idVer] = &rf
c.fetchers.Add(idVer, &rf)
return &rf, nil
}

0 comments on commit 08542b2

Please sign in to comment.