Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
85859: pgwire: Add support for cursors with special characters r=knz,rafiss a=rimadeodhar

IIn CockroachDB and Postgres, it is possible to declare
cursors with special characters enclosed within double
quotes, for e.g. "1-2-3". Currently, we store the name
as an unescaped string which causes problems during the
pgwire DESCRIBE step for looking up the cursor. We should
be storing using the tree.Name datatype for the cursor name
while storing and looking up cursors. This PR updates the code
to start using tree.Name instead of raw strings for handling
cursor names. This fixes the issue where the pgwire DESCRIBE
step fails while attempting to look up cursors with names
containing special characters.

Resolves #84261

Release note (bug fix): The pgwire DESCRIBE step no longer
fails with an error while attempting to look up cursors
declared with names containing special characters.

86492: bulk: perform meta lookup on range cache miss during index backfill r=dt a=nvanbenschoten

Fixes #84290.

This commit addresses the sustained slowdown described in #84290 by replacing
the call in `SSTBatcher.flushIfNeeded` to `RangeCache.GetCached` with a call to
`RangeCache.Lookup`. The former method checks the cache but returns no range
descriptor if the cache currently has no overlapping key. This is possible if
the descriptor was recently evicted because it was stale. The later method
performs a meta lookup if the cache currently has no overlapping key, so it
should always return _some_ range descriptor.

There's a question of whether we should be logging a warning but proceeding if
this meta lookup fails. For now, I've decided not to change that behavior.

Release justification: None. Don't merge yet.

87885: kv: remove broken attempt to reject lease acquisitions on draining nodes r=nvanbenschoten a=nvanbenschoten

Related to #83261.

This commit removes "protection" that avoided lease acquisitions on draining nodes. This protection had already been effectively disabled by acc1ad1, which allowed Raft leaders to bypass the check. As the comment here (added in 5ffaa9e) explained, Raft followers are already unable to acquire the lease. If leaders bypass the check and follower (and candidates) don't need it, the check is useless, so we remove it.

The commit also removes `TestReplicaDrainLease`, which was supposed to test this behavior. We remove the test not because it started failing after the change, but because it did not. It must not have been testing anything real anymore after acc1ad1.

Release justification: low risk change related to release blocker.

Release note: None

88205: kvserver: (partially) deflake transfer-leases/drain-other-node r=irfansharif a=irfansharif

In #85629 we changed our lease transfer protocol to only ever transfer expiration-based leases, and have recipients later upgrade them to the more efficient epoch based ones. This was done to limit the effects of ill-advised lease transfers since the incoming leaseholder would need to recognize itself as such within a few seconds -- so we wanted this upgrade happen after having received the lease.

In #83261 however we noticed that the upgrade was not immediate -- we were waiting until the current lease's expiration was within its renewal duration -- 4.5s. When the lease was eventually renewed the upgrade did happen, but it was not immediate. We fix this here and remove the manual clock advancing the supporting test had that masked this issue. It now demonstrates that we're no longer relying on upgrades happen as part of the (slow) renewal process.

Release note: None

Co-authored-by: rimadeodhar <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Sep 20, 2022
5 parents 77c412d + c0aa573 + 5a85950 + d89a7e4 + 991b134 commit a25a8b0
Show file tree
Hide file tree
Showing 17 changed files with 254 additions and 219 deletions.
9 changes: 4 additions & 5 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,15 +361,14 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
if !b.flushKeyChecked && b.rc != nil {
b.flushKeyChecked = true
if k, err := keys.Addr(nextKey); err != nil {
log.Warningf(ctx, "failed to get RKey for flush key lookup")
log.Warningf(ctx, "failed to get RKey for flush key lookup: %v", err)
} else {
r := b.rc.GetCached(ctx, k, false /* inverted */)
if r != nil {
if r, err := b.rc.Lookup(ctx, k); err != nil {
log.Warningf(ctx, "failed to lookup range cache entry for key %v: %v", k, err)
} else {
k := r.Desc().EndKey.AsRawKey()
b.flushKey = k
log.VEventf(ctx, 3, "%s building sstable that will flush before %v", b.name, k)
} else {
log.VEventf(ctx, 2, "%s no cached range desc available to determine sst flush key", b.name)
}
}
}
Expand Down
27 changes: 12 additions & 15 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,27 +277,24 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
return encoding.EncodeStringAscending(append([]byte{}, prefix...), fmt.Sprintf("k%d", i))
}

