Skip to content

Commit

Permalink
migration: introduce primitives for below-raft migrations...
Browse files Browse the repository at this point in the history
and onboard the truncated state migration. These include:

- The KV ranged `Migrate` command. This command forces all ranges
  overlapping with the request spans to execute the (below-raft)
  migrations corresponding to the specific, stated version. This has the
  effect of moving the range out of any legacy modes operation they may
  currently be in. KV waits for this command to durably apply on all the
  replicas before returning, guaranteeing to the caller that all
  pre-migration state has been completely purged from the system.

- The `SyncAllEngines` RPC. This is be used to instruct the target node
  to persist releveant in-memory state to disk. Like we mentioned above,
  KV currently waits for the `Migrate` command to have applied on all
  replicas before returning. With the applied state, there's no
  necessity to durably persist it (the representative version is already
  stored in the raft log). Out of an abundance of caution, and to really
  really ensure that no pre-migrated state is ever seen in the system,
  we provide the migration manager a mechanism to flush out all
  in-memory state to disk. This will let us guarantee that by the time a
  specific cluster version is bumped, all pre-migrated state from prior
  to a certain version will have been fully purged from the system.
  We'll also use it in conjunction with PurgeOutdatedReplicas below.

- The `PurgeOutdatedReplicas` RPC. This too comes up in the context of
  wanting the ensure that ranges where we've executed a ranged `Migrate`
  command over have no way of ever surfacing pre-migrated state. This can
  happen with older replicas in the replica GC queue and with applied
  state that is not yet persisted. Currently we wait for the `Migrate`
  to have applied on all replicas of a range before returning to the
  caller. This does not include earlier incarnations of the range,
  possibly sitting idle in the replica GC queue. These replicas can
  still request leases, and go through the request evaluation paths,
  possibly tripping up assertions that check to see no pre-migrated
  state is found. The `PurgeOutdatedReplicas` lets the migration manager
  do exactly as the name suggests, ensuring all "outdated" replicas are
  processed before declaring the specific cluster version bump complete.

- The concept of a "replica state version". This is what's used to
  construct the migration manager's view of what's "outdated", telling
  us which migrations can be assumed to have run against a particular
  replica.  When we introduce backwards incompatible changes to the
  replica state (for example using the unreplicated truncated state
  instead of the replicated variant), the version would inform us if,
  for a given replica, we should expect a state representation prior to,
  or after the migration (in our example this corresponds to whether or
  not we can assume an unreplicated truncated state).

As part of this commit, we also re-order the steps taken by the
migration manager so that it executes a given migration first before
bumping version gates cluster wide. This is because we want authors of
migrations to ascertain that their own migrations have run to
completion, instead of attaching that check to the next version.

---

This PR motivates all of the above by also onboarding the
TruncatedAndRangeAppliedState migration, lets us do the following:

  i. Use the RangeAppliedState on all ranges
  ii. Use the unreplicated TruncatedState on all ranges

In 21.2 we'll finally be able to delete holdover code that knows how to
handle the legacy replicated truncated state.

Release note (general change): Cluster version upgrades, as initiated by
SET CLUSTER SETTING version = <major>-<minor>, now perform internal
maintenance duties that will delay how long it takes for the command to
complete. The delay is proportional to the amount of data currently
stored in the cluster. The cluster will also experience a small amount
of additional load during this period while the upgrade is being
finalized.

---

The ideas here follow from our original prototype in cockroachdb#57445.
  • Loading branch information
irfansharif committed Dec 23, 2020
1 parent 1697431 commit 1247fed
Show file tree
Hide file tree
Showing 57 changed files with 3,781 additions and 1,063 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-16</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
28 changes: 28 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ const (
VirtualComputedColumns
// CPutInline is conditional put support for inline values.
CPutInline
// ReplicaVersions enables the versioning of Replica state.
ReplicaVersions
// TruncatedAndRangeAppliedStateMigration is part of the migration to stop
// using the legacy truncated state within KV. After the migration, we'll be
// using the unreplicated truncated state and the RangeAppliedState on all
// ranges. Callers that wish to assert on there no longer being any legacy
// will be able to do so after PostTruncatedAndRangeAppliedStateMigration is
// active. This lets remove any holdover code handling the possibility of
// replicated truncated state in 21.2.
//
// TODO(irfansharif): Do the above in 21.2.
TruncatedAndRangeAppliedStateMigration
// PostTruncatedAndRangeAppliedStateMigration is used to purge all replicas
// using the replicated legacy TruncatedState. It's also used in asserting
// that no replicated truncated state representation is found.
PostTruncatedAndRangeAppliedStateMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -340,6 +356,18 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: CPutInline,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 10},
},
{
Key: ReplicaVersions,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 12},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 14},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 16},
},

