Skip to content

Commit

Permalink
migration,*: onboard TruncatedAndRangeAppliedStateMigration
Browse files Browse the repository at this point in the history
This PR onboards the first real long-running migration using the
infrastructure we've been building up within pkg/migration. It was
originally prototyped in #57445.

TruncatedAndRangeAppliedStateMigration (which also happens to be a
below-Raft one) lets us do the following:

  i. Use the RangeAppliedState on all ranges
  ii. Use the unreplicated TruncatedState on all ranges

In 21.2 we'll finally be able to delete holdover code that knows how to
handle the legacy replicated truncated state.

Release note(general change): Cluster version upgrades, as initiated by
SET CLUSTER SETTING version = <major>-<minor>, now perform internal
maintenance duties that will delay how long it takes for the command to
complete. The delay is proportional to the amount of data currently
stored in the cluster. The cluster will also experience a small amount
of additional load during this period while the upgrade is being
finalized.

Release note: None
  • Loading branch information
irfansharif committed Dec 8, 2020
1 parent e0274c0 commit ef96beb
Show file tree
Hide file tree
Showing 25 changed files with 355 additions and 16 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-4</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-8</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
23 changes: 23 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ const (
// EmptyArraysInInvertedIndexes is when empty arrays are added to array
// inverted indexes.
EmptyArraysInInvertedIndexes
// TruncatedAndRangeAppliedStateMigration is part of the migration to stop
// using the legacy truncated state within KV. After the migration, we'll be
// using the unreplicated truncated state and the RangeAppliedState on all
// ranges. Callers that wish to assert on there no longer being any legacy
// In 21.2 we'll now be able to remove any holdover code handling the
// possibility of replicated truncated state.
//
// TODO(irfansharif): Do the above in 21.2.
TruncatedAndRangeAppliedStateMigration
// PostTruncatedAndRangeAppliedStateMigration is a placeholder version while
// we don't have a version immediately preceding
// TruncatedAndRangeAppliedStateMigration.
//
// TODO(irfansharif): Remove this once we've added some other version.
PostTruncatedAndRangeAppliedStateMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -321,6 +336,14 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: EmptyArraysInInvertedIndexes,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 4},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 6},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 8},
},

// Step (2): Add new versions here.
})
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,6 @@ func (b *Batch) addSSTable(
}

// migrate is only exported on DB.
//lint:ignore U1001 unused
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,6 @@ func (db *DB) AddSSTable(
// Migrate proactively forces ranges overlapping with the provided keyspace to
// transition out of any legacy modes of operation (as defined by the target
// version).
//lint:ignore U1001 unused
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
Expand Down
53 changes: 50 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand All @@ -26,8 +29,18 @@ func init() {
}

func declareKeysMigrate(
_ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet,
_ *roachpb.RangeDescriptor,
header roachpb.Header,
_ roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
// TODO(irfansharif): This will eventually grow to capture the super set of
// all keys accessed by all migrations defined here. That could get
// cumbersome. We could spruce up the migration type and allow authors to
// define the allow authors for specific set of keys each migration needs to
// grab latches and locks over.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)})
lockSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)})
}

// migrationRegistry is a global registry of all KV-level migrations. See
Expand All @@ -38,8 +51,7 @@ var migrationRegistry = make(map[roachpb.Version]migration)
type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Result, error)

func init() {
// registerMigration(clusterversion.WhateverMigration, whateverMigration)
_ = registerMigration
registerMigration(clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedAndAppliedStateMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
Expand All @@ -58,3 +70,38 @@ func Migrate(
}
return fn(ctx, readWriter, cArgs)
}

// truncatedAndRangeAppliedStateMigration lets us stop using the legacy
// replicated truncated state and start using the new RangeAppliedState for this
// specific range.
func truncatedAndAppliedStateMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
var legacyTruncatedState roachpb.RaftTruncatedState
legacyKeyFound, err := storage.MVCCGetProto(
ctx, readWriter, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()),
hlc.Timestamp{}, &legacyTruncatedState, storage.MVCCGetOptions{},
)
if err != nil {
return result.Result{}, err
}

