From 5a85950bf40a367018905a253f20600e0a4979ee Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 19 Aug 2022 17:38:58 -0400 Subject: [PATCH 1/4] bulk: perform meta lookup on range cache miss during index backfill Fixes #84290. This commit addresses the sustained slowdown described in #84290 by replacing the call in `SSTBatcher.flushIfNeeded` to `RangeCache.GetCached` with a call to `RangeCache.Lookup`. The former method checks the cache but returns no range descriptor if the cache currently has no overlapping key. This is possible if the descriptor was recently evicted because it was stale. The later method performs a meta lookup if the cache currently has no overlapping key, so it should always return _some_ range descriptor. There's a question of whether we should be logging a warning but proceeding if this meta lookup fails. For now, I've decided not to change that behavior. Release justification: None. Don't merge yet. --- pkg/kv/bulk/sst_batcher.go | 9 ++++----- pkg/kv/bulk/sst_batcher_test.go | 27 ++++++++++++--------------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 242a63e232c9..7bce386210e0 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -361,15 +361,14 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if !b.flushKeyChecked && b.rc != nil { b.flushKeyChecked = true if k, err := keys.Addr(nextKey); err != nil { - log.Warningf(ctx, "failed to get RKey for flush key lookup") + log.Warningf(ctx, "failed to get RKey for flush key lookup: %v", err) } else { - r := b.rc.GetCached(ctx, k, false /* inverted */) - if r != nil { + if r, err := b.rc.Lookup(ctx, k); err != nil { + log.Warningf(ctx, "failed to lookup range cache entry for key %v: %v", k, err) + } else { k := r.Desc().EndKey.AsRawKey() b.flushKey = k log.VEventf(ctx, 3, "%s building sstable that will flush before %v", b.name, k) - } else { - log.VEventf(ctx, 2, "%s no cached range desc available to determine sst flush key", b.name) } } } diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 588c8f5f9779..e29f5a40cd8a 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -277,27 +277,24 @@ func runTestImport(t *testing.T, batchSizeValue int64) { return encoding.EncodeStringAscending(append([]byte{}, prefix...), fmt.Sprintf("k%d", i)) } - t.Logf("splitting at %s and %s", key(split1), key(split2)) + t.Logf("splitting at %s", key(split1)) require.NoError(t, kvDB.AdminSplit(ctx, key(split1), hlc.MaxTimestamp /* expirationTime */)) - require.NoError(t, kvDB.AdminSplit(ctx, key(split2), hlc.MaxTimestamp /* expirationTime */)) // We want to make sure our range-aware batching knows about one of our - // splits to exercise that codepath, but we also want to make sure we + // splits to exercise that code path, but we also want to make sure we // still handle an unexpected split, so we make our own range cache and - // only populate it with one of our two splits. - mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil, + // populate it after the first split but before the second split. + ds := s.DistSenderI().(*kvcoord.DistSender) + mockCache := rangecache.NewRangeCache(s.ClusterSettings(), ds, func() int64 { return 2 << 10 }, s.Stopper(), s.TracerI().(*tracing.Tracer)) - addr, err := keys.Addr(key(0)) - require.NoError(t, err) - - tok, err := s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().LookupWithEvictionToken( - ctx, addr, rangecache.EvictionToken{}, false) - require.NoError(t, err) - - r := roachpb.RangeInfo{ - Desc: *tok.Desc(), + for _, k := range []int{0, split1} { + ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k))) + require.NoError(t, err) + mockCache.Insert(ctx, roachpb.RangeInfo{Desc: *ent.Desc()}) } - mockCache.Insert(ctx, r) + + t.Logf("splitting at %s", key(split2)) + require.NoError(t, kvDB.AdminSplit(ctx, key(split2), hlc.MaxTimestamp /* expirationTime */)) ts := hlc.Timestamp{WallTime: 100} mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) From d89a7e4588eac007444cbe411380789ec2468663 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 12 Sep 2022 19:07:06 -0400 Subject: [PATCH 2/4] kv: remove broken attempt to reject lease acquisitions on draining nodes Related to #83261. This commit removes "protection" that avoided lease acquisitions on draining nodes. This protection had already been effectively disabled by acc1ad1, which allowed Raft leaders to bypass the check. As the comment here (added in 5ffaa9e) explained, Raft followers are already unable to acquire the lease. If leaders bypass the check and follower (and candidates) don't need it, the check is useless, so we remove it. The commit also removes `TestReplicaDrainLease`, which was supposed to test this behavior. We remove the test not because it started failing after the change, but because it did not. It must not have been testing anything real anymore after acc1ad1. Release justification: low risk change related to release blocker. Release note: None --- pkg/kv/kvserver/replica_proposal.go | 15 ++- pkg/kv/kvserver/replica_range_lease.go | 25 ----- pkg/kv/kvserver/replica_test.go | 121 ------------------------- 3 files changed, 7 insertions(+), 154 deletions(-) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 308deef3c54b..6026021933dd 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -263,10 +263,13 @@ func (r *Replica) leasePostApplyLocked( leaseChangingHands := prevLease.Replica.StoreID != newLease.Replica.StoreID || prevLease.Sequence != newLease.Sequence if iAmTheLeaseHolder { - // Log lease acquisition whenever an Epoch-based lease changes hands (or verbose - // logging is enabled). - if newLease.Type() == roachpb.LeaseEpoch && leaseChangingHands || log.V(1) { - log.VEventf(ctx, 1, "new range lease %s following %s", newLease, prevLease) + // Log lease acquisitions loudly when verbose logging is enabled or when the + // new leaseholder is draining, in which case it should be shedding leases. + // Otherwise, log a trace event. + if log.V(1) || r.store.IsDraining() { + log.Infof(ctx, "new range lease %s following %s", newLease, prevLease) + } else { + log.Eventf(ctx, "new range lease %s following %s", newLease, prevLease) } } @@ -421,10 +424,6 @@ func (r *Replica) leasePostApplyLocked( log.Errorf(ctx, "%v", err) } }) - if leaseChangingHands && log.V(1) { - // This logging is useful to troubleshoot incomplete drains. - log.Info(ctx, "is now leaseholder") - } } // Inform the store of this lease. diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index fb318fdffe46..7643904c80c3 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -63,7 +63,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" - "go.etcd.io/etcd/raft/v3" ) var leaseStatusLogLimiter = func() *log.EveryN { @@ -800,30 +799,6 @@ func (r *Replica) requestLeaseLocked( return r.mu.pendingLeaseRequest.newResolvedHandle(pErr) } - // If we're draining, we'd rather not take any new leases (since we're also - // trying to move leases away elsewhere). But if we're the leader, we don't - // really have a choice and we take the lease - there might not be any other - // replica available to take this lease (perhaps they're all draining). - if r.store.IsDraining() { - // NB: Replicas that are not the Raft leader will not take leases anyway - // (see the check inside propBuf.FlushLockedWithRaftGroup()), so we don't - // really need any special behavior for draining nodes here. This check - // serves mostly as a means to get more granular logging and as a defensive - // precaution. - if r.raftBasicStatusRLocked().RaftState != raft.StateLeader { - log.VEventf(ctx, 2, "refusing to take the lease because we're draining") - return r.mu.pendingLeaseRequest.newResolvedHandle( - roachpb.NewError( - newNotLeaseHolderError( - roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc, - "refusing to take the lease; node is draining", - ), - ), - ) - } - log.Info(ctx, "trying to take the lease while we're draining since we're the raft leader") - } - // Propose a Raft command to get a lease for this replica. repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 6802942f2b08..c7da218fa72e 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1351,127 +1351,6 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { } } -// Test that draining nodes only take the lease if they're the leader. -func TestReplicaDrainLease(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - clusterArgs := base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - NodeLiveness: NodeLivenessTestingKnobs{ - // This test waits for an epoch-based lease to expire, so we're setting the - // liveness duration as low as possible while still keeping the test stable. - LivenessDuration: 2000 * time.Millisecond, - RenewalDuration: 1000 * time.Millisecond, - }, - Store: &StoreTestingKnobs{ - // We eliminate clock offsets in order to eliminate the stasis period of - // leases. Otherwise we'd need to make leases longer. - MaxOffset: time.Nanosecond, - }, - }, - }, - } - tc := serverutils.StartNewTestCluster(t, 2, clusterArgs) - defer tc.Stopper().Stop(ctx) - rngKey := tc.ScratchRange(t) - tc.AddVotersOrFatal(t, rngKey, tc.Target(1)) - - s1 := tc.Server(0) - s2 := tc.Server(1) - store1, err := s1.GetStores().(*Stores).GetStore(s1.GetFirstStoreID()) - require.NoError(t, err) - store2, err := s2.GetStores().(*Stores).GetStore(s2.GetFirstStoreID()) - require.NoError(t, err) - - rd := tc.LookupRangeOrFatal(t, rngKey) - r1, err := store1.GetReplica(rd.RangeID) - require.NoError(t, err) - status := r1.CurrentLeaseStatus(ctx) - require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) - // We expect the lease to be valid, but don't check that because, under race, it might have - // expired already. - - // Stop n1's heartbeats and wait for the lease to expire. - log.Infof(ctx, "test: suspending heartbeats for n1") - cleanup := s1.NodeLiveness().(*liveness.NodeLiveness).PauseAllHeartbeatsForTest() - defer cleanup() - - testutils.SucceedsSoon(t, func() error { - status := r1.CurrentLeaseStatus(ctx) - require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) - if status.State == kvserverpb.LeaseState_VALID { - return errors.New("lease still valid") - } - // We need to wait for the stasis state to pass too; during stasis other - // replicas can't take the lease. - if status.State == kvserverpb.LeaseState_UNUSABLE { - return errors.New("lease still in stasis") - } - return nil - }) - - require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()), - "expected leadership to still be on the first replica") - - // Wait until n1 has heartbeat its liveness record (epoch >= 1) and n2 - // knows about it. Otherwise, the following could occur: - // - // - n1's heartbeats to epoch 1 and acquires lease - // - n2 doesn't receive this yet (gossip) - // - when n2 is asked to acquire the lease, it uses a lease with epoch 1 - // but the liveness record with epoch zero - // - lease status is ERROR, lease acquisition (and thus test) fails. - testutils.SucceedsSoon(t, func() error { - nl, ok := s2.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(s1.NodeID()) - if !ok { - return errors.New("no liveness record for n1") - } - if nl.Epoch < 1 { - return errors.New("epoch for n1 still zero") - } - return nil - }) - - // Mark the stores as draining. We'll then start checking how acquiring leases - // behaves while draining. - store1.draining.Store(true) - store2.draining.Store(true) - - r2, err := store2.GetReplica(rd.RangeID) - require.NoError(t, err) - // Check that a draining replica that's not the leader does NOT take the - // lease. - _, pErr := r2.redirectOnOrAcquireLease(ctx) - require.NotNil(t, pErr) - require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail()) - - // Now transfer the leadership from r1 to r2 and check that r1 can now acquire - // the lease. - - // Initiate the leadership transfer. - r1.mu.Lock() - r1.mu.internalRaftGroup.TransferLeader(uint64(r2.ReplicaID())) - r1.mu.Unlock() - // Run the range through the Raft scheduler, otherwise the leadership messages - // doesn't get sent because the range is quiesced. - store1.EnqueueRaftUpdateCheck(r1.RangeID) - - // Wait for the leadership transfer to happen. - testutils.SucceedsSoon(t, func() error { - if r2.RaftStatus().SoftState.RaftState != raft.StateLeader { - return errors.Newf("r1 not yet leader") - } - return nil - }) - - // Check that r2 can now acquire the lease. - _, pErr = r2.redirectOnOrAcquireLease(ctx) - require.NoError(t, pErr.GoError()) -} - // TestReplicaGossipFirstRange verifies that the first range gossips its // location and the cluster ID. func TestReplicaGossipFirstRange(t *testing.T) { From c0aa573ddccf15ad4b694b91c6198f5d7154d38b Mon Sep 17 00:00:00 2001 From: rimadeodhar Date: Mon, 25 Jul 2022 13:58:35 -0700 Subject: [PATCH 3/4] pgwire: Add support for cursors enclosed in quotes In CockroachDB and Postgres, it is possible to declare cursors with special identifiers enclosed within double quotes, for e.g. "1-2-3". Currently, we store the name as an unescaped string which causes problems during the pgwire DESCRIBE step for looking up the cursor. We should be storing using the tree.Name datatype for the cursor name while storing and looking up cursors. This PR updates the code to start using tree.Name instead of raw strings for handling cursor names. This fixes the issue where the pgwire DESCRIBE step fails while attempting to look up cursors with names containing special characters. Resolves https://github.com/cockroachdb/cockroach/issues/84261 Release note (bug fix): The pgwire DESCRIBE step no longer fails with an error while attempting to look up cursors declared with names containing special characters. --- pkg/sql/conn_executor_prepare.go | 10 +-- pkg/sql/conn_io.go | 2 +- pkg/sql/logictest/testdata/logic_test/cursor | 42 ++++++++++++ .../logictest/testdata/logic_test/pg_catalog | 28 ++++++++ pkg/sql/pg_catalog.go | 12 ++-- pkg/sql/pgwire/conn.go | 2 +- pkg/sql/pgwire/conn_test.go | 8 ++- pkg/sql/pgwire/testdata/pgtest/portals | 68 +++++++++++++++++++ pkg/sql/sql_cursor.go | 46 ++++++------- 9 files changed, 181 insertions(+), 37 deletions(-) diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index cb2f279bb216..776a9b6a800b 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -337,7 +337,7 @@ func (ex *connExecutor) execBind( return retErr(pgerror.Newf( pgcode.DuplicateCursor, "portal %q already exists", portalName)) } - if cursor := ex.getCursorAccessor().getCursor(portalName); cursor != nil { + if cursor := ex.getCursorAccessor().getCursor(tree.Name(portalName)); cursor != nil { return retErr(pgerror.Newf( pgcode.DuplicateCursor, "portal %q already exists as cursor", portalName)) } @@ -493,7 +493,7 @@ func (ex *connExecutor) addPortal( if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok { panic(errors.AssertionFailedf("portal already exists: %q", portalName)) } - if cursor := ex.getCursorAccessor().getCursor(portalName); cursor != nil { + if cursor := ex.getCursorAccessor().getCursor(tree.Name(portalName)); cursor != nil { panic(errors.AssertionFailedf("portal already exists as cursor: %q", portalName)) } @@ -572,7 +572,7 @@ func (ex *connExecutor) execDescribe( switch descCmd.Type { case pgwirebase.PrepareStatement: - ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[descCmd.Name] + ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[string(descCmd.Name)] if !ok { return retErr(pgerror.Newf( pgcode.InvalidSQLStatementName, @@ -602,7 +602,9 @@ func (ex *connExecutor) execDescribe( res.SetPrepStmtOutput(ctx, ps.Columns) } case pgwirebase.PreparePortal: - portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[descCmd.Name] + // TODO(rimadeodhar): prepStmtsNamespace should also be updated to use tree.Name instead of string + // for indexing internal maps. + portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[string(descCmd.Name)] if !ok { // Check SQL-level cursors. cursor := ex.getCursorAccessor().getCursor(descCmd.Name) diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index bc0fb4443e6f..f0861ed00df9 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -218,7 +218,7 @@ var _ Command = PrepareStmt{} // DescribeStmt is the Command for producing info about a prepared statement or // portal. type DescribeStmt struct { - Name string + Name tree.Name Type pgwirebase.PrepareType } diff --git a/pkg/sql/logictest/testdata/logic_test/cursor b/pkg/sql/logictest/testdata/logic_test/cursor index 46c307b91b04..733e9a32fddb 100644 --- a/pkg/sql/logictest/testdata/logic_test/cursor +++ b/pkg/sql/logictest/testdata/logic_test/cursor @@ -557,3 +557,45 @@ CLOSE foo statement ok ALTER TABLE a ADD COLUMN c INT + +statement ok +COMMIT; + +statement ok +BEGIN; + +statement ok +DECLARE "a"" b'c" CURSOR FOR SELECT 1; + +query I +FETCH 1 "a"" b'c"; +---- +1 + +statement ok +CLOSE "a"" b'c"; +DECLARE "a b" CURSOR FOR SELECT 2; + +query I +FETCH 1 "a b"; +---- +2 + +statement ok +CLOSE "a b"; +DECLARE "a\b" CURSOR FOR SELECT 3; + +query I +FETCH 1 "a\b"; +---- +3 + +statement ok +CLOSE "a\b"; + +query error pq: at or near "b": syntax error +FETCH 1 a b; + +statement ok +COMMIT; + diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 18f786594687..e4d40d3674d3 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -6034,3 +6034,31 @@ SELECT * FROM pg_sequences WHERE sequencename = 'serial' ---- schemaname sequencename sequenceowner data_type start_value min_value max_value increment_by cycle cache_size last_value public serial root 20 101 1 9223372036854775807 5 false 1 NULL + +statement ok +CREATE TABLE t (a INT PRIMARY KEY, b INT); +INSERT INTO t VALUES (1, 2), (2, 3); + +statement ok +BEGIN; + +statement ok +DECLARE "a"" b'c" CURSOR FOR TABLE t; +DECLARE "a b" CURSOR FOR TABLE t; +DECLARE "a\b" CURSOR FOR TABLE t; + +## pg_catalog.pg_cursors + +query TTBBB colnames +SELECT name, statement, is_holdable, is_binary, is_scrollable FROM pg_catalog.pg_cursors ORDER BY name; +---- +name statement is_holdable is_binary is_scrollable +a b TABLE t false false false +a" b'c TABLE t false false false +a\b TABLE t false false false + +statement ok +COMMIT; + +statement ok +DROP TABLE t; diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go index 83bd1b7abcef..af8f5e5eb544 100644 --- a/pkg/sql/pg_catalog.go +++ b/pkg/sql/pg_catalog.go @@ -3776,12 +3776,12 @@ https://www.postgresql.org/docs/14/view-pg-cursors.html`, return err } if err := addRow( - tree.NewDString(name), /* name */ - tree.NewDString(c.statement), /* statement */ - tree.DBoolFalse, /* is_holdable */ - tree.DBoolFalse, /* is_binary */ - tree.DBoolFalse, /* is_scrollable */ - tz, /* creation_date */ + tree.NewDString(string(name)), /* name */ + tree.NewDString(c.statement), /* statement */ + tree.DBoolFalse, /* is_holdable */ + tree.DBoolFalse, /* is_binary */ + tree.DBoolFalse, /* is_scrollable */ + tz, /* creation_date */ ); err != nil { return err } diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index cc7720eaa3c6..b6ba058e6c17 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1011,7 +1011,7 @@ func (c *conn) handleDescribe(ctx context.Context, buf *pgwirebase.ReadBuffer) e return c.stmtBuf.Push( ctx, sql.DescribeStmt{ - Name: name, + Name: tree.Name(name), Type: typ, }) } diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index b88ddf1712dc..eabe43a9c1d1 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -159,6 +159,9 @@ func TestConn(t *testing.T) { if err := finishQuery(generateError, conn); err != nil { t.Fatal(err) } + expectExecStmt(ctx, t, "DECLARE \"a b\" CURSOR FOR SELECT 10", &rd, conn, queryStringComplete) + expectExecStmt(ctx, t, "FETCH 1 \"a b\"", &rd, conn, queryStringComplete) + expectExecStmt(ctx, t, "CLOSE \"a b\"", &rd, conn, queryStringComplete) // We got to the COMMIT at the end of the batch. expectExecStmt(ctx, t, "COMMIT TRANSACTION", &rd, conn, queryStringComplete) expectSync(ctx, t, &rd) @@ -505,6 +508,9 @@ func client(ctx context.Context, serverAddr net.Addr, wg *sync.WaitGroup) error batch.Queue("BEGIN") batch.Queue("select 7") batch.Queue("select 8") + batch.Queue("declare \"a b\" cursor for select 10") + batch.Queue("fetch 1 \"a b\"") + batch.Queue("close \"a b\"") batch.Queue("COMMIT") batchResults := conn.SendBatch(ctx, batch) @@ -676,7 +682,7 @@ func expectPrepareStmt( func expectDescribeStmt( ctx context.Context, t *testing.T, - expName string, + expName tree.Name, expType pgwirebase.PrepareType, rd *sql.StmtBufReader, c *conn, diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index 55c0595da6df..3f0b9a472bc9 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -1486,3 +1486,71 @@ ReadyForQuery {"Type":"ParseComplete"} {"Type":"ErrorResponse","Code":"08P01","Message":"invalid DESCRIBE message subtype 0"} {"Type":"ReadyForQuery","TxStatus":"I"} + +# Check declaring cursor with a name enclosed in double quotes +send +Query {"String": "BEGIN"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Parse {"Query": "DECLARE \"a b\" CURSOR FOR SELECT generate_series(1, 10) AS bar"} +Bind +Describe {"ObjectType": "P", "Name": ""} +Execute +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"NoData"} +{"Type":"CommandComplete","CommandTag":"DECLARE CURSOR"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Describe {"ObjectType": "P", "Name": "a b"} +Sync +---- + +until ignore_type_oids ignore_table_oids ignore_data_type_sizes +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"bar","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":0,"DataTypeSize":0,"TypeModifier":-1,"Format":0}]} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Parse {"Query": "FETCH 2 \"a b\""} +Bind +Describe {"ObjectType": "P", "Name": ""} +Execute +Sync +---- + +until ignore_type_oids ignore_table_oids ignore_data_type_sizes +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"RowDescription","Fields":[{"Name":"bar","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":0,"DataTypeSize":0,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"CommandComplete","CommandTag":"FETCH 2"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "ROLLBACK"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/sql_cursor.go b/pkg/sql/sql_cursor.go index 26b9609e1646..2499898ce00a 100644 --- a/pkg/sql/sql_cursor.go +++ b/pkg/sql/sql_cursor.go @@ -49,13 +49,12 @@ func (p *planner) DeclareCursor(ctx context.Context, s *tree.DeclareCursor) (pla } ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) - cursorName := s.Name.String() - if cursor := p.sqlCursors.getCursor(cursorName); cursor != nil { - return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", cursorName) + if cursor := p.sqlCursors.getCursor(s.Name); cursor != nil { + return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", s.Name) } - if p.extendedEvalCtx.PreparedStatementState.HasPortal(cursorName) { - return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists as portal", cursorName) + if p.extendedEvalCtx.PreparedStatementState.HasPortal(string(s.Name)) { + return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists as portal", s.Name) } // Try to plan the cursor query to make sure that it's valid. @@ -100,7 +99,7 @@ func (p *planner) DeclareCursor(ctx context.Context, s *tree.DeclareCursor) (pla statement: statement, created: timeutil.Now(), } - if err := p.sqlCursors.addCursor(cursorName, cursor); err != nil { + if err := p.sqlCursors.addCursor(s.Name, cursor); err != nil { // This case shouldn't happen because cursor names are scoped to a session, // and sessions can't have more than one statement running at once. But // let's be diligent and clean up if it somehow does happen anyway. @@ -119,11 +118,10 @@ var errBackwardScan = pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "cursor func (p *planner) FetchCursor( _ context.Context, s *tree.CursorStmt, isMove bool, ) (planNode, error) { - cursorName := s.Name.String() - cursor := p.sqlCursors.getCursor(cursorName) + cursor := p.sqlCursors.getCursor(s.Name) if cursor == nil { return nil, pgerror.Newf( - pgcode.InvalidCursorName, "cursor %q does not exist", cursorName, + pgcode.InvalidCursorName, "cursor %q does not exist", s.Name, ) } if s.Count < 0 || s.FetchType == tree.FetchBackwardAll { @@ -243,7 +241,7 @@ func (p *planner) CloseCursor(ctx context.Context, n *tree.CloseCursor) (planNod return &delayedNode{ name: n.String(), constructor: func(ctx context.Context, p *planner) (planNode, error) { - return newZeroNode(nil /* columns */), p.sqlCursors.closeCursor(n.Name.String()) + return newZeroNode(nil /* columns */), p.sqlCursors.closeCursor(n.Name) }, }, nil } @@ -276,20 +274,20 @@ type sqlCursors interface { closeAll() // closeCursor closes the named cursor, returning an error if that cursor // didn't exist in the set. - closeCursor(string) error + closeCursor(tree.Name) error // getCursor returns the named cursor, returning nil if that cursor // didn't exist in the set. - getCursor(string) *sqlCursor + getCursor(tree.Name) *sqlCursor // addCursor adds a new cursor with the given name to the set, returning an // error if the cursor already existed in the set. - addCursor(string, *sqlCursor) error + addCursor(tree.Name, *sqlCursor) error // list returns all open cursors in the set. - list() map[string]*sqlCursor + list() map[tree.Name]*sqlCursor } // cursorMap is a sqlCursors that's backed by an actual map. type cursorMap struct { - cursors map[string]*sqlCursor + cursors map[tree.Name]*sqlCursor } func (c *cursorMap) closeAll() { @@ -299,7 +297,7 @@ func (c *cursorMap) closeAll() { c.cursors = nil } -func (c *cursorMap) closeCursor(s string) error { +func (c *cursorMap) closeCursor(s tree.Name) error { cursor, ok := c.cursors[s] if !ok { return pgerror.Newf(pgcode.InvalidCursorName, "cursor %q does not exist", s) @@ -309,13 +307,13 @@ func (c *cursorMap) closeCursor(s string) error { return err } -func (c *cursorMap) getCursor(s string) *sqlCursor { +func (c *cursorMap) getCursor(s tree.Name) *sqlCursor { return c.cursors[s] } -func (c *cursorMap) addCursor(s string, cursor *sqlCursor) error { +func (c *cursorMap) addCursor(s tree.Name, cursor *sqlCursor) error { if c.cursors == nil { - c.cursors = make(map[string]*sqlCursor) + c.cursors = make(map[tree.Name]*sqlCursor) } if _, ok := c.cursors[s]; ok { return pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", s) @@ -324,7 +322,7 @@ func (c *cursorMap) addCursor(s string, cursor *sqlCursor) error { return nil } -func (c *cursorMap) list() map[string]*sqlCursor { +func (c *cursorMap) list() map[tree.Name]*sqlCursor { return c.cursors } @@ -338,19 +336,19 @@ func (c connExCursorAccessor) closeAll() { c.ex.extraTxnState.sqlCursors.closeAll() } -func (c connExCursorAccessor) closeCursor(s string) error { +func (c connExCursorAccessor) closeCursor(s tree.Name) error { return c.ex.extraTxnState.sqlCursors.closeCursor(s) } -func (c connExCursorAccessor) getCursor(s string) *sqlCursor { +func (c connExCursorAccessor) getCursor(s tree.Name) *sqlCursor { return c.ex.extraTxnState.sqlCursors.getCursor(s) } -func (c connExCursorAccessor) addCursor(s string, cursor *sqlCursor) error { +func (c connExCursorAccessor) addCursor(s tree.Name, cursor *sqlCursor) error { return c.ex.extraTxnState.sqlCursors.addCursor(s, cursor) } -func (c connExCursorAccessor) list() map[string]*sqlCursor { +func (c connExCursorAccessor) list() map[tree.Name]*sqlCursor { return c.ex.extraTxnState.sqlCursors.list() } From 991b134fd41c965b0da3d8cbd12a9f2ee05c6b6a Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 19 Sep 2022 22:15:14 -0400 Subject: [PATCH 4/4] kvserver: (partially) deflake transfer-leases/drain-other-node In #85629 we changed our lease transfer protocol to only ever transfer expiration-based leases, and have recipients later upgrade them to the more efficient epoch based ones. This was done to limit the effects of ill-advised lease transfers since the incoming leaseholder would need to recognize itself as such within a few seconds -- so we wanted this upgrade happen after having received the lease. In #83261 however we noticed that the upgrade was not immediate -- we were waiting until the current lease's expiration was within its renewal duration -- 4.5s. When the lease was eventually renewed the upgrade did happen, but it was not immediate. We fix this here and remove the manual clock advancing the supporting test had that masked this issue. It now demonstrates that we're no longer relying on upgrades happen as part of the (slow) renewal process. Release note: None --- pkg/kv/kvserver/client_lease_test.go | 41 +++++++++++++++++++++++----- pkg/kv/kvserver/client_merge_test.go | 6 ++++ pkg/kv/kvserver/replica_proposal.go | 7 ++++- pkg/kv/kvserver/testing_knobs.go | 4 +++ 4 files changed, 50 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index f63805890c77..bc435d368d59 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -701,8 +702,9 @@ func TestLeaseholderRelocate(t *testing.T) { // We start with having the range under test on (1,2,3). tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...) - // Make sure the lease is on 3 + // Make sure the lease is on 3 and is fully upgraded. tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(2)) + tc.WaitForLeaseUpgrade(ctx, t, rhsDesc) // Check that the lease moved to 3. leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil) @@ -730,7 +732,7 @@ func TestLeaseholderRelocate(t *testing.T) { return nil }) - // Make sure lease moved to the preferred region, if . + // Make sure lease moved to the preferred region. leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil) require.NoError(t, err) require.Equal(t, tc.Target(3), leaseHolder) @@ -739,10 +741,13 @@ func TestLeaseholderRelocate(t *testing.T) { repl := tc.GetFirstStoreFromServer(t, 3). LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey())) history := repl.GetLeaseHistory() + require.Equal(t, leaseHolder.NodeID, history[len(history)-1].Replica.NodeID) + require.Equal(t, leaseHolder.NodeID, + history[len(history)-2].Replica.NodeID) // account for the lease upgrade require.Equal(t, tc.Target(2).NodeID, - history[len(history)-2].Replica.NodeID) + history[len(history)-3].Replica.NodeID) } func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) { @@ -1303,10 +1308,17 @@ func TestAcquireLeaseTimeout(t *testing.T) { } } +// TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes does what it +// says on the tin. func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + mu := struct { + syncutil.Mutex + lease *roachpb.Lease + }{} + ctx := context.Background() manualClock := hlc.NewHybridManualClock() @@ -1315,10 +1327,21 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ + // Never ticked -- demonstrating that we're not relying on + // internal timers to upgrade leases. WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ - LeaseRenewalDurationOverride: 10 * time.Millisecond, // speed up the test + // Outlandishly high to disable proactive renewal of + // expiration based leases. Lease upgrades happen + // immediately after applying without needing active + // renewal. + LeaseRenewalDurationOverride: 100 * time.Hour, + LeaseUpgradeInterceptor: func(lease *roachpb.Lease) { + mu.Lock() + defer mu.Unlock() + mu.lease = lease + }, }, }, }, @@ -1333,8 +1356,7 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) n2 := tc.Server(1) n2Target := tc.Target(1) - // Transfer the lease from n1 to n2. Expect it to be transferred as an - // expiration based lease. + // Transfer the lease from n1 to n2. tc.TransferRangeLeaseOrFatal(t, desc, n2Target) testutils.SucceedsSoon(t, func() error { li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil) @@ -1346,6 +1368,11 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) return nil }) - tc.IncrClockForLeaseUpgrade(t, manualClock) + // Expect it to be upgraded to an epoch based lease. tc.WaitForLeaseUpgrade(ctx, t, desc) + + // Expect it to have been upgraded from an expiration based lease. + mu.Lock() + defer mu.Unlock() + require.Equal(t, roachpb.LeaseExpiration, mu.lease.Type()) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 0e42078a19de..0d2ee436a716 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -539,6 +539,9 @@ func mergeCheckingTimestampCaches( if !rhsRepl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { return errors.New("rhs store does not own valid lease for rhs range") } + if rhsRepl.CurrentLeaseStatus(ctx).Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease still an expiration based lease") + } return nil }) } @@ -1005,6 +1008,9 @@ func TestStoreRangeMergeTimestampCacheCausality(t *testing.T) { if !lhsRepl1.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { return errors.New("s2 does not own valid lease for lhs range") } + if lhsRepl1.CurrentLeaseStatus(ctx).Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease still an expiration based lease") + } return nil }) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 308deef3c54b..92a161fd7daf 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -374,8 +374,13 @@ func (r *Replica) leasePostApplyLocked( if log.V(1) { log.VEventf(ctx, 1, "upgrading expiration lease %s to an epoch-based one", newLease) } + + if r.store.TestingKnobs().LeaseUpgradeInterceptor != nil { + r.store.TestingKnobs().LeaseUpgradeInterceptor(newLease) + } st := r.leaseStatusForRequestRLocked(ctx, now, hlc.Timestamp{}) - r.maybeExtendLeaseAsyncLocked(ctx, st) + // Ignore the returned handle as we won't block on it. + _ = r.requestLeaseLocked(ctx, st) } } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 2763515b0774..8eb85129b526 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -434,6 +434,10 @@ type StoreTestingKnobs struct { // - rangefeed.TestingKnobs.IgnoreOnDeleteRangeError // - kvserverbase.BatchEvalTestingKnobs.DisableInitPutFailOnTombstones GlobalMVCCRangeTombstone bool + + // LeaseUpgradeInterceptor intercepts leases that get upgraded to + // epoch-based ones. + LeaseUpgradeInterceptor func(*roachpb.Lease) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.