Skip to content

Commit

Permalink
migration: introduce primitives for below-raft migrations...
Browse files Browse the repository at this point in the history
and onboard the first long running migration (for the truncated state,
see below). The primitives here include:

- The KV ranged `Migrate` command. This command forces all ranges
  overlapping with the request spans to execute the (below-raft)
  migrations corresponding to the specific, stated version. This has the
  effect of moving the range out of any legacy modes operation they may
  currently be in. KV waits for this command to durably apply on all the
  replicas before returning, guaranteeing to the caller that all
  pre-migration state has been completely purged from the system.

- The `SyncAllEngines` RPC. This is be used to instruct the target node
  to persist releveant in-memory state to disk. Like we mentioned above,
  KV currently waits for the `Migrate` command to have applied on all
  replicas before returning. With the applied state, there's no
  necessity to durably persist it (the representative version is already
  stored in the raft log). Out of an abundance of caution, and to really
  really ensure that no pre-migrated state is ever seen in the system,
  we provide the migration manager a mechanism to flush out all
  in-memory state to disk. This will let us guarantee that by the time a
  specific cluster version is bumped, all pre-migrated state from prior
  to a certain version will have been fully purged from the system.
  We'll also use it in conjunction with PurgeOutdatedReplicas below.

- The `PurgeOutdatedReplicas` RPC. This too comes up in the context of
  wanting the ensure that ranges where we've executed a ranged `Migrate`
  command over have no way of ever surfacing pre-migrated state. This can
  happen with older replicas in the replica GC queue and with applied
  state that is not yet persisted. Currently we wait for the `Migrate`
  to have applied on all replicas of a range before returning to the
  caller. This does not include earlier incarnations of the range,
  possibly sitting idle in the replica GC queue. These replicas can
  still request leases, and go through the request evaluation paths,
  possibly tripping up assertions that check to see no pre-migrated
  state is found. The `PurgeOutdatedReplicas` lets the migration manager
  do exactly as the name suggests, ensuring all "outdated" replicas are
  processed before declaring the specific cluster version bump complete.

- The concept of a "replica state version". This is what's used to
  construct the migration manager's view of what's "outdated", telling
  us which migrations can be assumed to have run against a particular
  replica.  When we introduce backwards incompatible changes to the
  replica state (for example using the unreplicated truncated state
  instead of the replicated variant), the version would inform us if,
  for a given replica, we should expect a state representation prior to,
  or after the migration (in our example this corresponds to whether or
  not we can assume an unreplicated truncated state).

As part of this commit, we also re-order the steps taken by the
migration manager so that it executes a given migration first before
bumping version gates cluster wide. This is because we want authors of
migrations to ascertain that their own migrations have run to
completion, instead of attaching that check to the next version.

---

This PR motivates all of the above by also onboarding the
TruncatedAndRangeAppliedState migration, lets us do the following:

  i. Use the RangeAppliedState on all ranges
  ii. Use the unreplicated TruncatedState on all ranges

In 21.2 we'll finally be able to delete holdover code that knows how to
handle the legacy replicated truncated state.

Release note (general change): Cluster version upgrades, as initiated by
SET CLUSTER SETTING version = <major>-<minor>, now perform internal
maintenance duties that will delay how long it takes for the command to
complete. The delay is proportional to the amount of data currently
stored in the cluster. The cluster will also experience a small amount
of additional load during this period while the upgrade is being
finalized.

---

The ideas here follow from our original prototype in #57445.
  • Loading branch information
irfansharif committed Dec 30, 2020
1 parent 6d49a32 commit 89781b1
Show file tree
Hide file tree
Showing 58 changed files with 3,872 additions and 1,169 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-10</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-16</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
28 changes: 28 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ const (
VirtualComputedColumns
// CPutInline is conditional put support for inline values.
CPutInline
// ReplicaVersions enables the versioning of Replica state.
ReplicaVersions
// TruncatedAndRangeAppliedStateMigration is part of the migration to stop
// using the legacy truncated state within KV. After the migration, we'll be
// using the unreplicated truncated state and the RangeAppliedState on all
// ranges. Callers that wish to assert on there no longer being any legacy
// will be able to do so after PostTruncatedAndRangeAppliedStateMigration is
// active. This lets remove any holdover code handling the possibility of
// replicated truncated state in 21.2.
//
// TODO(irfansharif): Do the above in 21.2.
TruncatedAndRangeAppliedStateMigration
// PostTruncatedAndRangeAppliedStateMigration is used to purge all replicas
// using the replicated legacy TruncatedState. It's also used in asserting
// that no replicated truncated state representation is found.
PostTruncatedAndRangeAppliedStateMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -340,6 +356,18 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: CPutInline,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 10},
},
{
Key: ReplicaVersions,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 12},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 14},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 16},
},

// Step (2): Add new versions here.
})
Expand Down
7 changes: 5 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ var (
// LocalLeaseAppliedIndexLegacySuffix is the suffix for the applied lease
// index.
LocalLeaseAppliedIndexLegacySuffix = []byte("rlla")
// LocalRangeVersionSuffix is the suffix for the range version.
LocalRangeVersionSuffix = []byte("rver")
// LocalRangeStatsLegacySuffix is the suffix for range statistics.
LocalRangeStatsLegacySuffix = []byte("stat")
// localTxnSpanGCThresholdSuffix is DEPRECATED and remains to prevent reuse.
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ var _ = [...]interface{}{
RaftTruncatedStateLegacyKey, // "rftt"
RangeLeaseKey, // "rll-"
LeaseAppliedIndexLegacyKey, // "rlla"
RangeVersionKey, // "rver"
RangeStatsLegacyKey, // "stat"

// 2. Unreplicated range-ID local keys: These contain metadata that
Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ func RangeLastGCKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeLastGCKey()
}

// RangeVersionKey returns a system-local for the range version.
func RangeVersionKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RangeVersionKey()
}