t.Logf("splitting at %s and %s", key(split1), key(split2))
t.Logf("splitting at %s", key(split1))
require.NoError(t, kvDB.AdminSplit(ctx, key(split1), hlc.MaxTimestamp /* expirationTime */))
require.NoError(t, kvDB.AdminSplit(ctx, key(split2), hlc.MaxTimestamp /* expirationTime */))

// We want to make sure our range-aware batching knows about one of our
// splits to exercise that codepath, but we also want to make sure we
// splits to exercise that code path, but we also want to make sure we
// still handle an unexpected split, so we make our own range cache and
// only populate it with one of our two splits.
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), nil,
// populate it after the first split but before the second split.
ds := s.DistSenderI().(*kvcoord.DistSender)
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), ds,
func() int64 { return 2 << 10 }, s.Stopper(), s.TracerI().(*tracing.Tracer))
addr, err := keys.Addr(key(0))
require.NoError(t, err)

tok, err := s.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().LookupWithEvictionToken(
ctx, addr, rangecache.EvictionToken{}, false)
require.NoError(t, err)

r := roachpb.RangeInfo{
Desc: *tok.Desc(),
for _, k := range []int{0, split1} {
ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k)))
require.NoError(t, err)
mockCache.Insert(ctx, roachpb.RangeInfo{Desc: *ent.Desc()})
}
mockCache.Insert(ctx, r)

t.Logf("splitting at %s", key(split2))
require.NoError(t, kvDB.AdminSplit(ctx, key(split2), hlc.MaxTimestamp /* expirationTime */))

ts := hlc.Timestamp{WallTime: 100}
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
Expand Down
41 changes: 34 additions & 7 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -701,8 +702,9 @@ func TestLeaseholderRelocate(t *testing.T) {
// We start with having the range under test on (1,2,3).
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...)

// Make sure the lease is on 3
// Make sure the lease is on 3 and is fully upgraded.
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(2))
tc.WaitForLeaseUpgrade(ctx, t, rhsDesc)

// Check that the lease moved to 3.
leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil)
Expand Down Expand Up @@ -730,7 +732,7 @@ func TestLeaseholderRelocate(t *testing.T) {
return nil
})

// Make sure lease moved to the preferred region, if .
// Make sure lease moved to the preferred region.
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(3), leaseHolder)
Expand All @@ -739,10 +741,13 @@ func TestLeaseholderRelocate(t *testing.T) {
repl := tc.GetFirstStoreFromServer(t, 3).
LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey()))
history := repl.GetLeaseHistory()

require.Equal(t, leaseHolder.NodeID,
history[len(history)-1].Replica.NodeID)
require.Equal(t, leaseHolder.NodeID,
history[len(history)-2].Replica.NodeID) // account for the lease upgrade
require.Equal(t, tc.Target(2).NodeID,
history[len(history)-2].Replica.NodeID)
history[len(history)-3].Replica.NodeID)
}

func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) {
Expand Down Expand Up @@ -1303,10 +1308,17 @@ func TestAcquireLeaseTimeout(t *testing.T) {
}
}

// TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes does what it
// says on the tin.
func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

mu := struct {
syncutil.Mutex
lease *roachpb.Lease
}{}

ctx := context.Background()

manualClock := hlc.NewHybridManualClock()
Expand All @@ -1315,10 +1327,21 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T)
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
// Never ticked -- demonstrating that we're not relying on
// internal timers to upgrade leases.
WallClock: manualClock,
},
Store: &kvserver.StoreTestingKnobs{
LeaseRenewalDurationOverride: 10 * time.Millisecond, // speed up the test
// Outlandishly high to disable proactive renewal of
// expiration based leases. Lease upgrades happen
// immediately after applying without needing active
// renewal.
LeaseRenewalDurationOverride: 100 * time.Hour,
LeaseUpgradeInterceptor: func(lease *roachpb.Lease) {
mu.Lock()
defer mu.Unlock()
mu.lease = lease
},
},
},
},
Expand All @@ -1333,8 +1356,7 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T)
n2 := tc.Server(1)
n2Target := tc.Target(1)

// Transfer the lease from n1 to n2. Expect it to be transferred as an
// expiration based lease.
// Transfer the lease from n1 to n2.
tc.TransferRangeLeaseOrFatal(t, desc, n2Target)
testutils.SucceedsSoon(t, func() error {
li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil)
Expand All @@ -1346,6 +1368,11 @@ func TestLeaseTransfersUseExpirationLeasesAndBumpToEpochBasedOnes(t *testing.T)
return nil
})

tc.IncrClockForLeaseUpgrade(t, manualClock)
// Expect it to be upgraded to an epoch based lease.
tc.WaitForLeaseUpgrade(ctx, t, desc)

