diff --git a/pkg/server/config.go b/pkg/server/config.go index ffa0324cc379..1a1e9e33b896 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -171,7 +171,7 @@ type Config struct { SQLTableStatCacheSize int // SQLQueryCacheSize is the memory size (in bytes) of the query plan cache. - SQLQueryCacheSize int + SQLQueryCacheSize int64 // HeapProfileDirName is the directory name for heap profiles using // heapprofiler. diff --git a/pkg/sql/querycache/query_cache.go b/pkg/sql/querycache/query_cache.go index 8bcdb42e9cb2..b3b3fa5d5f31 100644 --- a/pkg/sql/querycache/query_cache.go +++ b/pkg/sql/querycache/query_cache.go @@ -15,6 +15,8 @@ package querycache import ( + "fmt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -23,24 +25,30 @@ import ( // C is a query cache, keyed on SQL statement strings (which can contain // placeholders). type C struct { + totalMem int64 + mu struct { syncutil.Mutex - // Sentinel list entry. Cache entries form a circular linked list, with the - // most recently used entry at the front (i.e. sentinel.next). All available - // entries are always part of the list; unused entries have SQL="". - sentinel entry + availableMem int64 + + // Sentinel list entries. All entries are part of either the used or the + // free circular list. Any entry in the used list has a corresponding entry + // in the map. The used list is in MRU order. + used, free entry // Map with an entry for each used entry. m map[string]*entry } } -// Currently we use a very crude memory management scheme: we never put in the -// cache plans that use more memory than maxCachedSize and we only allocate -// (totalCacheSize / maxCachedSize) cache entries. -// TODO(radu): improve this. -const maxCachedSize = 8192 +// avgCachedSize is used to preallocate the number of "slots" in the cache. +// Specifically, the cache will be able to store at most +// ( / avgCachedSize) queries, even if their memory usage is small. +const avgCachedSize = 1024 + +// We disallow very large queries from being added to the cache. +const maxCachedSize = 128 * 1024 // CachedData is the data associated with a cache entry. type CachedData struct { @@ -72,7 +80,7 @@ func (e *entry) clear() { e.CachedData = CachedData{} } -// remove removes the entry from the linked list. +// remove removes the entry from the linked list it is part of. func (e *entry) remove() { e.prev.next = e.next e.next.prev = e.prev @@ -91,27 +99,31 @@ func (e *entry) insertAfter(a *entry) { } // New creates a query cache of the given size. -func New(memorySize int) *C { - numEntries := memorySize / maxCachedSize - if numEntries == 0 { - numEntries = 1 +func New(memorySize int64) *C { + if memorySize < avgCachedSize { + memorySize = avgCachedSize } - c := &C{} + numEntries := memorySize / avgCachedSize + c := &C{totalMem: memorySize} + c.mu.availableMem = memorySize c.mu.m = make(map[string]*entry, numEntries) entries := make([]entry, numEntries) + // The used list is empty. + c.mu.used.next = &c.mu.used + c.mu.used.prev = &c.mu.used // Make a linked list of entries, starting with the sentinel. - c.mu.sentinel.next = &entries[0] - c.mu.sentinel.prev = &entries[numEntries-1] + c.mu.free.next = &entries[0] + c.mu.free.prev = &entries[numEntries-1] for i := range entries { if i > 0 { entries[i].prev = &entries[i-1] } else { - entries[i].prev = &c.mu.sentinel + entries[i].prev = &c.mu.free } if i+1 < len(entries) { entries[i].next = &entries[i+1] } else { - entries[i].next = &c.mu.sentinel + entries[i].next = &c.mu.free } } return c @@ -125,46 +137,81 @@ func (c *C) Find(sql string) (_ CachedData, ok bool) { c.mu.Lock() defer c.mu.Unlock() - res := c.mu.m[sql] - if res == nil { + e := c.mu.m[sql] + if e == nil { return CachedData{}, false } - c.moveToFrontLocked(res) - return res.CachedData, true + // Move the entry to the front of the used list. + e.remove() + e.insertAfter(&c.mu.used) + return e.CachedData, true } // Add adds an entry to the cache (possibly evicting some other entry). If the // cache already has a corresponding entry for d.SQL, it is updated. // Note: d.PrepareMetadata cannot be modified once this method is called. func (c *C) Add(d *CachedData) { - if d.memoryEstimate() > maxCachedSize { + mem := d.memoryEstimate() + if d.SQL == "" || mem > maxCachedSize || mem > c.totalMem { return } c.mu.Lock() defer c.mu.Unlock() - if e, ok := c.mu.m[d.SQL]; ok { - c.moveToFrontLocked(e) - e.CachedData = *d - return + e, ok := c.mu.m[d.SQL] + if ok { + // The query already exists in the cache. + e.remove() + c.mu.availableMem += e.memoryEstimate() + } else { + // Get an entry to use for this query. + e = c.getEntry() + c.mu.m[d.SQL] = e } - // Evict the least recently used entry. - e := c.mu.sentinel.prev - if e.SQL != "" { - delete(c.mu.m, e.SQL) - } - c.moveToFrontLocked(e) e.CachedData = *d - c.mu.m[d.SQL] = e + + // Evict more entries if necessary. + c.makeSpace(mem) + c.mu.availableMem -= mem + + // Insert the entry at the front of the used list. + e.insertAfter(&c.mu.used) } -// moveToFrontLocked moves the given entry to the front of the list (right after -// the sentinel). Assumes c.mu is locked. -func (c *C) moveToFrontLocked(e *entry) { +// makeSpace evicts entries from the used list until we have enough free space. +func (c *C) makeSpace(needed int64) { + for c.mu.availableMem < needed { + // Evict entries as necessary, putting them in the free list. + c.evict().insertAfter(&c.mu.free) + } +} + +// Evicts the last item in the used list. +func (c *C) evict() *entry { + e := c.mu.used.prev + if e == &c.mu.used { + panic("no more used entries") + } e.remove() - e.insertAfter(&c.mu.sentinel) + c.mu.availableMem += e.memoryEstimate() + delete(c.mu.m, e.SQL) + e.clear() + + return e +} + +// getEntry returns an entry that can be used for adding a new query to the +// cache. If there are free entries, one is returned; otherwise, a used entry is +// evicted. +func (c *C) getEntry() *entry { + if e := c.mu.free.next; e != &c.mu.free { + e.remove() + return e + } + // No free entries, we must evict an entry. + return c.evict() } // Clear removes all the entries from the cache. @@ -174,8 +221,12 @@ func (c *C) Clear() { // Clear the map. for sql, e := range c.mu.m { - e.clear() + + c.mu.availableMem += e.memoryEstimate() delete(c.mu.m, sql) + e.remove() + e.clear() + e.insertAfter(&c.mu.free) } } @@ -184,13 +235,46 @@ func (c *C) Purge(sql string) { c.mu.Lock() defer c.mu.Unlock() - e := c.mu.m[sql] - if e == nil { - return + if e := c.mu.m[sql]; e != nil { + c.mu.availableMem += e.memoryEstimate() + delete(c.mu.m, sql) + e.clear() + e.remove() + e.insertAfter(&c.mu.free) + } +} + +// check performs various assertions on the internal consistency of the cache +// structures. Used by testing code. +func (c *C) check() { + c.mu.Lock() + defer c.mu.Unlock() + + // Verify that all entries in the used list have a corresponding entry in the + // map, and that the memory accounting adds up. + numUsed := 0 + memUsed := int64(0) + for e := c.mu.used.next; e != &c.mu.used; e = e.next { + numUsed++ + memUsed += e.memoryEstimate() + if e.SQL == "" { + panic(fmt.Sprintf("used entry with empty SQL")) + } + if me, ok := c.mu.m[e.SQL]; !ok { + panic(fmt.Sprintf("used entry %s not in map", e.SQL)) + } else if e != me { + panic(fmt.Sprintf("map entry for %s doesn't match used entry", e.SQL)) + } + } + + if numUsed != len(c.mu.m) { + panic(fmt.Sprintf("map length %d doesn't match used list size %d", len(c.mu.m), numUsed)) + } + + if memUsed+c.mu.availableMem != c.totalMem { + panic(fmt.Sprintf( + "memory usage doesn't add up: used=%d available=%d total=%d", + memUsed, c.mu.availableMem, c.totalMem, + )) } - delete(c.mu.m, sql) - e.clear() - e.remove() - // Insert at the back of the list. - e.insertAfter(c.mu.sentinel.prev) } diff --git a/pkg/sql/querycache/query_cache_test.go b/pkg/sql/querycache/query_cache_test.go index 60172955316d..2f4a5e2b4d22 100644 --- a/pkg/sql/querycache/query_cache_test.go +++ b/pkg/sql/querycache/query_cache_test.go @@ -21,18 +21,18 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/randutil" ) func toStr(c *C) string { + c.check() + c.mu.Lock() defer c.mu.Unlock() var b strings.Builder - for e := c.mu.sentinel.next; e != &c.mu.sentinel; e = e.next { - if e.SQL == "" { - continue - } + for e := c.mu.used.next; e != &c.mu.used; e = e.next { if b.Len() != 0 { b.WriteString(",") } @@ -48,6 +48,24 @@ func expect(t *testing.T, c *C, exp string) { } } +func data(sql string, mem *memo.Memo, memEstimate int64) *CachedData { + cd := &CachedData{SQL: sql, Memo: mem, PrepareMetadata: &sqlbase.PrepareMetadata{}} + n := memEstimate - cd.memoryEstimate() + if n < 0 { + panic(fmt.Sprintf("size %d too small", memEstimate)) + } + // Add characters to AnonymizedStr which should increase the estimate. + s := make([]byte, n) + for i := range s { + s[i] = 'x' + } + cd.PrepareMetadata.AnonymizedStr = string(s) + if cd.memoryEstimate() != memEstimate { + panic(fmt.Sprintf("failed to create CachedData of size %d", memEstimate)) + } + return cd +} + // TestCache tests the main operations of the cache. func TestCache(t *testing.T) { sa := &memo.Memo{} @@ -55,16 +73,17 @@ func TestCache(t *testing.T) { sc := &memo.Memo{} sd := &memo.Memo{} - c := New(3 * maxCachedSize) + // In this test, all entries have the same size: avgCachedSize. + c := New(3 * avgCachedSize) expect(t, c, "") - c.Add(&CachedData{SQL: "a", Memo: sa}) + c.Add(data("a", sa, avgCachedSize)) expect(t, c, "a") - c.Add(&CachedData{SQL: "b", Memo: sb}) + c.Add(data("b", sb, avgCachedSize)) expect(t, c, "b,a") - c.Add(&CachedData{SQL: "c", Memo: sc}) + c.Add(data("c", sc, avgCachedSize)) expect(t, c, "c,b,a") - c.Add(&CachedData{SQL: "d", Memo: sd}) + c.Add(data("d", sd, avgCachedSize)) expect(t, c, "d,c,b") if _, ok := c.Find("a"); ok { t.Errorf("a shouldn't be in the cache") @@ -83,7 +102,7 @@ func TestCache(t *testing.T) { } expect(t, c, "b,c,d") - c.Add(&CachedData{SQL: "a", Memo: sa}) + c.Add(data("a", sa, avgCachedSize)) expect(t, c, "a,b,c") c.Purge("b") @@ -95,7 +114,7 @@ func TestCache(t *testing.T) { c.Purge("c") expect(t, c, "a") - c.Add(&CachedData{SQL: "b", Memo: sb}) + c.Add(data("b", sb, avgCachedSize)) expect(t, c, "b,a") c.Clear() @@ -105,11 +124,67 @@ func TestCache(t *testing.T) { } } +func TestCacheMemory(t *testing.T) { + m := &memo.Memo{} + + c := New(10 * avgCachedSize) + expect(t, c, "") + for i := 0; i < 10; i++ { + c.Add(data(fmt.Sprintf("%d", i), m, avgCachedSize/2)) + } + expect(t, c, "9,8,7,6,5,4,3,2,1,0") + + // Verify handling when we have no more entries. + c.Add(data("10", m, avgCachedSize/2)) + expect(t, c, "10,9,8,7,6,5,4,3,2,1") + + // Verify handling when we have larger entries. + c.Add(data("large", m, avgCachedSize*8)) + expect(t, c, "large,10,9,8,7") + c.Add(data("verylarge", m, avgCachedSize*10)) + expect(t, c, "verylarge") + + for i := 0; i < 10; i++ { + c.Add(data(fmt.Sprintf("%d", i), m, avgCachedSize)) + } + expect(t, c, "9,8,7,6,5,4,3,2,1,0") + + // Verify that we don't try to add an entry that's larger than the cache size. + c.Add(data("large", m, avgCachedSize*11)) + expect(t, c, "9,8,7,6,5,4,3,2,1,0") + + // Verify handling when we update an existing entry with one that uses more + // memory. + c.Add(data("5", m, avgCachedSize*5)) + expect(t, c, "5,9,8,7,6,4") + + c.Add(data("0", m, avgCachedSize)) + expect(t, c, "0,5,9,8,7,6") + + // Verify handling when we update an existing entry with one that uses less + // memory. + c.Add(data("5", m, avgCachedSize)) + expect(t, c, "5,0,9,8,7,6") + c.Add(data("1", m, avgCachedSize)) + c.Add(data("2", m, avgCachedSize)) + c.Add(data("3", m, avgCachedSize)) + c.Add(data("4", m, avgCachedSize)) + expect(t, c, "4,3,2,1,5,0,9,8,7,6") + + // Verify Purge updates the available memory. + c.Purge("3") + expect(t, c, "4,2,1,5,0,9,8,7,6") + c.Add(data("x", m, avgCachedSize)) + expect(t, c, "x,4,2,1,5,0,9,8,7,6") + c.Add(data("y", m, avgCachedSize)) + expect(t, c, "y,x,4,2,1,5,0,9,8,7") +} + // TestSynchronization verifies that the cache doesn't crash (or cause a race // detector error) when multiple goroutines are using it in parallel. func TestSynchronization(t *testing.T) { const size = 100 - c := New(size * maxCachedSize) + c := New(size * avgCachedSize) var wg sync.WaitGroup const goroutines = 20 @@ -128,11 +203,12 @@ func TestSynchronization(t *testing.T) { c.Purge(sql) case r <= 35: // 25% of the time, add an entry. - c.Add(&CachedData{SQL: sql, Memo: &memo.Memo{}}) + c.Add(data(sql, &memo.Memo{}, int64(256+rng.Intn(10*avgCachedSize)))) default: // The rest of the time, find an entry. _, _ = c.Find(sql) } + c.check() } wg.Done() }()