Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
98741: ci: update bazel builder image r=rickystewart a=cockroach-teamcity

Release note: None
Epic: None


98878: backupccl: fix occassional TestRestoreErrorPropagates flake r=stevendanna a=adityamaru

Very rarely under stress race another automatic job would race with the restore and increment the error count. This would result in the count being greater than our expected value of 1. This disables all the automatic jobs eliminating the chance of this race.

Fixes: #98037

Release note: None

99099: kvserver: deflake TestReplicaTombstone r=andrewbaptist a=tbg

Like many other tests, this test could flake because we'd sometimes
catch a "cannot remove learner while snapshot is in flight" error.

I think the root cause is that sometimes there are errant Raft snapshots
in the system[^1] and these get mistaken for LEARNERs that are still
being caught up by the replicate queue. I tried to address this general
class of issues by making the check for in-flight learner snapshots not
care about *raft* snapshots.

I was able to stress TestReplicaTombstone for 30+ minutes without a
failure using that approach, whereas previously it usually failed within
a few minutes.

```
./dev test --stress pkg/kv/kvserver/ --filter TestReplicaTombstone 2>&1 | tee stress.log
[...]
2461 runs so far, 0 failures, over 35m45s
```

[^1]: #87553

Fixes #98883.

Epic: none
Release note: None


99126: kv: return error on locking request in LeafTxn r=nvanbenschoten a=miraradeva

Previously, as noted in #94290, it was possible for a LeafTxn to issue locking requests as part of SELECT FOR UPDATE. This behavior was unexpected and the RootTxn wasn't properly cleaning up the locks, resulting in others waiting for those locks to be released. The issue was resolved, in #94399, by ensuring non-default locking strength transactions don't use the streamer API and always run as RootTxn.

This patch adds an assertion on the kv side to prevent other existing or future attempts of LeafTxn issuing locking requests. We don't expect that there are such existing cases, so we don't expect this assertion to fail, but will keep an eye on the nightly tests to make sure.

Fixes: #97817
Release note: None

99150: backupccl: stop logging unsanitized backup stmt in schedule executor r=stevendanna a=msbutler

Informs #99145

Release note: None

Co-authored-by: cockroach-teamcity <[email protected]>
Co-authored-by: adityamaru <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Mira Radeva <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
6 people committed Mar 22, 2023
6 parents 1e2ea17 + d449fa8 + f8557fc + 6444df5 + 4985599 + 97243f6 commit c2460f1
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 43 deletions.
2 changes: 1 addition & 1 deletion build/.bazelbuilderversion
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cockroachdb/bazel:20230303-060247
cockroachdb/bazel:20230316-060229
6 changes: 6 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/keyvisualizer"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -6204,11 +6205,16 @@ func TestRestoreErrorPropagates(t *testing.T) {
return nil
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
params.ServerArgs.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true}
params.ServerArgs.DisableSpanConfigs = true
tc := testcluster.StartTestCluster(t, 3, params)
defer tc.Stopper().Stop(ctx)
db := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, `SET CLUSTER SETTING jobs.metrics.interval.poll = '30s'`)
runner.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false`)
runner.Exec(t, `SET CLUSTER SETTING sql.stats.system_tables_autostats.enabled = false`)
runner.Exec(t, "CREATE TABLE foo ()")
runner.Exec(t, "CREATE DATABASE into_db")
url := `nodelocal://0/foo`
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ func (e *scheduledBackupExecutor) executeBackup(
}
backupStmt.AsOf = tree.AsOfClause{Expr: endTime}

log.Infof(ctx, "Starting scheduled backup %d: %s",
sj.ScheduleID(), tree.AsString(backupStmt))
log.Infof(ctx, "Starting scheduled backup %d", sj.ScheduleID())

// Invoke backup plan hook.
hook, cleanup := cfg.PlanHookMaker("exec-backup", txn.KV(), sj.Owner())
Expand Down Expand Up @@ -148,10 +147,10 @@ func planBackup(
) (sql.PlanHookRowFn, error) {
fn, cols, _, _, err := backupPlanHook(ctx, backupStmt, p)
if err != nil {
return nil, errors.Wrapf(err, "backup eval: %q", tree.AsString(backupStmt))
return nil, errors.Wrapf(err, "failed to evaluate backup stmt")
}
if fn == nil {
return nil, errors.Newf("backup eval: %q", tree.AsString(backupStmt))
return nil, errors.Newf("failed to evaluate backup stmt")
}
if len(cols) != len(jobs.DetachedJobExecutionResultHeader) {
return nil, errors.Newf("unexpected result columns")
Expand Down
28 changes: 26 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,17 @@ func (tc *TxnCoordSender) Send(
) (*kvpb.BatchResponse, *kvpb.Error) {
// NOTE: The locking here is unusual. Although it might look like it, we are
// NOT holding the lock continuously for the duration of the Send. We lock
// here, and unlock at the botton of the interceptor stack, in the
// txnLockGatekeeper. The we lock again in that interceptor when the response
// here, and unlock at the bottom of the interceptor stack, in the
// txnLockGatekeeper. Then we lock again in that interceptor when the response
// comes, and unlock again in the defer below.
tc.mu.Lock()
defer tc.mu.Unlock()
tc.mu.active = true

if pErr := tc.maybeRejectIncompatibleRequest(ctx, ba); pErr != nil {
return nil, pErr
}

if pErr := tc.maybeRejectClientLocked(ctx, ba); pErr != nil {
return nil, pErr
}
Expand Down Expand Up @@ -667,6 +671,26 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
return nil
}

// maybeRejectIncompatibleRequest checks if the TxnCoordSender is compatible with
// a given BatchRequest.
// Specifically, a Leaf TxnCoordSender is not compatible with locking requests.
func (tc *TxnCoordSender) maybeRejectIncompatibleRequest(
ctx context.Context, ba *kvpb.BatchRequest,
) *kvpb.Error {
switch tc.typ {
case kv.RootTxn:
return nil
case kv.LeafTxn:
if ba.IsLocking() {
return kvpb.NewError(errors.WithContextTags(errors.AssertionFailedf(
"LeafTxn %s incompatible with locking request %s", tc.mu.txn, ba.Summary()), ctx))
}
return nil
default:
panic("unexpected TxnType")
}
}

// maybeRejectClientLocked checks whether the transaction is in a state that
// prevents it from continuing, such as the heartbeat having detected the
// transaction to have been aborted.
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3019,3 +3019,36 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) {
})
}
}

