diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 126c615ad653..f745184a7e72 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -80,6 +80,7 @@ go_library( "//pkg/util", "//pkg/util/bitarray", "//pkg/util/bufalloc", + "//pkg/util/cache", "//pkg/util/ctxgroup", "//pkg/util/duration", "//pkg/util/encoding", diff --git a/pkg/ccl/changefeedccl/rowfetcher_cache.go b/pkg/ccl/changefeedccl/rowfetcher_cache.go index e0407c683212..57a3cde2f889 100644 --- a/pkg/ccl/changefeedccl/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/rowfetcher_cache.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -35,7 +36,7 @@ import ( type rowFetcherCache struct { codec keys.SQLCodec leaseMgr *lease.Manager - fetchers map[idVersion]*row.Fetcher + fetchers *cache.UnorderedCache collection *descs.Collection db *kv.DB @@ -43,6 +44,14 @@ type rowFetcherCache struct { a rowenc.DatumAlloc } +var rfCacheConfig = cache.Config{ + Policy: cache.CacheFIFO, + // TODO: If we find ourselves thrashing here in changefeeds on many tables, + // we can improve performance by eagerly evicting versions using Resolved notifications. + // A old version with a timestamp entirely before a notification can be safely evicted. + ShouldEvict: func(size int, _ interface{}, _ interface{}) bool { return size > 1024 }, +} + type idVersion struct { id descpb.ID version descpb.DescriptorVersion @@ -60,7 +69,7 @@ func newRowFetcherCache( leaseMgr: leaseMgr, collection: cf.NewCollection(nil /* TemporarySchemaProvider */), db: db, - fetchers: make(map[idVersion]*row.Fetcher), + fetchers: cache.NewUnorderedCache(rfCacheConfig), } } @@ -138,10 +147,13 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( // UserDefinedTypeColsHaveSameVersion if we have a hit because we are // guaranteed that the tables have the same version. Additionally, these // fetchers are always initialized with a single tabledesc.Immutable. - if rf, ok := c.fetchers[idVer]; ok && - catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) { - return rf, nil + if v, ok := c.fetchers.Get(idVer); ok { + rf := v.(*row.Fetcher) + if catalog.UserDefinedTypeColsHaveSameVersion(tableDesc, rf.GetTable().(catalog.TableDescriptor)) { + return rf, nil + } } + // TODO(dan): Allow for decoding a subset of the columns. var colIdxMap catalog.TableColMap var valNeededForCol util.FastIntSet @@ -177,9 +189,6 @@ func (c *rowFetcherCache) RowFetcherForTableDesc( // Necessary because virtual columns are not populated. rf.IgnoreUnexpectedNulls = true - // TODO(dan): Bound the size of the cache. Resolved notifications will let - // us evict anything for timestamps entirely before the notification. Then - // probably an LRU just in case? - c.fetchers[idVer] = &rf + c.fetchers.Add(idVer, &rf) return &rf, nil }