// MakeRangeIDUnreplicatedPrefix creates a range-local key prefix from
// rangeID for all unreplicated data.
func MakeRangeIDUnreplicatedPrefix(rangeID roachpb.RangeID) roachpb.Key {
Expand Down Expand Up @@ -964,6 +969,11 @@ func (b RangeIDPrefixBuf) RangeLastGCKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeLastGCSuffix...)
}

// RangeVersionKey returns a system-local key for the range version.
func (b RangeIDPrefixBuf) RangeVersionKey() roachpb.Key {
return append(b.replicatedPrefix(), LocalRangeVersionSuffix...)
}

// RangeTombstoneKey returns a system-local key for a range tombstone.
func (b RangeIDPrefixBuf) RangeTombstoneKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRangeTombstoneSuffix...)
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
{name: "RangeLease", suffix: LocalRangeLeaseSuffix},
{name: "RangeStats", suffix: LocalRangeStatsLegacySuffix},
{name: "RangeLastGC", suffix: LocalRangeLastGCSuffix},
{name: "RangeVersion", suffix: LocalRangeVersionSuffix},
}

rangeSuffixDict = []struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestPrettyPrint(t *testing.T) {
{keys.RangeLeaseKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLease", revertSupportUnknown},
{keys.RangeStatsLegacyKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeStats", revertSupportUnknown},
{keys.RangeLastGCKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeLastGC", revertSupportUnknown},
{keys.RangeVersionKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/r/RangeVersion", revertSupportUnknown},

{keys.RaftHardStateKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RaftHardState", revertSupportUnknown},
{keys.RangeTombstoneKey(roachpb.RangeID(1000001)), "/Local/RangeID/1000001/u/RangeTombstone", revertSupportUnknown},
Expand Down
37 changes: 29 additions & 8 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,10 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.DeleteRequest:
row := &result.Rows[k]
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
if result.Err == nil {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
}

default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.EndTxnRequest:
Expand All @@ -265,6 +257,12 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.ImportRequest:
case *roachpb.AdminScatterRequest:
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}
}
// Fill up the resume span.
if result.Err == nil && reply != nil && reply.Header().ResumeSpan != nil {
Expand Down Expand Up @@ -786,3 +784,26 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
Version: version,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
10 changes: 10 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,16 @@ func (db *DB) AddSSTable(
return getOneErr(db.Run(ctx, b), b)
}

// Migrate is used instruct all ranges overlapping with the provided keyspace to
// exercise any relevant (below-raft) migrations in order for its range state to
// conform to what's needed by the specified version. It's a core primitive used
// in our migrations infrastructure to phase out legacy code below raft.
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ go_test(
"client_lease_test.go",
"client_merge_test.go",
"client_metrics_test.go",
"client_migration_test.go",
"client_protectedts_test.go",
"client_raft_helpers_test.go",
"client_raft_log_queue_test.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"cmd_lease_request.go",
"cmd_lease_transfer.go",
"cmd_merge.go",
"cmd_migrate.go",
"cmd_push_txn.go",
"cmd_put.go",
"cmd_query_intent.go",
Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ func splitTriggerHelper(
// initial state. Additionally, since bothDeltaMS is tracking writes to
// both sides, we need to update it as well.
{
// Various pieces of code rely on a replica's lease never being unitialized,
// Various pieces of code rely on a replica's lease never being uninitialized,
// but it's more than that - it ensures that we properly initialize the
// timestamp cache, which is only populated on the lease holder, from that
// of the original Range. We found out about a regression here the hard way
Expand All @@ -918,8 +918,9 @@ func splitTriggerHelper(
// - node two can illegally propose a write to 'd' at a lower timestamp.
//
// TODO(tschottdorf): why would this use r.store.Engine() and not the
// batch?
leftLease, err := MakeStateLoader(rec).LoadLease(ctx, rec.Engine())
// batch? We do the same thing for other usages of the state loader.
sl := MakeStateLoader(rec)
leftLease, err := sl.LoadLease(ctx, rec.Engine())
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load lease")
}
Expand All @@ -936,8 +937,7 @@ func splitTriggerHelper(
}
rightLease := leftLease
rightLease.Replica = replica

gcThreshold, err := MakeStateLoader(rec).LoadGCThreshold(ctx, rec.Engine())
gcThreshold, err := sl.LoadGCThreshold(ctx, rec.Engine())
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}
Expand Down Expand Up @@ -968,6 +968,11 @@ func splitTriggerHelper(
truncStateType = stateloader.TruncatedStateLegacyReplicated
}

replicaVersion, err := sl.LoadVersion(ctx, rec.Engine())
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}

// Writing the initial state is subtle since this also seeds the Raft
// group. It becomes more subtle due to proposer-evaluated Raft.
//
Expand Down Expand Up @@ -1000,7 +1005,7 @@ func splitTriggerHelper(

*h.AbsPostSplitRight(), err = stateloader.WriteInitialReplicaState(
ctx, batch, *h.AbsPostSplitRight(), split.RightDesc, rightLease,
*gcThreshold, truncStateType,
*gcThreshold, truncStateType, replicaVersion,
)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to write initial Replica state")
Expand Down
Loading

0 comments on commit 89781b1

Please sign in to comment.