// Step (2): Add new versions here.
})
Expand Down
7 changes: 5 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ var (
// LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease
// index.
LocalLeaseAppliedIndexLegacySuffix = []byte("rlla")
// LocalRangeVersionSuffix is the suffix for the range version.
LocalRangeVersionSuffix = []byte("rver")
// LocalRangeStatsLegacySuffix is the suffix for range statistics.
LocalRangeStatsLegacySuffix = []byte("stat")
// localTxnSpanGCThresholdSuffix is DEPRECATED and remains to prevent reuse.
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var _ = [...]interface{}{
RaftTruncatedStateLegacyKey, // "rftt"
RangeLeaseKey, // "rll-"
LeaseAppliedIndexLegacyKey, // "rlla"
RangeVersionKey, // "rver"
RangeStatsLegacyKey, // "stat"

// 2. Unreplicated range-ID local keys: These contain metadata that
Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func RangeLastGCKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLastGCKey()
}

// RangeVersionKey returns a system-local for the range version.
func RangeVersionKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeVersionKey()
}

// MakeRangeIDUnreplicatedPrefix creates a range-local key prefix from
// rangeID for all unreplicated data.
func MakeRangeIDUnreplicatedPrefix(rangeID roachpb.RangeID) roachpb.Key {
Expand Down Expand Up @@ -964,6 +969,11 @@ func (b RangeIDPrefixBuf) RangeLastGCKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeLastGCSuffix...)
}

// RangeVersionKey returns a system-local key for the range version.
func (b RangeIDPrefixBuf) RangeVersionKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeVersionSuffix...)
}

// RangeTombstoneKey returns a system-local key for a range tombstone.
func (b RangeIDPrefixBuf) RangeTombstoneKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRangeTombstoneSuffix...)
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
{name: "RangeLease", suffix: LocalRangeLeaseSuffix},
{name: "RangeStats", suffix: LocalRangeStatsLegacySuffix},
{name: "RangeLastGC", suffix: LocalRangeLastGCSuffix},
{name: "RangeVersion", suffix: LocalRangeVersionSuffix},
}

rangeSuffixDict = []struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease", revertSupportUnknown},
{keys.RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats", revertSupportUnknown},
{keys.RangeLastGCKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLastGC", revertSupportUnknown},
{keys.RangeLastGCKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeVersion", revertSupportUnknown},

{keys.RaftHardStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftHardState", revertSupportUnknown},
{keys.RangeTombstoneKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RangeTombstone", revertSupportUnknown},
Expand Down
38 changes: 30 additions & 8 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,10 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.DeleteRequest:
row := &result.Rows[k]
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
if result.Err == nil {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
}

default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.EndTxnRequest:
Expand All @@ -265,6 +257,12 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.ImportRequest:
case *roachpb.AdminScatterRequest:
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}
}
// Fill up the resume span.
if result.Err == nil && reply != nil && reply.Header().ResumeSpan != nil {
Expand Down Expand Up @@ -786,3 +784,27 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
//lint:ignore U1000 unused
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
Version: version,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
9 changes: 9 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,15 @@ func (db *DB) AddSSTable(
return getOneErr(db.Run(ctx, b), b)
}

// Migrate proactively forces ranges overlapping with the provided keyspace to
// transition out of any legacy modes of operation (as defined by the target
// version).
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ go_test(
"client_replica_test.go",
"client_split_test.go",
"client_status_test.go",
"client_store_test.go",
"client_tenant_test.go",
"client_test.go",
"closed_timestamp_test.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"cmd_lease_request.go",
"cmd_lease_transfer.go",
"cmd_merge.go",
"cmd_migrate.go",
"cmd_push_txn.go",
"cmd_put.go",
"cmd_query_intent.go",
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func splitTriggerHelper(
// initial state. Additionally, since bothDeltaMS is tracking writes to
// both sides, we need to update it as well.
{
// Various pieces of code rely on a replica's lease never being unitialized,
// Various pieces of code rely on a replica's lease never being uninitialized,
// but it's more than that - it ensures that we properly initialize the
// timestamp cache, which is only populated on the lease holder, from that
// of the original Range. We found out about a regression here the hard way
Expand Down Expand Up @@ -1002,9 +1002,13 @@ func splitTriggerHelper(
// writeInitialReplicaState which essentially writes a ReplicaState
// only.

var replicaVersion roachpb.Version
if rec.ClusterSettings().Version.IsActive(ctx, clusterversion.ReplicaVersions) {
replicaVersion = rec.ClusterSettings().Version.ActiveVersion(ctx).Version
}
*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, truncStateType,
*gcThreshold, truncStateType, replicaVersion,
)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
Expand Down
140 changes: 140 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

func init() {
RegisterReadWriteCommand(roachpb.Migrate, declareKeysMigrate, Migrate)
}

func declareKeysMigrate(
_ *roachpb.RangeDescriptor,
header roachpb.Header,
_ roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
// TODO(irfansharif): This will eventually grow to capture the super set of
// all keys accessed by all migrations defined here. That could get
// cumbersome. We could spruce up the migration type and allow authors to
// define the allow authors for specific set of keys each migration needs to
// grab latches and locks over.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)})
lockSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)})
}

// migrationRegistry is a global registry of all KV-level migrations. See
// pkg/migration for details around how the migrations defined here are
// wired up.
var migrationRegistry = make(map[roachpb.Version]migration)

type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Result, error)

func init() {
registerMigration(clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedAndAppliedStateMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
migrationRegistry[clusterversion.ByKey(key)] = migration
}

// Migrate executes the below-raft migration corresponding to the given version.
func Migrate(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, _ roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.MigrateRequest)
migrationVersion := args.Version

fn, ok := migrationRegistry[migrationVersion]
if !ok {
return result.Result{}, errors.Newf("migration for %s not found", migrationVersion)
}
pd, err := fn(ctx, readWriter, cArgs)
if err != nil {
return result.Result{}, err
}

// Since we're a below raft migration, we'll need update our replica state
// version.
if err := MakeStateLoader(cArgs.EvalCtx).SetVersion(
ctx, readWriter, cArgs.Stats, &migrationVersion,
); err != nil {
return result.Result{}, err
}
if pd.Replicated.State == nil {
pd.Replicated.State = &kvserverpb.ReplicaState{}
}
// NB: We don't check for clusterversion.ReplicaVersions being active here
// as all below-raft migrations (the only users of Migrate) were introduced
// after it.
pd.Replicated.State.Version = &migrationVersion
return pd, nil
}

// truncatedAndRangeAppliedStateMigration lets us stop using the legacy
// replicated truncated state and start using the new RangeAppliedState for this
// specific range.
func truncatedAndAppliedStateMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
var legacyTruncatedState roachpb.RaftTruncatedState
legacyKeyFound, err := storage.MVCCGetProto(
ctx, readWriter, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()),
hlc.Timestamp{}, &legacyTruncatedState, storage.MVCCGetOptions{},
)
if err != nil {
return result.Result{}, err
}

var pd result.Result
if legacyKeyFound {
// Time to migrate by deleting the legacy key. The downstream-of-Raft
// code will atomically rewrite the truncated state (supplied via the
// side effect) into the new unreplicated key.
if err := storage.MVCCDelete(
ctx, readWriter, cArgs.Stats, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()),
hlc.Timestamp{}, nil, /* txn */
); err != nil {
return result.Result{}, err
}
pd.Replicated.State = &kvserverpb.ReplicaState{
// We need to pass in a truncated state to enable the migration.
// Passing the same one is the easiest thing to do.
TruncatedState: &legacyTruncatedState,
}
}
return pd, nil
}

// TestingRegisterMigrationInterceptor is used in tests to register an
// interceptor for a below-raft migration.
func TestingRegisterMigrationInterceptor(version roachpb.Version, fn func()) (unregister func()) {
if _, ok := migrationRegistry[version]; ok {
panic("doubly registering migration")
}
migrationRegistry[version] = func(context.Context, storage.ReadWriter, CommandArgs) (result.Result, error) {
fn()
return result.Result{}, nil
}
return func() { delete(migrationRegistry, version) }
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ func (p *Result) MergeAndDestroy(q Result) error {
q.Replicated.State.GCThreshold = nil
}

if p.Replicated.State.Version == nil {
p.Replicated.State.Version = q.Replicated.State.Version
} else if q.Replicated.State.Version != nil {
return errors.AssertionFailedf("conflicting Version")
}
q.Replicated.State.Version = nil

if q.Replicated.State.Stats != nil {
return errors.AssertionFailedf("must not specify Stats")
}
Expand Down
Loading

0 comments on commit 1247fed

Please sign in to comment.