From 126ea38261fb12997b8b26d244ea02561aa2a805 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 22 Dec 2020 16:00:00 -0500 Subject: [PATCH] migration: rework migration subsystem into multiple packages No tests were harmed in this commit. Some abstractions which felt like cruft were thinned. Release note: None --- pkg/migration/BUILD.bazel | 57 +--- pkg/migration/helper.go | 312 ----------------- pkg/migration/helper_test.go | 315 ------------------ pkg/migration/helpers.go | 44 +++ pkg/migration/migration.go | 225 +++++++++++++ pkg/migration/migrationcluster/BUILD.bazel | 53 +++ .../{ => migrationcluster}/client_test.go | 18 +- pkg/migration/migrationcluster/cluster.go | 172 ++++++++++ pkg/migration/migrationcluster/helper_test.go | 215 ++++++++++++ pkg/migration/migrationcluster/main_test.go | 29 ++ pkg/migration/migrationcluster/nodes.go | 118 +++++++ .../nodes_test.go} | 26 +- pkg/migration/migrationmanager/BUILD.bazel | 46 +++ pkg/migration/migrationmanager/main_test.go | 29 ++ .../{ => migrationmanager}/manager.go | 85 ++--- .../migrationmanager/manager_external_test.go | 117 +++++++ pkg/migration/migrations.go | 183 ---------- pkg/migration/migrations/BUILD.bazel | 41 +++ pkg/migration/{ => migrations}/main_test.go | 2 +- pkg/migration/migrations/migrations.go | 81 +++++ pkg/migration/migrations/truncated_state.go | 94 ++++++ .../truncated_state_external_test.go} | 94 +----- pkg/migration/nodelivenesstest/BUILD.bazel | 12 + .../nodelivenesstest/test_node_liveness.go | 96 ++++++ pkg/migration/util.go | 122 ------- pkg/server/BUILD.bazel | 2 +- pkg/server/server_sql.go | 5 +- 27 files changed, 1435 insertions(+), 1158 deletions(-) delete mode 100644 pkg/migration/helper.go delete mode 100644 pkg/migration/helper_test.go create mode 100644 pkg/migration/helpers.go create mode 100644 pkg/migration/migration.go create mode 100644 pkg/migration/migrationcluster/BUILD.bazel rename pkg/migration/{ => migrationcluster}/client_test.go (78%) create mode 100644 pkg/migration/migrationcluster/cluster.go create mode 100644 pkg/migration/migrationcluster/helper_test.go create mode 100644 pkg/migration/migrationcluster/main_test.go create mode 100644 pkg/migration/migrationcluster/nodes.go rename pkg/migration/{util_test.go => migrationcluster/nodes_test.go} (78%) create mode 100644 pkg/migration/migrationmanager/BUILD.bazel create mode 100644 pkg/migration/migrationmanager/main_test.go rename pkg/migration/{ => migrationmanager}/manager.go (72%) create mode 100644 pkg/migration/migrationmanager/manager_external_test.go delete mode 100644 pkg/migration/migrations.go create mode 100644 pkg/migration/migrations/BUILD.bazel rename pkg/migration/{ => migrations}/main_test.go (97%) create mode 100644 pkg/migration/migrations/migrations.go create mode 100644 pkg/migration/migrations/truncated_state.go rename pkg/migration/{migrations_test.go => migrations/truncated_state_external_test.go} (58%) create mode 100644 pkg/migration/nodelivenesstest/BUILD.bazel create mode 100644 pkg/migration/nodelivenesstest/test_node_liveness.go delete mode 100644 pkg/migration/util.go diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index ffe77f76d105..a0cfcb63f082 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -1,70 +1,19 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "migration", srcs = [ - "helper.go", - "manager.go", - "migrations.go", - "util.go", + "helpers.go", + "migration.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/migration", visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", - "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", - "//pkg/rpc", - "//pkg/rpc/nodedialer", "//pkg/server/serverpb", - "//pkg/sql", - "//pkg/sql/sqlutil", - "//pkg/util/ctxgroup", "//pkg/util/log", - "//pkg/util/quotapool", - "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", - "@com_github_cockroachdb_redact//:redact", - "@org_golang_google_grpc//:go_default_library", - ], -) - -go_test( - name = "migration_test", - srcs = [ - "client_test.go", - "helper_test.go", - "main_test.go", - "migrations_test.go", - "util_test.go", - ], - embed = [":migration"], - deps = [ - "//pkg/base", - "//pkg/clusterversion", - "//pkg/kv", - "//pkg/kv/kvserver", - "//pkg/kv/kvserver/batcheval", - "//pkg/kv/kvserver/liveness", - "//pkg/kv/kvserver/liveness/livenesspb", - "//pkg/kv/kvserver/stateloader", - "//pkg/roachpb", - "//pkg/security", - "//pkg/security/securitytest", - "//pkg/server", - "//pkg/server/serverpb", - "//pkg/settings/cluster", - "//pkg/sql/tests", - "//pkg/testutils", - "//pkg/testutils/serverutils", - "//pkg/testutils/testcluster", - "//pkg/util/leaktest", - "//pkg/util/log", - "//pkg/util/syncutil", - "@com_github_cockroachdb_errors//:errors", - "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:go_default_library", ], ) diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go deleted file mode 100644 index f877417d22c1..000000000000 --- a/pkg/migration/helper.go +++ /dev/null @@ -1,312 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package migration - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" - "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/quotapool" - "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" - "google.golang.org/grpc" -) - -// Helper captures all the primitives required to fully specify a migration. -type Helper struct { - c cluster - cv clusterversion.ClusterVersion -} - -// cluster mediates access to the crdb cluster. -type cluster interface { - // nodes returns the IDs and epochs for all nodes that are currently part of - // the cluster (i.e. they haven't been decommissioned away). Migrations have - // the pre-requisite that all nodes are up and running so that we're able to - // execute all relevant node-level operations on them. If any of the nodes - // are found to be unavailable, an error is returned. - // - // It's important to note that this makes no guarantees about new nodes - // being added to the cluster. It's entirely possible for that to happen - // concurrently with the retrieval of the current set of nodes. Appropriate - // usage of this entails wrapping it under a stabilizing loop, like we do in - // EveryNode. - nodes(ctx context.Context) (nodes, error) - - // dial returns a grpc connection to the given node. - dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error) - - // db provides access the kv.DB instance backing the cluster. - // - // TODO(irfansharif): We could hide the kv.DB instance behind an interface - // to expose only relevant, vetted bits of kv.DB. It'll make our tests less - // "integration-ey". - db() *kv.DB - - // executor provides access to an internal executor instance to run - // arbitrary SQL statements. - executor() sqlutil.InternalExecutor -} - -func newHelper(c cluster, cv clusterversion.ClusterVersion) *Helper { - return &Helper{c: c, cv: cv} -} - -// ForEveryNode is a short hand to execute the given closure (named by the -// informational parameter op) against every node in the cluster at a given -// point in time. Given it's possible for nodes to join or leave the cluster -// during (we don't make any guarantees for the ordering of cluster membership -// events), we only expect this to be used in conjunction with -// UntilClusterStable (see the comment there for how these two primitives can be -// put together). -func (h *Helper) ForEveryNode( - ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, -) error { - ns, err := h.c.nodes(ctx) - if err != nil { - return err - } - - // We'll want to rate limit outgoing RPCs (limit pulled out of thin air). - qp := quotapool.NewIntPool("every-node", 25) - log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) - grp := ctxgroup.WithContext(ctx) - - for _, node := range ns { - id := node.id // copy out of the loop variable - alloc, err := qp.Acquire(ctx, 1) - if err != nil { - return err - } - - grp.GoCtx(func(ctx context.Context) error { - defer alloc.Release() - - conn, err := h.c.dial(ctx, id) - if err != nil { - return err - } - client := serverpb.NewMigrationClient(conn) - return fn(ctx, client) - }) - } - return grp.Wait() -} - -// UntilClusterStable invokes the given closure until the cluster membership is -// stable, i.e once the set of nodes in the cluster before and after the closure -// are identical, and no nodes have restarted in the interim, we can return to -// the caller[*]. -// -// The mechanism for doing so, while accounting for the possibility of new nodes -// being added to the cluster in the interim, is provided by the following -// structure: -// (a) We'll retrieve the list of node IDs for all nodes in the system -// (b) We'll invoke the closure -// (c) We'll retrieve the list of node IDs again to account for the -// possibility of a new node being added during (b), or a node -// restarting -// (d) If there any discrepancies between the list retrieved in (a) -// and (c), we'll invoke the closure again -// (e) We'll continue to loop around until the node ID list stabilizes -// -// [*]: We can be a bit more precise here. What UntilClusterStable gives us is a -// strict causal happened-before relation between running the given closure and -// the next node that joins the cluster. Put another way: using -// UntilClusterStable callers will have managed to run something without a new -// node joining half-way through (which could have allowed it to pick up some -// state off one of the existing nodes that hadn't heard from us yet). -// -// To consider an example of how this primitive is used, let's consider our use -// of it to bump the cluster version. We use in conjunction with ForEveryNode, -// where after we return, we can rely on the guarantee that all nodes in the -// cluster will have their cluster versions bumped. This then implies that -// future node additions will observe the latest version (through the join RPC). -// That in turn lets us author migrations that can assume that a certain version -// gate has been enabled on all nodes in the cluster, and will always be enabled -// for any new nodes in the system. -// -// Given that it'll always be possible for new nodes to join after an -// UntilClusterStable round, it means that some migrations may have to be split -// up into two version bumps: one that phases out the old version (i.e. stops -// creation of stale data or behavior) and a clean-up version, which removes any -// vestiges of the stale data/behavior, and which, when active, ensures that the -// old data has vanished from the system. This is similar in spirit to how -// schema changes are split up into multiple smaller steps that are carried out -// sequentially. -func (h *Helper) UntilClusterStable(ctx context.Context, fn func() error) error { - ns, err := h.c.nodes(ctx) - if err != nil { - return err - } - - for { - if err := fn(); err != nil { - return err - } - - curNodes, err := h.c.nodes(ctx) - if err != nil { - return err - } - - if ok, diffs := ns.identical(curNodes); !ok { - log.Infof(ctx, "%s, retrying", diffs) - ns = curNodes - continue - } - - break - } - - return nil -} - -// IterateRangeDescriptors provides a handle on every range descriptor in the -// system, which callers can then use to send out arbitrary KV requests to in -// order to run arbitrary KV-level migrations. These requests will typically -// just be the `Migrate` request, with code added within [1] to do the specific -// things intended for the specified version. -// -// It's important to note that the closure is being executed in the context of a -// distributed transaction that may be automatically retried. So something like -// the following is an anti-pattern: -// -// processed := 0 -// _ = h.IterateRangeDescriptors(..., -// func(descriptors ...roachpb.RangeDescriptor) error { -// processed += len(descriptors) // we'll over count if retried -// log.Infof(ctx, "processed %d ranges", processed) -// }, -// ) -// -// Instead we allow callers to pass in a callback to signal on every attempt -// (including the first). This lets us salvage the example above: -// -// var processed int -// init := func() { processed = 0 } -// _ = h.IterateRangeDescriptors(..., init, -// func(descriptors ...roachpb.RangeDescriptor) error { -// processed += len(descriptors) -// log.Infof(ctx, "processed %d ranges", processed) -// }, -// ) -// -// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go -func (h *Helper) IterateRangeDescriptors( - ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error, -) error { - if err := h.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // Inform the caller that we're starting a fresh attempt to page in - // range descriptors. - init() - - // Iterate through meta2 to pull out all the range descriptors. - return txn.Iterate(ctx, keys.Meta2Prefix, keys.MetaMax, blockSize, - func(rows []kv.KeyValue) error { - descriptors := make([]roachpb.RangeDescriptor, len(rows)) - for i, row := range rows { - if err := row.ValueProto(&descriptors[i]); err != nil { - return errors.Wrapf(err, - "unable to unmarshal range descriptor from %s", - row.Key, - ) - } - } - - // Invoke fn with the current chunk (of size ~blockSize) of - // range descriptors. - if err := fn(descriptors...); err != nil { - return err - } - - return nil - }) - }); err != nil { - return err - } - - return nil -} - -// DB provides exposes the underlying *kv.DB instance. -func (h *Helper) DB() *kv.DB { - return h.c.db() -} - -// ClusterVersion exposes the cluster version associated with the ongoing -// migration. -func (h *Helper) ClusterVersion() clusterversion.ClusterVersion { - return h.cv -} - -type clusterImpl struct { - nl nodeLiveness - exec sqlutil.InternalExecutor - dialer *nodedialer.Dialer - kvDB *kv.DB -} - -var _ cluster = &clusterImpl{} - -func newCluster( - nl nodeLiveness, dialer *nodedialer.Dialer, executor *sql.InternalExecutor, db *kv.DB, -) *clusterImpl { - return &clusterImpl{nl: nl, dialer: dialer, exec: executor, kvDB: db} -} - -// nodes implements the cluster interface. -func (c *clusterImpl) nodes(ctx context.Context) (nodes, error) { - var ns []node - ls, err := c.nl.GetLivenessesFromKV(ctx) - if err != nil { - return nil, err - } - for _, l := range ls { - if l.Membership.Decommissioned() { - continue - } - live, err := c.nl.IsLive(l.NodeID) - if err != nil { - return nil, err - } - if !live { - return nil, errors.Newf("n%d required, but unavailable", l.NodeID) - } - ns = append(ns, node{id: l.NodeID, epoch: l.Epoch}) - } - return ns, nil -} - -// dial implements the cluster interface. -func (c *clusterImpl) dial(ctx context.Context, id roachpb.NodeID) (*grpc.ClientConn, error) { - return c.dialer.Dial(ctx, id, rpc.DefaultClass) -} - -// db implements the cluster interface. -func (c *clusterImpl) db() *kv.DB { - return c.kvDB -} - -// executor implements the cluster interface. -func (c *clusterImpl) executor() sqlutil.InternalExecutor { - return c.exec -} diff --git a/pkg/migration/helper_test.go b/pkg/migration/helper_test.go deleted file mode 100644 index 80ffdf9aa822..000000000000 --- a/pkg/migration/helper_test.go +++ /dev/null @@ -1,315 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package migration - -import ( - "context" - "fmt" - "testing" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "google.golang.org/grpc" -) - -func TestHelperEveryNodeUntilClusterStable(t *testing.T) { - defer leaktest.AfterTest(t) - - cv := clusterversion.ClusterVersion{} - ctx := context.Background() - var mu syncutil.Mutex - const numNodes = 3 - - t.Run("with-node-addition", func(t *testing.T) { - // Add a node mid-way through execution. We expect EveryNode to start - // over from scratch and include the newly added node. - tc := TestingNewCluster(numNodes) - h := newHelper(tc, cv) - opCount := 0 - err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { - mu.Lock() - defer mu.Unlock() - - opCount++ - if opCount == numNodes { - tc.addNode() - } - - return nil - }) - }) - if err != nil { - t.Fatal(err) - } - - if exp := numNodes*2 + 1; exp != opCount { - t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) - } - }) - - t.Run("with-node-restart", func(t *testing.T) { - // Restart a node mid-way through execution. We expect EveryNode to - // start over from scratch and include the restarted node. - tc := TestingNewCluster(numNodes) - h := newHelper(tc, cv) - opCount := 0 - err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { - mu.Lock() - defer mu.Unlock() - - opCount++ - if opCount == numNodes { - tc.restartNode(2) - } - - return nil - }) - }) - if err != nil { - t.Fatal(err) - } - - if exp := numNodes * 2; exp != opCount { - t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) - } - }) - - t.Run("with-node-downNode", func(t *testing.T) { - // Down a node mid-way through execution. We expect EveryNode to error - // out. - const downedNode = 2 - tc := TestingNewCluster(numNodes) - expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) - h := newHelper(tc, cv) - opCount := 0 - if err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { - mu.Lock() - defer mu.Unlock() - - opCount++ - if opCount == 1 { - tc.downNode(downedNode) - } - return nil - }) - }); !testutils.IsError(err, expRe) { - t.Fatalf("expected error %q, got %q", expRe, err) - } - - tc.restartNode(downedNode) - if err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { - return nil - }) - }); err != nil { - t.Fatal(err) - } - }) -} - -func TestClusterNodes(t *testing.T) { - defer leaktest.AfterTest(t) - - ctx := context.Background() - const numNodes = 3 - - t.Run("retrieves-all", func(t *testing.T) { - nl := newTestNodeLiveness(numNodes) - c := clusterImpl{nl: nl} - - ns, err := c.nodes(ctx) - if err != nil { - t.Fatal(err) - } - - if got := len(ns); got != numNodes { - t.Fatalf("expected %d nodes, got %d", numNodes, got) - } - - for i := range ns { - if exp := roachpb.NodeID(i + 1); exp != ns[i].id { - t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].id) - } - if ns[i].epoch != 1 { - t.Fatalf("expected to find epoch=1, got %d", ns[i].epoch) - } - } - }) - - t.Run("ignores-decommissioned", func(t *testing.T) { - nl := newTestNodeLiveness(numNodes) - c := clusterImpl{nl: nl} - const decommissionedNode = 3 - nl.decommission(decommissionedNode) - - ns, err := c.nodes(ctx) - if err != nil { - t.Fatal(err) - } - - if got := len(ns); got != numNodes-1 { - t.Fatalf("expected %d nodes, got %d", numNodes-1, got) - } - - for i := range ns { - if exp := roachpb.NodeID(i + 1); exp != ns[i].id { - t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].id) - } - if ns[i].epoch != 1 { - t.Fatalf("expected to find epoch=1, got %d", ns[i].epoch) - } - } - }) - - t.Run("errors-if-down", func(t *testing.T) { - nl := newTestNodeLiveness(numNodes) - c := clusterImpl{nl: nl} - const downedNode = 3 - nl.downNode(downedNode) - - _, err := c.nodes(ctx) - expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) - if !testutils.IsError(err, expRe) { - t.Fatalf("expected error %q, got %q", expRe, err) - } - }) -} - -// mockClusterImpl is a testing only implementation of the cluster interface. It -// lets callers mock out adding, killing, and restarting nodes in the cluster. -type mockClusterImpl struct { - nl *mockNodeLivenessImpl - *clusterImpl -} - -var _ cluster = &mockClusterImpl{} - -// TestingNewCluster is an exported a constructor for a test-only implementation -// of the cluster interface. -func TestingNewCluster(numNodes int, options ...func(*mockClusterImpl)) *mockClusterImpl { - nl := newTestNodeLiveness(numNodes) - tc := &mockClusterImpl{ - nl: nl, - clusterImpl: newCluster(nl, nil, nil, nil), - } - for _, option := range options { - option(tc) - } - return tc -} - -// TestingWithKV facilitates the creation of a test cluster backed by the given -// KV instance. -func TestingWithKV(db *kv.DB) func(*mockClusterImpl) { - return func(impl *mockClusterImpl) { - impl.clusterImpl.kvDB = db - } -} - -// dial is part of the cluster interface. We override it here as tests don't -// expect to make any outbound requests. -func (t *mockClusterImpl) dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error) { - return nil, nil -} - -func (t *mockClusterImpl) addNode() { - t.nl.addNode(roachpb.NodeID(len(t.nl.ls) + 1)) -} - -func (t *mockClusterImpl) downNode(id roachpb.NodeID) { - t.nl.downNode(id) -} - -func (t *mockClusterImpl) restartNode(id roachpb.NodeID) { - t.nl.restartNode(id) -} - -// mockNodeLivenessImpl is a testing-only implementation of the nodeLiveness. It -// lets tests mock out restarting, killing, decommissioning and adding nodes to -// the cluster. -type mockNodeLivenessImpl struct { - ls []livenesspb.Liveness - dead map[roachpb.NodeID]struct{} -} - -var _ nodeLiveness = &mockNodeLivenessImpl{} - -func newTestNodeLiveness(numNodes int) *mockNodeLivenessImpl { - nl := &mockNodeLivenessImpl{ - ls: make([]livenesspb.Liveness, numNodes), - dead: make(map[roachpb.NodeID]struct{}), - } - for i := 0; i < numNodes; i++ { - nl.ls[i] = livenesspb.Liveness{ - NodeID: roachpb.NodeID(i + 1), Epoch: 1, - Membership: livenesspb.MembershipStatus_ACTIVE, - } - } - return nl -} - -// GetLivenessesFromKV implements the nodeLiveness interface. -func (t *mockNodeLivenessImpl) GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) { - return t.ls, nil -} - -// IsLive implements the nodeLiveness interface. -func (t *mockNodeLivenessImpl) IsLive(id roachpb.NodeID) (bool, error) { - _, dead := t.dead[id] - return !dead, nil -} - -func (t *mockNodeLivenessImpl) decommission(id roachpb.NodeID) { - for i := range t.ls { - if t.ls[i].NodeID == id { - t.ls[i].Membership = livenesspb.MembershipStatus_DECOMMISSIONED - break - } - } -} - -func (t *mockNodeLivenessImpl) addNode(id roachpb.NodeID) { - t.ls = append(t.ls, livenesspb.Liveness{ - NodeID: id, - Epoch: 1, - Membership: livenesspb.MembershipStatus_ACTIVE, - }) -} - -func (t *mockNodeLivenessImpl) downNode(id roachpb.NodeID) { - t.dead[id] = struct{}{} -} - -func (t *mockNodeLivenessImpl) restartNode(id roachpb.NodeID) { - for i := range t.ls { - if t.ls[i].NodeID == id { - t.ls[i].Epoch++ - break - } - } - - delete(t.dead, id) -} - -// TestingNewHelper is an exported a constructor for Helper for testing -// purposes. -func TestingNewHelper(c cluster, cv clusterversion.ClusterVersion) *Helper { - return &Helper{c: c, cv: cv} -} diff --git a/pkg/migration/helpers.go b/pkg/migration/helpers.go new file mode 100644 index 000000000000..b32c76c4eb0c --- /dev/null +++ b/pkg/migration/helpers.go @@ -0,0 +1,44 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// FenceVersionFor constructs the appropriate "fence version" for the given +// cluster version. Fence versions allow the migrations infrastructure to safely +// step through consecutive cluster versions in the presence of Nodes (running +// any binary version) being added to the cluster. See the migration manager +// above for intended usage. +// +// Fence versions (and the migrations infrastructure entirely) were introduced +// in the 21.1 release cycle. In the same release cycle, we introduced the +// invariant that new user-defined versions (users being crdb engineers) must +// always have even-numbered Internal versions, thus reserving the odd numbers +// to slot in fence versions for each cluster version. See top-level +// documentation in pkg/clusterversion for more details. +func FenceVersionFor( + ctx context.Context, cv clusterversion.ClusterVersion, +) clusterversion.ClusterVersion { + if (cv.Internal % 2) != 0 { + log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version) + } + + // We'll pick the odd internal version preceding the cluster version, + // slotting ourselves right before it. + fenceCV := cv + fenceCV.Internal-- + return fenceCV +} diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go new file mode 100644 index 000000000000..329e4d8d24be --- /dev/null +++ b/pkg/migration/migration.go @@ -0,0 +1,225 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package migration captures the facilities needed to define and execute +// migrations for a crdb cluster. These migrations can be arbitrarily long +// running, are free to send out arbitrary requests cluster wide, change +// internal DB state, and much more. They're typically reserved for crdb +// internal operations and state. Each migration is idempotent in nature, is +// associated with a specific cluster version, and executed when the cluster +// version is made activate on every node in the cluster. +// +// Examples of migrations that apply would be migrations to move all raft state +// from one storage engine to another, or purging all usage of the replicated +// truncated state in KV. A "sister" package of interest is pkg/sqlmigrations. +package migration + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/logtags" +) + +// Manager coordinates long-running migrations. +type Manager interface { + Migrate(ctx context.Context, from, to clusterversion.ClusterVersion) error +} + +// Cluster abstracts a physical KV cluster and can be utilized by a long-runnng +// migration. +type Cluster interface { + + // DB returns access to the kv. + DB() *kv.DB + + // ForEveryNode is a short hand to execute the given closure (named by the + // informational parameter op) against every node in the cluster at a given + // point in time. Given it's possible for nodes to join or leave the cluster + // during (we don't make any guarantees for the ordering of cluster membership + // events), we only expect this to be used in conjunction with + // UntilClusterStable (see the comment there for how these two primitives can be + // put together). + ForEveryNode( + ctx context.Context, + op string, + fn func(context.Context, serverpb.MigrationClient) error, + ) error + + // UntilClusterStable invokes the given closure until the cluster membership + // is stable, i.e once the set of nodes in the cluster before and after the + // closure are identical, and no nodes have restarted in the interim, we can + // return to the caller[*]. + // + // The mechanism for doing so, while accounting for the possibility of new + // nodes being added to the cluster in the interim, is provided by the + // following structure: + // (a) We'll retrieve the list of node IDs for all nodes in the system + // (b) We'll invoke the closure + // (c) We'll retrieve the list of node IDs again to account for the + // possibility of a new node being added during (b), or a node + // restarting + // (d) If there any discrepancies between the list retrieved in (a) + // and (c), we'll invoke the closure again + // (e) We'll continue to loop around until the node ID list stabilizes + // + // [*]: We can be a bit more precise here. What UntilClusterStable gives us is + // a strict causal happens-before relation between running the given closure + // and the next node that joins the cluster. Put another way: using + // UntilClusterStable callers will have managed to run something without a new + // node joining halfway through (which could have allowed it to pick up some + // state off one of the existing nodes that hadn't heard from us yet). + // + // To consider an example of how this primitive is used, let's consider our + // use of it to bump the cluster version. We use in conjunction with + // ForEveryNode, where after we return, we can rely on the guarantee that all + // nodes in the cluster will have their cluster versions bumped. This then + // implies that future node additions will observe the latest version (through + // the join RPC). That in turn lets us author migrations that can assume that + // a certain version gate has been enabled on all nodes in the cluster, and + // will always be enabled for any new nodes in the system. + // + // Given that it'll always be possible for new nodes to join after an + // UntilClusterStable round, it means that some migrations may have to be + // split up into two version bumps: one that phases out the old version (i.e. + // stops creation of stale data or behavior) and a cleanup version, which + // removes any vestiges of the stale data/behavior, and which, when active, + // ensures that the old data has vanished from the system. This is similar in + // spirit to how schema changes are split up into multiple smaller steps that + // are carried out sequentially. + UntilClusterStable(ctx context.Context, fn func() error) error + + // IterateRangeDescriptors provides a handle on every range descriptor in the + // system, which callers can then use to send out arbitrary KV requests to in + // order to run arbitrary KV-level migrations. These requests will typically + // just be the `Migrate` request, with code added within [1] to do the + // specific things intended for the specified version. + // + // It's important to note that the closure is being executed in the context of + // a distributed transaction that may be automatically retried. So something + // like the following is an anti-pattern: + // + // processed := 0 + // _ = h.IterateRangeDescriptors(..., + // func(descriptors ...roachpb.RangeDescriptor) error { + // processed += len(descriptors) // we'll over count if retried + // log.Infof(ctx, "processed %d ranges", processed) + // }, + // ) + // + // Instead we allow callers to pass in a callback to signal on every attempt + // (including the first). This lets us salvage the example above: + // + // var processed int + // init := func() { processed = 0 } + // _ = h.IterateRangeDescriptors(..., init, + // func(descriptors ...roachpb.RangeDescriptor) error { + // processed += len(descriptors) + // log.Infof(ctx, "processed %d ranges", processed) + // }, + // ) + // + // [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go + IterateRangeDescriptors( + ctx context.Context, + size int, + init func(), + f func(descriptors ...roachpb.RangeDescriptor) error, + ) error +} + +// Migration defines a program to be executed once every node in the cluster is +// (a) running a specific binary version, and (b) has completed all prior +// migrations. +// +// Each migration is associated with a specific internal cluster version and is +// idempotent in nature. When setting the cluster version (via `SET CLUSTER +// SETTING version`), the manager process determines the set of migrations +// needed to bridge the gap between the current active cluster version, and the +// target one. See [1] for where that happens. +// +// To introduce a migration, start by adding version key to pkg/clusterversion +// and introducing a corresponding internal cluster version for it. See [2] for +// more details. Following that, define a Migration in the migrations package +// and add it to the appropriate migrations slice to the registry. Be sure to +// key it in with the new cluster version we just added. During cluster +// upgrades, once the operator is able to set a cluster version setting that's +// past the version that was introduced (typically the major release version +// the migration was introduced in), the manager will execute the defined +// migration before letting the upgrade finalize. +// +// If the migration requires below-Raft level changes ([3] is one example), +// you'll need to add a version switch and the relevant KV-level migration in +// [4]. See IterateRangeDescriptors and the Migrate KV request for more details. +// +// [1]: `(*Manager).Migrate` +// [2]: pkg/clusterversion/cockroach_versions.go +// [3]: truncatedStateMigration +// [4]: pkg/kv/kvserver/batch_eval/cmd_migrate.go +// +type Migration interface { + ClusterVersion() clusterversion.ClusterVersion + + internal() // restrict implementations to this package +} + +type migration struct { + description string + cv clusterversion.ClusterVersion +} + +// ClusterVersion makes KVMigration a Migration. +func (m *migration) ClusterVersion() clusterversion.ClusterVersion { + return m.cv +} + +func (m *migration) internal() {} + +// KVMigration is an implementation of Migration for KV-level migrations. +type KVMigration struct { + migration + fn KVMigrationFn +} + +// NewKVMigration constructs a KVMigration. +func NewKVMigration( + description string, cv clusterversion.ClusterVersion, fn KVMigrationFn, +) *KVMigration { + return &KVMigration{ + migration: migration{ + description: description, + cv: cv, + }, + fn: fn, + } +} + +// KVMigrationFn contains the logic of a KVMigration. +type KVMigrationFn func(context.Context, clusterversion.ClusterVersion, Cluster) error + +// Run kickstarts the actual migration process. It's responsible for recording +// the ongoing status of the migration into a system table. +// +// TODO(irfansharif): Introduce a `system.migrations` table, and populate it here. +func (m *KVMigration) Run( + ctx context.Context, cv clusterversion.ClusterVersion, h Cluster, +) (err error) { + ctx = logtags.AddTag(ctx, fmt.Sprintf("migration=%s", cv), nil) + + if err := m.fn(ctx, cv, h); err != nil { + return err + } + + return nil +} diff --git a/pkg/migration/migrationcluster/BUILD.bazel b/pkg/migration/migrationcluster/BUILD.bazel new file mode 100644 index 000000000000..8c69648729bf --- /dev/null +++ b/pkg/migration/migrationcluster/BUILD.bazel @@ -0,0 +1,53 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "migrationcluster", + srcs = [ + "cluster.go", + "nodes.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrationcluster", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/roachpb", + "//pkg/rpc", + "//pkg/server/serverpb", + "//pkg/util/ctxgroup", + "//pkg/util/log", + "//pkg/util/quotapool", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", + "@org_golang_google_grpc//:go_default_library", + ], +) + +go_test( + name = "migrationcluster_test", + srcs = [ + "client_test.go", + "helper_test.go", + "main_test.go", + "nodes_test.go", + ], + embed = [":migrationcluster"], + deps = [ + "//pkg/kv/kvserver", + "//pkg/migration/nodelivenesstest", + "//pkg/roachpb", + "//pkg/rpc", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/server/serverpb", + "//pkg/sql/tests", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/syncutil", + "@org_golang_google_grpc//:go_default_library", + ], +) diff --git a/pkg/migration/client_test.go b/pkg/migration/migrationcluster/client_test.go similarity index 78% rename from pkg/migration/client_test.go rename to pkg/migration/migrationcluster/client_test.go index a56a1c490a18..e6d114c3fa75 100644 --- a/pkg/migration/client_test.go +++ b/pkg/migration/migrationcluster/client_test.go @@ -7,25 +7,25 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package migration_test + +package migrationcluster_test import ( "context" "testing" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrationcluster" + "github.com/cockroachdb/cockroach/pkg/migration/nodelivenesstest" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) -func TestHelperIterateRangeDescriptors(t *testing.T) { +func TestCluster_IterateRangeDescriptors(t *testing.T) { defer leaktest.AfterTest(t) - cv := clusterversion.ClusterVersion{} ctx := context.Background() const numNodes = 1 @@ -41,8 +41,12 @@ func TestHelperIterateRangeDescriptors(t *testing.T) { t.Fatal(err) } - c := migration.TestingNewCluster(numNodes, migration.TestingWithKV(kvDB)) - h := migration.TestingNewHelper(c, cv) + c := nodelivenesstest.New(numNodes) + h := migrationcluster.New(migrationcluster.ClusterConfig{ + NodeLiveness: c, + Dialer: migrationcluster.NoopDialer{}, + DB: kvDB, + }) for _, blockSize := range []int{1, 5, 10, 50} { var numDescs int diff --git a/pkg/migration/migrationcluster/cluster.go b/pkg/migration/migrationcluster/cluster.go new file mode 100644 index 000000000000..9f22204a0f8c --- /dev/null +++ b/pkg/migration/migrationcluster/cluster.go @@ -0,0 +1,172 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrationcluster + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "google.golang.org/grpc" +) + +// Cluster mediates interacting with a cockroach cluster. +type Cluster struct { + c ClusterConfig +} + +// ClusterConfig configures a Cluster. +type ClusterConfig struct { + + // NodeLiveness is used to determine the set of nodes in the cluster. + NodeLiveness NodeLiveness + + // Dialer constructs connections to other nodes. + Dialer NodeDialer + + // DB provides access the kv.DB instance backing the cluster. + // + // TODO(irfansharif): We could hide the kv.DB instance behind an interface + // to expose only relevant, vetted bits of kv.DB. It'll make our tests less + // "integration-ey". + DB *kv.DB +} + +// NodeDialer abstracts connecting to other nodes in the cluster. +type NodeDialer interface { + // Dial returns a grpc connection to the given node. + Dial(context.Context, roachpb.NodeID, rpc.ConnectionClass) (*grpc.ClientConn, error) +} + +// NodeLiveness is the subset of the interface satisfied by CRDB's node liveness +// component that the migration manager relies upon. +type NodeLiveness interface { + GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) + IsLive(roachpb.NodeID) (bool, error) +} + +// New constructs a new Cluster with the provided dependencies. +func New(cfg ClusterConfig) *Cluster { + return &Cluster{c: cfg} +} + +// UntilClusterStable is part of the migration.Cluster interface. +func (c *Cluster) UntilClusterStable(ctx context.Context, fn func() error) error { + ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return err + } + + for { + if err := fn(); err != nil { + return err + } + curNodes, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return err + } + + if ok, diffs := ns.Identical(curNodes); !ok { + log.Infof(ctx, "%s, retrying", diffs) + ns = curNodes + continue + } + break + } + return nil +} + +// ForEveryNode is part of the migration.Cluster interface. +func (c *Cluster) ForEveryNode( + ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, +) error { + + ns, err := NodesFromNodeLiveness(ctx, c.c.NodeLiveness) + if err != nil { + return err + } + + // We'll want to rate limit outgoing RPCs (limit pulled out of thin air). + qp := quotapool.NewIntPool("every-node", 25) + log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) + grp := ctxgroup.WithContext(ctx) + + for _, node := range ns { + id := node.ID // copy out of the loop variable + alloc, err := qp.Acquire(ctx, 1) + if err != nil { + return err + } + + grp.GoCtx(func(ctx context.Context) error { + defer alloc.Release() + + conn, err := c.c.Dialer.Dial(ctx, id, rpc.DefaultClass) + if err != nil { + return err + } + client := serverpb.NewMigrationClient(conn) + return fn(ctx, client) + }) + } + return grp.Wait() +} + +// IterateRangeDescriptors is part of the migration.Cluster interface. +func (c *Cluster) IterateRangeDescriptors( + ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error, +) error { + if err := c.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Inform the caller that we're starting a fresh attempt to page in + // range descriptors. + init() + + // Iterate through meta2 to pull out all the range descriptors. + return txn.Iterate(ctx, keys.Meta2Prefix, keys.MetaMax, blockSize, + func(rows []kv.KeyValue) error { + descriptors := make([]roachpb.RangeDescriptor, len(rows)) + for i, row := range rows { + if err := row.ValueProto(&descriptors[i]); err != nil { + return errors.Wrapf(err, + "unable to unmarshal range descriptor from %s", + row.Key, + ) + } + } + + // Invoke fn with the current chunk (of size ~blockSize) of + // range descriptors. + if err := fn(descriptors...); err != nil { + return err + } + + return nil + }) + }); err != nil { + return err + } + + return nil +} + +// DB provides exposes the underlying *kv.DB instance. +func (c *Cluster) DB() *kv.DB { + return c.c.DB +} diff --git a/pkg/migration/migrationcluster/helper_test.go b/pkg/migration/migrationcluster/helper_test.go new file mode 100644 index 000000000000..7a3a405e1d4f --- /dev/null +++ b/pkg/migration/migrationcluster/helper_test.go @@ -0,0 +1,215 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrationcluster + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/migration/nodelivenesstest" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "google.golang.org/grpc" +) + +type NoopDialer struct{} + +func (n NoopDialer) Dial( + ctx context.Context, id roachpb.NodeID, class rpc.ConnectionClass, +) (*grpc.ClientConn, error) { + return nil, nil +} + +var _ NodeDialer = NoopDialer{} + +func TestHelperEveryNode(t *testing.T) { + defer leaktest.AfterTest(t) + + ctx := context.Background() + var mu syncutil.Mutex + const numNodes = 3 + + t.Run("with-node-addition", func(t *testing.T) { + // Add a node mid-way through execution. We expect EveryNode to start + // over from scratch and include the newly added node. + tc := nodelivenesstest.New(numNodes) + h := New(ClusterConfig{ + NodeLiveness: tc, + Dialer: NoopDialer{}, + }) + opCount := 0 + err := h.UntilClusterStable(ctx, func() error { + return h.ForEveryNode(ctx, "dummy-op", func( + context.Context, serverpb.MigrationClient, + ) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == numNodes { + tc.AddNewNode() + } + + return nil + }) + }) + if err != nil { + t.Fatal(err) + } + + if exp := numNodes*2 + 1; exp != opCount { + t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) + } + }) + + t.Run("with-node-restart", func(t *testing.T) { + // Restart a node mid-way through execution. We expect EveryNode to + // start over from scratch and include the restarted node. + tc := nodelivenesstest.New(numNodes) + h := New(ClusterConfig{ + NodeLiveness: tc, + Dialer: NoopDialer{}, + }) + opCount := 0 + err := h.UntilClusterStable(ctx, func() error { + return h.ForEveryNode(ctx, "dummy-op", func( + context.Context, serverpb.MigrationClient, + ) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == numNodes { + tc.RestartNode(2) + } + + return nil + }) + }) + if err != nil { + t.Fatal(err) + } + + if exp := numNodes * 2; exp != opCount { + t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) + } + }) + + t.Run("with-node-downNode", func(t *testing.T) { + // Down a node mid-way through execution. We expect EveryNode to error + // out. + const downedNode = 2 + tc := nodelivenesstest.New(numNodes) + h := New(ClusterConfig{ + NodeLiveness: tc, + Dialer: NoopDialer{}, + }) + expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + opCount := 0 + if err := h.UntilClusterStable(ctx, func() error { + return h.ForEveryNode(ctx, "dummy-op", func( + context.Context, serverpb.MigrationClient, + ) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == 1 { + tc.DownNode(downedNode) + } + return nil + }) + }); !testutils.IsError(err, expRe) { + t.Fatalf("expected error %q, got %q", expRe, err) + } + + tc.RestartNode(downedNode) + if err := h.UntilClusterStable(ctx, func() error { + return h.ForEveryNode(ctx, "dummy-op", func( + context.Context, serverpb.MigrationClient, + ) error { + return nil + }) + }); err != nil { + t.Fatal(err) + } + }) +} + +func TestClusterNodes(t *testing.T) { + defer leaktest.AfterTest(t) + + ctx := context.Background() + const numNodes = 3 + + t.Run("retrieves-all", func(t *testing.T) { + nl := nodelivenesstest.New(numNodes) + ns, err := NodesFromNodeLiveness(ctx, nl) + if err != nil { + t.Fatal(err) + } + + if got := len(ns); got != numNodes { + t.Fatalf("expected %d Nodes, got %d", numNodes, got) + } + + for i := range ns { + if exp := roachpb.NodeID(i + 1); exp != ns[i].ID { + t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].ID) + } + if ns[i].Epoch != 1 { + t.Fatalf("expected to find Epoch=1, got %d", ns[i].Epoch) + } + } + }) + + t.Run("ignores-decommissioned", func(t *testing.T) { + nl := nodelivenesstest.New(numNodes) + + const decommissionedNode = 3 + nl.Decommission(decommissionedNode) + + ns, err := NodesFromNodeLiveness(ctx, nl) + if err != nil { + t.Fatal(err) + } + + if got := len(ns); got != numNodes-1 { + t.Fatalf("expected %d Nodes, got %d", numNodes-1, got) + } + + for i := range ns { + if exp := roachpb.NodeID(i + 1); exp != ns[i].ID { + t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].ID) + } + if ns[i].Epoch != 1 { + t.Fatalf("expected to find Epoch=1, got %d", ns[i].Epoch) + } + } + }) + + t.Run("errors-if-down", func(t *testing.T) { + nl := nodelivenesstest.New(numNodes) + const downedNode = 3 + nl.DownNode(downedNode) + + _, err := NodesFromNodeLiveness(ctx, nl) + expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + if !testutils.IsError(err, expRe) { + t.Fatalf("expected error %q, got %q", expRe, err) + } + }) +} diff --git a/pkg/migration/migrationcluster/main_test.go b/pkg/migration/migrationcluster/main_test.go new file mode 100644 index 000000000000..0f87ea997cad --- /dev/null +++ b/pkg/migration/migrationcluster/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrationcluster_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/migration/migrationcluster/nodes.go b/pkg/migration/migrationcluster/nodes.go new file mode 100644 index 000000000000..15b789f90ec1 --- /dev/null +++ b/pkg/migration/migrationcluster/nodes.go @@ -0,0 +1,118 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrationcluster + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// Node captures the relevant bits of each node as it pertains to the migration +// infrastructure. +type Node struct { + ID roachpb.NodeID + Epoch int64 +} + +// Nodes is a collection of node objects. +type Nodes []Node + +// NodesFromNodeLiveness returns the IDs and epochs for all nodes that are +// currently part of the cluster (i.e. they haven't been decommissioned away). +// Migrations have the pre-requisite that all nodes are up and running so that +// we're able to execute all relevant node-level operations on them. If any of +// the nodes are found to be unavailable, an error is returned. +// +// It's important to note that this makes no guarantees about new nodes +// being added to the cluster. It's entirely possible for that to happen +// concurrently with the retrieval of the current set of nodes. Appropriate +// usage of this entails wrapping it under a stabilizing loop, like we do in +// EveryNode. +func NodesFromNodeLiveness(ctx context.Context, nl NodeLiveness) (Nodes, error) { + var ns []Node + ls, err := nl.GetLivenessesFromKV(ctx) + if err != nil { + return nil, err + } + for _, l := range ls { + if l.Membership.Decommissioned() { + continue + } + live, err := nl.IsLive(l.NodeID) + if err != nil { + return nil, err + } + if !live { + return nil, errors.Newf("n%d required, but unavailable", l.NodeID) + } + ns = append(ns, Node{ID: l.NodeID, Epoch: l.Epoch}) + } + return ns, nil +} + +// Identical returns whether or not two lists of Nodes are identical as sets, +// and if not, what changed (in terms of cluster membership operations and epoch +// changes). The textual diffs are only to be used for logging purposes. +func (ns Nodes) Identical(other Nodes) (ok bool, _ []redact.RedactableString) { + a, b := ns, other + + type ent struct { + node Node + count int + epochChanged bool + } + m := map[roachpb.NodeID]ent{} + for _, node := range a { + m[node.ID] = ent{count: 1, node: node, epochChanged: false} + } + for _, node := range b { + e, ok := m[node.ID] + e.count-- + if ok && e.node.Epoch != node.Epoch { + e.epochChanged = true + } + m[node.ID] = e + } + + var diffs []redact.RedactableString + for id, e := range m { + if e.epochChanged { + diffs = append(diffs, redact.Sprintf("n%d's Epoch changed", id)) + } + if e.count > 0 { + diffs = append(diffs, redact.Sprintf("n%d was decommissioned", id)) + } + if e.count < 0 { + diffs = append(diffs, redact.Sprintf("n%d joined the cluster", id)) + } + } + + return len(diffs) == 0, diffs +} + +func (ns Nodes) String() string { + return redact.StringWithoutMarkers(ns) +} + +// SafeFormat implements redact.SafeFormatter. +func (ns Nodes) SafeFormat(s redact.SafePrinter, _ rune) { + s.SafeString("n{") + if len(ns) > 0 { + s.Printf("%d", ns[0].ID) + for _, node := range ns[1:] { + s.Printf(",%d", node.ID) + } + } + s.SafeString("}") +} diff --git a/pkg/migration/util_test.go b/pkg/migration/migrationcluster/nodes_test.go similarity index 78% rename from pkg/migration/util_test.go rename to pkg/migration/migrationcluster/nodes_test.go index 98d201da3935..dbd2713b8796 100644 --- a/pkg/migration/util_test.go +++ b/pkg/migration/migrationcluster/nodes_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package migration +package migrationcluster import ( "sort" @@ -23,16 +23,16 @@ import ( func TestNodesString(t *testing.T) { defer leaktest.AfterTest(t) - ns := func(ids ...int) nodes { - var nodes []node + ns := func(ids ...int) Nodes { + var nodes []Node for _, id := range ids { - nodes = append(nodes, node{id: roachpb.NodeID(id)}) + nodes = append(nodes, Node{ID: roachpb.NodeID(id)}) } return nodes } var tests = []struct { - ns nodes + ns Nodes exp string }{ {ns(), "n{}"}, @@ -51,8 +51,8 @@ func TestNodesString(t *testing.T) { func TestNodesIdentical(t *testing.T) { defer leaktest.AfterTest(t) - list := func(nodes ...string) nodes { // takes in strings of the form "id@epoch" - var ns []node + list := func(nodes ...string) Nodes { // takes in strings of the form "ID@Epoch" + var ns []Node for _, n := range nodes { parts := strings.Split(n, "@") id, err := strconv.Atoi(parts[0]) @@ -63,29 +63,29 @@ func TestNodesIdentical(t *testing.T) { if err != nil { t.Fatal(err) } - ns = append(ns, node{id: roachpb.NodeID(id), epoch: int64(epoch)}) + ns = append(ns, Node{ID: roachpb.NodeID(id), Epoch: int64(epoch)}) } return ns } var tests = []struct { - a, b nodes + a, b Nodes expOk bool expDiff string }{ {list(), list(), true, ""}, {list("1@2"), list("1@2"), true, ""}, {list("2@1", "1@2"), list("1@2", "2@1"), true, ""}, - {list("1@2"), list("1@3"), false, "n1's epoch changed"}, + {list("1@2"), list("1@3"), false, "n1's Epoch changed"}, {list("1@2"), list("1@2", "2@1"), false, "n2 joined the cluster"}, {list("1@1", "2@1"), list("1@1"), false, "n2 was decommissioned"}, - {list("3@2", "4@6"), list("4@8", "5@2"), false, "n3 was decommissioned, n4's epoch changed, n5 joined the cluster"}, + {list("3@2", "4@6"), list("4@8", "5@2"), false, "n3 was decommissioned, n4's Epoch changed, n5 joined the cluster"}, } for _, test := range tests { - ok, diffs := test.a.identical(test.b) + ok, diffs := test.a.Identical(test.b) if ok != test.expOk { - t.Fatalf("expected identical = %t, got %t", test.expOk, ok) + t.Fatalf("expected Identical = %t, got %t", test.expOk, ok) } strDiffs := make([]string, len(diffs)) diff --git a/pkg/migration/migrationmanager/BUILD.bazel b/pkg/migration/migrationmanager/BUILD.bazel new file mode 100644 index 000000000000..988ba0b1bc44 --- /dev/null +++ b/pkg/migration/migrationmanager/BUILD.bazel @@ -0,0 +1,46 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "migrationmanager", + srcs = ["manager.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrationmanager", + visibility = ["//visibility:public"], + deps = [ + "//pkg/clusterversion", + "//pkg/kv", + "//pkg/migration", + "//pkg/migration/migrationcluster", + "//pkg/migration/migrations", + "//pkg/server/serverpb", + "//pkg/util/log", + "@com_github_cockroachdb_logtags//:logtags", + ], +) + +go_test( + name = "migrationmanager_test", + srcs = [ + "main_test.go", + "manager_external_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/clusterversion", + "//pkg/kv/kvserver/batcheval", + "//pkg/kv/kvserver/liveness", + "//pkg/migration", + "//pkg/migration/migrations", + "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/settings/cluster", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/migration/migrationmanager/main_test.go b/pkg/migration/migrationmanager/main_test.go new file mode 100644 index 000000000000..a99ad25333fb --- /dev/null +++ b/pkg/migration/migrationmanager/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrationmanager_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/migration/manager.go b/pkg/migration/migrationmanager/manager.go similarity index 72% rename from pkg/migration/manager.go rename to pkg/migration/migrationmanager/manager.go index adf989710c98..ce00e985d451 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/migrationmanager/manager.go @@ -8,18 +8,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -// Package migration captures the facilities needed to define and execute -// migrations for a crdb cluster. These migrations can be arbitrarily long -// running, are free to send out arbitrary requests cluster wide, change -// internal DB state, and much more. They're typically reserved for crdb -// internal operations and state. Each migration is idempotent in nature, is -// associated with a specific cluster version, and executed when the cluster -// version is made activate on every node in the cluster. -// -// Examples of migrations that apply would be migrations to move all raft state -// from one storage engine to another, or purging all usage of the replicated -// truncated state in KV. A "sister" package of interest is pkg/sqlmigrations. -package migration +// Package migrationmanager provides an implementation of migration.Manager +// for use on kv nodes. +package migrationmanager import ( "context" @@ -27,11 +18,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrationcluster" + "github.com/cockroachdb/cockroach/pkg/migration/migrations" "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/logtags" ) @@ -39,30 +29,21 @@ import ( // Manager is the instance responsible for executing migrations across the // cluster. type Manager struct { - dialer *nodedialer.Dialer - nl nodeLiveness - executor *sql.InternalExecutor - db *kv.DB -} - -// nodeLiveness is the subset of the interface satisfied by CRDB's node liveness -// component that the migration manager relies upon. -type nodeLiveness interface { - GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) - IsLive(roachpb.NodeID) (bool, error) + c migration.Cluster } // NewManager constructs a new Manager. // // TODO(irfansharif): We'll need to eventually plumb in on a lease manager here. func NewManager( - dialer *nodedialer.Dialer, nl nodeLiveness, executor *sql.InternalExecutor, db *kv.DB, + dialer migrationcluster.NodeDialer, nl migrationcluster.NodeLiveness, db *kv.DB, ) *Manager { return &Manager{ - dialer: dialer, - executor: executor, - db: db, - nl: nl, + c: migrationcluster.New(migrationcluster.ClusterConfig{ + NodeLiveness: nl, + Dialer: dialer, + DB: db, + }), } } @@ -99,15 +80,9 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe log.Infof(ctx, "migrating cluster from %s to %s (stepping through %s)", from, to, clusterVersions) for _, clusterVersion := range clusterVersions { - cluster := newCluster(m.nl, m.dialer, m.executor, m.db) - h := newHelper(cluster, clusterVersion) - - // First run the actual migration (if any). The cluster version bump - // will be rolled out afterwards. This lets us provide the invariant - // that if a version=V is active, all data is guaranteed to have - // migrated. - if migration, ok := registry[clusterVersion]; ok { - if err := migration.Run(ctx, h); err != nil { + // First, run the actual migration if any. + if mig, ok := migrations.GetMigration(clusterVersion); ok { + if err := mig.(*migration.KVMigration).Run(ctx, clusterVersion, m.c); err != nil { return err } } @@ -122,7 +97,7 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // // For each intermediate version, we'll need to first bump the fence // version before bumping the "real" one. Doing so allows us to provide - // the invariant that whenever a cluster version is active, all nodes in + // the invariant that whenever a cluster version is active, all Nodes in // the cluster (including ones added concurrently during version // upgrades) are running binaries that know about the version. @@ -156,17 +131,17 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // [1]: See ReplicaState.Version. // [2]: See Replica.executeWriteBatch, specifically how proposals with the // Migrate request are handled downstream of raft. - // [3]: See PurgeOutdatedReplicas from the Migration service. + // [3]: See PurgeOutdatedReplicas from the KVMigration service. { // The migrations infrastructure makes use of internal fence // versions when stepping through consecutive versions. It's // instructive to walk through how we expect a version migration // from v21.1 to v21.2 to take place, and how we behave in the - // presence of new v21.1 or v21.2 nodes being added to the cluster. + // presence of new v21.1 or v21.2 Nodes being added to the cluster. // - // - All nodes are running v21.1 - // - All nodes are rolled into v21.2 binaries, but with active + // - All Nodes are running v21.1 + // - All Nodes are rolled into v21.2 binaries, but with active // cluster version still as v21.1 // - The first version bump will be into v21.2-1(fence), see the // migration manager above for where that happens @@ -176,11 +151,11 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // - A new node is added to the cluster, but running binary v21.1 // - We try bumping the cluster gates to v21.2-1(fence) // - // If the v21.1 nodes manages to sneak in before the version bump, + // If the v21.1 Nodes manages to sneak in before the version bump, // it's fine as the version bump is a no-op one (all fence versions // are). Any subsequent bumps (including the "actual" one bumping to // v21.2) will fail during the validation step where we'll first - // check to see that all nodes are running v21.2 binaries. + // check to see that all Nodes are running v21.2 binaries. // // If the v21.1 node is only added after v21.2-1(fence) is active, // it won't be able to actually join the cluster (it'll be prevented @@ -191,11 +166,11 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // can join the cluster will run a release that support the fence // version, and by design also supports the actual version (which is // the direct successor of the fence). - fenceVersion := fenceVersionFor(ctx, clusterVersion) + fenceVersion := migration.FenceVersionFor(ctx, clusterVersion) req := &serverpb.BumpClusterVersionRequest{ClusterVersion: &fenceVersion} op := fmt.Sprintf("bump-cluster-version=%s", req.ClusterVersion.PrettyPrint()) - if err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + if err := m.c.UntilClusterStable(ctx, func() error { + return m.c.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { _, err := client.BumpClusterVersion(ctx, req) return err }) @@ -208,8 +183,8 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // cluster version bump, cluster-wide. req := &serverpb.ValidateTargetClusterVersionRequest{ClusterVersion: &clusterVersion} op := fmt.Sprintf("validate-cluster-version=%s", req.ClusterVersion.PrettyPrint()) - if err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + if err := m.c.UntilClusterStable(ctx, func() error { + return m.c.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { _, err := client.ValidateTargetClusterVersion(ctx, req) return err }) @@ -221,8 +196,8 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // Finally, bump the real version cluster-wide. req := &serverpb.BumpClusterVersionRequest{ClusterVersion: &clusterVersion} op := fmt.Sprintf("bump-cluster-version=%s", req.ClusterVersion.PrettyPrint()) - if err := h.UntilClusterStable(ctx, func() error { - return h.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + if err := m.c.UntilClusterStable(ctx, func() error { + return m.c.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { _, err := client.BumpClusterVersion(ctx, req) return err }) diff --git a/pkg/migration/migrationmanager/manager_external_test.go b/pkg/migration/migrationmanager/manager_external_test.go new file mode 100644 index 000000000000..e1b3aa802414 --- /dev/null +++ b/pkg/migration/migrationmanager/manager_external_test.go @@ -0,0 +1,117 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrationmanager_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrations" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestMigrateUpdatesReplicaVersion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // We're going to be migrating from startCV to endCV. + startCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 41}} + endCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 42}} + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV.Version, startCV.Version, false), + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + BinaryVersionOverride: startCV.Version, + DisableAutomaticVersionUpgrade: 1, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + // We'll take a specific range, still running at startCV, generate an + // outgoing snapshot and then suspend it temporarily. We'll then bump the + // cluster version on all the stores, as part of the migration process, and + // then resume the snapshot process. Seeing as how the snapshot was + // generated pre-version bump, off of a version of the range that hadn't + // observed the migration corresponding to the latest cluster version, we + // expect the store to reject it. + + key := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(key)) + desc, err := tc.LookupRange(key) + require.NoError(t, err) + rangeID := desc.RangeID + + // Enqueue the replica in the raftsnapshot queue. We use SucceedsSoon + // because it may take a bit for raft to figure out that we need to be + // generating a snapshot. + store := tc.GetFirstStoreFromServer(t, 0) + repl, err := store.GetReplica(rangeID) + require.NoError(t, err) + + if got := repl.Version(); got != startCV.Version { + t.Fatalf("got replica version %s, expected %s", got, startCV.Version) + } + + // Register the below raft migration. + unregisterKVMigration := batcheval.TestingRegisterMigrationInterceptor(endCV.Version, func() {}) + defer unregisterKVMigration() + + // Register the top-level migration. + unregister := migrations.TestingRegisterMigrationInterceptor(endCV, func( + ctx context.Context, cv clusterversion.ClusterVersion, c migration.Cluster, + ) error { + return c.DB().Migrate(ctx, desc.StartKey, desc.EndKey, cv.Version) + }) + defer unregister() + + // Wait until all nodes have are considered live. + nl := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness) + testutils.SucceedsSoon(t, func() error { + for _, s := range tc.Servers { + id := s.NodeID() + live, err := nl.IsLive(id) + if err != nil { + return err + } + if !live { + return errors.Newf("n%s not live yet", id) + } + } + return nil + }) + + // Kick off the migration process. + _, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String()) + require.NoError(t, err) + + if got := repl.Version(); got != endCV.Version { + t.Fatalf("got replica version %s, expected %s", got, endCV.Version) + } +} diff --git a/pkg/migration/migrations.go b/pkg/migration/migrations.go deleted file mode 100644 index 25ea7af8c3a3..000000000000 --- a/pkg/migration/migrations.go +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package migration - -import ( - "bytes" - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/logtags" -) - -// registry defines the global mapping between a cluster version and the -// associated migration. The migration is only executed after a cluster-wide -// bump of the corresponding version gate. -var registry = make(map[clusterversion.ClusterVersion]Migration) - -func init() { - register(clusterversion.TruncatedAndRangeAppliedStateMigration, truncatedStateMigration, - "use unreplicated TruncatedState and RangeAppliedState for all ranges") - register(clusterversion.PostTruncatedAndRangeAppliedStateMigration, postTruncatedStateMigration, - "purge all replicas using the replicated TruncatedState") -} - -// Migration defines a program to be executed once every node in the cluster is -// (a) running a specific binary version, and (b) has completed all prior -// migrations. -// -// Each migration is associated with a specific internal cluster version and is -// idempotent in nature. When setting the cluster version (via `SET CLUSTER -// SETTING version`), the manager process determines the set of migrations -// needed to bridge the gap between the current active cluster version, and the -// target one. See [1] for where that happens. -// -// To introduce a migration, start by adding version key to pkg/clusterversion -// and introducing a corresponding internal cluster version for it. See [2] for -// more details. Following that, define a Migration in this package and add it -// to the registry. Be sure to key it in with the new cluster version we just -// added. During cluster upgrades, once the operator is able to set a cluster -// version setting that's past the version that was introduced (typically the -// major release version the migration was introduced in), the manager will -// execute the defined migration before letting the upgrade finalize. -// -// If the migration requires below-Raft level changes ([3] is one example), -// you'll need to add a version switch and the relevant KV-level migration in -// [4]. See IterateRangeDescriptors and the Migrate KV request for more details. -// -// [1]: `(*Manager).Migrate` -// [2]: pkg/clusterversion/cockroach_versions.go -// [3]: truncatedStateMigration -// [4]: pkg/kv/kvserver/batch_eval/cmd_migrate.go -// -// TODO(irfansharif): [3] and [4] are currently referring to what was prototyped -// in #57445. Once that makes its way into master, this TODO can be removed. -type Migration struct { - cv clusterversion.ClusterVersion - fn migrationFn - desc string -} - -type migrationFn func(context.Context, *Helper) error - -// Run kickstarts the actual migration process. It's responsible for recording -// the ongoing status of the migration into a system table. -// -// TODO(irfansharif): Introduce a `system.migrations` table, and populate it here. -func (m *Migration) Run(ctx context.Context, h *Helper) (err error) { - ctx = logtags.AddTag(ctx, "migration", h.ClusterVersion()) - - if err := m.fn(ctx, h); err != nil { - return err - } - - return nil -} - -// defaultPageSize controls how many range descriptors are paged in by default -// when iterating through all ranges in a cluster during any given migration. We -// pulled this number out of thin air(-ish). Let's consider a cluster with 50k -// ranges, with each range taking ~200ms. We're being somewhat conservative with -// the duration, but in a wide-area cluster with large hops between the manager -// and the replicas, it could be true. Here's how long it'll take for various -// block sizes: -// -// page size of 1 ~ 2h 46m -// page size of 50 ~ 3m 20s -// page size of 200 ~ 50s -const defaultPageSize = 200 - -func truncatedStateMigration(ctx context.Context, h *Helper) error { - return h.UntilClusterStable(ctx, func() error { - var batchIdx, numMigratedRanges int - init := func() { batchIdx, numMigratedRanges = 1, 0 } - if err := h.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error { - for _, desc := range descriptors { - // NB: This is a bit of a wart. We want to reach the first range, - // but we can't address the (local) StartKey. However, keys.LocalMax - // is on r1, so we'll just use that instead to target r1. - start, end := desc.StartKey, desc.EndKey - if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 { - start, _ = keys.Addr(keys.LocalMax) - } - if err := h.DB().Migrate(ctx, start, end, h.ClusterVersion().Version); err != nil { - return err - } - } - - // TODO(irfansharif): Instead of logging this to the debug log, we - // should be leveraging our jobs infrastructure for observability. - // See #58183. - numMigratedRanges += len(descriptors) - log.Infof(ctx, "[batch %d/??] migrated %d ranges", batchIdx, numMigratedRanges) - batchIdx++ - - return nil - }); err != nil { - return err - } - - log.Infof(ctx, "[batch %d/%d] migrated %d ranges", batchIdx, batchIdx, numMigratedRanges) - - // Make sure that all stores have synced. Given we're a below-raft - // migrations, this ensures that the applied state is flushed to disk. - req := &serverpb.SyncAllEnginesRequest{} - return h.ForEveryNode(ctx, "sync-engines", func(ctx context.Context, client serverpb.MigrationClient) error { - _, err := client.SyncAllEngines(ctx, req) - return err - }) - }) -} - -func postTruncatedStateMigration(ctx context.Context, h *Helper) error { - // Purge all replicas that haven't been migrated to use the unreplicated - // truncated state and the range applied state. We're sure to also durably - // persist any changes made in the same closure. Doing so in separate - // UntilClusterStable closure would run the (small) risk that a node might - // have GC-ed older replicas, restarted without syncing (thus unapplying the - // GC), and flushing all engines after. - truncStateVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration) - op := fmt.Sprintf("purge-outdated-replicas-and-sync=%s", truncStateVersion) - err := h.UntilClusterStable(ctx, func() error { - err := h.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { - preq := &serverpb.PurgeOutdatedReplicasRequest{Version: &truncStateVersion} - _, err := client.PurgeOutdatedReplicas(ctx, preq) - if err != nil { - return err - } - - freq := &serverpb.SyncAllEnginesRequest{} - _, err = client.SyncAllEngines(ctx, freq) - return err - }) - return err - }) - - return err -} - -// TestingRegisterMigrationInterceptor is used in tests to register an -// interceptor for a version migration. -// -// TODO(irfansharif): This is a gross anti-pattern, we're letting tests mutate -// global state. This should instead be a testing knob that the migration -// manager checks when search for attached migrations. -func TestingRegisterMigrationInterceptor( - cv clusterversion.ClusterVersion, fn migrationFn, -) (unregister func()) { - registry[cv] = Migration{cv: cv, fn: fn} - return func() { delete(registry, cv) } -} diff --git a/pkg/migration/migrations/BUILD.bazel b/pkg/migration/migrations/BUILD.bazel new file mode 100644 index 000000000000..13aac289d90e --- /dev/null +++ b/pkg/migration/migrations/BUILD.bazel @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "migrations", + srcs = [ + "migrations.go", + "truncated_state.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/migration/migrations", + visibility = ["//visibility:public"], + deps = [ + "//pkg/clusterversion", + "//pkg/keys", + "//pkg/migration", + "//pkg/roachpb", + "//pkg/server/serverpb", + "//pkg/util/log", + ], +) + +go_test( + name = "migrations_test", + srcs = [ + "main_test.go", + "truncated_state_external_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/clusterversion", + "//pkg/kv/kvserver", + "//pkg/kv/kvserver/stateloader", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/migration/main_test.go b/pkg/migration/migrations/main_test.go similarity index 97% rename from pkg/migration/main_test.go rename to pkg/migration/migrations/main_test.go index cdc3f7742a5a..5a38c8b0af33 100644 --- a/pkg/migration/main_test.go +++ b/pkg/migration/migrations/main_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package migration_test +package migrations_test import ( "os" diff --git a/pkg/migration/migrations/migrations.go b/pkg/migration/migrations/migrations.go new file mode 100644 index 000000000000..353b6e5726dd --- /dev/null +++ b/pkg/migration/migrations/migrations.go @@ -0,0 +1,81 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package migrations contains the implementation of migrations. It is imported +// by the server library. +// +// This package registers the migrations with the migration package. +package migrations + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// registry defines the global mapping between a cluster version and the +// associated migration. The migration is only executed after a cluster-wide +// bump of the corresponding version gate. +var registry = make(map[clusterversion.ClusterVersion]migration.Migration) + +// register is a short hand to register a given migration within the global +// registry. +func register(key clusterversion.Key, fn migration.KVMigrationFn, desc string) { + cv := clusterversion.ClusterVersion{Version: clusterversion.ByKey(key)} + if _, ok := registry[cv]; ok { + log.Fatalf(context.Background(), "doubly registering migration for %s", cv) + } + registry[cv] = migration.NewKVMigration(desc, cv, fn) +} + +// GetMigration returns the migration corresponding to this version if +// one exists. +func GetMigration(key clusterversion.ClusterVersion) (migration.Migration, bool) { + m, ok := registry[key] + return m, ok +} + +// TestingRegisterMigrationInterceptor is used in tests to register an +// interceptor for a version migration. +// +// TODO(irfansharif): This is a gross anti-pattern, we're letting tests mutate +// global state. This should instead be a testing knob that the migration +// manager checks when search for attached migrations. +func TestingRegisterMigrationInterceptor( + cv clusterversion.ClusterVersion, fn migration.KVMigrationFn, +) (unregister func()) { + registry[cv] = migration.NewKVMigration("", cv, fn) + return func() { delete(registry, cv) } +} + +var kvMigrations = []struct { + cv clusterversion.Key + fn migration.KVMigrationFn + description string +}{ + { + clusterversion.TruncatedAndRangeAppliedStateMigration, + truncatedStateMigration, + "use unreplicated TruncatedState and RangeAppliedState for all ranges", + }, + { + clusterversion.PostTruncatedAndRangeAppliedStateMigration, + postTruncatedStateMigration, + "purge all replicas using the replicated TruncatedState", + }, +} + +func init() { + for _, m := range kvMigrations { + register(m.cv, m.fn, m.description) + } +} diff --git a/pkg/migration/migrations/truncated_state.go b/pkg/migration/migrations/truncated_state.go new file mode 100644 index 000000000000..3129643ea6b7 --- /dev/null +++ b/pkg/migration/migrations/truncated_state.go @@ -0,0 +1,94 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migrations + +import ( + "bytes" + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// defaultPageSize controls how many ranges are paged in by default when +// iterating through all ranges in a cluster during any given migration. We +// pulled this number out of thin air(-ish). Let's consider a cluster with 50k +// ranges, with each range taking ~200ms. We're being somewhat conservative with +// the duration, but in a wide-area cluster with large hops between the manager +// and the replicas, it could be true. Here's how long it'll take for various +// block sizes: +// +// page size of 1 ~ 2h 46m +// page size of 50 ~ 3m 20s +// page size of 200 ~ 50s +const defaultPageSize = 200 + +func truncatedStateMigration( + ctx context.Context, cv clusterversion.ClusterVersion, h migration.Cluster, +) error { + var batchIdx, numMigratedRanges int + init := func() { batchIdx, numMigratedRanges = 1, 0 } + if err := h.IterateRangeDescriptors(ctx, defaultPageSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // NB: This is a bit of a wart. We want to reach the first range, + // but we can't address the (local) StartKey. However, keys.LocalMax + // is on r1, so we'll just use that instead to target r1. + start, end := desc.StartKey, desc.EndKey + if bytes.Compare(desc.StartKey, keys.LocalMax) < 0 { + start, _ = keys.Addr(keys.LocalMax) + } + if err := h.DB().Migrate(ctx, start, end, cv.Version); err != nil { + return err + } + } + + // TODO(irfansharif): Instead of logging this to the debug log, we + // should insert these into a `system.migrations` table for external + // observability. + numMigratedRanges += len(descriptors) + log.Infof(ctx, "[batch %d/??] migrated %d ranges", batchIdx, numMigratedRanges) + batchIdx++ + + return nil + }); err != nil { + return err + } + + log.Infof(ctx, "[batch %d/%d] migrated %d ranges", batchIdx, batchIdx, numMigratedRanges) + + // Make sure that all stores have synced. Given we're a below-raft + // migrations, this ensures that the applied state is flushed to disk. + req := &serverpb.SyncAllEnginesRequest{} + op := "flush-stores" + return h.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.SyncAllEngines(ctx, req) + return err + }) +} + +func postTruncatedStateMigration( + ctx context.Context, cv clusterversion.ClusterVersion, h migration.Cluster, +) error { + // Purge all replicas that haven't been migrated to use the unreplicated + // truncated state and the range applied state. + truncStateVersion := clusterversion.ByKey(clusterversion.TruncatedAndRangeAppliedStateMigration) + req := &serverpb.PurgeOutdatedReplicasRequest{Version: &truncStateVersion} + op := fmt.Sprintf("purge-outdated-replicas=%s", req.Version) + return h.ForEveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { + _, err := client.PurgeOutdatedReplicas(ctx, req) + return err + }) +} diff --git a/pkg/migration/migrations_test.go b/pkg/migration/migrations/truncated_state_external_test.go similarity index 58% rename from pkg/migration/migrations_test.go rename to pkg/migration/migrations/truncated_state_external_test.go index d099ed1376eb..d27943b86032 100644 --- a/pkg/migration/migrations_test.go +++ b/pkg/migration/migrations/truncated_state_external_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package migration_test +package migrations_test import ( "context" @@ -18,17 +18,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" - "github.com/cockroachdb/cockroach/pkg/migration" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -67,7 +60,7 @@ func TestTruncatedStateMigration(t *testing.T) { for i := 0; i < tc.NumServers(); i++ { err := tc.Server(i).GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { var err error - s.VisitReplicas(func(repl *kvserver.Replica) (wantMore bool) { + s.VisitReplicas(func(repl *kvserver.Replica) bool { err = f(repl) return err == nil }) @@ -140,86 +133,3 @@ func TestTruncatedStateMigration(t *testing.T) { }) } } - -func TestMigrateUpdatesReplicaVersion(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // We're going to be migrating from startCV to endCV. - startCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 41}} - endCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 42}} - - ctx := context.Background() - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettingsWithVersions(endCV.Version, startCV.Version, false), - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - BinaryVersionOverride: startCV.Version, - DisableAutomaticVersionUpgrade: 1, - }, - }, - }, - }) - defer tc.Stopper().Stop(ctx) - - // We'll take a specific range, still running at startCV, generate an - // outgoing snapshot and then suspend it temporarily. We'll then bump the - // cluster version on all the stores, as part of the migration process, and - // then resume the snapshot process. Seeing as how the snapshot was - // generated pre-version bump, off of a version of the range that hadn't - // observed the migration corresponding to the latest cluster version, we - // expect the store to reject it. - - key := tc.ScratchRange(t) - require.NoError(t, tc.WaitForSplitAndInitialization(key)) - desc, err := tc.LookupRange(key) - require.NoError(t, err) - rangeID := desc.RangeID - - // Enqueue the replica in the raftsnapshot queue. We use SucceedsSoon - // because it may take a bit for raft to figure out that we need to be - // generating a snapshot. - store := tc.GetFirstStoreFromServer(t, 0) - repl, err := store.GetReplica(rangeID) - require.NoError(t, err) - - if got := repl.Version(); got != startCV.Version { - t.Fatalf("got replica version %s, expected %s", got, startCV.Version) - } - - // Register the below raft migration. - unregisterKVMigration := batcheval.TestingRegisterMigrationInterceptor(endCV.Version, func() {}) - defer unregisterKVMigration() - - // Register the top-level migration. - unregister := migration.TestingRegisterMigrationInterceptor(endCV, func(ctx context.Context, h *migration.Helper) error { - return h.DB().Migrate(ctx, desc.StartKey, desc.EndKey, h.ClusterVersion().Version) - }) - defer unregister() - - // Wait until all nodes have are considered live. - nl := tc.Server(0).NodeLiveness().(*liveness.NodeLiveness) - testutils.SucceedsSoon(t, func() error { - for _, s := range tc.Servers { - id := s.NodeID() - live, err := nl.IsLive(id) - if err != nil { - return err - } - if !live { - return errors.Newf("n%s not live yet", id) - } - } - return nil - }) - - // Kick off the migration process. - _, err = tc.Conns[0].ExecContext(ctx, `SET CLUSTER SETTING version = $1`, endCV.String()) - require.NoError(t, err) - - if got := repl.Version(); got != endCV.Version { - t.Fatalf("got replica version %s, expected %s", got, endCV.Version) - } -} diff --git a/pkg/migration/nodelivenesstest/BUILD.bazel b/pkg/migration/nodelivenesstest/BUILD.bazel new file mode 100644 index 000000000000..027901493e8c --- /dev/null +++ b/pkg/migration/nodelivenesstest/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "nodelivenesstest", + srcs = ["test_node_liveness.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/migration/nodelivenesstest", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/roachpb", + ], +) diff --git a/pkg/migration/nodelivenesstest/test_node_liveness.go b/pkg/migration/nodelivenesstest/test_node_liveness.go new file mode 100644 index 000000000000..e930ad1d83e1 --- /dev/null +++ b/pkg/migration/nodelivenesstest/test_node_liveness.go @@ -0,0 +1,96 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package nodelivenesstest provides a mock implementation of NodeLiveness +// to facilitate testing of migration infrastructure. +package nodelivenesstest + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// NodeLiveness is a testing-only implementation of the NodeLiveness. It +// lets tests mock out restarting, killing, decommissioning and adding Nodes to +// the cluster. +type NodeLiveness struct { + ls []livenesspb.Liveness + dead map[roachpb.NodeID]struct{} +} + +// New constructs a new NodeLiveness with the specified number of nodes. +func New(numNodes int) *NodeLiveness { + nl := &NodeLiveness{ + ls: make([]livenesspb.Liveness, numNodes), + dead: make(map[roachpb.NodeID]struct{}), + } + for i := 0; i < numNodes; i++ { + nl.ls[i] = livenesspb.Liveness{ + NodeID: roachpb.NodeID(i + 1), Epoch: 1, + Membership: livenesspb.MembershipStatus_ACTIVE, + } + } + return nl +} + +// GetLivenessesFromKV implements the NodeLiveness interface. +func (t *NodeLiveness) GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) { + return t.ls, nil +} + +// IsLive implements the NodeLiveness interface. +func (t *NodeLiveness) IsLive(id roachpb.NodeID) (bool, error) { + _, dead := t.dead[id] + return !dead, nil +} + +// Decommission marks the specified node as decommissioned. +func (t *NodeLiveness) Decommission(id roachpb.NodeID) { + for i := range t.ls { + if t.ls[i].NodeID == id { + t.ls[i].Membership = livenesspb.MembershipStatus_DECOMMISSIONED + break + } + } +} + +// AddNewNode adds a new node with an ID greater than all other nodes. +func (t *NodeLiveness) AddNewNode() { + t.AddNode(roachpb.NodeID(len(t.ls) + 1)) +} + +// AddNode adds a new node with the specified ID. +func (t *NodeLiveness) AddNode(id roachpb.NodeID) { + t.ls = append(t.ls, livenesspb.Liveness{ + NodeID: id, + Epoch: 1, + Membership: livenesspb.MembershipStatus_ACTIVE, + }) +} + +// DownNode marks a given node as down. +func (t *NodeLiveness) DownNode(id roachpb.NodeID) { + t.dead[id] = struct{}{} +} + +// RestartNode increments the epoch for a given node and marks it as +// alive. +func (t *NodeLiveness) RestartNode(id roachpb.NodeID) { + for i := range t.ls { + if t.ls[i].NodeID == id { + t.ls[i].Epoch++ + break + } + } + + delete(t.dead, id) +} diff --git a/pkg/migration/util.go b/pkg/migration/util.go deleted file mode 100644 index 0814456b8c1e..000000000000 --- a/pkg/migration/util.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package migration - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/redact" -) - -// node captures the relevant bits of each node as it pertains to the migration -// infrastructure. -type node struct { - id roachpb.NodeID - epoch int64 -} - -// nodes is a collection of node objects. -type nodes []node - -// identical returns whether or not two lists of nodes are identical as sets, -// and if not, what changed (in terms of cluster membership operations and epoch -// changes). The textual diffs are only to be used for logging purposes. -func (ns nodes) identical(other nodes) (ok bool, _ []redact.RedactableString) { - a, b := ns, other - - type ent struct { - node node - count int - epochChanged bool - } - m := map[roachpb.NodeID]ent{} - for _, node := range a { - m[node.id] = ent{count: 1, node: node, epochChanged: false} - } - for _, node := range b { - e, ok := m[node.id] - e.count-- - if ok && e.node.epoch != node.epoch { - e.epochChanged = true - } - m[node.id] = e - } - - var diffs []redact.RedactableString - for id, e := range m { - if e.epochChanged { - diffs = append(diffs, redact.Sprintf("n%d's epoch changed", id)) - } - if e.count > 0 { - diffs = append(diffs, redact.Sprintf("n%d was decommissioned", id)) - } - if e.count < 0 { - diffs = append(diffs, redact.Sprintf("n%d joined the cluster", id)) - } - } - - return len(diffs) == 0, diffs -} - -func (ns nodes) String() string { - return redact.StringWithoutMarkers(ns) -} - -// SafeFormat implements redact.SafeFormatter. -func (ns nodes) SafeFormat(s redact.SafePrinter, _ rune) { - s.SafeString("n{") - if len(ns) > 0 { - s.Printf("%d", ns[0].id) - for _, node := range ns[1:] { - s.Printf(",%d", node.id) - } - } - s.SafeString("}") -} - -// fenceVersionFor constructs the appropriate "fence version" for the given -// cluster version. Fence versions allow the migrations infrastructure to safely -// step through consecutive cluster versions in the presence of nodes (running -// any binary version) being added to the cluster. See the migration manager -// above for intended usage. -// -// Fence versions (and the migrations infrastructure entirely) were introduced -// in the 21.1 release cycle. In the same release cycle, we introduced the -// invariant that new user-defined versions (users being crdb engineers) must -// always have even-numbered Internal versions, thus reserving the odd numbers -// to slot in fence versions for each cluster version. See top-level -// documentation in pkg/clusterversion for more details. -func fenceVersionFor( - ctx context.Context, cv clusterversion.ClusterVersion, -) clusterversion.ClusterVersion { - if (cv.Internal % 2) != 0 { - log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version) - } - - // We'll pick the odd internal version preceding the cluster version, - // slotting ourselves right before it. - fenceCV := cv - fenceCV.Internal-- - return fenceCV -} - -// register is a short hand to register a given migration within the global -// registry. -func register(key clusterversion.Key, fn migrationFn, desc string) { - cv := clusterversion.ClusterVersion{Version: clusterversion.ByKey(key)} - if _, ok := registry[cv]; ok { - log.Fatalf(context.Background(), "doubly registering migration for %s", cv) - } - registry[cv] = Migration{cv: cv, fn: fn, desc: desc} -} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 8a780fb7fe3f..c5a537a98b59 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -75,7 +75,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptprovider", "//pkg/kv/kvserver/protectedts/ptreconcile", "//pkg/kv/kvserver/reports", - "//pkg/migration", + "//pkg/migration/migrationmanager", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c34b77b4889d..a7daa200810e 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -32,7 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" - "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/migration/migrationmanager" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -625,10 +625,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // We only need to attach a version upgrade hook if we're the system // tenant. Regular tenants are disallowed from changing cluster // versions. - migrationMgr := migration.NewManager( + migrationMgr := migrationmanager.NewManager( cfg.nodeDialer, nodeLiveness, - cfg.circularInternalExecutor, cfg.db, ) execCfg.VersionUpgradeHook = func(ctx context.Context, from, to clusterversion.ClusterVersion) error {