Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: introduce the kv Migrate command, and GCReplicas/FlushAllEngines RPCs #57662

Closed
wants to merge 11 commits into from
Closed
38 changes: 30 additions & 8 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,18 +227,10 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.DeleteRequest:
row := &result.Rows[k]
row.Key = []byte(args.(*roachpb.DeleteRequest).Key)

case *roachpb.DeleteRangeRequest:
if result.Err == nil {
result.Keys = reply.(*roachpb.DeleteRangeResponse).Keys
}

default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}

// Nothing to do for all methods below as they do not generate
// any rows.
case *roachpb.EndTxnRequest:
Expand All @@ -264,6 +256,12 @@ func (b *Batch) fillResults(ctx context.Context) {
case *roachpb.ImportRequest:
case *roachpb.AdminScatterRequest:
case *roachpb.AddSSTableRequest:
case *roachpb.MigrateRequest:
default:
if result.Err == nil {
result.Err = errors.Errorf("unsupported reply: %T for %T",
reply, args)
}
}
// Fill up the resume span.
if result.Err == nil && reply != nil && reply.Header().ResumeSpan != nil {
Expand Down Expand Up @@ -754,3 +752,27 @@ func (b *Batch) addSSTable(
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

// migrate is only exported on DB.
//lint:ignore U1000 unused
func (b *Batch) migrate(s, e interface{}, version roachpb.Version) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
end, err := marshalKey(e)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.MigrateRequest{
RequestHeader: roachpb.RequestHeader{
Key: begin,
EndKey: end,
},
Version: version,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
10 changes: 10 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,16 @@ func (db *DB) AddSSTable(
return getOneErr(db.Run(ctx, b), b)
}

// Migrate proactively forces ranges overlapping with the provided keyspace to
// transition out of any legacy modes of operation (as defined by the target
// version).
//lint:ignore U1000 unused
func (db *DB) Migrate(ctx context.Context, begin, end interface{}, version roachpb.Version) error {
b := &Batch{}
b.migrate(begin, end, version)
return getOneErr(db.Run(ctx, b), b)
}

// sendAndFill is a helper which sends the given batch and fills its results,
// returning the appropriate error which is either from the first failing call,
// or an "internal" error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"cmd_lease_request.go",
"cmd_lease_transfer.go",
"cmd_merge.go",
"cmd_migrate.go",
"cmd_push_txn.go",
"cmd_put.go",
"cmd_query_intent.go",
Expand Down
60 changes: 60 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
)

func init() {
RegisterReadWriteCommand(roachpb.Migrate, declareKeysMigrate, Migrate)
}

func declareKeysMigrate(
_ *roachpb.RangeDescriptor, _ roachpb.Header, _ roachpb.Request, _, _ *spanset.SpanSet,
) {
}

// migrationRegistry is a global registry of all KV-level migrations. See
// pkg/migration for details around how the migrations defined here are
// wired up.
var migrationRegistry = make(map[roachpb.Version]migration)

type migration func(context.Context, storage.ReadWriter, CommandArgs) (result.Result, error)

func init() {
// registerMigration(clusterversion.WhateverMigration, whateverMigration)
_ = registerMigration
}

func registerMigration(key clusterversion.Key, migration migration) {
migrationRegistry[clusterversion.ByKey(key)] = migration
}

// Migrate executes the below-raft migration corresponding to the given version.
func Migrate(
ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, _ roachpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*roachpb.MigrateRequest)

fn, ok := migrationRegistry[args.Version]
if !ok {
return result.Result{}, errors.Newf("migration for %s not found", args.Version)
}
return fn(ctx, readWriter, cArgs)
}
18 changes: 0 additions & 18 deletions pkg/kv/kvserver/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// Code in this file is for testing usage only. It is exported only because it
Expand All @@ -26,23 +25,6 @@ func (bq *baseQueue) testingAdd(
return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) {
if err := forceScanAndProcess(s, q); err != nil {
log.Fatalf(ctx, "%v", err)
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ func (r *Replica) executeWriteBatch(
log.Warningf(ctx, "%v", err)
}
}
if ba.Requests[0].GetMigrate() != nil && propResult.Err == nil {
// Migrate is special since it wants commands to be durably
// applied on all peers, which we achieve via waitForApplication.
//
// NB: We don't have to worry about extant snapshots creating
// replicas that start at an index before this Migrate request.
// Snapshots that don't include the recipient (as specified by
// replicaID and descriptor in the snap vs. the replicaID of the
// raft instance) are discarded by the recipient, and we're
// already checking against all replicas in the descriptor
// below. Snapshots are also discarded unless they move the LAI
// forward, so we're not worried about old snapshots (with
// indexes preceding the MLAI here) instantiating pre-migrated
// state in anyway.
desc := r.Desc()
// NB: waitForApplication already has a timeout.
applicationErr := waitForApplication(
ctx, r.store.cfg.NodeDialer, desc.RangeID, desc.Replicas().All(),
uint64(maxLeaseIndex))
propResult.Err = roachpb.NewError(applicationErr)
}
return propResult.Reply, nil, propResult.Err
case <-slowTimer.C:
slowTimer.Read = true
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2763,6 +2763,32 @@ func (s *Store) ManuallyEnqueue(
return collect(), processErr, nil
}

// GCReplicas iterates over all ranges and processes any that may need to be
// GC'd.
func (s *Store) GCReplicas() error {
if interceptor := s.TestingKnobs().GCReplicasInterceptor; interceptor != nil {
interceptor()
}
return forceScanAndProcess(s, s.replicaGCQueue.baseQueue)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

// WriteClusterVersion writes the given cluster version to the store-local
// cluster version key.
func WriteClusterVersion(
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ type StoreTestingKnobs struct {
// in execChangeReplicasTxn that prevent moving
// to a configuration that cannot make progress.
AllowDangerousReplicationChanges bool
// GCReplicasInterceptor intercepts attempts to GC all replicas in the
// store.
GCReplicasInterceptor func()
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
39 changes: 38 additions & 1 deletion pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "migration",
srcs = [
"helper.go",
"manager.go",
"migrations.go",
"util.go",
Expand All @@ -11,15 +12,51 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/cockroachdb/logtags",
"//vendor/github.com/cockroachdb/redact",
"//vendor/google.golang.org/grpc",
],
)

go_test(
name = "migration_test",
srcs = [
"client_test.go",
"helper_test.go",
"main_test.go",
"util_test.go",
],
embed = [":migration"],
deps = [
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"//vendor/github.com/cockroachdb/errors",
"//vendor/google.golang.org/grpc",
],
)
64 changes: 64 additions & 0 deletions pkg/migration/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package migration_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestHelperIterateRangeDescriptors(t *testing.T) {
defer leaktest.AfterTest(t)

cv := clusterversion.ClusterVersion{}
ctx := context.Background()
const numNodes = 1

params, _ := tests.CreateTestServerParams()
server, _, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())

var numRanges int
if err := server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error {
numRanges = s.ReplicaCount()
return nil
}); err != nil {
t.Fatal(err)
}

c := migration.TestingNewCluster(numNodes, kvDB, server.InternalExecutor().(sqlutil.InternalExecutor))
h := migration.TestingNewHelper(c, cv)

for _, blockSize := range []int{1, 5, 10, 50} {
var numDescs int
init := func() { numDescs = 0 }
if err := h.IterateRangeDescriptors(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error {
numDescs += len(descriptors)
return nil
}); err != nil {
t.Fatal(err)
}

// TODO(irfansharif): We always seem to include a second copy of r1's
// desc. Unsure why.
if numDescs != numRanges+1 {
t.Fatalf("expected to find %d ranges, found %d", numRanges+1, numDescs)
}
}
}
Loading