// TestTxnTypeCompatibleWithBatchRequest tests if a transaction type and a batch
// request are compatible.
func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
s := createTestDB(t)
defer s.Stop()

rootTxn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
leafInputState := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)

// a LeafTxn is not compatible with a locking request
_, err := leafTxn.GetForUpdate(ctx, roachpb.Key("a"))
require.Error(t, err)
require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err)
err = leafTxn.Put(ctx, roachpb.Key("a"), []byte("b"))
require.Error(t, err)
require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err)
// a LeafTxn is compatible with a non-locking request
_, err = leafTxn.Get(ctx, roachpb.Key("a"))
require.NoError(t, err)

// a RootTxn is compatible with all requests
_, err = rootTxn.GetForUpdate(ctx, roachpb.Key("a"))
require.NoError(t, err)
_, err = rootTxn.Get(ctx, roachpb.Key("a"))
require.NoError(t, err)
err = rootTxn.Put(ctx, roachpb.Key("a"), []byte("b"))
require.NoError(t, err)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
raftStatus := r.raftStatusRLocked()

const anyRecipientStore roachpb.StoreID = 0
pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore)
_, pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */)
lastIndex := r.mu.lastIndexNotDurable
// NB: raftLogSize above adjusts for pending truncations that have already
// been successfully replicated via raft, but logSizeTrusted does not see if
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {

r.mu.state.RaftAppliedIndex = index1
// Add first constraint.
_, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
_, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID)
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}

// Make sure it registered.
Expand All @@ -661,15 +661,15 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
// Add another constraint with the same id. Extremely unlikely in practice
// but we want to make sure it doesn't blow anything up. Collisions are
// handled by ignoring the colliding update.
_, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
_, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID)
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Helper that grabs the min constraint index (which can trigger GC as a
// byproduct) and asserts.
assertMin := func(exp uint64, now time.Time) {
t.Helper()
const anyRecipientStore roachpb.StoreID = 0
if maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore); maxIndex != exp {
if _, maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}
Expand All @@ -681,7 +681,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
r.mu.state.RaftAppliedIndex = index2
// Add another, higher, index. We're not going to notice it's around
// until the lower one disappears.
_, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID)
_, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, false /* initial */, storeID)

