From b9d5c51da51f492349ad44d280612979f787cc11 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 27 Jan 2022 17:51:03 -0500 Subject: [PATCH] kvserver: introduce RangeAppliedState.RaftAppliedIndexTerm The same field is also introduced in ReplicaState, since ReplicaState is used in internal data-structures and when sending a state machine snapshot. The migration code uses a special unused term value in a ReplicatedEvalResult to signal to the state machine application machinery to start populating the term field. Fixes #75671 Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 14 +++ .../kvserver/batcheval/cmd_end_transaction.go | 16 ++- pkg/kv/kvserver/batcheval/cmd_migrate.go | 22 ++++ pkg/kv/kvserver/batcheval/result/result.go | 7 +- pkg/kv/kvserver/below_raft_protos_test.go | 2 +- pkg/kv/kvserver/kvserverpb/state.proto | 4 + .../replica_application_state_machine.go | 24 ++++- pkg/kv/kvserver/replica_raftstorage.go | 52 ++++++++- pkg/kv/kvserver/stateloader/initial.go | 19 +++- pkg/kv/kvserver/stateloader/stateloader.go | 20 ++-- pkg/kv/kvserver/store_snapshot.go | 7 ++ pkg/migration/migrations/BUILD.bazel | 1 + pkg/migration/migrations/migrations.go | 10 ++ .../migrations/raft_applied_index_term.go | 100 ++++++++++++++++++ pkg/storage/enginepb/mvcc3.proto | 9 ++ 17 files changed, 288 insertions(+), 23 deletions(-) create mode 100644 pkg/migration/migrations/raft_applied_index_term.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index bd9b53e9c3d9..c883aa6f14c7 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-50 set the active cluster version in the format '.' +version version 21.2-54 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 822c8cbd171f..bc25fd2bf193 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -186,6 +186,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-50set the active cluster version in the format '.' +versionversion21.2-54set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 1cc066ae2a2f..bcc87f399fc3 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -251,6 +251,12 @@ const ( // EnableProtectedTimestampsForTenant enables the use of protected timestamps // in secondary tenants. EnableProtectedTimestampsForTenant + // AddRaftAppliedIndexTermMigration is a migration that causes each range + // replica to start populating RangeAppliedState.RaftAppliedIndexTerm field. + AddRaftAppliedIndexTermMigration + // PostAddRaftAppliedIndexTermMigration is used for asserting that + // RaftAppliedIndexTerm is populated. + PostAddRaftAppliedIndexTermMigration // ************************************************* // Step (1): Add new versions here. @@ -387,6 +393,14 @@ var versionsSingleton = keyedVersions{ Key: EnableProtectedTimestampsForTenant, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50}, }, + { + Key: AddRaftAppliedIndexTermMigration, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52}, + }, + { + Key: PostAddRaftAppliedIndexTermMigration, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 54}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 83c8698f0f60..f287f81d5867 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -1011,9 +1011,23 @@ func splitTriggerHelper( if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load replica version") } + // The RHS should populate RaftAppliedIndexTerm if the LHS is doing so. + // Alternatively, we could be more aggressive and also look at the cluster + // version, but this is simpler -- if the keyspace occupied by the + // original unsplit range has not been migrated yet, by an ongoing + // migration, both LHS and RHS will be migrated later. + rangeAppliedState, err := sl.LoadRangeAppliedState(ctx, batch) + if err != nil { + return enginepb.MVCCStats{}, result.Result{}, + errors.Wrap(err, "unable to load range applied state") + } + writeRaftAppliedIndexTerm := false + if rangeAppliedState.RaftAppliedIndexTerm > 0 { + writeRaftAppliedIndexTerm = true + } *h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState( ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease, - *gcThreshold, replicaVersion, + *gcThreshold, replicaVersion, writeRaftAppliedIndexTerm, ) if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state") diff --git a/pkg/kv/kvserver/batcheval/cmd_migrate.go b/pkg/kv/kvserver/batcheval/cmd_migrate.go index 12de72f787f5..8bf16b05b1e4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_migrate.go +++ b/pkg/kv/kvserver/batcheval/cmd_migrate.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/errors" @@ -49,6 +50,8 @@ type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Re func init() { _ = registerMigration // prevent unused warning. + registerMigration( + clusterversion.AddRaftAppliedIndexTermMigration, addRaftAppliedIndexTermMigration) } func registerMigration(key clusterversion.Key, migration migration) { @@ -89,6 +92,25 @@ func Migrate( return pd, nil } +// addRaftAppliedIndexTermMigration migrates the system to start populating +// the RangeAppliedState.RaftAppliedIndexTerm field. +func addRaftAppliedIndexTermMigration( + ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, +) (result.Result, error) { + return result.Result{ + Replicated: kvserverpb.ReplicatedEvalResult{ + State: &kvserverpb.ReplicaState{ + // Signal the migration by sending a term on the new field that we + // want to migrate into. This term is chosen as one that would never + // be used in practice (since raftInitialLogTerm is 10), so we can + // special-case it below raft and start writing the (real) term to the + // AppliedState. + RaftAppliedIndexTerm: stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration, + }, + }, + }, nil +} + // TestingRegisterMigrationInterceptor is used in tests to register an // interceptor for a below-raft migration. // diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index 23a632f996cc..c82e31ca7d9c 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -193,10 +193,13 @@ func coalesceBool(lhs *bool, rhs *bool) { func (p *Result) MergeAndDestroy(q Result) error { if q.Replicated.State != nil { if q.Replicated.State.RaftAppliedIndex != 0 { - return errors.AssertionFailedf("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify RaftAppliedIndex") + } + if q.Replicated.State.RaftAppliedIndexTerm != 0 { + return errors.AssertionFailedf("must not specify RaftAppliedIndexTerm") } if q.Replicated.State.LeaseAppliedIndex != 0 { - return errors.AssertionFailedf("must not specify RaftApplyIndex") + return errors.AssertionFailedf("must not specify LeaseAppliedIndex") } if p.Replicated.State == nil { p.Replicated.State = &kvserverpb.ReplicaState{} diff --git a/pkg/kv/kvserver/below_raft_protos_test.go b/pkg/kv/kvserver/below_raft_protos_test.go index ab2f072d9c26..5be8e1f95b0b 100644 --- a/pkg/kv/kvserver/below_raft_protos_test.go +++ b/pkg/kv/kvserver/below_raft_protos_test.go @@ -78,7 +78,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ return enginepb.NewPopulatedRangeAppliedState(r, false) }, emptySum: 615555020845646359, - populatedSum: 17354515720541950025, + populatedSum: 12125419916111069931, }, reflect.TypeOf(&raftpb.HardState{}): { populatedConstructor: func(r *rand.Rand) protoutil.Message { diff --git a/pkg/kv/kvserver/kvserverpb/state.proto b/pkg/kv/kvserver/kvserverpb/state.proto index 08034e04dfb3..ac6cdb577f70 100644 --- a/pkg/kv/kvserver/kvserverpb/state.proto +++ b/pkg/kv/kvserver/kvserverpb/state.proto @@ -134,6 +134,10 @@ message ReplicaState { // "follower reads" at or below this timestamp. util.hlc.Timestamp raft_closed_timestamp = 13 [(gogoproto.nullable) = false]; + // The term corresponding to RaftAppliedIndex. This is derived from + // RangeAppliedState.RaftAppliedIndexTerm. + uint64 raft_applied_index_term = 14; + reserved 8, 9, 10; } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index bc3b77e19f6c..16e6e8a104a2 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -849,9 +850,23 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( ctx context.Context, cmd *replicatedCmd, ) { - if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 { - b.state.RaftAppliedIndex = raftAppliedIndex + raftAppliedIndex := cmd.ent.Index + if raftAppliedIndex == 0 { + log.Fatalf(ctx, "raft entry with index 0") + } + b.state.RaftAppliedIndex = raftAppliedIndex + rs := cmd.decodedRaftEntry.replicatedResult().State + // We are post migration or this replicatedCmd is doing the migration. + if b.state.RaftAppliedIndexTerm > 0 || (rs != nil && + rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) { + // Once we populate b.state.RaftAppliedIndexTerm it will flow into the + // persisted RangeAppliedState and into the in-memory representation in + // Replica.mu.state. The latter is used to initialize b.state, so future + // calls to this method will see that the migration has already happened + // and will continue to populate the term. + b.state.RaftAppliedIndexTerm = cmd.ent.Term } + if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 { b.state.LeaseAppliedIndex = leaseAppliedIndex } @@ -915,6 +930,9 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // Update the replica's applied indexes, mvcc stats and closed timestamp. r.mu.Lock() r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex + // RaftAppliedIndexTerm will be non-zero only when the + // AddRaftAppliedIndexTermMigration has happened. + r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex // Sanity check that the RaftClosedTimestamp doesn't go backwards. @@ -978,7 +996,7 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { // lease index along with the mvcc stats, all in one key. loader := &b.r.raftMu.stateLoader return loader.SetRangeAppliedState( - ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, + ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.RaftAppliedIndexTerm, b.state.Stats, &b.state.RaftClosedTimestamp, ) } diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..9f8ced7318a4 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -386,6 +386,15 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 { // r.mu is held. Note that the returned snapshot is a placeholder and // does not contain any of the replica data. The snapshot is actually generated // (and sent) by the Raft snapshot queue. +// +// More specifically, this method is called by etcd/raft in +// (*raftLog).snapshot. Raft expects that it generates the snapshot (by +// calling Snapshot) and that "sending" the result actually sends the +// snapshot. In CockroachDB, that message is intercepted (at the sender) and +// instead we add the replica (the raft leader) to the raft snapshot queue, +// and when its turn comes we look at the raft state for followers that want a +// snapshot, and then send one. That actual sending path does not call this +// Snapshot method. func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) { r.mu.AssertHeld() appliedIndex := r.mu.state.RaftAppliedIndex @@ -418,11 +427,27 @@ func (r *Replica) GetSnapshot( // the corresponding Raft command not applied yet). r.raftMu.Lock() snap := r.store.engine.NewSnapshot() - r.mu.Lock() - appliedIndex := r.mu.state.RaftAppliedIndex - // Cleared when OutgoingSnapshot closes. - r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) - r.mu.Unlock() + { + r.mu.Lock() + // We will fetch the applied index later again, from snap. The + // appliedIndex fetched here is narrowly used for adding a log truncation + // constraint to prevent log entries > appliedIndex from being removed. + // Note that the appliedIndex maintained in Replica actually lags the one + // in the engine, since replicaAppBatch.ApplyToStateMachine commits the + // engine batch and then acquires Replica.mu to update + // Replica.mu.state.RaftAppliedIndex. The use of a possibly stale value + // here is harmless since using a lower index in this constraint, than the + // actual snapshot index, preserves more from a log truncation + // perspective. + // + // TODO(sumeer): despite the above justification, this is unnecessarily + // complicated. Consider loading the RaftAppliedIndex from the snap for + // this use case. + appliedIndex := r.mu.state.RaftAppliedIndex + // Cleared when OutgoingSnapshot closes. + r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex, recipientStore) + r.mu.Unlock() + } r.raftMu.Unlock() release := func() { @@ -568,6 +593,12 @@ func snapshot( } term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex) + // If we've migrated to populating RaftAppliedIndexTerm, check that the term + // from the two sources are equal. + if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm { + return OutgoingSnapshot{}, + errors.AssertionFailedf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm) + } if err != nil { return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex) } @@ -920,6 +951,12 @@ func (r *Replica) applySnapshot( log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", state.RaftAppliedIndex, nonemptySnap.Metadata.Index) } + // If we've migrated to populating RaftAppliedIndexTerm, check that the term + // from the two sources are equal. + if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term { + log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d", + state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term) + } // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be @@ -975,6 +1012,11 @@ func (r *Replica) applySnapshot( // feelings about this ever change, we can add a LastIndex field to // raftpb.SnapshotMetadata. r.mu.lastIndex = state.RaftAppliedIndex + + // TODO(sumeer): We should be able to set this to + // nonemptySnap.Metadata.Term. See + // https://reviewable.io/reviews/cockroachdb/cockroach/75675#-MukTjIhAlmWs8rSi-kY:-MukTjIhAlmWs8rSi-kZ:bf8moge + // for a discussion regarding this. r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = 0 // Update the store stats for the data in the snapshot. diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..9d4b58d74691 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -13,6 +13,7 @@ package stateloader import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -29,6 +30,15 @@ import ( const ( raftInitialLogIndex = 10 raftInitialLogTerm = 5 + + // RaftLogTermSignalForAddRaftAppliedIndexTermMigration is never persisted + // in the state machine or in HardState. It is only used in + // AddRaftAppliedIndexTermMigration to signal to the below raft code that + // the migration should happen when applying the raft log entry that + // contains ReplicatedEvalResult.State.RaftAppliedIndexTerm equal to this + // value. It is less than raftInitialLogTerm since that ensures it will + // never be used under normal operation. + RaftLogTermSignalForAddRaftAppliedIndexTermMigration = 3 ) // WriteInitialReplicaState sets up a new Range, but without writing an @@ -46,6 +56,7 @@ func WriteInitialReplicaState( lease roachpb.Lease, gcThreshold hlc.Timestamp, replicaVersion roachpb.Version, + writeRaftAppliedIndexTerm bool, ) (enginepb.MVCCStats, error) { rsl := Make(desc.RangeID) var s kvserverpb.ReplicaState @@ -54,6 +65,9 @@ func WriteInitialReplicaState( Index: raftInitialLogIndex, } s.RaftAppliedIndex = s.TruncatedState.Index + if writeRaftAppliedIndexTerm { + s.RaftAppliedIndexTerm = s.TruncatedState.Term + } s.Desc = &roachpb.RangeDescriptor{ RangeID: desc.RangeID, } @@ -102,9 +116,12 @@ func WriteInitialRangeState( initialGCThreshold := hlc.Timestamp{} initialMS := enginepb.MVCCStats{} + writeRaftAppliedIndexTerm := + clusterversion.ClusterVersion{Version: replicaVersion}.IsActiveVersion( + clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration)) if _, err := WriteInitialReplicaState( ctx, readWriter, initialMS, desc, initialLease, initialGCThreshold, - replicaVersion, + replicaVersion, writeRaftAppliedIndexTerm, ); err != nil { return err } diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..088eab7c364b 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -78,6 +78,7 @@ func (rsl StateLoader) Load( return kvserverpb.ReplicaState{}, err } s.RaftAppliedIndex = as.RaftAppliedIndex + s.RaftAppliedIndexTerm = as.RaftAppliedIndexTerm s.LeaseAppliedIndex = as.LeaseAppliedIndex ms := as.RangeStats.ToStats() s.Stats = &ms @@ -133,8 +134,9 @@ func (rsl StateLoader) Save( return enginepb.MVCCStats{}, err } } - rai, lai, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, &state.RaftClosedTimestamp - if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, ms, ct); err != nil { + rai, lai, rait, ct := state.RaftAppliedIndex, state.LeaseAppliedIndex, state.RaftAppliedIndexTerm, + &state.RaftClosedTimestamp + if err := rsl.SetRangeAppliedState(ctx, readWriter, rai, lai, rait, ms, ct); err != nil { return enginepb.MVCCStats{}, err } return *ms, nil @@ -194,14 +196,15 @@ func (rsl StateLoader) LoadMVCCStats( func (rsl StateLoader) SetRangeAppliedState( ctx context.Context, readWriter storage.ReadWriter, - appliedIndex, leaseAppliedIndex uint64, + appliedIndex, leaseAppliedIndex, appliedIndexTerm uint64, newMS *enginepb.MVCCStats, raftClosedTimestamp *hlc.Timestamp, ) error { as := enginepb.RangeAppliedState{ - RaftAppliedIndex: appliedIndex, - LeaseAppliedIndex: leaseAppliedIndex, - RangeStats: newMS.ToPersistentStats(), + RaftAppliedIndex: appliedIndex, + LeaseAppliedIndex: leaseAppliedIndex, + RangeStats: newMS.ToPersistentStats(), + RaftAppliedIndexTerm: appliedIndexTerm, } if raftClosedTimestamp != nil && !raftClosedTimestamp.IsEmpty() { as.RaftClosedTimestamp = raftClosedTimestamp @@ -223,7 +226,8 @@ func (rsl StateLoader) SetMVCCStats( return err } return rsl.SetRangeAppliedState( - ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, newMS, as.RaftClosedTimestamp) + ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, newMS, + as.RaftClosedTimestamp) } // SetClosedTimestamp overwrites the closed timestamp. @@ -235,7 +239,7 @@ func (rsl StateLoader) SetClosedTimestamp( return err } return rsl.SetRangeAppliedState( - ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, + ctx, readWriter, as.RaftAppliedIndex, as.LeaseAppliedIndex, as.RaftAppliedIndexTerm, as.RangeStats.ToStatsPtr(), closedTS) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e5ed6dd91677..2aa67996bb29 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -903,6 +903,12 @@ func SendEmptySnapshot( return err } + // Since SendEmptySnapshot is only used by the new cockroach debug + // reset-quorum tool, it will be used only in new clusters, so we set + // writeAppliedIndexTerm to true. We could have also compared against the + // cluster version, but there is a concern that the cluster version + // information could be stale. + writeAppliedIndexTerm := true ms, err = stateloader.WriteInitialReplicaState( ctx, eng, @@ -911,6 +917,7 @@ func SendEmptySnapshot( roachpb.Lease{}, hlc.Timestamp{}, // gcThreshold st.Version.ActiveVersionOrEmpty(ctx).Version, + writeAppliedIndexTerm, ) if err != nil { return err diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel index 1ef5cc162434..47a297c74843 100644 --- a/pkg/migration/migrations/BUILD.bazel +++ b/pkg/migration/migrations/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "migrate_span_configs.go", "migrations.go", "public_schema_migration.go", + "raft_applied_index_term.go", "schema_changes.go", "seed_tenant_span_configs.go", ], diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go index 832180f7dd9e..7459fb420325 100644 --- a/pkg/migration/migrations/migrations.go +++ b/pkg/migration/migrations/migrations.go @@ -97,6 +97,16 @@ var migrations = []migration.Migration{ NoPrecondition, grantOptionMigration, ), + migration.NewSystemMigration( + "populate RangeAppliedState.RaftAppliedIndexTerm for all ranges", + toCV(clusterversion.AddRaftAppliedIndexTermMigration), + raftAppliedIndexTermMigration, + ), + migration.NewSystemMigration( + "purge all replicas not populating RangeAppliedState.RaftAppliedIndexTerm", + toCV(clusterversion.PostAddRaftAppliedIndexTermMigration), + postRaftAppliedIndexTermMigration, + ), } func init() { diff --git a/pkg/migration/migrations/raft_applied_index_term.go b/pkg/migration/migrations/raft_applied_index_term.go new file mode 100644 index 000000000000..7b6ea09134b1 --- /dev/null +++ b/pkg/migration/migrations/raft_applied_index_term.go @@ -0,0 +1,100 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "bytes" + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// defaultPageSize controls how many ranges are paged in by default when +// iterating through all ranges in a cluster during any given migration. We +// pulled this number out of thin air(-ish). Let's consider a cluster with 50k +// ranges, with each range taking ~200ms. We're being somewhat conservative with +// the duration, but in a wide-area cluster with large hops between the manager +// and the replicas, it could be true. Here's how long it'll take for various +// block sizes: +// +// page size of 1 ~ 2h 46m +// page size of 50 ~ 3m 20s +// page size of 200 ~ 50s +const defaultPageSize = 200 + +func raftAppliedIndexTermMigration( + ctx context.Context, cv clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + var batchIdx, numMigratedRanges int + init := func() { batchIdx, numMigratedRanges = 1, 0 } + if err := deps.Cluster.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // NB: This is a bit of a wart. We want to reach the first range, + // but we can't address the (local) StartKey. However, keys.LocalMax + // is on r1, so we'll just use that instead to target r1. + start, end := desc.StartKey, desc.EndKey + if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 { + start, _ = keys.Addr(keys.LocalMax) + } + if err := deps.DB.Migrate(ctx, start, end, cv.Version); err != nil { + return err + } + } + + // TODO(irfansharif): Instead of logging this to the debug log, we + // should insert these into a `system.migrations` table for external + // observability. + numMigratedRanges += len(descriptors) + log.Infof(ctx, "[batch %d/??] migrated %d ranges", batchIdx, numMigratedRanges) + batchIdx++ + + return nil + }); err != nil { + return err + } + + log.Infof(ctx, "[batch %d/%d] migrated %d ranges", batchIdx, batchIdx, numMigratedRanges) + + // Make sure that all stores have synced. Given we're a below-raft + // migrations, this ensures that the applied state is flushed to disk. + req := &serverpb.SyncAllEnginesRequest{} + op := "flush-stores" + return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.SyncAllEngines(ctx, req) + return err + }) +} + +func postRaftAppliedIndexTermMigration( + ctx context.Context, cv clusterversion.ClusterVersion, deps migration.SystemDeps, _ *jobs.Job, +) error { + // TODO(sumeer): this is copied from postTruncatedStateMigration. In + // comparison, postSeparatedIntentsMigration iterated over ranges and issues + // a noop below-raft migration. I am not clear on why there is a difference. + // Get this clarified. + + // Purge all replicas that haven't been migrated to use the unreplicated + // truncated state and the range applied state. + truncStateVersion := clusterversion.ByKey(clusterversion.AddRaftAppliedIndexTermMigration) + req := &serverpb.PurgeOutdatedReplicasRequest{Version: &truncStateVersion} + op := fmt.Sprintf("purge-outdated-replicas=%s", req.Version) + return deps.Cluster.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.PurgeOutdatedReplicas(ctx, req) + return err + }) +} diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index 8cddd3dff4a6..a528febf764f 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -224,6 +224,15 @@ message RangeAppliedState { // migration). In 21.1 we cannot write empty timestamp to disk because that // looks like an inconsistency to the consistency-checker. util.hlc.Timestamp raft_closed_timestamp = 4; + + // raft_applied_index_term is the term corresponding to raft_applied_index. + // The serialized proto will not contain this field until code starts + // setting it to a value > 0. This is desirable since we don't want a mixed + // version cluster to have divergent replica state simply because we have + // introduced this field. An explicit migration, + // AddRaftAppliedIndexTermMigration, will cause this field to start being + // populated. + uint64 raft_applied_index_term = 5; } // MVCCWriteValueOp corresponds to a value being written outside of a