// Expect it to have been upgraded from an expiration based lease.
mu.Lock()
defer mu.Unlock()
require.Equal(t, roachpb.LeaseExpiration, mu.lease.Type())
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ func mergeCheckingTimestampCaches(
if !rhsRepl.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) {
return errors.New("rhs store does not own valid lease for rhs range")
}
if rhsRepl.CurrentLeaseStatus(ctx).Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
return nil
})
}
Expand Down Expand Up @@ -1005,6 +1008,9 @@ func TestStoreRangeMergeTimestampCacheCausality(t *testing.T) {
if !lhsRepl1.OwnsValidLease(ctx, tc.Servers[1].Clock().NowAsClockTimestamp()) {
return errors.New("s2 does not own valid lease for lhs range")
}
if lhsRepl1.CurrentLeaseStatus(ctx).Lease.Type() != roachpb.LeaseEpoch {
return errors.Errorf("lease still an expiration based lease")
}
return nil
})

Expand Down
22 changes: 13 additions & 9 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,13 @@ func (r *Replica) leasePostApplyLocked(
leaseChangingHands := prevLease.Replica.StoreID != newLease.Replica.StoreID || prevLease.Sequence != newLease.Sequence

if iAmTheLeaseHolder {
// Log lease acquisition whenever an Epoch-based lease changes hands (or verbose
// logging is enabled).
if newLease.Type() == roachpb.LeaseEpoch && leaseChangingHands || log.V(1) {
log.VEventf(ctx, 1, "new range lease %s following %s", newLease, prevLease)
// Log lease acquisitions loudly when verbose logging is enabled or when the
// new leaseholder is draining, in which case it should be shedding leases.
// Otherwise, log a trace event.
if log.V(1) || r.store.IsDraining() {
log.Infof(ctx, "new range lease %s following %s", newLease, prevLease)
} else {
log.Eventf(ctx, "new range lease %s following %s", newLease, prevLease)
}
}

Expand Down Expand Up @@ -374,8 +377,13 @@ func (r *Replica) leasePostApplyLocked(
if log.V(1) {
log.VEventf(ctx, 1, "upgrading expiration lease %s to an epoch-based one", newLease)
}

if r.store.TestingKnobs().LeaseUpgradeInterceptor != nil {
r.store.TestingKnobs().LeaseUpgradeInterceptor(newLease)
}
st := r.leaseStatusForRequestRLocked(ctx, now, hlc.Timestamp{})
r.maybeExtendLeaseAsyncLocked(ctx, st)
// Ignore the returned handle as we won't block on it.
_ = r.requestLeaseLocked(ctx, st)
}
}

Expand Down Expand Up @@ -421,10 +429,6 @@ func (r *Replica) leasePostApplyLocked(
log.Errorf(ctx, "%v", err)
}
})
if leaseChangingHands && log.V(1) {
// This logging is useful to troubleshoot incomplete drains.
log.Info(ctx, "is now leaseholder")
}
}

// Inform the store of this lease.
Expand Down
25 changes: 0 additions & 25 deletions pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"go.etcd.io/etcd/raft/v3"
)

var leaseStatusLogLimiter = func() *log.EveryN {
Expand Down Expand Up @@ -800,30 +799,6 @@ func (r *Replica) requestLeaseLocked(
return r.mu.pendingLeaseRequest.newResolvedHandle(pErr)
}

// If we're draining, we'd rather not take any new leases (since we're also
// trying to move leases away elsewhere). But if we're the leader, we don't
// really have a choice and we take the lease - there might not be any other
// replica available to take this lease (perhaps they're all draining).
if r.store.IsDraining() {
// NB: Replicas that are not the Raft leader will not take leases anyway
// (see the check inside propBuf.FlushLockedWithRaftGroup()), so we don't
// really need any special behavior for draining nodes here. This check
// serves mostly as a means to get more granular logging and as a defensive
// precaution.
if r.raftBasicStatusRLocked().RaftState != raft.StateLeader {
log.VEventf(ctx, 2, "refusing to take the lease because we're draining")
return r.mu.pendingLeaseRequest.newResolvedHandle(
roachpb.NewError(
newNotLeaseHolderError(
roachpb.Lease{}, r.store.StoreID(), r.mu.state.Desc,
"refusing to take the lease; node is draining",
),
),
)
}
log.Info(ctx, "trying to take the lease while we're draining since we're the raft leader")
}

// Propose a Raft command to get a lease for this replica.
repDesc, err := r.getReplicaDescriptorRLocked()
if err != nil {
Expand Down
121 changes: 0 additions & 121 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1351,127 +1351,6 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
}
}

