From 9ec776018b007840a139f6bceb56250fc6a3f406 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 13 Mar 2023 15:27:26 -0400 Subject: [PATCH 1/6] storage: More CheckSSTConflicts fixes A few additional fixes around CheckSSTConflicts, stats calculations, and Next()ing logic, caught by kvnemesis. Hopefully the last of its kind. Also re-enable kvnemesis testing for range keys in AddSSTable, reverting #98475. Fixes #94141. Fixes #98473. Informs #94876. Epic: none Release note: None --- pkg/kv/kvnemesis/generator.go | 7 +----- .../batcheval/cmd_add_sstable_test.go | 16 ++++++++++++- pkg/storage/sst.go | 23 +++++++++++++++---- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 29f373ca103f..691cca96bcc0 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -525,12 +525,7 @@ func randAddSSTable(g *generator, rng *rand.Rand) Operation { probTombstone := 0.2 // probability to write a tombstone asWrites := rng.Float64() < 0.2 // IngestAsWrites - if true { - // TODO(erikgrinaker): Disable range keys for now since CheckSSTConflicts - // computes incorrect MVCC stats. See: - // https://github.com/cockroachdb/cockroach/issues/98473 - numRangeKeys = 0 - } else if r := rng.Float64(); r < 0.8 { + if r := rng.Float64(); r < 0.8 { // 80% probability of only point keys. numRangeKeys = 0 } else if r < 0.9 { diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 65b687ca19e6..00fc114701d5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -851,6 +851,20 @@ func TestEvalAddSSTable(t *testing.T) { sst: kvs{rangeKV("a", "l", 8, "")}, expect: kvs{rangeKV("a", "c", 8, ""), rangeKV("c", "d", 8, ""), rangeKV("c", "d", 6, ""), rangeKV("d", "j", 8, ""), rangeKV("j", "k", 8, ""), rangeKV("j", "k", 5, ""), rangeKV("k", "l", 8, "")}, }, + "DisallowConflicts correctly accounts for complex fragment cases 5": { + noConflict: true, + reqTS: 10, + data: kvs{pointKV("cc", 7, ""), pointKV("cc", 6, ""), pointKV("cc", 5, "foo"), pointKV("cc", 4, ""), pointKV("cc", 3, "bar"), pointKV("cc", 2, "barfoo"), rangeKV("ab", "g", 1, "")}, + sst: kvs{pointKV("aa", 8, "foo"), pointKV("aaa", 8, ""), pointKV("ac", 8, "foo"), rangeKV("b", "c", 8, ""), pointKV("ca", 8, "foo"), pointKV("cb", 8, "foo"), pointKV("cc", 8, "foo"), rangeKV("d", "e", 8, ""), pointKV("e", 8, "foobar")}, + expect: kvs{pointKV("aa", 8, "foo"), pointKV("aaa", 8, ""), rangeKV("ab", "b", 1, ""), pointKV("ac", 8, "foo"), rangeKV("b", "c", 8, ""), rangeKV("b", "c", 1, ""), rangeKV("c", "d", 1, ""), pointKV("ca", 8, "foo"), pointKV("cb", 8, "foo"), pointKV("cc", 8, "foo"), pointKV("cc", 7, ""), pointKV("cc", 6, ""), pointKV("cc", 5, "foo"), pointKV("cc", 4, ""), pointKV("cc", 3, "bar"), pointKV("cc", 2, "barfoo"), rangeKV("d", "e", 8, ""), rangeKV("d", "e", 1, ""), rangeKV("e", "g", 1, ""), pointKV("e", 8, "foobar")}, + }, + "DisallowConflicts handles existing point key above existing range tombstone": { + noConflict: true, + reqTS: 10, + data: kvs{pointKV("c", 7, ""), rangeKV("a", "g", 6, ""), pointKV("h", 7, "")}, + sst: kvs{rangeKV("b", "d", 8, ""), rangeKV("f", "j", 8, "")}, + expect: kvs{rangeKV("a", "b", 6, ""), rangeKV("b", "d", 8, ""), rangeKV("b", "d", 6, ""), pointKV("c", 7, ""), rangeKV("d", "f", 6, ""), rangeKV("f", "g", 8, ""), rangeKV("f", "g", 6, ""), rangeKV("g", "j", 8, ""), pointKV("h", 7, "")}, + }, "DisallowConflicts accounts for point key already deleted in engine": { noConflict: true, reqTS: 10, @@ -1024,7 +1038,7 @@ func TestEvalAddSSTable(t *testing.T) { noConflict: true, data: kvs{rangeKV("a", "b", 7, "")}, sst: kvs{rangeKV("a", "b", 7, "")}, - expectErr: "ingested range key collides with an existing one", + expectErr: &kvpb.WriteTooOldError{}, }, } testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) { diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index e09f87939e98..f9d106063035 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -473,8 +473,8 @@ func CheckSSTConflicts( isIdempotent := extTombstones.Equal(sstRangeKeys.Versions) if ok := allowIdempotentHelper(extRangeKeys.Versions[0].Timestamp); !ok || !isIdempotent { // Idempotence is either not allowed or there's a conflict. - return enginepb.MVCCStats{}, errors.Errorf( - "ingested range key collides with an existing one: %s", sstTombstone) + return enginepb.MVCCStats{}, kvpb.NewWriteTooOldError( + sstTombstone.Timestamp, extRangeKeys.Versions[0].Timestamp.Next(), sstRangeKeys.Bounds.Key) } } } @@ -813,6 +813,21 @@ func CheckSSTConflicts( // We exclude !sstHasPoint above in case we were at a range key pause // point that matches extKey. In that case, the below SeekGE would make // no forward progress. + // + // However, seeking is not safe if we're at an ext range key; we could + // miss conflicts and overlaps with sst range keys in between + // [current SST Key, extKey.Key). Do a next and go back to the start of + // the loop. If we had a dedicated sst range key iterator, we could have + // optimized away this unconditional-next. + if extHasRange { + sstIter.NextKey() + sstOK, sstErr = sstIter.Valid() + if sstOK { + extIter.SeekGE(MVCCKey{Key: sstIter.UnsafeKey().Key}) + extOK, extErr = extIter.Valid() + } + continue + } sstIter.SeekGE(MVCCKey{Key: extKey.Key}) sstOK, sstErr = sstIter.Valid() if sstOK { @@ -1040,10 +1055,10 @@ func CheckSSTConflicts( // ext key. extIter.NextKey() extOK, extErr = extIter.Valid() - if extOK { + if extOK && extIter.UnsafeKey().Key.Compare(sstIter.UnsafeKey().Key) < 0 { sstIter.SeekGE(MVCCKey{Key: extIter.UnsafeKey().Key}) + sstOK, sstErr = sstIter.Valid() } - sstOK, sstErr = sstIter.Valid() if sstOK { extIter.SeekGE(MVCCKey{Key: sstIter.UnsafeKey().Key}) extOK, extErr = extIter.Valid() From 4f8a911363b57468ca43e77ce0a7590d71bec0d1 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Thu, 9 Mar 2023 19:24:17 -0500 Subject: [PATCH 2/6] sqlliveness: re-arrange APIs to clarify when the API was blocking By "default" the implementation of Reader was blocking and there was a method to get a handle to a non-blocking CachedReader(). This asymmetry does not aid understandability. The non-blocking reader came later, but it is generally the more desirable interface. Release note: None --- pkg/server/server_sql.go | 12 ++++--- pkg/sql/exec_util.go | 2 +- pkg/sql/planner.go | 2 +- pkg/sql/sqlliveness/slstorage/slstorage.go | 35 ++++++++++++------- .../sqlliveness/slstorage/slstorage_test.go | 34 ++++++++++-------- pkg/sql/sqlliveness/sqlliveness.go | 18 ++++++---- 6 files changed, 61 insertions(+), 42 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 09afa6ed638b..652da7e7ab56 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -558,11 +558,13 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ) cfg.sqlInstanceStorage = instancestorage.NewStorage( - cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, cfg.clock, cfg.rangeFeedFactory) + cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings, + cfg.clock, cfg.rangeFeedFactory, + ) cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider, - cfg.stopper) + cfg.sqlInstanceStorage, cfg.sqlLivenessProvider.BlockingReader(), + cfg.stopper, + ) // We can't use the nodeDailer as the podNodeDailer unless we // are serving the system tenant despite the fact that we've @@ -787,7 +789,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { RowMetrics: &rowMetrics, InternalRowMetrics: &internalRowMetrics, - SQLLivenessReader: cfg.sqlLivenessProvider, + SQLLivenessReader: cfg.sqlLivenessProvider.BlockingReader(), JobRegistry: jobRegistry, Gossip: cfg.gossip, PodNodeDialer: cfg.podNodeDialer, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 6336c770fa66..27f01d43f9ff 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1244,7 +1244,7 @@ type ExecutorConfig struct { MetricsRecorder nodeStatusGenerator SessionRegistry *SessionRegistry ClosedSessionCache *ClosedSessionCache - SQLLiveness sqlliveness.Liveness + SQLLiveness sqlliveness.Provider JobRegistry *jobs.Registry VirtualSchemas *VirtualSchemaHolder DistSQLPlanner *DistSQLPlanner diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index cb6bd9544b3a..f061ce630dc2 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -114,7 +114,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) { evalCtx.Settings = execCfg.Settings evalCtx.Codec = execCfg.Codec evalCtx.Tracer = execCfg.AmbientCtx.Tracer - evalCtx.SQLLivenessReader = execCfg.SQLLiveness + evalCtx.SQLLivenessReader = execCfg.SQLLiveness.BlockingReader() evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs diff --git a/pkg/sql/sqlliveness/slstorage/slstorage.go b/pkg/sql/sqlliveness/slstorage/slstorage.go index 2d148c7cbfa1..a3fe04b38a6b 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage.go @@ -79,7 +79,7 @@ var CacheSize = settings.RegisterIntSetting( 1024) // Storage deals with reading and writing session records. It implements the -// sqlliveness.Reader interface, and the slinstace.Writer interface. +// sqlliveness.Storage interface, and the slinstace.Writer interface. type Storage struct { settings *cluster.Settings settingsWatcher *settingswatcher.SettingsWatcher @@ -110,7 +110,7 @@ type Storage struct { } } -var _ sqlliveness.Reader = &Storage{} +var _ sqlliveness.StorageReader = &Storage{} // NewTestingStorage constructs a new storage with control for the database // in which the `sqlliveness` table should exist. @@ -186,13 +186,6 @@ func (s *Storage) Start(ctx context.Context) { s.mu.started = true } -// IsAlive determines whether a given session is alive. If this method returns -// true, the session may no longer be alive, but if it returns false, the -// session definitely is not alive. -func (s *Storage) IsAlive(ctx context.Context, sid sqlliveness.SessionID) (alive bool, err error) { - return s.isAlive(ctx, sid, sync) -} - type readType byte const ( @@ -580,15 +573,31 @@ func (s *Storage) Update( // currently known state of the session, but will trigger an asynchronous // refresh of the state of the session if it is not known. func (s *Storage) CachedReader() sqlliveness.Reader { - return (*cachedStorage)(s) + return (*cachedReader)(s) +} + +// BlockingReader reader returns an implementation of sqlliveness.Reader which +// will cache results of previous reads but will synchronously block to +// determine the status of a session which it does not know about or thinks +// might be expired. +func (s *Storage) BlockingReader() sqlliveness.Reader { + return (*blockingReader)(s) +} + +type blockingReader Storage + +func (s *blockingReader) IsAlive( + ctx context.Context, sid sqlliveness.SessionID, +) (alive bool, err error) { + return (*Storage)(s).isAlive(ctx, sid, sync) } -// cachedStorage implements the sqlliveness.Reader interface, and the +// cachedReader implements the sqlliveness.Reader interface, and the // slinstace.Writer interface, but does not read from the underlying store // synchronously during IsAlive. -type cachedStorage Storage +type cachedReader Storage -func (s *cachedStorage) IsAlive( +func (s *cachedReader) IsAlive( ctx context.Context, sid sqlliveness.SessionID, ) (alive bool, err error) { return (*Storage)(s).isAlive(ctx, sid, async) diff --git a/pkg/sql/sqlliveness/slstorage/slstorage_test.go b/pkg/sql/sqlliveness/slstorage/slstorage_test.go index 2404316ce7f0..2c2b1b4a899e 100644 --- a/pkg/sql/sqlliveness/slstorage/slstorage_test.go +++ b/pkg/sql/sqlliveness/slstorage/slstorage_test.go @@ -89,6 +89,7 @@ func testStorage(t *testing.T) { clock, _, _, stopper, storage := setup(t) storage.Start(ctx) defer stopper.Stop(ctx) + reader := storage.BlockingReader() exp := clock.Now().Add(time.Second.Nanoseconds(), 0) id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) @@ -100,14 +101,14 @@ func testStorage(t *testing.T) { require.Equal(t, int64(1), metrics.WriteSuccesses.Count()) } { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count()) require.Equal(t, int64(0), metrics.IsAliveCacheHits.Count()) } { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count()) @@ -119,6 +120,7 @@ func testStorage(t *testing.T) { defer stopper.Stop(ctx) slstorage.GCJitter.Override(ctx, &settings.SV, 0) storage.Start(ctx) + reader := storage.BlockingReader() metrics := storage.Metrics() // GC will run some time after startup. @@ -148,10 +150,10 @@ func testStorage(t *testing.T) { // Verify they are alive. { - isAlive1, err := storage.IsAlive(ctx, id1) + isAlive1, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.True(t, isAlive1) - isAlive2, err := storage.IsAlive(ctx, id2) + isAlive2, err := reader.IsAlive(ctx, id2) require.NoError(t, err) require.True(t, isAlive2) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -168,7 +170,7 @@ func testStorage(t *testing.T) { // Ensure that the cached value is still in use for id2. { - isAlive, err := storage.IsAlive(ctx, id2) + isAlive, err := reader.IsAlive(ctx, id2) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -194,7 +196,7 @@ func testStorage(t *testing.T) { // Ensure that we now see the id1 as dead. That fact will be cached. { - isAlive, err := storage.IsAlive(ctx, id1) + isAlive, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -202,7 +204,7 @@ func testStorage(t *testing.T) { } // Ensure that the fact that it's dead is cached. { - isAlive, err := storage.IsAlive(ctx, id1) + isAlive, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -218,7 +220,7 @@ func testStorage(t *testing.T) { // Ensure that we now see the id2 as alive. { - isAlive, err := storage.IsAlive(ctx, id2) + isAlive, err := reader.IsAlive(ctx, id2) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count()) @@ -226,7 +228,7 @@ func testStorage(t *testing.T) { } // Ensure that the fact that it's still alive is cached. { - isAlive, err := storage.IsAlive(ctx, id1) + isAlive, err := reader.IsAlive(ctx, id1) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(3), metrics.IsAliveCacheMisses.Count()) @@ -237,6 +239,7 @@ func testStorage(t *testing.T) { clock, timeSource, _, stopper, storage := setup(t) defer stopper.Stop(ctx) storage.Start(ctx) + reader := storage.BlockingReader() exp := clock.Now().Add(time.Second.Nanoseconds(), 0) id, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) @@ -248,7 +251,7 @@ func testStorage(t *testing.T) { require.Equal(t, int64(1), metrics.WriteSuccesses.Count()) } { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.True(t, isAlive) require.Equal(t, int64(1), metrics.IsAliveCacheMisses.Count()) @@ -259,7 +262,7 @@ func testStorage(t *testing.T) { timeSource.Advance(time.Second + time.Nanosecond) // Ensure that we discover it is no longer alive. { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -268,7 +271,7 @@ func testStorage(t *testing.T) { } // Ensure that the fact that it is no longer alive is cached. { - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) require.NoError(t, err) require.False(t, isAlive) require.Equal(t, int64(2), metrics.IsAliveCacheMisses.Count()) @@ -348,6 +351,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) { storage := slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, s.SettingsWatcher().(*settingswatcher.SettingsWatcher), table, timeSource.NewTimer) storage.Start(ctx) + reader := storage.BlockingReader() const ( runsPerWorker = 100 @@ -446,7 +450,7 @@ func testConcurrentAccessesAndEvictions(t *testing.T) { for i := 0; i < runsPerWorker; i++ { time.Sleep(time.Microsecond) i, id := pickSession() - isAlive, err := storage.IsAlive(ctx, id) + isAlive, err := reader.IsAlive(ctx, id) assert.NoError(t, err) checkIsAlive(t, i, isAlive) } @@ -603,7 +607,7 @@ func testConcurrentAccessSynchronization(t *testing.T) { // Now launch another, synchronous reader, which will join // the single-flight. g.Go(func() (err error) { - alive, err = storage.IsAlive(ctx, sid) + alive, err = storage.BlockingReader().IsAlive(ctx, sid) return err }) // Sleep some tiny amount of time to hopefully allow the other @@ -648,7 +652,7 @@ func testConcurrentAccessSynchronization(t *testing.T) { // Now launch another, synchronous reader, which will join // the single-flight. g.Go(func() (err error) { - alive, err = storage.IsAlive(toCancel, sid) + alive, err = storage.BlockingReader().IsAlive(toCancel, sid) return err }) diff --git a/pkg/sql/sqlliveness/sqlliveness.go b/pkg/sql/sqlliveness/sqlliveness.go index 6510277f5dc1..6c70e02aebf4 100644 --- a/pkg/sql/sqlliveness/sqlliveness.go +++ b/pkg/sql/sqlliveness/sqlliveness.go @@ -34,7 +34,8 @@ type SessionID string // Provider is a wrapper around the sqlliveness subsystem for external // consumption. type Provider interface { - Liveness + Instance + StorageReader // Start starts the sqlliveness subsystem. regionPhysicalRep should // represent the physical representation of the current process region @@ -43,18 +44,21 @@ type Provider interface { // Metrics returns a metric.Struct which holds metrics for the provider. Metrics() metric.Struct +} + +// StorageReader provides access to Readers which either block or do not +// block. +type StorageReader interface { + // BlockingReader returns a Reader which only synchronously + // checks whether a session is alive if it does not have any + // cached information which implies that it currently is. + BlockingReader() Reader // CachedReader returns a reader which only consults its local cache and // does not perform any RPCs in the IsAlive call. CachedReader() Reader } -// Liveness exposes Reader and Instance interfaces. -type Liveness interface { - Reader - Instance -} - // String returns a hex-encoded version of the SessionID. func (s SessionID) String() string { return hex.EncodeToString(encoding.UnsafeConvertStringToBytes(string(s))) From f4c6eba68754df8a47cad5378b6a208bab6625a2 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Thu, 9 Mar 2023 19:26:45 -0500 Subject: [PATCH 3/6] sql: use CachedReader for uses with sqlinstance and the sql builtin The CachedReader won't block, which in multi-region clusters is good. It will mean that in some cases, it'll state that a sessions is alive when it most certainly is not. Currently, nobody needs synchronous semantics. This is a major part of fixing the TestColdStartLatency as sometimes distsql planning would block. That's not acceptable -- the idea that query physical planning can need to wait for a cross-region RPC is unacceptable. Release note: None --- pkg/server/server_sql.go | 4 ++-- pkg/sql/planner.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 652da7e7ab56..1f853486baf7 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -562,7 +562,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.clock, cfg.rangeFeedFactory, ) cfg.sqlInstanceReader = instancestorage.NewReader( - cfg.sqlInstanceStorage, cfg.sqlLivenessProvider.BlockingReader(), + cfg.sqlInstanceStorage, cfg.sqlLivenessProvider.CachedReader(), cfg.stopper, ) @@ -789,7 +789,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { RowMetrics: &rowMetrics, InternalRowMetrics: &internalRowMetrics, - SQLLivenessReader: cfg.sqlLivenessProvider.BlockingReader(), + SQLLivenessReader: cfg.sqlLivenessProvider.CachedReader(), JobRegistry: jobRegistry, Gossip: cfg.gossip, PodNodeDialer: cfg.podNodeDialer, diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index f061ce630dc2..89f5a63c1379 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -114,7 +114,7 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) { evalCtx.Settings = execCfg.Settings evalCtx.Codec = execCfg.Codec evalCtx.Tracer = execCfg.AmbientCtx.Tracer - evalCtx.SQLLivenessReader = execCfg.SQLLiveness.BlockingReader() + evalCtx.SQLLivenessReader = execCfg.SQLLiveness.CachedReader() evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs From a902d6a92295afd3a142bf15ad06da4ab076df5d Mon Sep 17 00:00:00 2001 From: ajwerner Date: Fri, 10 Mar 2023 07:07:08 -0500 Subject: [PATCH 4/6] multitenantccl: deflake ColdStartLatencyTest This test was flakey due to the closed timestamp sometimes not leading far for global tables due to overload, and due to a cached liveness reader not being used in distsql. The former was fixed in previous commits. The latter is fixed here. Fixes: #96334 Release note: None --- pkg/ccl/multiregionccl/cold_start_latency_test.go | 11 ++++++++++- pkg/sql/planner.go | 4 +++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/multiregionccl/cold_start_latency_test.go b/pkg/ccl/multiregionccl/cold_start_latency_test.go index be881a410d23..42097e5b009a 100644 --- a/pkg/ccl/multiregionccl/cold_start_latency_test.go +++ b/pkg/ccl/multiregionccl/cold_start_latency_test.go @@ -47,7 +47,6 @@ import ( func TestColdStartLatency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 96334) skip.UnderRace(t, "too slow") skip.UnderStress(t, "too slow") defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() @@ -146,7 +145,17 @@ func TestColdStartLatency(t *testing.T) { } tdb := sqlutils.MakeSQLRunner(tc.ServerConn(1)) + // Shorten the closed timestamp target duration so that span configs + // propagate more rapidly. tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`) + // Lengthen the lead time for the global tables to prevent overload from + // resulting in delays in propagating closed timestamps and, ultimately + // forcing requests from being redirected to the leaseholder. Without this + // change, the test sometimes is flakey because the latency budget allocated + // to closed timestamp propagation proves to be insufficient. This value is + // very cautious, and makes this already slow test even slower. + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'") + tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '1500ms'`) tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`) applyGlobalTables := func(t *testing.T, db *gosql.DB, isTenant bool) { diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 89f5a63c1379..e74f0e06cb63 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -114,7 +114,9 @@ func (evalCtx *extendedEvalContext) copyFromExecCfg(execCfg *ExecutorConfig) { evalCtx.Settings = execCfg.Settings evalCtx.Codec = execCfg.Codec evalCtx.Tracer = execCfg.AmbientCtx.Tracer - evalCtx.SQLLivenessReader = execCfg.SQLLiveness.CachedReader() + if execCfg.SQLLiveness != nil { // nil in some tests + evalCtx.SQLLivenessReader = execCfg.SQLLiveness.CachedReader() + } evalCtx.CompactEngineSpan = execCfg.CompactEngineSpanFunc evalCtx.SetCompactionConcurrency = execCfg.CompactionConcurrencyFunc evalCtx.TestingKnobs = execCfg.EvalContextTestingKnobs From 358f86dcd334599bebe965d5640fc3fc72dfa276 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Tue, 14 Mar 2023 00:15:04 -0400 Subject: [PATCH 5/6] instancestorage: ensure that the cache goroutine shuts down gracefully `(*stop.Stopper).AddCloser` registers a closer to be called after all async tasks have exited. The cache was running, waiting to be closed. We instead need to hook up its context to exit when the stopper is quiescing. Epic: none Release note: None --- pkg/sql/sqlinstance/instancestorage/instancereader.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader.go b/pkg/sql/sqlinstance/instancestorage/instancereader.go index 6c1cc6c6ff97..eb9ff89b93ef 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancereader.go +++ b/pkg/sql/sqlinstance/instancestorage/instancereader.go @@ -78,17 +78,19 @@ func (r *Reader) Start(ctx context.Context, self sqlinstance.InstanceInfo) { timestamp: hlc.Timestamp{}, // intentionally zero }, }) + // Make sure that the reader shuts down gracefully. + ctx, cancel := r.stopper.WithCancelOnQuiesce(ctx) err := r.stopper.RunAsyncTask(ctx, "start-instance-reader", func(ctx context.Context) { cache, err := r.storage.newInstanceCache(ctx) if err != nil { r.setInitialScanDone(err) return } - r.stopper.AddCloser(cache) r.setCache(cache) r.setInitialScanDone(nil) }) if err != nil { + cancel() r.setInitialScanDone(err) } } From 46e55ac95ec32d401278e6c8d41297a42e7a085b Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 14 Mar 2023 08:20:34 -0400 Subject: [PATCH 6/6] backupccl: use correct version gate for restore checkpointing PR #97862 introduced a subtle bug which allowed the new restore checkpointing policy to take effect before the 23_1 migrations occured. This patch ensures the new policy only takes effect after all migrations occur. Release note: None Epic: None --- pkg/ccl/backupccl/backup_test.go | 2 +- pkg/ccl/backupccl/restore_job.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index f427ca11e25b..196f4b5d40f4 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1811,7 +1811,7 @@ func TestBackupRestoreResume(t *testing.T) { }, }, // Required because restore checkpointing is version gated. - clusterversion.ByKey(clusterversion.V23_1Start), + clusterversion.ByKey(clusterversion.V23_1), ) // If the restore properly took the (incorrect) low-water mark into account, // the first half of the table will be missing. diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index ad9e5593ad3d..96867aeafdf0 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -287,7 +287,7 @@ func restore( return emptyRowCount, err } - on231 := clusterversion.ByKey(clusterversion.V23_1Start).LessEq(job.Payload().CreationClusterVersion) + on231 := clusterversion.ByKey(clusterversion.V23_1).LessEq(job.Payload().CreationClusterVersion) progressTracker, err := makeProgressTracker( dataToRestore.getSpans(), job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint,