Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migration,*: onboard TruncatedAndRangeAppliedStateMigration #57694

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-4</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>20.2-8</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
23 changes: 23 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ const (
// EmptyArraysInInvertedIndexes is when empty arrays are added to array
// inverted indexes.
EmptyArraysInInvertedIndexes
// TruncatedAndRangeAppliedStateMigration is part of the migration to stop
// using the legacy truncated state within KV. After the migration, we'll be
// using the unreplicated truncated state and the RangeAppliedState on all
// ranges. Callers that wish to assert on there no longer being any legacy
// In 21.2 we'll now be able to remove any holdover code handling the
// possibility of replicated truncated state.
//
// TODO(irfansharif): Do the above in 21.2.
TruncatedAndRangeAppliedStateMigration
// PostTruncatedAndRangeAppliedStateMigration is a placeholder version while
// we don't have a version immediately preceding
// TruncatedAndRangeAppliedStateMigration.
//
// TODO(irfansharif): Remove this once we've added some other version.
PostTruncatedAndRangeAppliedStateMigration

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -321,6 +336,14 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: EmptyArraysInInvertedIndexes,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 4},
},
{
Key: TruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 6},
},
{
Key: PostTruncatedAndRangeAppliedStateMigration,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 8},
},

// Step (2): Add new versions here.
})
Expand Down
6 changes: 4 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 29 additions & 8 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,18 +227,10 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.DeleteRequest:
row := &result.Rows[k]
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
if result.Err == nil {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
}

default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.EndTxnRequest:
Expand All @@ -264,6 +256,12 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.ImportRequest:
case *roachpb.AdminScatterRequest:
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}
}
// Fill up the resume span.
if result.Err == nil && reply != nil && reply.Header().ResumeSpan != nil {
Expand Down Expand Up @@ -754,3 +752,26 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
Version: version,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
9 changes: 9 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,15 @@ func (db *DB) AddSSTable(
return getOneErr(db.Run(ctx, b), b)
}

// Migrate proactively forces ranges overlapping with the provided keyspace to
// transition out of any legacy modes of operation (as defined by the target
// version).
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"cmd_lease_request.go",
"cmd_lease_transfer.go",
"cmd_merge.go",
"cmd_migrate.go",
"cmd_push_txn.go",
"cmd_put.go",
"cmd_query_intent.go",
Expand Down
107 changes: 107 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

func init() {
RegisterReadWriteCommand(roachpb.Migrate, declareKeysMigrate, Migrate)
}

func declareKeysMigrate(
_ *roachpb.RangeDescriptor,
header roachpb.Header,
_ roachpb.Request,
latchSpans, lockSpans *spanset.SpanSet,
) {
// TODO(irfansharif): This will eventually grow to capture the super set of
// all keys accessed by all migrations defined here. That could get
// cumbersome. We could spruce up the migration type and allow authors to
// define the allow authors for specific set of keys each migration needs to
// grab latches and locks over.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)})
lockSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)})
}

// migrationRegistry is a global registry of all KV-level migrations. See
// pkg/migration for details around how the migrations defined here are
// wired up.
var migrationRegistry = make(map[roachpb.Version]migration)

type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Result, error)

func init() {
registerMigration(clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedAndAppliedStateMigration)
}

func registerMigration(key clusterversion.Key, migration migration) {
migrationRegistry[clusterversion.ByKey(key)] = migration
}

// Migrate executes the below-raft migration corresponding to the given version.
func Migrate(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, _ roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.MigrateRequest)

fn, ok := migrationRegistry[args.Version]
if !ok {
return result.Result{}, errors.Newf("migration for %s not found", args.Version)
}
return fn(ctx, readWriter, cArgs)
}

// truncatedAndRangeAppliedStateMigration lets us stop using the legacy
// replicated truncated state and start using the new RangeAppliedState for this
// specific range.
func truncatedAndAppliedStateMigration(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs,
) (result.Result, error) {
var legacyTruncatedState roachpb.RaftTruncatedState
legacyKeyFound, err := storage.MVCCGetProto(
ctx, readWriter, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()),
hlc.Timestamp{}, &legacyTruncatedState, storage.MVCCGetOptions{},
)
if err != nil {
return result.Result{}, err
}