now := timeutil.Now()
// The colliding snapshot comes back. Or the original, we can't tell.
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
if fn := repl.store.cfg.TestingKnobs.RaftSnapshotQueueSkipReplica; fn != nil && fn() {
return false, nil
}
if repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID) {
// There is a snapshot being transferred. It's probably an INITIAL snap,
// so bail for now and try again later.
// NB: we could pass `false` for initialOnly as well, but we are the "other"
// possible sender.
if _, ok := repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID, true /* initialOnly */); ok {
// There is an INITIAL snapshot being transferred, so bail for now and try again later.
err := errors.Errorf(
"skipping snapshot; replica is likely a %s in the process of being added: %s",
typ,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2314,7 +2314,7 @@ func (r *Replica) GetLoadStatsForTesting() *load.ReplicaLoad {
// HasOutstandingLearnerSnapshotInFlightForTesting is for use only by tests to
// gather whether there are in-flight snapshots to learner replcas.
func (r *Replica) HasOutstandingLearnerSnapshotInFlightForTesting() bool {
return r.hasOutstandingLearnerSnapshotInFlight()
return r.errOnOutstandingLearnerSnapshotInflight() != nil
}

// ReadProtectedTimestampsForTesting is for use only by tests to read and update
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,9 +1334,9 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
// periods of time on a single range without making progress, which can stall
// other operations that they are expected to perform (see
// https://github.com/cockroachdb/cockroach/issues/79249 for example).
if r.hasOutstandingLearnerSnapshotInFlight() {
if err := r.errOnOutstandingLearnerSnapshotInflight(); err != nil {
return nil /* desc */, 0, /* learnersRemoved */
errCannotRemoveLearnerWhileSnapshotInFlight
errors.WithSecondaryError(errCannotRemoveLearnerWhileSnapshotInFlight, err)
}

if fn := r.store.TestingKnobs().BeforeRemovingDemotedLearner; fn != nil {
Expand Down Expand Up @@ -1839,7 +1839,7 @@ func (r *Replica) lockLearnerSnapshot(
var cleanups []func()
for _, addition := range additions {
lockUUID := uuid.MakeV4()
_, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID)
_, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, true /* initial */, addition.StoreID)
cleanups = append(cleanups, cleanup)
}
return func() {
Expand Down Expand Up @@ -2793,7 +2793,7 @@ func (r *Replica) sendSnapshotUsingDelegate(
senderQueuePriority = 0
}
snapUUID := uuid.MakeV4()
appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID)
appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, snapType == kvserverpb.SnapshotRequest_INITIAL, recipient.StoreID)
// The cleanup function needs to be called regardless of success or failure of
// sending to release the log truncation constraint.
defer cleanup()
Expand Down
55 changes: 32 additions & 23 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
type snapTruncationInfo struct {
index uint64
recipientStore roachpb.StoreID
initial bool
}

// addSnapshotLogTruncation creates a log truncation record which will prevent
Expand All @@ -1800,8 +1801,10 @@ type snapTruncationInfo struct {
// a possibly stale value here is harmless since the values increases
// monotonically. The actual snapshot index, may preserve more from a log
// truncation perspective.
// If initial is true, the snapshot is marked as being sent by the replicate
// queue to a new replica; some callers only care about these snapshots.
func (r *Replica) addSnapshotLogTruncationConstraint(
ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID,
ctx context.Context, snapUUID uuid.UUID, initial bool, recipientStore roachpb.StoreID,
) (uint64, func()) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -1823,6 +1826,7 @@ func (r *Replica) addSnapshotLogTruncationConstraint(
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
index: appliedIndex,
recipientStore: recipientStore,
initial: initial,
}

return appliedIndex, func() {
Expand All @@ -1843,48 +1847,53 @@ func (r *Replica) addSnapshotLogTruncationConstraint(
}
}

// getSnapshotLogTruncationConstraints returns the minimum index of any
// getSnapshotLogTruncationConstraintsRLocked returns the minimum index of any
// currently outstanding snapshot being sent from this replica to the specified
// recipient or 0 if there isn't one. Passing 0 for recipientStore means any
// recipient.
func (r *Replica) getSnapshotLogTruncationConstraints(
recipientStore roachpb.StoreID,
) (minSnapIndex uint64) {
r.mu.RLock()
defer r.mu.RUnlock()
return r.getSnapshotLogTruncationConstraintsRLocked(recipientStore)
}

// recipient. If initialOnly is set, only snapshots sent by the replicate queue
// to new replicas are considered.
func (r *Replica) getSnapshotLogTruncationConstraintsRLocked(
recipientStore roachpb.StoreID,
) (minSnapIndex uint64) {
recipientStore roachpb.StoreID, initialOnly bool,
) (_ []snapTruncationInfo, minSnapIndex uint64) {
var sl []snapTruncationInfo
for _, item := range r.mu.snapshotLogTruncationConstraints {
if initialOnly && !item.initial {
continue
}
if recipientStore != 0 && item.recipientStore != recipientStore {
continue
}
if minSnapIndex == 0 || minSnapIndex > item.index {
minSnapIndex = item.index
}
sl = append(sl, item)
}
return minSnapIndex
return sl, minSnapIndex
}

// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in
// progress from this replica to a learner replica for this range.
func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool {
// errOnOutstandingLearnerSnapshotInflight returns an error if there is a
// snapshot in progress from this replica to a learner replica for this range.
func (r *Replica) errOnOutstandingLearnerSnapshotInflight() error {
learners := r.Desc().Replicas().LearnerDescriptors()
for _, repl := range learners {
if r.hasOutstandingSnapshotInFlightToStore(repl.StoreID) {
return true
sl, _ := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID, true /* initialOnly */)
if len(sl) > 0 {
return errors.Errorf("INITIAL snapshots in flight to s%d: %v", repl.StoreID, sl)
}
}
return false
return nil
}

// hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in
// flight from this replica to the store with the given ID.
func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool {
return r.getSnapshotLogTruncationConstraints(storeID) > 0
// flight from this replica to the store with the given ID. If initialOnly is
// true, only snapshots sent by the replicate queue to new replicas are considered.
func (r *Replica) hasOutstandingSnapshotInFlightToStore(
storeID roachpb.StoreID, initialOnly bool,
) ([]snapTruncationInfo, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
sl, idx := r.getSnapshotLogTruncationConstraintsRLocked(storeID, initialOnly)
return sl, idx > 0
}

// HasRaftLeader returns true if the raft group has a raft leader currently.
Expand Down

0 comments on commit c2460f1

Please sign in to comment.