diff --git a/build/.bazelbuilderversion b/build/.bazelbuilderversion index b84f6513bcc5..d5f64ccb9ee9 100644 --- a/build/.bazelbuilderversion +++ b/build/.bazelbuilderversion @@ -1 +1 @@ -cockroachdb/bazel:20230303-060247 \ No newline at end of file +cockroachdb/bazel:20230316-060229 \ No newline at end of file diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index ec93e7986ab0..859c25bf05ef 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/keyvisualizer" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -6204,11 +6205,16 @@ func TestRestoreErrorPropagates(t *testing.T) { return nil }, } + params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + params.ServerArgs.Knobs.KeyVisualizer = &keyvisualizer.TestingKnobs{SkipJobBootstrap: true} + params.ServerArgs.DisableSpanConfigs = true tc := testcluster.StartTestCluster(t, 3, params) defer tc.Stopper().Stop(ctx) db := tc.ServerConn(0) runner := sqlutils.MakeSQLRunner(db) + runner.Exec(t, `SET CLUSTER SETTING jobs.metrics.interval.poll = '30s'`) runner.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false`) + runner.Exec(t, `SET CLUSTER SETTING sql.stats.system_tables_autostats.enabled = false`) runner.Exec(t, "CREATE TABLE foo ()") runner.Exec(t, "CREATE DATABASE into_db") url := `nodelocal://0/foo` diff --git a/pkg/ccl/backupccl/schedule_exec.go b/pkg/ccl/backupccl/schedule_exec.go index d572019bd952..63e35ad255ca 100644 --- a/pkg/ccl/backupccl/schedule_exec.go +++ b/pkg/ccl/backupccl/schedule_exec.go @@ -97,8 +97,7 @@ func (e *scheduledBackupExecutor) executeBackup( } backupStmt.AsOf = tree.AsOfClause{Expr: endTime} - log.Infof(ctx, "Starting scheduled backup %d: %s", - sj.ScheduleID(), tree.AsString(backupStmt)) + log.Infof(ctx, "Starting scheduled backup %d", sj.ScheduleID()) // Invoke backup plan hook. hook, cleanup := cfg.PlanHookMaker("exec-backup", txn.KV(), sj.Owner()) @@ -148,10 +147,10 @@ func planBackup( ) (sql.PlanHookRowFn, error) { fn, cols, _, _, err := backupPlanHook(ctx, backupStmt, p) if err != nil { - return nil, errors.Wrapf(err, "backup eval: %q", tree.AsString(backupStmt)) + return nil, errors.Wrapf(err, "failed to evaluate backup stmt") } if fn == nil { - return nil, errors.Newf("backup eval: %q", tree.AsString(backupStmt)) + return nil, errors.Newf("failed to evaluate backup stmt") } if len(cols) != len(jobs.DetachedJobExecutionResultHeader) { return nil, errors.Newf("unexpected result columns") diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 834c0f91574e..5ef128f4e7b1 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -475,13 +475,17 @@ func (tc *TxnCoordSender) Send( ) (*kvpb.BatchResponse, *kvpb.Error) { // NOTE: The locking here is unusual. Although it might look like it, we are // NOT holding the lock continuously for the duration of the Send. We lock - // here, and unlock at the botton of the interceptor stack, in the - // txnLockGatekeeper. The we lock again in that interceptor when the response + // here, and unlock at the bottom of the interceptor stack, in the + // txnLockGatekeeper. Then we lock again in that interceptor when the response // comes, and unlock again in the defer below. tc.mu.Lock() defer tc.mu.Unlock() tc.mu.active = true + if pErr := tc.maybeRejectIncompatibleRequest(ctx, ba); pErr != nil { + return nil, pErr + } + if pErr := tc.maybeRejectClientLocked(ctx, ba); pErr != nil { return nil, pErr } @@ -667,6 +671,26 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er return nil } +// maybeRejectIncompatibleRequest checks if the TxnCoordSender is compatible with +// a given BatchRequest. +// Specifically, a Leaf TxnCoordSender is not compatible with locking requests. +func (tc *TxnCoordSender) maybeRejectIncompatibleRequest( + ctx context.Context, ba *kvpb.BatchRequest, +) *kvpb.Error { + switch tc.typ { + case kv.RootTxn: + return nil + case kv.LeafTxn: + if ba.IsLocking() { + return kvpb.NewError(errors.WithContextTags(errors.AssertionFailedf( + "LeafTxn %s incompatible with locking request %s", tc.mu.txn, ba.Summary()), ctx)) + } + return nil + default: + panic("unexpected TxnType") + } +} + // maybeRejectClientLocked checks whether the transaction is in a state that // prevents it from continuing, such as the heartbeat having detected the // transaction to have been aborted. diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 7738d566f772..f8c1671507c2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -3019,3 +3019,36 @@ func TestTxnCoordSenderSetFixedTimestamp(t *testing.T) { }) } } + +// TestTxnTypeCompatibleWithBatchRequest tests if a transaction type and a batch +// request are compatible. +func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + s := createTestDB(t) + defer s.Stop() + + rootTxn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */) + leafInputState := rootTxn.GetLeafTxnInputState(ctx) + leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState) + + // a LeafTxn is not compatible with a locking request + _, err := leafTxn.GetForUpdate(ctx, roachpb.Key("a")) + require.Error(t, err) + require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) + err = leafTxn.Put(ctx, roachpb.Key("a"), []byte("b")) + require.Error(t, err) + require.Regexp(t, "LeafTxn .* incompatible with locking request .*", err) + // a LeafTxn is compatible with a non-locking request + _, err = leafTxn.Get(ctx, roachpb.Key("a")) + require.NoError(t, err) + + // a RootTxn is compatible with all requests + _, err = rootTxn.GetForUpdate(ctx, roachpb.Key("a")) + require.NoError(t, err) + _, err = rootTxn.Get(ctx, roachpb.Key("a")) + require.NoError(t, err) + err = rootTxn.Put(ctx, roachpb.Key("a"), []byte("b")) + require.NoError(t, err) +} diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 8365a15923f7..e27095d3e991 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -268,7 +268,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err raftStatus := r.raftStatusRLocked() const anyRecipientStore roachpb.StoreID = 0 - pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore) + _, pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */) lastIndex := r.mu.lastIndexNotDurable // NB: raftLogSize above adjusts for pending truncations that have already // been successfully replicated via raft, but logSizeTrusted does not see if diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go index cb3737cb2031..a80d5dd1e948 100644 --- a/pkg/kv/kvserver/raft_log_queue_test.go +++ b/pkg/kv/kvserver/raft_log_queue_test.go @@ -651,7 +651,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { r.mu.state.RaftAppliedIndex = index1 // Add first constraint. - _, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID) + _, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID) exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}} // Make sure it registered. @@ -661,7 +661,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { // Add another constraint with the same id. Extremely unlikely in practice // but we want to make sure it doesn't blow anything up. Collisions are // handled by ignoring the colliding update. - _, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID) + _, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID) assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) // Helper that grabs the min constraint index (which can trigger GC as a @@ -669,7 +669,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { assertMin := func(exp uint64, now time.Time) { t.Helper() const anyRecipientStore roachpb.StoreID = 0 - if maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore); maxIndex != exp { + if _, maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */); maxIndex != exp { t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp) } } @@ -681,7 +681,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) { r.mu.state.RaftAppliedIndex = index2 // Add another, higher, index. We're not going to notice it's around // until the lower one disappears. - _, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID) + _, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, false /* initial */, storeID) now := timeutil.Now() // The colliding snapshot comes back. Or the original, we can't tell. diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 2c7b1952272d..e473c3a74e15 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -116,9 +116,10 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( if fn := repl.store.cfg.TestingKnobs.RaftSnapshotQueueSkipReplica; fn != nil && fn() { return false, nil } - if repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID) { - // There is a snapshot being transferred. It's probably an INITIAL snap, - // so bail for now and try again later. + // NB: we could pass `false` for initialOnly as well, but we are the "other" + // possible sender. + if _, ok := repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID, true /* initialOnly */); ok { + // There is an INITIAL snapshot being transferred, so bail for now and try again later. err := errors.Errorf( "skipping snapshot; replica is likely a %s in the process of being added: %s", typ, diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2325233a82a2..145135dd25dc 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2314,7 +2314,7 @@ func (r *Replica) GetLoadStatsForTesting() *load.ReplicaLoad { // HasOutstandingLearnerSnapshotInFlightForTesting is for use only by tests to // gather whether there are in-flight snapshots to learner replcas. func (r *Replica) HasOutstandingLearnerSnapshotInFlightForTesting() bool { - return r.hasOutstandingLearnerSnapshotInFlight() + return r.errOnOutstandingLearnerSnapshotInflight() != nil } // ReadProtectedTimestampsForTesting is for use only by tests to read and update diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index ccdd699a403a..3b40763f7eab 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1334,9 +1334,9 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners( // periods of time on a single range without making progress, which can stall // other operations that they are expected to perform (see // https://github.com/cockroachdb/cockroach/issues/79249 for example). - if r.hasOutstandingLearnerSnapshotInFlight() { + if err := r.errOnOutstandingLearnerSnapshotInflight(); err != nil { return nil /* desc */, 0, /* learnersRemoved */ - errCannotRemoveLearnerWhileSnapshotInFlight + errors.WithSecondaryError(errCannotRemoveLearnerWhileSnapshotInFlight, err) } if fn := r.store.TestingKnobs().BeforeRemovingDemotedLearner; fn != nil { @@ -1839,7 +1839,7 @@ func (r *Replica) lockLearnerSnapshot( var cleanups []func() for _, addition := range additions { lockUUID := uuid.MakeV4() - _, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID) + _, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, true /* initial */, addition.StoreID) cleanups = append(cleanups, cleanup) } return func() { @@ -2793,7 +2793,7 @@ func (r *Replica) sendSnapshotUsingDelegate( senderQueuePriority = 0 } snapUUID := uuid.MakeV4() - appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID) + appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, snapType == kvserverpb.SnapshotRequest_INITIAL, recipient.StoreID) // The cleanup function needs to be called regardless of success or failure of // sending to release the log truncation constraint. defer cleanup() diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index dca55053b685..898892163df1 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1785,6 +1785,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID type snapTruncationInfo struct { index uint64 recipientStore roachpb.StoreID + initial bool } // addSnapshotLogTruncation creates a log truncation record which will prevent @@ -1800,8 +1801,10 @@ type snapTruncationInfo struct { // a possibly stale value here is harmless since the values increases // monotonically. The actual snapshot index, may preserve more from a log // truncation perspective. +// If initial is true, the snapshot is marked as being sent by the replicate +// queue to a new replica; some callers only care about these snapshots. func (r *Replica) addSnapshotLogTruncationConstraint( - ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID, + ctx context.Context, snapUUID uuid.UUID, initial bool, recipientStore roachpb.StoreID, ) (uint64, func()) { r.mu.Lock() defer r.mu.Unlock() @@ -1823,6 +1826,7 @@ func (r *Replica) addSnapshotLogTruncationConstraint( r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{ index: appliedIndex, recipientStore: recipientStore, + initial: initial, } return appliedIndex, func() { @@ -1843,48 +1847,53 @@ func (r *Replica) addSnapshotLogTruncationConstraint( } } -// getSnapshotLogTruncationConstraints returns the minimum index of any +// getSnapshotLogTruncationConstraintsRLocked returns the minimum index of any // currently outstanding snapshot being sent from this replica to the specified // recipient or 0 if there isn't one. Passing 0 for recipientStore means any -// recipient. -func (r *Replica) getSnapshotLogTruncationConstraints( - recipientStore roachpb.StoreID, -) (minSnapIndex uint64) { - r.mu.RLock() - defer r.mu.RUnlock() - return r.getSnapshotLogTruncationConstraintsRLocked(recipientStore) -} - +// recipient. If initialOnly is set, only snapshots sent by the replicate queue +// to new replicas are considered. func (r *Replica) getSnapshotLogTruncationConstraintsRLocked( - recipientStore roachpb.StoreID, -) (minSnapIndex uint64) { + recipientStore roachpb.StoreID, initialOnly bool, +) (_ []snapTruncationInfo, minSnapIndex uint64) { + var sl []snapTruncationInfo for _, item := range r.mu.snapshotLogTruncationConstraints { + if initialOnly && !item.initial { + continue + } if recipientStore != 0 && item.recipientStore != recipientStore { continue } if minSnapIndex == 0 || minSnapIndex > item.index { minSnapIndex = item.index } + sl = append(sl, item) } - return minSnapIndex + return sl, minSnapIndex } -// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in -// progress from this replica to a learner replica for this range. -func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool { +// errOnOutstandingLearnerSnapshotInflight returns an error if there is a +// snapshot in progress from this replica to a learner replica for this range. +func (r *Replica) errOnOutstandingLearnerSnapshotInflight() error { learners := r.Desc().Replicas().LearnerDescriptors() for _, repl := range learners { - if r.hasOutstandingSnapshotInFlightToStore(repl.StoreID) { - return true + sl, _ := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID, true /* initialOnly */) + if len(sl) > 0 { + return errors.Errorf("INITIAL snapshots in flight to s%d: %v", repl.StoreID, sl) } } - return false + return nil } // hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in -// flight from this replica to the store with the given ID. -func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool { - return r.getSnapshotLogTruncationConstraints(storeID) > 0 +// flight from this replica to the store with the given ID. If initialOnly is +// true, only snapshots sent by the replicate queue to new replicas are considered. +func (r *Replica) hasOutstandingSnapshotInFlightToStore( + storeID roachpb.StoreID, initialOnly bool, +) ([]snapTruncationInfo, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + sl, idx := r.getSnapshotLogTruncationConstraintsRLocked(storeID, initialOnly) + return sl, idx > 0 } // HasRaftLeader returns true if the raft group has a raft leader currently.