var pd result.Result
if legacyKeyFound {
// Time to migrate by deleting the legacy key. The downstream-of-Raft
// code will atomically rewrite the truncated state (supplied via the
// side effect) into the new unreplicated key.
if err := storage.MVCCDelete(
ctx, readWriter, cArgs.Stats, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()),
hlc.Timestamp{}, nil, /* txn */
); err != nil {
return result.Result{}, err
}
pd.Replicated.State = &kvserverpb.ReplicaState{
// We need to pass in a truncated state to enable the migration.
// Passing the same one is the easiest thing to do.
TruncatedState: &legacyTruncatedState,
}
}
return pd, nil
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func createTestStoreWithOpts(
eng,
kvs, /* initialValues */
clusterversion.TestingBinaryVersion,
1 /* numStores */, splits, storeCfg.Clock.PhysicalNow())
1 /* numStores */, splits, storeCfg.Clock.PhysicalNow(),
storeCfg.TestingKnobs,
)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -958,7 +960,9 @@ func (m *multiTestContext) addStore(idx int) {
eng,
kvs, /* initialValues */
clusterversion.TestingBinaryVersion,
len(m.engines), splits, cfg.Clock.PhysicalNow())
len(m.engines), splits, cfg.Clock.PhysicalNow(),
cfg.TestingKnobs,
)
if err != nil {
m.t.Fatal(err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -699,8 +700,12 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
}

if res.State != nil && res.State.TruncatedState != nil {
activeVersion := b.r.ClusterSettings().Version.ActiveVersion(ctx).Version
migrationVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration)

if apply, err := handleTruncatedStateBelowRaft(
ctx, b.state.TruncatedState, res.State.TruncatedState, b.r.raftMu.stateLoader, b.batch,
migrationVersion.Less(activeVersion),
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to handle truncated state")
} else if !apply {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ func (r *Replica) evaluateProposal(
usingAppliedStateKey := r.mu.state.UsingAppliedStateKey
r.mu.RUnlock()
if !usingAppliedStateKey {
// The range applied state was introduced in v21.1. The cluster
// version transition into v21.1 ought to have migrated any holdover
// ranges still using the legacy keys, which is what we assert
// below. If we're not running 21.1 yet, migrate over as we've done
// since the introduction of the applied state key.
activeVersion := r.ClusterSettings().Version.ActiveVersion(ctx).Version
migrationVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration)
if migrationVersion.Less(activeVersion) {
log.Fatalf(ctx, "not using applied state key in v21.1")
}
// The range applied state was introduced in v2.1. It's possible to
// still find ranges that haven't activated it. If so, activate it.
// We can remove this code if we introduce a boot-time check that
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,7 @@ func handleTruncatedStateBelowRaft(
oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState,
loader stateloader.StateLoader,
readWriter storage.ReadWriter,
assertNoLegacy bool,
) (_apply bool, _ error) {
// If this is a log truncation, load the resulting unreplicated or legacy
// replicated truncated state (in that order). If the migration is happening
Expand All @@ -1686,6 +1687,10 @@ func handleTruncatedStateBelowRaft(
return false, errors.Wrap(err, "loading truncated state")
}

if assertNoLegacy && truncStateIsLegacy {
log.Fatalf(ctx, "found legacy truncated state which should no longer exist")
}

// Truncate the Raft log from the entry after the previous
// truncation index to the new truncation index. This is performed
// atomically with the raft command application so that the
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Term: term,
}

apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng)
apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
if err != nil {
return err.Error()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
nil, /* initialValues */
bootstrapVersion,
1 /* numStores */, nil /* splits */, cfg.Clock.PhysicalNow(),
cfg.TestingKnobs,
); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func WriteInitialReplicaState(
s.Stats = &ms
s.Lease = &lease
s.GCThreshold = &gcThreshold
s.UsingAppliedStateKey = true
if truncStateType != TruncatedStateLegacyReplicatedAndNoAppliedKey {
s.UsingAppliedStateKey = true
}

if existingLease, err := rsl.LoadLease(ctx, readWriter); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "error reading lease")
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type TruncatedStateType int
const (
// TruncatedStateLegacyReplicated means use the legacy (replicated) key.
TruncatedStateLegacyReplicated TruncatedStateType = iota
// TruncatedStateLegacyReplicatedAndNoAppliedKey means use the legacy key
// and also don't use the RangeAppliedKey. This is for testing use only.
TruncatedStateLegacyReplicatedAndNoAppliedKey
// TruncatedStateUnreplicated means use the new (unreplicated) key.
TruncatedStateUnreplicated
)
Expand Down Expand Up @@ -141,7 +144,7 @@ func (rsl StateLoader) Save(
if err := rsl.SetGCThreshold(ctx, readWriter, ms, state.GCThreshold); err != nil {
return enginepb.MVCCStats{}, err
}
if truncStateType == TruncatedStateLegacyReplicated {
if truncStateType != TruncatedStateUnreplicated {
if err := rsl.SetLegacyRaftTruncatedState(ctx, readWriter, ms, state.TruncatedState); err != nil {
return enginepb.MVCCStats{}, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func WriteInitialClusterData(
numStores int,
splits []roachpb.RKey,
nowNanos int64,
knobs StoreTestingKnobs,
) error {
// Bootstrap version information. We'll add the "bootstrap version" to the
// list of initialValues, so that we don't have to handle it specially
Expand Down Expand Up @@ -234,6 +235,9 @@ func WriteInitialClusterData(
}

truncStateType := stateloader.TruncatedStateUnreplicated
if tt := knobs.TruncatedStateTypeOverride; tt != nil {
truncStateType = *tt
}
lease := roachpb.Lease{}
_, err := stateloader.WriteInitialState(
ctx, batch,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func createTestStoreWithoutStart(
if err := WriteInitialClusterData(
context.Background(), eng, kvs, /* initialValues */
clusterversion.TestingBinaryVersion,
1 /* numStores */, splits, cfg.Clock.PhysicalNow(),
1 /* numStores */, splits, cfg.Clock.PhysicalNow(), cfg.TestingKnobs,
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestStoreInitAndBootstrap(t *testing.T) {

if err := WriteInitialClusterData(
ctx, eng, kvs /* initialValues */, clusterversion.TestingBinaryVersion,
1 /* numStores */, splits, cfg.Clock.PhysicalNow(),
1 /* numStores */, splits, cfg.Clock.PhysicalNow(), cfg.TestingKnobs,
); err != nil {
t.Errorf("failure to create first range: %+v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -262,6 +263,9 @@ type StoreTestingKnobs struct {
// GCReplicasInterceptor intercepts attempts to GC all replicas in the
// store.
GCReplicasInterceptor func()
// If set, use the given truncated state type when bootstrapping ranges.
// This is used for testing the truncated state migration.
TruncatedStateTypeOverride *stateloader.TruncatedStateType
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
4 changes: 4 additions & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ go_test(
"client_test.go",
"helper_test.go",
"main_test.go",
"migrations_test.go",
"util_test.go",
],
embed = [":migration"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -57,6 +60,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/stretchr/testify/require",
"//vendor/google.golang.org/grpc",
],
)
6 changes: 6 additions & 0 deletions pkg/migration/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ func (h *Helper) DB() *kv.DB {
return h.c.db()
}

// ClusterVersion exposes the cluster version associated with the ongoing
// migration.
func (h *Helper) ClusterVersion() clusterversion.ClusterVersion {
return h.cv
}

type clusterImpl struct {
nl nodeLiveness
exec sqlutil.InternalExecutor
Expand Down
Loading

0 comments on commit ef96beb

Please sign in to comment.