diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 242a63e232c9..7bce386210e0 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -361,15 +361,14 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err if !b.flushKeyChecked && b.rc != nil { b.flushKeyChecked = true if k, err := keys.Addr(nextKey); err != nil { - log.Warningf(ctx, "failed to get RKey for flush key lookup") + log.Warningf(ctx, "failed to get RKey for flush key lookup: %v", err) } else { - r := b.rc.GetCached(ctx, k, false /* inverted */) - if r != nil { + if r, err := b.rc.Lookup(ctx, k); err != nil { + log.Warningf(ctx, "failed to lookup range cache entry for key %v: %v", k, err) + } else { k := r.Desc().EndKey.AsRawKey() b.flushKey = k log.VEventf(ctx, 3, "%s building sstable that will flush before %v", b.name, k) - } else { - log.VEventf(ctx, 2, "%s no cached range desc available to determine sst flush key", b.name) } } } diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 588c8f5f9779..e29f5a40cd8a 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -277,27 +277,24 @@ func runTestImport(t *testing.T, batchSizeValue int64) { return encoding.EncodeStringAscending(append([]byte{}, prefix...), fmt.Sprintf("k%d", i)) } - t.Logf("splitting at %s and %s", key(split1), key(split2)) + t.Logf("splitting at %s", key(split1)) require.NoError(t, kvDB.AdminSplit(ctx, key(split1), hlc.MaxTimestamp /* expirationTime */)) - require.NoError(t, kvDB.AdminSplit(ctx, key(split2), hlc.MaxTimestamp /* expirationTime */)) // We want to make sure our range-aware batching knows about one of our - // splits to exercise that codepath, but we also want to make sure we + // splits to exercise that code path, but we also want to make sure we // still handle an unexpected split, so we make our own range cache and - // only populate it with one of our two splits. - mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil, + // populate it after the first split but before the second split. + ds := s.DistSenderI().(*kvcoord.DistSender) + mockCache := rangecache.NewRangeCache(s.ClusterSettings(), ds, func() int64 { return 2 << 10 }, s.Stopper(), s.TracerI().(*tracing.Tracer)) - addr, err := keys.Addr(key(0)) - require.NoError(t, err) - - tok, err := s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().LookupWithEvictionToken( - ctx, addr, rangecache.EvictionToken{}, false) - require.NoError(t, err) - - r := roachpb.RangeInfo{ - Desc: *tok.Desc(), + for _, k := range []int{0, split1} { + ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k))) + require.NoError(t, err) + mockCache.Insert(ctx, roachpb.RangeInfo{Desc: *ent.Desc()}) } - mockCache.Insert(ctx, r) + + t.Logf("splitting at %s", key(split2)) + require.NoError(t, kvDB.AdminSplit(ctx, key(split2), hlc.MaxTimestamp /* expirationTime */)) ts := hlc.Timestamp{WallTime: 100} mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil) diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index f63805890c77..bc435d368d59 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -701,8 +702,9 @@ func TestLeaseholderRelocate(t *testing.T) { // We start with having the range under test on (1,2,3). tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...) - // Make sure the lease is on 3 + // Make sure the lease is on 3 and is fully upgraded. tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(2)) + tc.WaitForLeaseUpgrade(ctx, t, rhsDesc) // Check that the lease moved to 3. leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil) @@ -730,7 +732,7 @@ func TestLeaseholderRelocate(t *testing.T) { return nil }) - // Make sure lease moved to the preferred region, if . + // Make sure lease moved to the preferred region. leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil) require.NoError(t, err) require.Equal(t, tc.Target(3), leaseHolder) @@ -739,10 +741,13 @@ func TestLeaseholderRelocate(t *testing.T) { repl := tc.GetFirstStoreFromServer(t, 3). LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey())) history := repl.GetLeaseHistory() + require.Equal(t, leaseHolder.NodeID, history[len(history)-1].Replica.NodeID) + require.Equal(t, leaseHolder.NodeID, + history[len(history)-2].Replica.NodeID) // account for the lease upgrade require.Equal(t, tc.Target(2).NodeID, - history[len(history)-2].Replica.NodeID) + history[len(history)-3].Replica.NodeID) } func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) { @@ -1303,10 +1308,17 @@ func TestAcquireLeaseTimeout(t *testing.T) { } } +// TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes does what it +// says on the tin. func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + mu := struct { + syncutil.Mutex + lease *roachpb.Lease + }{} + ctx := context.Background() manualClock := hlc.NewHybridManualClock() @@ -1315,10 +1327,21 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ + // Never ticked -- demonstrating that we're not relying on + // internal timers to upgrade leases. WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ - LeaseRenewalDurationOverride: 10 * time.Millisecond, // speed up the test + // Outlandishly high to disable proactive renewal of + // expiration based leases. Lease upgrades happen + // immediately after applying without needing active + // renewal. + LeaseRenewalDurationOverride: 100 * time.Hour, + LeaseUpgradeInterceptor: func(lease *roachpb.Lease) { + mu.Lock() + defer mu.Unlock() + mu.lease = lease + }, }, }, }, @@ -1333,8 +1356,7 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) n2 := tc.Server(1) n2Target := tc.Target(1) - // Transfer the lease from n1 to n2. Expect it to be transferred as an - // expiration based lease. + // Transfer the lease from n1 to n2. tc.TransferRangeLeaseOrFatal(t, desc, n2Target) testutils.SucceedsSoon(t, func() error { li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil) @@ -1346,6 +1368,11 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) return nil }) - tc.IncrClockForLeaseUpgrade(t, manualClock) + // Expect it to be upgraded to an epoch based lease. tc.WaitForLeaseUpgrade(ctx, t, desc) + + // Expect it to have been upgraded from an expiration based lease. + mu.Lock() + defer mu.Unlock() + require.Equal(t, roachpb.LeaseExpiration, mu.lease.Type()) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 0e42078a19de..0d2ee436a716 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -539,6 +539,9 @@ func mergeCheckingTimestampCaches( if !rhsRepl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { return errors.New("rhs store does not own valid lease for rhs range") } + if rhsRepl.CurrentLeaseStatus(ctx).Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease still an expiration based lease") + } return nil }) } @@ -1005,6 +1008,9 @@ func TestStoreRangeMergeTimestampCacheCausality(t *testing.T) { if !lhsRepl1.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) { return errors.New("s2 does not own valid lease for lhs range") } + if lhsRepl1.CurrentLeaseStatus(ctx).Lease.Type() != roachpb.LeaseEpoch { + return errors.Errorf("lease still an expiration based lease") + } return nil }) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 308deef3c54b..70e47a690c17 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -263,10 +263,13 @@ func (r *Replica) leasePostApplyLocked( leaseChangingHands := prevLease.Replica.StoreID != newLease.Replica.StoreID || prevLease.Sequence != newLease.Sequence if iAmTheLeaseHolder { - // Log lease acquisition whenever an Epoch-based lease changes hands (or verbose - // logging is enabled). - if newLease.Type() == roachpb.LeaseEpoch && leaseChangingHands || log.V(1) { - log.VEventf(ctx, 1, "new range lease %s following %s", newLease, prevLease) + // Log lease acquisitions loudly when verbose logging is enabled or when the + // new leaseholder is draining, in which case it should be shedding leases. + // Otherwise, log a trace event. + if log.V(1) || r.store.IsDraining() { + log.Infof(ctx, "new range lease %s following %s", newLease, prevLease) + } else { + log.Eventf(ctx, "new range lease %s following %s", newLease, prevLease) } } @@ -374,8 +377,13 @@ func (r *Replica) leasePostApplyLocked( if log.V(1) { log.VEventf(ctx, 1, "upgrading expiration lease %s to an epoch-based one", newLease) } + + if r.store.TestingKnobs().LeaseUpgradeInterceptor != nil { + r.store.TestingKnobs().LeaseUpgradeInterceptor(newLease) + } st := r.leaseStatusForRequestRLocked(ctx, now, hlc.Timestamp{}) - r.maybeExtendLeaseAsyncLocked(ctx, st) + // Ignore the returned handle as we won't block on it. + _ = r.requestLeaseLocked(ctx, st) } } @@ -421,10 +429,6 @@ func (r *Replica) leasePostApplyLocked( log.Errorf(ctx, "%v", err) } }) - if leaseChangingHands && log.V(1) { - // This logging is useful to troubleshoot incomplete drains. - log.Info(ctx, "is now leaseholder") - } } // Inform the store of this lease. diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index fb318fdffe46..7643904c80c3 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -63,7 +63,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" - "go.etcd.io/etcd/raft/v3" ) var leaseStatusLogLimiter = func() *log.EveryN { @@ -800,30 +799,6 @@ func (r *Replica) requestLeaseLocked( return r.mu.pendingLeaseRequest.newResolvedHandle(pErr) } - // If we're draining, we'd rather not take any new leases (since we're also - // trying to move leases away elsewhere). But if we're the leader, we don't - // really have a choice and we take the lease - there might not be any other - // replica available to take this lease (perhaps they're all draining). - if r.store.IsDraining() { - // NB: Replicas that are not the Raft leader will not take leases anyway - // (see the check inside propBuf.FlushLockedWithRaftGroup()), so we don't - // really need any special behavior for draining nodes here. This check - // serves mostly as a means to get more granular logging and as a defensive - // precaution. - if r.raftBasicStatusRLocked().RaftState != raft.StateLeader { - log.VEventf(ctx, 2, "refusing to take the lease because we're draining") - return r.mu.pendingLeaseRequest.newResolvedHandle( - roachpb.NewError( - newNotLeaseHolderError( - roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc, - "refusing to take the lease; node is draining", - ), - ), - ) - } - log.Info(ctx, "trying to take the lease while we're draining since we're the raft leader") - } - // Propose a Raft command to get a lease for this replica. repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 563576f3a576..7c179eda7069 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -1351,127 +1351,6 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { } } -// Test that draining nodes only take the lease if they're the leader. -func TestReplicaDrainLease(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - clusterArgs := base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - NodeLiveness: NodeLivenessTestingKnobs{ - // This test waits for an epoch-based lease to expire, so we're setting the - // liveness duration as low as possible while still keeping the test stable. - LivenessDuration: 2000 * time.Millisecond, - RenewalDuration: 1000 * time.Millisecond, - }, - Store: &StoreTestingKnobs{ - // We eliminate clock offsets in order to eliminate the stasis period of - // leases. Otherwise we'd need to make leases longer. - MaxOffset: time.Nanosecond, - }, - }, - }, - } - tc := serverutils.StartNewTestCluster(t, 2, clusterArgs) - defer tc.Stopper().Stop(ctx) - rngKey := tc.ScratchRange(t) - tc.AddVotersOrFatal(t, rngKey, tc.Target(1)) - - s1 := tc.Server(0) - s2 := tc.Server(1) - store1, err := s1.GetStores().(*Stores).GetStore(s1.GetFirstStoreID()) - require.NoError(t, err) - store2, err := s2.GetStores().(*Stores).GetStore(s2.GetFirstStoreID()) - require.NoError(t, err) - - rd := tc.LookupRangeOrFatal(t, rngKey) - r1, err := store1.GetReplica(rd.RangeID) - require.NoError(t, err) - status := r1.CurrentLeaseStatus(ctx) - require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) - // We expect the lease to be valid, but don't check that because, under race, it might have - // expired already. - - // Stop n1's heartbeats and wait for the lease to expire. - log.Infof(ctx, "test: suspending heartbeats for n1") - cleanup := s1.NodeLiveness().(*liveness.NodeLiveness).PauseAllHeartbeatsForTest() - defer cleanup() - - testutils.SucceedsSoon(t, func() error { - status := r1.CurrentLeaseStatus(ctx) - require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) - if status.State == kvserverpb.LeaseState_VALID { - return errors.New("lease still valid") - } - // We need to wait for the stasis state to pass too; during stasis other - // replicas can't take the lease. - if status.State == kvserverpb.LeaseState_UNUSABLE { - return errors.New("lease still in stasis") - } - return nil - }) - - require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()), - "expected leadership to still be on the first replica") - - // Wait until n1 has heartbeat its liveness record (epoch >= 1) and n2 - // knows about it. Otherwise, the following could occur: - // - // - n1's heartbeats to epoch 1 and acquires lease - // - n2 doesn't receive this yet (gossip) - // - when n2 is asked to acquire the lease, it uses a lease with epoch 1 - // but the liveness record with epoch zero - // - lease status is ERROR, lease acquisition (and thus test) fails. - testutils.SucceedsSoon(t, func() error { - nl, ok := s2.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(s1.NodeID()) - if !ok { - return errors.New("no liveness record for n1") - } - if nl.Epoch < 1 { - return errors.New("epoch for n1 still zero") - } - return nil - }) - - // Mark the stores as draining. We'll then start checking how acquiring leases - // behaves while draining. - store1.draining.Store(true) - store2.draining.Store(true) - - r2, err := store2.GetReplica(rd.RangeID) - require.NoError(t, err) - // Check that a draining replica that's not the leader does NOT take the - // lease. - _, pErr := r2.redirectOnOrAcquireLease(ctx) - require.NotNil(t, pErr) - require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail()) - - // Now transfer the leadership from r1 to r2 and check that r1 can now acquire - // the lease. - - // Initiate the leadership transfer. - r1.mu.Lock() - r1.mu.internalRaftGroup.TransferLeader(uint64(r2.ReplicaID())) - r1.mu.Unlock() - // Run the range through the Raft scheduler, otherwise the leadership messages - // doesn't get sent because the range is quiesced. - store1.EnqueueRaftUpdateCheck(r1.RangeID) - - // Wait for the leadership transfer to happen. - testutils.SucceedsSoon(t, func() error { - if r2.RaftStatus().SoftState.RaftState != raft.StateLeader { - return errors.Newf("r1 not yet leader") - } - return nil - }) - - // Check that r2 can now acquire the lease. - _, pErr = r2.redirectOnOrAcquireLease(ctx) - require.NoError(t, pErr.GoError()) -} - // TestReplicaGossipFirstRange verifies that the first range gossips its // location and the cluster ID. func TestReplicaGossipFirstRange(t *testing.T) { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 2763515b0774..8eb85129b526 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -434,6 +434,10 @@ type StoreTestingKnobs struct { // - rangefeed.TestingKnobs.IgnoreOnDeleteRangeError // - kvserverbase.BatchEvalTestingKnobs.DisableInitPutFailOnTombstones GlobalMVCCRangeTombstone bool + + // LeaseUpgradeInterceptor intercepts leases that get upgraded to + // epoch-based ones. + LeaseUpgradeInterceptor func(*roachpb.Lease) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index 001556b24c80..3312e5f42b94 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -329,7 +329,7 @@ func (ex *connExecutor) execBind( return retErr(pgerror.Newf( pgcode.DuplicateCursor, "portal %q already exists", portalName)) } - if cursor := ex.getCursorAccessor().getCursor(portalName); cursor != nil { + if cursor := ex.getCursorAccessor().getCursor(tree.Name(portalName)); cursor != nil { return retErr(pgerror.Newf( pgcode.DuplicateCursor, "portal %q already exists as cursor", portalName)) } @@ -485,7 +485,7 @@ func (ex *connExecutor) addPortal( if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok { panic(errors.AssertionFailedf("portal already exists: %q", portalName)) } - if cursor := ex.getCursorAccessor().getCursor(portalName); cursor != nil { + if cursor := ex.getCursorAccessor().getCursor(tree.Name(portalName)); cursor != nil { panic(errors.AssertionFailedf("portal already exists as cursor: %q", portalName)) } @@ -564,7 +564,7 @@ func (ex *connExecutor) execDescribe( switch descCmd.Type { case pgwirebase.PrepareStatement: - ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[descCmd.Name] + ps, ok := ex.extraTxnState.prepStmtsNamespace.prepStmts[string(descCmd.Name)] if !ok { return retErr(pgerror.Newf( pgcode.InvalidSQLStatementName, @@ -594,7 +594,9 @@ func (ex *connExecutor) execDescribe( res.SetPrepStmtOutput(ctx, ps.Columns) } case pgwirebase.PreparePortal: - portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[descCmd.Name] + // TODO(rimadeodhar): prepStmtsNamespace should also be updated to use tree.Name instead of string + // for indexing internal maps. + portal, ok := ex.extraTxnState.prepStmtsNamespace.portals[string(descCmd.Name)] if !ok { // Check SQL-level cursors. cursor := ex.getCursorAccessor().getCursor(descCmd.Name) diff --git a/pkg/sql/conn_io.go b/pkg/sql/conn_io.go index bc0fb4443e6f..f0861ed00df9 100644 --- a/pkg/sql/conn_io.go +++ b/pkg/sql/conn_io.go @@ -218,7 +218,7 @@ var _ Command = PrepareStmt{} // DescribeStmt is the Command for producing info about a prepared statement or // portal. type DescribeStmt struct { - Name string + Name tree.Name Type pgwirebase.PrepareType } diff --git a/pkg/sql/logictest/testdata/logic_test/cursor b/pkg/sql/logictest/testdata/logic_test/cursor index 46c307b91b04..733e9a32fddb 100644 --- a/pkg/sql/logictest/testdata/logic_test/cursor +++ b/pkg/sql/logictest/testdata/logic_test/cursor @@ -557,3 +557,45 @@ CLOSE foo statement ok ALTER TABLE a ADD COLUMN c INT + +statement ok +COMMIT; + +statement ok +BEGIN; + +statement ok +DECLARE "a"" b'c" CURSOR FOR SELECT 1; + +query I +FETCH 1 "a"" b'c"; +---- +1 + +statement ok +CLOSE "a"" b'c"; +DECLARE "a b" CURSOR FOR SELECT 2; + +query I +FETCH 1 "a b"; +---- +2 + +statement ok +CLOSE "a b"; +DECLARE "a\b" CURSOR FOR SELECT 3; + +query I +FETCH 1 "a\b"; +---- +3 + +statement ok +CLOSE "a\b"; + +query error pq: at or near "b": syntax error +FETCH 1 a b; + +statement ok +COMMIT; + diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 7cb647dd10e7..5aa853af6fcf 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -6034,3 +6034,31 @@ SELECT * FROM pg_sequences WHERE sequencename = 'serial' ---- schemaname sequencename sequenceowner data_type start_value min_value max_value increment_by cycle cache_size last_value public serial root 20 101 1 9223372036854775807 5 false 1 NULL + +statement ok +CREATE TABLE t (a INT PRIMARY KEY, b INT); +INSERT INTO t VALUES (1, 2), (2, 3); + +statement ok +BEGIN; + +statement ok +DECLARE "a"" b'c" CURSOR FOR TABLE t; +DECLARE "a b" CURSOR FOR TABLE t; +DECLARE "a\b" CURSOR FOR TABLE t; + +## pg_catalog.pg_cursors + +query TTBBB colnames +SELECT name, statement, is_holdable, is_binary, is_scrollable FROM pg_catalog.pg_cursors ORDER BY name; +---- +name statement is_holdable is_binary is_scrollable +a b TABLE t false false false +a" b'c TABLE t false false false +a\b TABLE t false false false + +statement ok +COMMIT; + +statement ok +DROP TABLE t; diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go index 44b1cd5b8c35..bc2d5472c2c5 100644 --- a/pkg/sql/pg_catalog.go +++ b/pkg/sql/pg_catalog.go @@ -3776,12 +3776,12 @@ https://www.postgresql.org/docs/14/view-pg-cursors.html`, return err } if err := addRow( - tree.NewDString(name), /* name */ - tree.NewDString(c.statement), /* statement */ - tree.DBoolFalse, /* is_holdable */ - tree.DBoolFalse, /* is_binary */ - tree.DBoolFalse, /* is_scrollable */ - tz, /* creation_date */ + tree.NewDString(string(name)), /* name */ + tree.NewDString(c.statement), /* statement */ + tree.DBoolFalse, /* is_holdable */ + tree.DBoolFalse, /* is_binary */ + tree.DBoolFalse, /* is_scrollable */ + tz, /* creation_date */ ); err != nil { return err } diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 843eb7cb7d60..4ab3f88306b5 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1009,7 +1009,7 @@ func (c *conn) handleDescribe(ctx context.Context, buf *pgwirebase.ReadBuffer) e return c.stmtBuf.Push( ctx, sql.DescribeStmt{ - Name: name, + Name: tree.Name(name), Type: typ, }) } diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index b88ddf1712dc..eabe43a9c1d1 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -159,6 +159,9 @@ func TestConn(t *testing.T) { if err := finishQuery(generateError, conn); err != nil { t.Fatal(err) } + expectExecStmt(ctx, t, "DECLARE \"a b\" CURSOR FOR SELECT 10", &rd, conn, queryStringComplete) + expectExecStmt(ctx, t, "FETCH 1 \"a b\"", &rd, conn, queryStringComplete) + expectExecStmt(ctx, t, "CLOSE \"a b\"", &rd, conn, queryStringComplete) // We got to the COMMIT at the end of the batch. expectExecStmt(ctx, t, "COMMIT TRANSACTION", &rd, conn, queryStringComplete) expectSync(ctx, t, &rd) @@ -505,6 +508,9 @@ func client(ctx context.Context, serverAddr net.Addr, wg *sync.WaitGroup) error batch.Queue("BEGIN") batch.Queue("select 7") batch.Queue("select 8") + batch.Queue("declare \"a b\" cursor for select 10") + batch.Queue("fetch 1 \"a b\"") + batch.Queue("close \"a b\"") batch.Queue("COMMIT") batchResults := conn.SendBatch(ctx, batch) @@ -676,7 +682,7 @@ func expectPrepareStmt( func expectDescribeStmt( ctx context.Context, t *testing.T, - expName string, + expName tree.Name, expType pgwirebase.PrepareType, rd *sql.StmtBufReader, c *conn, diff --git a/pkg/sql/pgwire/testdata/pgtest/portals b/pkg/sql/pgwire/testdata/pgtest/portals index 55c0595da6df..3f0b9a472bc9 100644 --- a/pkg/sql/pgwire/testdata/pgtest/portals +++ b/pkg/sql/pgwire/testdata/pgtest/portals @@ -1486,3 +1486,71 @@ ReadyForQuery {"Type":"ParseComplete"} {"Type":"ErrorResponse","Code":"08P01","Message":"invalid DESCRIBE message subtype 0"} {"Type":"ReadyForQuery","TxStatus":"I"} + +# Check declaring cursor with a name enclosed in double quotes +send +Query {"String": "BEGIN"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"BEGIN"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Parse {"Query": "DECLARE \"a b\" CURSOR FOR SELECT generate_series(1, 10) AS bar"} +Bind +Describe {"ObjectType": "P", "Name": ""} +Execute +Sync +---- + +until +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"NoData"} +{"Type":"CommandComplete","CommandTag":"DECLARE CURSOR"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Describe {"ObjectType": "P", "Name": "a b"} +Sync +---- + +until ignore_type_oids ignore_table_oids ignore_data_type_sizes +ReadyForQuery +---- +{"Type":"RowDescription","Fields":[{"Name":"bar","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":0,"DataTypeSize":0,"TypeModifier":-1,"Format":0}]} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Parse {"Query": "FETCH 2 \"a b\""} +Bind +Describe {"ObjectType": "P", "Name": ""} +Execute +Sync +---- + +until ignore_type_oids ignore_table_oids ignore_data_type_sizes +ReadyForQuery +---- +{"Type":"ParseComplete"} +{"Type":"BindComplete"} +{"Type":"RowDescription","Fields":[{"Name":"bar","TableOID":0,"TableAttributeNumber":0,"DataTypeOID":0,"DataTypeSize":0,"TypeModifier":-1,"Format":0}]} +{"Type":"DataRow","Values":[{"text":"1"}]} +{"Type":"DataRow","Values":[{"text":"2"}]} +{"Type":"CommandComplete","CommandTag":"FETCH 2"} +{"Type":"ReadyForQuery","TxStatus":"T"} + +send +Query {"String": "ROLLBACK"} +---- + +until +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"ROLLBACK"} +{"Type":"ReadyForQuery","TxStatus":"I"} diff --git a/pkg/sql/sql_cursor.go b/pkg/sql/sql_cursor.go index 26b9609e1646..2499898ce00a 100644 --- a/pkg/sql/sql_cursor.go +++ b/pkg/sql/sql_cursor.go @@ -49,13 +49,12 @@ func (p *planner) DeclareCursor(ctx context.Context, s *tree.DeclareCursor) (pla } ie := p.ExecCfg().InternalExecutorFactory.NewInternalExecutor(p.SessionData()) - cursorName := s.Name.String() - if cursor := p.sqlCursors.getCursor(cursorName); cursor != nil { - return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", cursorName) + if cursor := p.sqlCursors.getCursor(s.Name); cursor != nil { + return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", s.Name) } - if p.extendedEvalCtx.PreparedStatementState.HasPortal(cursorName) { - return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists as portal", cursorName) + if p.extendedEvalCtx.PreparedStatementState.HasPortal(string(s.Name)) { + return nil, pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists as portal", s.Name) } // Try to plan the cursor query to make sure that it's valid. @@ -100,7 +99,7 @@ func (p *planner) DeclareCursor(ctx context.Context, s *tree.DeclareCursor) (pla statement: statement, created: timeutil.Now(), } - if err := p.sqlCursors.addCursor(cursorName, cursor); err != nil { + if err := p.sqlCursors.addCursor(s.Name, cursor); err != nil { // This case shouldn't happen because cursor names are scoped to a session, // and sessions can't have more than one statement running at once. But // let's be diligent and clean up if it somehow does happen anyway. @@ -119,11 +118,10 @@ var errBackwardScan = pgerror.Newf(pgcode.ObjectNotInPrerequisiteState, "cursor func (p *planner) FetchCursor( _ context.Context, s *tree.CursorStmt, isMove bool, ) (planNode, error) { - cursorName := s.Name.String() - cursor := p.sqlCursors.getCursor(cursorName) + cursor := p.sqlCursors.getCursor(s.Name) if cursor == nil { return nil, pgerror.Newf( - pgcode.InvalidCursorName, "cursor %q does not exist", cursorName, + pgcode.InvalidCursorName, "cursor %q does not exist", s.Name, ) } if s.Count < 0 || s.FetchType == tree.FetchBackwardAll { @@ -243,7 +241,7 @@ func (p *planner) CloseCursor(ctx context.Context, n *tree.CloseCursor) (planNod return &delayedNode{ name: n.String(), constructor: func(ctx context.Context, p *planner) (planNode, error) { - return newZeroNode(nil /* columns */), p.sqlCursors.closeCursor(n.Name.String()) + return newZeroNode(nil /* columns */), p.sqlCursors.closeCursor(n.Name) }, }, nil } @@ -276,20 +274,20 @@ type sqlCursors interface { closeAll() // closeCursor closes the named cursor, returning an error if that cursor // didn't exist in the set. - closeCursor(string) error + closeCursor(tree.Name) error // getCursor returns the named cursor, returning nil if that cursor // didn't exist in the set. - getCursor(string) *sqlCursor + getCursor(tree.Name) *sqlCursor // addCursor adds a new cursor with the given name to the set, returning an // error if the cursor already existed in the set. - addCursor(string, *sqlCursor) error + addCursor(tree.Name, *sqlCursor) error // list returns all open cursors in the set. - list() map[string]*sqlCursor + list() map[tree.Name]*sqlCursor } // cursorMap is a sqlCursors that's backed by an actual map. type cursorMap struct { - cursors map[string]*sqlCursor + cursors map[tree.Name]*sqlCursor } func (c *cursorMap) closeAll() { @@ -299,7 +297,7 @@ func (c *cursorMap) closeAll() { c.cursors = nil } -func (c *cursorMap) closeCursor(s string) error { +func (c *cursorMap) closeCursor(s tree.Name) error { cursor, ok := c.cursors[s] if !ok { return pgerror.Newf(pgcode.InvalidCursorName, "cursor %q does not exist", s) @@ -309,13 +307,13 @@ func (c *cursorMap) closeCursor(s string) error { return err } -func (c *cursorMap) getCursor(s string) *sqlCursor { +func (c *cursorMap) getCursor(s tree.Name) *sqlCursor { return c.cursors[s] } -func (c *cursorMap) addCursor(s string, cursor *sqlCursor) error { +func (c *cursorMap) addCursor(s tree.Name, cursor *sqlCursor) error { if c.cursors == nil { - c.cursors = make(map[string]*sqlCursor) + c.cursors = make(map[tree.Name]*sqlCursor) } if _, ok := c.cursors[s]; ok { return pgerror.Newf(pgcode.DuplicateCursor, "cursor %q already exists", s) @@ -324,7 +322,7 @@ func (c *cursorMap) addCursor(s string, cursor *sqlCursor) error { return nil } -func (c *cursorMap) list() map[string]*sqlCursor { +func (c *cursorMap) list() map[tree.Name]*sqlCursor { return c.cursors } @@ -338,19 +336,19 @@ func (c connExCursorAccessor) closeAll() { c.ex.extraTxnState.sqlCursors.closeAll() } -func (c connExCursorAccessor) closeCursor(s string) error { +func (c connExCursorAccessor) closeCursor(s tree.Name) error { return c.ex.extraTxnState.sqlCursors.closeCursor(s) } -func (c connExCursorAccessor) getCursor(s string) *sqlCursor { +func (c connExCursorAccessor) getCursor(s tree.Name) *sqlCursor { return c.ex.extraTxnState.sqlCursors.getCursor(s) } -func (c connExCursorAccessor) addCursor(s string, cursor *sqlCursor) error { +func (c connExCursorAccessor) addCursor(s tree.Name, cursor *sqlCursor) error { return c.ex.extraTxnState.sqlCursors.addCursor(s, cursor) } -func (c connExCursorAccessor) list() map[string]*sqlCursor { +func (c connExCursorAccessor) list() map[tree.Name]*sqlCursor { return c.ex.extraTxnState.sqlCursors.list() }