var pd result.Result
if legacyKeyFound {
// Time to migrate by deleting the legacy key. The downstream-of-Raft
// code will atomically rewrite the truncated state (supplied via the
// side effect) into the new unreplicated key.
if err := storage.MVCCDelete(
ctx, readWriter, cArgs.Stats, keys.RaftTruncatedStateLegacyKey(cArgs.EvalCtx.GetRangeID()),
hlc.Timestamp{}, nil, /* txn */
); err != nil {
return result.Result{}, err
}
pd.Replicated.State = &kvserverpb.ReplicaState{
// We need to pass in a truncated state to enable the migration.
// Passing the same one is the easiest thing to do.
TruncatedState: &legacyTruncatedState,
}
}
return pd, nil
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func createTestStoreWithOpts(
eng,
kvs, /* initialValues */
clusterversion.TestingBinaryVersion,
1 /* numStores */, splits, storeCfg.Clock.PhysicalNow())
1 /* numStores */, splits, storeCfg.Clock.PhysicalNow(),
storeCfg.TestingKnobs,
)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -958,7 +960,9 @@ func (m *multiTestContext) addStore(idx int) {
eng,
kvs, /* initialValues */
clusterversion.TestingBinaryVersion,
len(m.engines), splits, cfg.Clock.PhysicalNow())
len(m.engines), splits, cfg.Clock.PhysicalNow(),
cfg.TestingKnobs,
)
if err != nil {
m.t.Fatal(err)
}
Expand Down
18 changes: 0 additions & 18 deletions pkg/kv/kvserver/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// Code in this file is for testing usage only. It is exported only because it
Expand All @@ -26,23 +25,6 @@ func (bq *baseQueue) testingAdd(
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) {
if err := forceScanAndProcess(s, q); err != nil {
log.Fatalf(ctx, "%v", err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
Expand Down Expand Up @@ -699,8 +700,12 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
}

if res.State != nil && res.State.TruncatedState != nil {
activeVersion := b.r.ClusterSettings().Version.ActiveVersion(ctx).Version
migrationVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration)

if apply, err := handleTruncatedStateBelowRaft(
ctx, b.state.TruncatedState, res.State.TruncatedState, b.r.raftMu.stateLoader, b.batch,
migrationVersion.Less(activeVersion),
); err != nil {
return wrapWithNonDeterministicFailure(err, "unable to handle truncated state")
} else if !apply {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,16 @@ func (r *Replica) evaluateProposal(
usingAppliedStateKey := r.mu.state.UsingAppliedStateKey
r.mu.RUnlock()
if !usingAppliedStateKey {
// The range applied state was introduced in v21.1. The cluster
// version transition into v21.1 ought to have migrated any holdover
// ranges still using the legacy keys, which is what we assert
// below. If we're not running 21.1 yet, migrate over as we've done
// since the introduction of the applied state key.
activeVersion := r.ClusterSettings().Version.ActiveVersion(ctx).Version
migrationVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration)
if migrationVersion.Less(activeVersion) {
log.Fatalf(ctx, "not using applied state key in v21.1")
}
// The range applied state was introduced in v2.1. It's possible to
// still find ranges that haven't activated it. If so, activate it.
// We can remove this code if we introduce a boot-time check that
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,7 @@ func handleTruncatedStateBelowRaft(
oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState,
loader stateloader.StateLoader,
readWriter storage.ReadWriter,
assertNoLegacy bool,
) (_apply bool, _ error) {
// If this is a log truncation, load the resulting unreplicated or legacy
// replicated truncated state (in that order). If the migration is happening
Expand All @@ -1686,6 +1687,10 @@ func handleTruncatedStateBelowRaft(
return false, errors.Wrap(err, "loading truncated state")
}

if assertNoLegacy && truncStateIsLegacy {
log.Fatalf(ctx, "found legacy truncated state which should no longer exist")
}

// Truncate the Raft log from the entry after the previous
// truncation index to the new truncation index. This is performed
// atomically with the raft command application so that the
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Term: term,
}

apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng)
apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
if err != nil {
return err.Error()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
nil, /* initialValues */
bootstrapVersion,
1 /* numStores */, nil /* splits */, cfg.Clock.PhysicalNow(),
cfg.TestingKnobs,
); err != nil {
t.Fatal(err)
}
Expand Down
Loading