// Test that draining nodes only take the lease if they're the leader.
func TestReplicaDrainLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
clusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
NodeLiveness: NodeLivenessTestingKnobs{
// This test waits for an epoch-based lease to expire, so we're setting the
// liveness duration as low as possible while still keeping the test stable.
LivenessDuration: 2000 * time.Millisecond,
RenewalDuration: 1000 * time.Millisecond,
},
Store: &StoreTestingKnobs{
// We eliminate clock offsets in order to eliminate the stasis period of
// leases. Otherwise we'd need to make leases longer.
MaxOffset: time.Nanosecond,
},
},
},
}
tc := serverutils.StartNewTestCluster(t, 2, clusterArgs)
defer tc.Stopper().Stop(ctx)
rngKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, rngKey, tc.Target(1))

s1 := tc.Server(0)
s2 := tc.Server(1)
store1, err := s1.GetStores().(*Stores).GetStore(s1.GetFirstStoreID())
require.NoError(t, err)
store2, err := s2.GetStores().(*Stores).GetStore(s2.GetFirstStoreID())
require.NoError(t, err)

rd := tc.LookupRangeOrFatal(t, rngKey)
r1, err := store1.GetReplica(rd.RangeID)
require.NoError(t, err)
status := r1.CurrentLeaseStatus(ctx)
require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status)
// We expect the lease to be valid, but don't check that because, under race, it might have
// expired already.

// Stop n1's heartbeats and wait for the lease to expire.
log.Infof(ctx, "test: suspending heartbeats for n1")
cleanup := s1.NodeLiveness().(*liveness.NodeLiveness).PauseAllHeartbeatsForTest()
defer cleanup()

testutils.SucceedsSoon(t, func() error {
status := r1.CurrentLeaseStatus(ctx)
require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status)
if status.State == kvserverpb.LeaseState_VALID {
return errors.New("lease still valid")
}
// We need to wait for the stasis state to pass too; during stasis other
// replicas can't take the lease.
if status.State == kvserverpb.LeaseState_UNUSABLE {
return errors.New("lease still in stasis")
}
return nil
})

require.Equal(t, r1.RaftStatus().Lead, uint64(r1.ReplicaID()),
"expected leadership to still be on the first replica")

// Wait until n1 has heartbeat its liveness record (epoch >= 1) and n2
// knows about it. Otherwise, the following could occur:
//
// - n1's heartbeats to epoch 1 and acquires lease
// - n2 doesn't receive this yet (gossip)
// - when n2 is asked to acquire the lease, it uses a lease with epoch 1
// but the liveness record with epoch zero
// - lease status is ERROR, lease acquisition (and thus test) fails.
testutils.SucceedsSoon(t, func() error {
nl, ok := s2.NodeLiveness().(*liveness.NodeLiveness).GetLiveness(s1.NodeID())
if !ok {
return errors.New("no liveness record for n1")
}
if nl.Epoch < 1 {
return errors.New("epoch for n1 still zero")
}
return nil
})

// Mark the stores as draining. We'll then start checking how acquiring leases
// behaves while draining.
store1.draining.Store(true)
store2.draining.Store(true)

r2, err := store2.GetReplica(rd.RangeID)
require.NoError(t, err)
// Check that a draining replica that's not the leader does NOT take the
// lease.
_, pErr := r2.redirectOnOrAcquireLease(ctx)
require.NotNil(t, pErr)
require.IsType(t, &roachpb.NotLeaseHolderError{}, pErr.GetDetail())

// Now transfer the leadership from r1 to r2 and check that r1 can now acquire
// the lease.

// Initiate the leadership transfer.
r1.mu.Lock()
r1.mu.internalRaftGroup.TransferLeader(uint64(r2.ReplicaID()))
r1.mu.Unlock()
// Run the range through the Raft scheduler, otherwise the leadership messages
// doesn't get sent because the range is quiesced.
store1.EnqueueRaftUpdateCheck(r1.RangeID)

// Wait for the leadership transfer to happen.
testutils.SucceedsSoon(t, func() error {
if r2.RaftStatus().SoftState.RaftState != raft.StateLeader {
return errors.Newf("r1 not yet leader")
}
return nil
})

// Check that r2 can now acquire the lease.
_, pErr = r2.redirectOnOrAcquireLease(ctx)
require.NoError(t, pErr.GoError())
}

// TestReplicaGossipFirstRange verifies that the first range gossips its
// location and the cluster ID.
func TestReplicaGossipFirstRange(t *testing.T) {
Expand Down
Loading

0 comments on commit a25a8b0

Please sign in to comment.