From 9e5a731cc9c60b891b512189e72a2ff92440ba02 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 09:59:08 -0500 Subject: [PATCH 01/11] server: improve a miscellaneous comment Release note: None --- pkg/server/init.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/init.go b/pkg/server/init.go index ccb08e278862..e1796b8d2616 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -541,7 +541,7 @@ func (s *initServer) initializeFirstStoreAfterJoin( type initServerCfg struct { advertiseAddr string binaryMinSupportedVersion roachpb.Version - binaryVersion roachpb.Version // This is what's used for bootstrap. + binaryVersion roachpb.Version // the version used during bootstrap defaultSystemZoneConfig zonepb.ZoneConfig defaultZoneConfig zonepb.ZoneConfig From 5a53b003f9a019f8cab918678ed2dde3a2ec844a Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 10:00:11 -0500 Subject: [PATCH 02/11] server: annotate migration RPC ctx with rpc name Makes for better logging. Release note: None --- pkg/server/migration.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/server/migration.go b/pkg/server/migration.go index d5f2252dab5b..76765bac3db6 100644 --- a/pkg/server/migration.go +++ b/pkg/server/migration.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" ) @@ -39,6 +40,10 @@ var _ serverpb.MigrationServer = &migrationServer{} func (m *migrationServer) ValidateTargetClusterVersion( ctx context.Context, req *serverpb.ValidateTargetClusterVersionRequest, ) (*serverpb.ValidateTargetClusterVersionResponse, error) { + ctx, span := m.server.AnnotateCtxWithSpan(ctx, "validate-cv") + defer span.Finish() + ctx = logtags.AddTag(ctx, "validate-cv", nil) + targetCV := req.ClusterVersion versionSetting := m.server.ClusterSettings().Version @@ -81,6 +86,10 @@ func (m *migrationServer) ValidateTargetClusterVersion( func (m *migrationServer) BumpClusterVersion( ctx context.Context, req *serverpb.BumpClusterVersionRequest, ) (*serverpb.BumpClusterVersionResponse, error) { + ctx, span := m.server.AnnotateCtxWithSpan(ctx, "bump-cv") + defer span.Finish() + ctx = logtags.AddTag(ctx, "bump-cv", nil) + m.Lock() defer m.Unlock() From fd6bd4f785fd8e8add4cd7d3da7c7a18833661b6 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 10:02:21 -0500 Subject: [PATCH 03/11] migration: break down pkg across a few files We can separate out the `Helper`, `Migration`, and various utilities into their own files. We'll add tests for individual components in future commits; the physical separation here sets the foundation for doing so (prototyped in #57445). This commit is purely code movement. Release note: None --- pkg/migration/BUILD.bazel | 1 + pkg/migration/helper.go | 121 +++++++++++++++++++++++++++++ pkg/migration/manager.go | 150 ------------------------------------ pkg/migration/migrations.go | 28 ++++++- pkg/migration/util.go | 29 +++++++ 5 files changed, 178 insertions(+), 151 deletions(-) create mode 100644 pkg/migration/helper.go diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index c02fd0142384..24976336f400 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "migration", srcs = [ + "helper.go", "manager.go", "migrations.go", "util.go", diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go new file mode 100644 index 000000000000..8b929cab3a1b --- /dev/null +++ b/pkg/migration/helper.go @@ -0,0 +1,121 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +// Helper captures all the primitives required to fully specify a migration. +type Helper struct { + *Manager +} + +// RequiredNodeIDs returns the node IDs for all nodes that are currently part of +// the cluster (i.e. they haven't been decommissioned away). Migrations have the +// pre-requisite that all required nodes are up and running so that we're able +// to execute all relevant node-level operations on them. If any of the nodes +// are found to be unavailable, an error is returned. +func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) { + var nodeIDs []roachpb.NodeID + ls, err := h.nl.GetLivenessesFromKV(ctx) + if err != nil { + return nil, err + } + for _, l := range ls { + if l.Membership.Decommissioned() { + continue + } + live, err := h.nl.IsLive(l.NodeID) + if err != nil { + return nil, err + } + if !live { + return nil, errors.Newf("n%d required, but unavailable", l.NodeID) + } + nodeIDs = append(nodeIDs, l.NodeID) + } + return nodeIDs, nil +} + +// EveryNode invokes the given closure (named by the informational parameter op) +// across every node in the cluster. The mechanism for ensuring that we've done +// so, while accounting for the possibility of new nodes being added to the +// cluster in the interim, is provided by the following structure: +// (a) We'll retrieve the list of node IDs for all nodes in the system +// (b) For each node, we'll invoke the closure +// (c) We'll retrieve the list of node IDs again to account for the +// possibility of a new node being added during (b) +// (d) If there any discrepancies between the list retrieved in (a) +// and (c), we'll invoke the closure each node again +// (e) We'll continue to loop around until the node ID list stabilizes +// +// By the time EveryNode returns, we'll have thus invoked the closure against +// every node in the cluster. +// +// To consider one example of how this primitive is used, let's consider our use +// of it to bump the cluster version. After we return, given all nodes in the +// cluster will have their cluster versions bumped, and future node additions +// will observe the latest version (through the join RPC). This lets us author +// migrations that can assume that a certain version gate has been enabled on +// all nodes in the cluster, and will always be enabled for any new nodes in the +// system. +// +// It may be possible however that right after we return, a new node may join. +// This means that some migrations may have to be split up into two version +// bumps: one that phases out the old version (i.e. stops creation of stale data +// or behavior) and a clean-up version, which removes any vestiges of the stale +// data/behavior, and which, when active, ensures that the old data has vanished +// from the system. This is similar in spirit to how schema changes are split up +// into multiple smaller steps that are carried out sequentially. +func (h *Helper) EveryNode( + ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, +) error { + nodeIDs, err := h.RequiredNodeIDs(ctx) + if err != nil { + return err + } + + for { + // TODO(irfansharif): We can/should send out these RPCs in parallel. + log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs) + for _, nodeID := range nodeIDs { + conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass) + if err != nil { + return err + } + client := serverpb.NewMigrationClient(conn) + if err := fn(ctx, client); err != nil { + return err + } + } + + curNodeIDs, err := h.RequiredNodeIDs(ctx) + if err != nil { + return err + } + + if !identical(nodeIDs, curNodeIDs) { + nodeIDs = curNodeIDs + continue + } + + break + } + + return nil +} diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go index 25488e7ec2a1..57f357d9aaaa 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/manager.go @@ -29,37 +29,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" ) -// Migration defines a program to be executed once every node in the cluster is -// (a) running a specific binary version, and (b) has completed all prior -// migrations. -// -// Each migration is associated with a specific internal cluster version and is -// idempotent in nature. When setting the cluster version (via `SET CLUSTER -// SETTING version`), a manager process determines the set of migrations needed -// to bridge the gap between the current active cluster version, and the target -// one. -// -// To introduce a migration, introduce a version key in pkg/clusterversion, and -// introduce a corresponding internal cluster version for it. See [1] for -// details. Following that, define a Migration in this package and add it to the -// Registry. Be sure to key it in with the new cluster version we just added. -// During cluster upgrades, once the operator is able to set a cluster version -// setting that's past the version that was introduced (typically the major -// release version the migration was introduced in), the manager will execute -// the defined migration before letting the upgrade finalize. -// -// [1]: pkg/clusterversion/cockroach_versions.go -type Migration func(context.Context, *Helper) error - // Manager is the instance responsible for executing migrations across the // cluster. type Manager struct { @@ -69,106 +45,6 @@ type Manager struct { db *kv.DB } -// Helper captures all the primitives required to fully specify a migration. -type Helper struct { - *Manager -} - -// RequiredNodeIDs returns the node IDs for all nodes that are currently part of -// the cluster (i.e. they haven't been decommissioned away). Migrations have the -// pre-requisite that all required nodes are up and running so that we're able -// to execute all relevant node-level operations on them. If any of the nodes -// are found to be unavailable, an error is returned. -func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) { - var nodeIDs []roachpb.NodeID - ls, err := h.nl.GetLivenessesFromKV(ctx) - if err != nil { - return nil, err - } - for _, l := range ls { - if l.Membership.Decommissioned() { - continue - } - live, err := h.nl.IsLive(l.NodeID) - if err != nil { - return nil, err - } - if !live { - return nil, errors.Newf("n%d required, but unavailable", l.NodeID) - } - nodeIDs = append(nodeIDs, l.NodeID) - } - return nodeIDs, nil -} - -// EveryNode invokes the given closure (named by the informational parameter op) -// across every node in the cluster. The mechanism for ensuring that we've done -// so, while accounting for the possibility of new nodes being added to the -// cluster in the interim, is provided by the following structure: -// (a) We'll retrieve the list of node IDs for all nodes in the system -// (b) For each node, we'll invoke the closure -// (c) We'll retrieve the list of node IDs again to account for the -// possibility of a new node being added during (b) -// (d) If there any discrepancies between the list retrieved in (a) -// and (c), we'll invoke the closure each node again -// (e) We'll continue to loop around until the node ID list stabilizes -// -// By the time EveryNode returns, we'll have thus invoked the closure against -// every node in the cluster. -// -// To consider one example of how this primitive is used, let's consider our use -// of it to bump the cluster version. After we return, given all nodes in the -// cluster will have their cluster versions bumped, and future node additions -// will observe the latest version (through the join RPC). This lets us author -// migrations that can assume that a certain version gate has been enabled on -// all nodes in the cluster, and will always be enabled for any new nodes in the -// system. -// -// It may be possible however that right after we return, a new node may join. -// This means that some migrations may have to be split up into two version -// bumps: one that phases out the old version (i.e. stops creation of stale data -// or behavior) and a clean-up version, which removes any vestiges of the stale -// data/behavior, and which, when active, ensures that the old data has vanished -// from the system. This is similar in spirit to how schema changes are split up -// into multiple smaller steps that are carried out sequentially. -func (h *Helper) EveryNode( - ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, -) error { - nodeIDs, err := h.RequiredNodeIDs(ctx) - if err != nil { - return err - } - - for { - // TODO(irfansharif): We can/should send out these RPCs in parallel. - log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs) - for _, nodeID := range nodeIDs { - conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass) - if err != nil { - return err - } - client := serverpb.NewMigrationClient(conn) - if err := fn(ctx, client); err != nil { - return err - } - } - - curNodeIDs, err := h.RequiredNodeIDs(ctx) - if err != nil { - return err - } - - if !identical(nodeIDs, curNodeIDs) { - nodeIDs = curNodeIDs - continue - } - - break - } - - return nil -} - // nodeLiveness is the subset of the interface satisfied by CRDB's node liveness // component that the migration manager relies upon. type nodeLiveness interface { @@ -313,29 +189,3 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe return nil } - -// fenceVersionFor constructs the appropriate "fence version" for the given -// cluster version. Fence versions allow the migrations infrastructure to safely -// step through consecutive cluster versions in the presence of nodes (running -// any binary version) being added to the cluster. See the migration manager -// above for intended usage. -// -// Fence versions (and the migrations infrastructure entirely) were introduced -// in the 21.1 release cycle. In the same release cycle, we introduced the -// invariant that new user-defined versions (users being crdb engineers) must -// always have even-numbered Internal versions, thus reserving the odd numbers -// to slot in fence versions for each cluster version. See top-level -// documentation in pkg/clusterversion for more details. -func fenceVersionFor( - ctx context.Context, cv clusterversion.ClusterVersion, -) clusterversion.ClusterVersion { - if (cv.Internal % 2) != 0 { - log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version) - } - - // We'll pick the odd internal version preceding the cluster version, - // slotting ourselves right before it. - fenceCV := cv - fenceCV.Internal-- - return fenceCV -} diff --git a/pkg/migration/migrations.go b/pkg/migration/migrations.go index dfae1ee82155..00976d2547ee 100644 --- a/pkg/migration/migrations.go +++ b/pkg/migration/migrations.go @@ -10,7 +10,11 @@ package migration -import "github.com/cockroachdb/cockroach/pkg/clusterversion" +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" +) // Registry defines the global mapping between a cluster version, and the // associated migration. The migration is only executed after a cluster-wide @@ -23,3 +27,25 @@ func init() { // // Registry[clusterversion.ByKey(clusterversion.VersionWhatever)] = WhateverMigration } + +// Migration defines a program to be executed once every node in the cluster is +// (a) running a specific binary version, and (b) has completed all prior +// migrations. +// +// Each migration is associated with a specific internal cluster version and is +// idempotent in nature. When setting the cluster version (via `SET CLUSTER +// SETTING version`), a manager process determines the set of migrations needed +// to bridge the gap between the current active cluster version, and the target +// one. +// +// To introduce a migration, introduce a version key in pkg/clusterversion, and +// introduce a corresponding internal cluster version for it. See [1] for +// details. Following that, define a Migration in this package and add it to the +// Registry. Be sure to key it in with the new cluster version we just added. +// During cluster upgrades, once the operator is able to set a cluster version +// setting that's past the version that was introduced (typically the major +// release version the migration was introduced in), the manager will execute +// the defined migration before letting the upgrade finalize. +// +// [1]: pkg/clusterversion/cockroach_versions.go +type Migration func(context.Context, *Helper) error diff --git a/pkg/migration/util.go b/pkg/migration/util.go index e638dff8d5f4..b9c64405df20 100644 --- a/pkg/migration/util.go +++ b/pkg/migration/util.go @@ -11,9 +11,12 @@ package migration import ( + "context" "sort" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" ) // identical returns whether or not two lists of node IDs are identical as sets. @@ -35,3 +38,29 @@ func identical(a, b []roachpb.NodeID) bool { } return true } + +// fenceVersionFor constructs the appropriate "fence version" for the given +// cluster version. Fence versions allow the migrations infrastructure to safely +// step through consecutive cluster versions in the presence of nodes (running +// any binary version) being added to the cluster. See the migration manager +// above for intended usage. +// +// Fence versions (and the migrations infrastructure entirely) were introduced +// in the 21.1 release cycle. In the same release cycle, we introduced the +// invariant that new user-defined versions (users being crdb engineers) must +// always have even-numbered Internal versions, thus reserving the odd numbers +// to slot in fence versions for each cluster version. See top-level +// documentation in pkg/clusterversion for more details. +func fenceVersionFor( + ctx context.Context, cv clusterversion.ClusterVersion, +) clusterversion.ClusterVersion { + if (cv.Internal % 2) != 0 { + log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version) + } + + // We'll pick the odd internal version preceding the cluster version, + // slotting ourselves right before it. + fenceCV := cv + fenceCV.Internal-- + return fenceCV +} From 67138641341c7520bb8ed00a50be3a987445d792 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:15:23 -0500 Subject: [PATCH 04/11] migration: improve documentation for EveryNode It's clearer to talk explicitly in terms of causality. Release note: None --- pkg/migration/helper.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index 8b929cab3a1b..5632d40e9b23 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -53,8 +53,8 @@ func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) } // EveryNode invokes the given closure (named by the informational parameter op) -// across every node in the cluster. The mechanism for ensuring that we've done -// so, while accounting for the possibility of new nodes being added to the +// across every node in the cluster[*]. The mechanism for ensuring that we've +// done so, while accounting for the possibility of new nodes being added to the // cluster in the interim, is provided by the following structure: // (a) We'll retrieve the list of node IDs for all nodes in the system // (b) For each node, we'll invoke the closure @@ -64,8 +64,13 @@ func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) // and (c), we'll invoke the closure each node again // (e) We'll continue to loop around until the node ID list stabilizes // -// By the time EveryNode returns, we'll have thus invoked the closure against -// every node in the cluster. +// [*]: We can be a bit more precise here. What EveryNode gives us is a strict +// causal happened-before relation between running the given closure against +// every node that's currently a member of the cluster, and the next node that +// joins the cluster. Put another way: using EveryNode callers will have managed +// to run something against all nodes without a new node joining half-way +// through (which could have allowed it to pick up some state off one of the +// existing nodes that hadn't heard from us yet). // // To consider one example of how this primitive is used, let's consider our use // of it to bump the cluster version. After we return, given all nodes in the @@ -75,8 +80,8 @@ func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) // all nodes in the cluster, and will always be enabled for any new nodes in the // system. // -// It may be possible however that right after we return, a new node may join. -// This means that some migrations may have to be split up into two version +// Given that it'll always be possible for new nodes to join after an EveryNode +// round, it means that some migrations may have to be split up into two version // bumps: one that phases out the old version (i.e. stops creation of stale data // or behavior) and a clean-up version, which removes any vestiges of the stale // data/behavior, and which, when active, ensures that the old data has vanished From c725f7ba6cbc5c90284d261692bb3ab71f5f41e6 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:20:40 -0500 Subject: [PATCH 05/11] migration: introduce a more structured type for `Migration`s We re-define what the Migration type is to be able to annotate it a description. We'll later use this description when populating the `system.migrations` table (originally prototyped in #57445). Release note: None --- pkg/migration/migrations.go | 58 +++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/pkg/migration/migrations.go b/pkg/migration/migrations.go index 00976d2547ee..d664e93cb61a 100644 --- a/pkg/migration/migrations.go +++ b/pkg/migration/migrations.go @@ -34,18 +34,46 @@ func init() { // // Each migration is associated with a specific internal cluster version and is // idempotent in nature. When setting the cluster version (via `SET CLUSTER -// SETTING version`), a manager process determines the set of migrations needed -// to bridge the gap between the current active cluster version, and the target -// one. -// -// To introduce a migration, introduce a version key in pkg/clusterversion, and -// introduce a corresponding internal cluster version for it. See [1] for -// details. Following that, define a Migration in this package and add it to the -// Registry. Be sure to key it in with the new cluster version we just added. -// During cluster upgrades, once the operator is able to set a cluster version -// setting that's past the version that was introduced (typically the major -// release version the migration was introduced in), the manager will execute -// the defined migration before letting the upgrade finalize. -// -// [1]: pkg/clusterversion/cockroach_versions.go -type Migration func(context.Context, *Helper) error +// SETTING version`), the manager process determines the set of migrations +// needed to bridge the gap between the current active cluster version, and the +// target one. See [1] for where that happens. +// +// To introduce a migration, start by adding version key to pkg/clusterversion +// and introducing a corresponding internal cluster version for it. See [2] for +// more details. Following that, define a Migration in this package and add it +// to the registry. Be sure to key it in with the new cluster version we just +// added. During cluster upgrades, once the operator is able to set a cluster +// version setting that's past the version that was introduced (typically the +// major release version the migration was introduced in), the manager will +// execute the defined migration before letting the upgrade finalize. +// +// If the migration requires below-Raft level changes ([3] is one example), +// you'll need to add a version switch and the relevant KV-level migration in +// [4]. See IterateRangeDescriptors and the Migrate KV request for more details. +// +// [1]: `(*Manager).Migrate` +// [2]: pkg/clusterversion/cockroach_versions.go +// [3]: TruncatedStateMigration +// [4]: pkg/kv/kvserver/batch_eval/cmd_migrate.go +// +// TODO(irfansharif): [3] and [4] are currently referring to what was prototyped +// in #57445. Once that makes its way into master, this TODO can be removed. +type Migration struct { + cv clusterversion.ClusterVersion + fn migrationFn + desc string +} + +type migrationFn func(context.Context, *Helper) error + +// Run kickstarts the actual migration process. It's responsible for recording +// the ongoing status of the migration into a system table. +// +// TODO(irfansharif): Introduce a `system.migrations` table, and populate it here. +func (m *Migration) Run(ctx context.Context, h *Helper) (err error) { + if err := m.fn(ctx, h); err != nil { + return err + } + + return nil +} From 932a8152def45fa02d7c3b3276c4e9432e32058c Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:23:18 -0500 Subject: [PATCH 06/11] migration: re-define the migration registration process We make it a bit more ergonomic (this revision was originally prototyped in #57445). Release note: None --- pkg/migration/manager.go | 13 ++++++++++--- pkg/migration/migrations.go | 9 ++++----- pkg/migration/util.go | 10 ++++++++++ 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go index 57f357d9aaaa..cc129feb3c00 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/manager.go @@ -179,12 +179,19 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe } } - // TODO(irfansharif): We'll want to retrieve the right migration off of - // our registry of migrations, if any, and execute it. // TODO(irfansharif): We'll want to be able to override which migration // is retrieved here within tests. We could make the registry be a part // of the manager, and all tests to provide their own. - _ = Registry[clusterVersion] + + // Finally, run the actual migration. + migration, ok := registry[clusterVersion] + if !ok { + log.Infof(ctx, "no migration registered for %s, skipping", clusterVersion) + continue + } + if err := migration.Run(ctx, h); err != nil { + return err + } } return nil diff --git a/pkg/migration/migrations.go b/pkg/migration/migrations.go index d664e93cb61a..9382ef0abe79 100644 --- a/pkg/migration/migrations.go +++ b/pkg/migration/migrations.go @@ -16,16 +16,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" ) -// Registry defines the global mapping between a cluster version, and the +// registry defines the global mapping between a cluster version and the // associated migration. The migration is only executed after a cluster-wide -// bump of the version gate. -var Registry = make(map[clusterversion.ClusterVersion]Migration) +// bump of the corresponding version gate. +var registry = make(map[clusterversion.ClusterVersion]Migration) func init() { // TODO(irfansharif): We'll want to register individual migrations with // specific internal cluster versions here. - // - // Registry[clusterversion.ByKey(clusterversion.VersionWhatever)] = WhateverMigration + _ = register // register(clusterversion.WhateverMigration, WhateverMigration, "whatever migration") } // Migration defines a program to be executed once every node in the cluster is diff --git a/pkg/migration/util.go b/pkg/migration/util.go index b9c64405df20..b7ab5ed6c13e 100644 --- a/pkg/migration/util.go +++ b/pkg/migration/util.go @@ -64,3 +64,13 @@ func fenceVersionFor( fenceCV.Internal-- return fenceCV } + +// register is a short hand to register a given migration within the global +// registry. +func register(key clusterversion.Key, fn migrationFn, desc string) { + cv := clusterversion.ClusterVersion{Version: clusterversion.ByKey(key)} + if _, ok := registry[cv]; ok { + log.Fatalf(context.Background(), "doubly registering migration for %s", cv) + } + registry[cv] = Migration{cv: cv, fn: fn, desc: desc} +} From b0a6d3e1b3cd01f32b0761ce60cb558751a566e2 Mon Sep 17 00:00:00 2001 From: arulajmani Date: Thu, 17 Dec 2020 12:02:17 -0500 Subject: [PATCH 07/11] roachtest: add test_9_6_diagnostics to psycopg blocklist Previously, this test was skipped as our pg server version (9.5) did not meet the threshold (9.6) for this test to run. After the pg server version bump to 13 this is no longer the case. A change to explicitly skip this test for CRDB has been merged upstream, but it won't apply until the next release (psycopg_2_8_7) comes out and we switch to testing against that. Until then, this test lives in the blocklist. Closes #57986 Release note: None --- pkg/cmd/roachtest/psycopg_blocklist.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/psycopg_blocklist.go b/pkg/cmd/roachtest/psycopg_blocklist.go index 62b0af8be60f..3164c64990a8 100644 --- a/pkg/cmd/roachtest/psycopg_blocklist.go +++ b/pkg/cmd/roachtest/psycopg_blocklist.go @@ -29,7 +29,8 @@ var psycopgBlocklists = blocklistsForVersion{ // After a failed run, an updated version of this blocklist should be available // in the test log. var psycopgBlockList21_1 = blocklist{ - "tests.test_async_keyword.CancelTests.test_async_cancel": "41335", + "tests.test_async_keyword.CancelTests.test_async_cancel": "41335", + "tests.test_module.ExceptionsTestCase.test_9_6_diagnostics": "58035", } var psycopgBlockList20_2 = blocklist{ From fe1ad64f2cb342bbaf3e8b7779249ae8a3a0ec72 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:28:34 -0500 Subject: [PATCH 08/11] migration: shimmy in a `cluster` interface for testability To facilitate testing `Helper` in isolation, we introduce a `cluster` interface that we'll mock out in tests. It's through this interface that the migration infrastructure will able to dial out to a specific node, grab hold of a kv.DB instance, and retrieve the current cluster membership. Part of diff also downgrades `RequiredNodes` from being a first class primitive, instead tucking it away for internal usage only. Given retrieving the cluster membership made no guarantees about new nodes being added to the cluster, it's entirely possible for that to happen concurrently with it. Appropriate usage then entailed wrapping it under a stabilizing loop, like we do so in `EveryNode`. This tells us there's no need to expose it directly to migration authors. Release note: None --- pkg/migration/BUILD.bazel | 25 ++- pkg/migration/helper.go | 141 +++++++++++---- pkg/migration/helper_test.go | 307 ++++++++++++++++++++++++++++++++ pkg/migration/manager.go | 3 +- pkg/migration/util.go | 76 ++++++-- pkg/migration/util_test.go | 101 +++++++++++ pkg/testutils/lint/lint_test.go | 1 + 7 files changed, 604 insertions(+), 50 deletions(-) create mode 100644 pkg/migration/helper_test.go create mode 100644 pkg/migration/util_test.go diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index 24976336f400..1efd725e9f86 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "migration", @@ -19,8 +19,31 @@ go_library( "//pkg/rpc/nodedialer", "//pkg/server/serverpb", "//pkg/sql", + "//pkg/sql/sqlutil", "//pkg/util/log", "//vendor/github.com/cockroachdb/errors", "//vendor/github.com/cockroachdb/logtags", + "//vendor/github.com/cockroachdb/redact", + "//vendor/google.golang.org/grpc", + ], +) + +go_test( + name = "migration_test", + srcs = [ + "helper_test.go", + "util_test.go", + ], + embed = [":migration"], + deps = [ + "//pkg/clusterversion", + "//pkg/kv", + "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/roachpb", + "//pkg/server/serverpb", + "//pkg/testutils", + "//pkg/util/leaktest", + "//pkg/util/syncutil", + "//vendor/google.golang.org/grpc", ], ) diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index 5632d40e9b23..bdc82b8fc6ee 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -13,43 +13,58 @@ package migration import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "google.golang.org/grpc" ) // Helper captures all the primitives required to fully specify a migration. type Helper struct { - *Manager + c cluster + cv clusterversion.ClusterVersion } -// RequiredNodeIDs returns the node IDs for all nodes that are currently part of -// the cluster (i.e. they haven't been decommissioned away). Migrations have the -// pre-requisite that all required nodes are up and running so that we're able -// to execute all relevant node-level operations on them. If any of the nodes -// are found to be unavailable, an error is returned. -func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) { - var nodeIDs []roachpb.NodeID - ls, err := h.nl.GetLivenessesFromKV(ctx) - if err != nil { - return nil, err - } - for _, l := range ls { - if l.Membership.Decommissioned() { - continue - } - live, err := h.nl.IsLive(l.NodeID) - if err != nil { - return nil, err - } - if !live { - return nil, errors.Newf("n%d required, but unavailable", l.NodeID) - } - nodeIDs = append(nodeIDs, l.NodeID) - } - return nodeIDs, nil +// cluster mediates access to the crdb cluster. +type cluster interface { + // nodes returns the IDs and epochs for all nodes that are currently part of + // the cluster (i.e. they haven't been decommissioned away). Migrations have + // the pre-requisite that all nodes are up and running so that we're able to + // execute all relevant node-level operations on them. If any of the nodes + // are found to be unavailable, an error is returned. + // + // It's important to note that this makes no guarantees about new nodes + // being added to the cluster. It's entirely possible for that to happen + // concurrently with the retrieval of the current set of nodes. Appropriate + // usage of this entails wrapping it under a stabilizing loop, like we do in + // EveryNode. + nodes(ctx context.Context) (nodes, error) + + // dial returns a grpc connection to the given node. + dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error) + + // db provides access the kv.DB instance backing the cluster. + // + // TODO(irfansharif): We could hide the kv.DB instance behind an interface + // to expose only relevant, vetted bits of kv.DB. It'll make our tests less + // "integration-ey". + db() *kv.DB + + // executor provides access to an internal executor instance to run + // arbitrary SQL statements. + executor() sqlutil.InternalExecutor +} + +func newHelper(c cluster, cv clusterversion.ClusterVersion) *Helper { + return &Helper{c: c, cv: cv} } // EveryNode invokes the given closure (named by the informational parameter op) @@ -90,16 +105,17 @@ func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) func (h *Helper) EveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, ) error { - nodeIDs, err := h.RequiredNodeIDs(ctx) + ns, err := h.c.nodes(ctx) if err != nil { return err } for { // TODO(irfansharif): We can/should send out these RPCs in parallel. - log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs) - for _, nodeID := range nodeIDs { - conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass) + log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) + + for _, node := range ns { + conn, err := h.c.dial(ctx, node.id) if err != nil { return err } @@ -109,13 +125,14 @@ func (h *Helper) EveryNode( } } - curNodeIDs, err := h.RequiredNodeIDs(ctx) + curNodes, err := h.c.nodes(ctx) if err != nil { return err } - if !identical(nodeIDs, curNodeIDs) { - nodeIDs = curNodeIDs + if ok, diffs := ns.identical(curNodes); !ok { + log.Infof(ctx, "%s, retrying", diffs) + ns = curNodes continue } @@ -124,3 +141,61 @@ func (h *Helper) EveryNode( return nil } + +// DB provides exposes the underlying *kv.DB instance. +func (h *Helper) DB() *kv.DB { + return h.c.db() +} + +type clusterImpl struct { + nl nodeLiveness + exec sqlutil.InternalExecutor + dialer *nodedialer.Dialer + kvDB *kv.DB +} + +var _ cluster = &clusterImpl{} + +func newCluster( + nl nodeLiveness, dialer *nodedialer.Dialer, executor *sql.InternalExecutor, db *kv.DB, +) *clusterImpl { + return &clusterImpl{nl: nl, dialer: dialer, exec: executor, kvDB: db} +} + +// nodes implements the cluster interface. +func (c *clusterImpl) nodes(ctx context.Context) (nodes, error) { + var ns []node + ls, err := c.nl.GetLivenessesFromKV(ctx) + if err != nil { + return nil, err + } + for _, l := range ls { + if l.Membership.Decommissioned() { + continue + } + live, err := c.nl.IsLive(l.NodeID) + if err != nil { + return nil, err + } + if !live { + return nil, errors.Newf("n%d required, but unavailable", l.NodeID) + } + ns = append(ns, node{id: l.NodeID, epoch: l.Epoch}) + } + return ns, nil +} + +// dial implements the cluster interface. +func (c *clusterImpl) dial(ctx context.Context, id roachpb.NodeID) (*grpc.ClientConn, error) { + return c.dialer.Dial(ctx, id, rpc.DefaultClass) +} + +// db implements the cluster interface. +func (c *clusterImpl) db() *kv.DB { + return c.kvDB +} + +// executor implements the cluster interface. +func (c *clusterImpl) executor() sqlutil.InternalExecutor { + return c.exec +} diff --git a/pkg/migration/helper_test.go b/pkg/migration/helper_test.go new file mode 100644 index 000000000000..54d52f51f66d --- /dev/null +++ b/pkg/migration/helper_test.go @@ -0,0 +1,307 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "google.golang.org/grpc" +) + +func TestHelperEveryNode(t *testing.T) { + defer leaktest.AfterTest(t) + + cv := clusterversion.ClusterVersion{} + ctx := context.Background() + var mu syncutil.Mutex + const numNodes = 3 + + t.Run("with-node-addition", func(t *testing.T) { + // Add a node mid-way through execution. We expect EveryNode to start + // over from scratch and include the newly added node. + tc := TestingNewCluster(numNodes) + h := newHelper(tc, cv) + opCount := 0 + err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == numNodes { + tc.addNode() + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + if exp := numNodes*2 + 1; exp != opCount { + t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) + } + }) + + t.Run("with-node-restart", func(t *testing.T) { + // Restart a node mid-way through execution. We expect EveryNode to + // start over from scratch and include the restarted node. + tc := TestingNewCluster(numNodes) + h := newHelper(tc, cv) + opCount := 0 + err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == numNodes { + tc.restartNode(2) + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + if exp := numNodes * 2; exp != opCount { + t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) + } + }) + + t.Run("with-node-downNode", func(t *testing.T) { + // Down a node mid-way through execution. We expect EveryNode to error + // out. + const downedNode = 2 + tc := TestingNewCluster(numNodes) + expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + h := newHelper(tc, cv) + opCount := 0 + if err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == 1 { + tc.downNode(downedNode) + } + return nil + }); !testutils.IsError(err, expRe) { + t.Fatalf("expected error %q, got %q", expRe, err) + } + + tc.restartNode(downedNode) + if err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + return nil + }); err != nil { + t.Fatal(err) + } + }) +} + +func TestClusterNodes(t *testing.T) { + defer leaktest.AfterTest(t) + + ctx := context.Background() + const numNodes = 3 + + t.Run("retrieves-all", func(t *testing.T) { + nl := newTestNodeLiveness(numNodes) + c := clusterImpl{nl: nl} + + ns, err := c.nodes(ctx) + if err != nil { + t.Fatal(err) + } + + if got := len(ns); got != numNodes { + t.Fatalf("expected %d nodes, got %d", numNodes, got) + } + + for i := range ns { + if exp := roachpb.NodeID(i + 1); exp != ns[i].id { + t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].id) + } + if ns[i].epoch != 1 { + t.Fatalf("expected to find epoch=1, got %d", ns[i].epoch) + } + } + }) + + t.Run("ignores-decommissioned", func(t *testing.T) { + nl := newTestNodeLiveness(numNodes) + c := clusterImpl{nl: nl} + const decommissionedNode = 3 + nl.decommission(decommissionedNode) + + ns, err := c.nodes(ctx) + if err != nil { + t.Fatal(err) + } + + if got := len(ns); got != numNodes-1 { + t.Fatalf("expected %d nodes, got %d", numNodes-1, got) + } + + for i := range ns { + if exp := roachpb.NodeID(i + 1); exp != ns[i].id { + t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].id) + } + if ns[i].epoch != 1 { + t.Fatalf("expected to find epoch=1, got %d", ns[i].epoch) + } + } + }) + + t.Run("errors-if-down", func(t *testing.T) { + nl := newTestNodeLiveness(numNodes) + c := clusterImpl{nl: nl} + const downedNode = 3 + nl.downNode(downedNode) + + _, err := c.nodes(ctx) + expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + if !testutils.IsError(err, expRe) { + t.Fatalf("expected error %q, got %q", expRe, err) + } + }) +} + +// mockClusterImpl is a testing only implementation of the cluster interface. It +// lets callers mock out adding, killing, and restarting nodes in the cluster. +type mockClusterImpl struct { + nl *mockNodeLivenessImpl + *clusterImpl +} + +var _ cluster = &mockClusterImpl{} + +// TestingNewCluster is an exported a constructor for a test-only implementation +// of the cluster interface. +func TestingNewCluster(numNodes int, options ...func(*mockClusterImpl)) *mockClusterImpl { + nl := newTestNodeLiveness(numNodes) + tc := &mockClusterImpl{ + nl: nl, + clusterImpl: newCluster(nl, nil, nil, nil), + } + for _, option := range options { + option(tc) + } + return tc +} + +// TestingWithKV facilitates the creation of a test cluster backed by the given +// KV instance. +func TestingWithKV(db *kv.DB) func(*mockClusterImpl) { + return func(impl *mockClusterImpl) { + impl.clusterImpl.kvDB = db + } +} + +// dial is part of the cluster interface. We override it here as tests don't +// expect to make any outbound requests. +func (t *mockClusterImpl) dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error) { + return nil, nil +} + +func (t *mockClusterImpl) addNode() { + t.nl.addNode(roachpb.NodeID(len(t.nl.ls) + 1)) +} + +func (t *mockClusterImpl) downNode(id roachpb.NodeID) { + t.nl.downNode(id) +} + +func (t *mockClusterImpl) restartNode(id roachpb.NodeID) { + t.nl.restartNode(id) +} + +// mockNodeLivenessImpl is a testing-only implementation of the nodeLiveness. It +// lets tests mock out restarting, killing, decommissioning and adding nodes to +// the cluster. +type mockNodeLivenessImpl struct { + ls []livenesspb.Liveness + dead map[roachpb.NodeID]struct{} +} + +var _ nodeLiveness = &mockNodeLivenessImpl{} + +func newTestNodeLiveness(numNodes int) *mockNodeLivenessImpl { + nl := &mockNodeLivenessImpl{ + ls: make([]livenesspb.Liveness, numNodes), + dead: make(map[roachpb.NodeID]struct{}), + } + for i := 0; i < numNodes; i++ { + nl.ls[i] = livenesspb.Liveness{ + NodeID: roachpb.NodeID(i + 1), Epoch: 1, + Membership: livenesspb.MembershipStatus_ACTIVE, + } + } + return nl +} + +// GetLivenessesFromKV implements the nodeLiveness interface. +func (t *mockNodeLivenessImpl) GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) { + return t.ls, nil +} + +// IsLive implements the nodeLiveness interface. +func (t *mockNodeLivenessImpl) IsLive(id roachpb.NodeID) (bool, error) { + _, dead := t.dead[id] + return !dead, nil +} + +func (t *mockNodeLivenessImpl) decommission(id roachpb.NodeID) { + for i := range t.ls { + if t.ls[i].NodeID == id { + t.ls[i].Membership = livenesspb.MembershipStatus_DECOMMISSIONED + break + } + } +} + +func (t *mockNodeLivenessImpl) addNode(id roachpb.NodeID) { + t.ls = append(t.ls, livenesspb.Liveness{ + NodeID: id, + Epoch: 1, + Membership: livenesspb.MembershipStatus_ACTIVE, + }) +} + +func (t *mockNodeLivenessImpl) downNode(id roachpb.NodeID) { + t.dead[id] = struct{}{} +} + +func (t *mockNodeLivenessImpl) restartNode(id roachpb.NodeID) { + for i := range t.ls { + if t.ls[i].NodeID == id { + t.ls[i].Epoch++ + break + } + } + + delete(t.dead, id) +} + +// TestingNewHelper is an exported a constructor for Helper for testing +// purposes. +func TestingNewHelper(c cluster, cv clusterversion.ClusterVersion) *Helper { + return &Helper{c: c, cv: cv} +} diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go index cc129feb3c00..99686c0f21d4 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/manager.go @@ -99,7 +99,8 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe log.Infof(ctx, "migrating cluster from %s to %s (stepping through %s)", from, to, clusterVersions) for _, clusterVersion := range clusterVersions { - h := &Helper{Manager: m} + cluster := newCluster(m.nl, m.dialer, m.executor, m.db) + h := newHelper(cluster, clusterVersion) // Push out the version gate to every node in the cluster. Each node // will persist the version, bump the local version gates, and then diff --git a/pkg/migration/util.go b/pkg/migration/util.go index b7ab5ed6c13e..0814456b8c1e 100644 --- a/pkg/migration/util.go +++ b/pkg/migration/util.go @@ -12,31 +12,77 @@ package migration import ( "context" - "sort" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/redact" ) -// identical returns whether or not two lists of node IDs are identical as sets. -func identical(a, b []roachpb.NodeID) bool { - if len(a) != len(b) { - return false +// node captures the relevant bits of each node as it pertains to the migration +// infrastructure. +type node struct { + id roachpb.NodeID + epoch int64 +} + +// nodes is a collection of node objects. +type nodes []node + +// identical returns whether or not two lists of nodes are identical as sets, +// and if not, what changed (in terms of cluster membership operations and epoch +// changes). The textual diffs are only to be used for logging purposes. +func (ns nodes) identical(other nodes) (ok bool, _ []redact.RedactableString) { + a, b := ns, other + + type ent struct { + node node + count int + epochChanged bool + } + m := map[roachpb.NodeID]ent{} + for _, node := range a { + m[node.id] = ent{count: 1, node: node, epochChanged: false} + } + for _, node := range b { + e, ok := m[node.id] + e.count-- + if ok && e.node.epoch != node.epoch { + e.epochChanged = true + } + m[node.id] = e + } + + var diffs []redact.RedactableString + for id, e := range m { + if e.epochChanged { + diffs = append(diffs, redact.Sprintf("n%d's epoch changed", id)) + } + if e.count > 0 { + diffs = append(diffs, redact.Sprintf("n%d was decommissioned", id)) + } + if e.count < 0 { + diffs = append(diffs, redact.Sprintf("n%d joined the cluster", id)) + } } - sort.Slice(a, func(i, j int) bool { - return a[i] < a[j] - }) - sort.Slice(b, func(i, j int) bool { - return b[i] < b[j] - }) - for i, v := range a { - if v != b[i] { - return false + return len(diffs) == 0, diffs +} + +func (ns nodes) String() string { + return redact.StringWithoutMarkers(ns) +} + +// SafeFormat implements redact.SafeFormatter. +func (ns nodes) SafeFormat(s redact.SafePrinter, _ rune) { + s.SafeString("n{") + if len(ns) > 0 { + s.Printf("%d", ns[0].id) + for _, node := range ns[1:] { + s.Printf(",%d", node.id) } } - return true + s.SafeString("}") } // fenceVersionFor constructs the appropriate "fence version" for the given diff --git a/pkg/migration/util_test.go b/pkg/migration/util_test.go new file mode 100644 index 000000000000..98d201da3935 --- /dev/null +++ b/pkg/migration/util_test.go @@ -0,0 +1,101 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import ( + "sort" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestNodesString(t *testing.T) { + defer leaktest.AfterTest(t) + + ns := func(ids ...int) nodes { + var nodes []node + for _, id := range ids { + nodes = append(nodes, node{id: roachpb.NodeID(id)}) + } + return nodes + } + + var tests = []struct { + ns nodes + exp string + }{ + {ns(), "n{}"}, + {ns(1), "n{1}"}, + {ns(1, 2, 3), "n{1,2,3}"}, + {ns(3, 4, 7), "n{3,4,7}"}, + } + + for _, test := range tests { + if got := test.ns.String(); got != test.exp { + t.Fatalf("expected %s, got %s", test.exp, got) + } + } +} + +func TestNodesIdentical(t *testing.T) { + defer leaktest.AfterTest(t) + + list := func(nodes ...string) nodes { // takes in strings of the form "id@epoch" + var ns []node + for _, n := range nodes { + parts := strings.Split(n, "@") + id, err := strconv.Atoi(parts[0]) + if err != nil { + t.Fatal(err) + } + epoch, err := strconv.Atoi(parts[1]) + if err != nil { + t.Fatal(err) + } + ns = append(ns, node{id: roachpb.NodeID(id), epoch: int64(epoch)}) + } + return ns + } + + var tests = []struct { + a, b nodes + expOk bool + expDiff string + }{ + {list(), list(), true, ""}, + {list("1@2"), list("1@2"), true, ""}, + {list("2@1", "1@2"), list("1@2", "2@1"), true, ""}, + {list("1@2"), list("1@3"), false, "n1's epoch changed"}, + {list("1@2"), list("1@2", "2@1"), false, "n2 joined the cluster"}, + {list("1@1", "2@1"), list("1@1"), false, "n2 was decommissioned"}, + {list("3@2", "4@6"), list("4@8", "5@2"), false, "n3 was decommissioned, n4's epoch changed, n5 joined the cluster"}, + } + + for _, test := range tests { + ok, diffs := test.a.identical(test.b) + if ok != test.expOk { + t.Fatalf("expected identical = %t, got %t", test.expOk, ok) + } + + strDiffs := make([]string, len(diffs)) + for i, diff := range diffs { + strDiffs[i] = string(diff) + } + sort.Strings(strDiffs) + + if strings.Join(strDiffs, ", ") != test.expDiff { + t.Fatalf("expected diff %q, got %q", test.expDiff, strDiffs) + } + } +} diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 8432e765f221..93cb19d3b709 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1560,6 +1560,7 @@ func TestLint(t *testing.T) { if err := stream.ForEach( stream.Sequence( filter, + stream.GrepNot("migration/.*exported func TestingNewCluster returns unexported type"), stream.GrepNot("sql/.*exported func .* returns unexported type sql.planNode"), stream.GrepNot("pkg/sql/types/types.go.* var Uuid should be UUID"), stream.GrepNot("pkg/sql/oidext/oidext.go.*don't use underscores in Go names; const T_"), From 3c118de377d004e8fb5d20a25a45f64ae50d3241 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:56:45 -0500 Subject: [PATCH 09/11] migration: parallelize execution of the EveryNode primitive Release note: None --- pkg/migration/BUILD.bazel | 2 ++ pkg/migration/helper.go | 27 +++++++++++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index 1efd725e9f86..e674c65e55a4 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -20,7 +20,9 @@ go_library( "//pkg/server/serverpb", "//pkg/sql", "//pkg/sql/sqlutil", + "//pkg/util/ctxgroup", "//pkg/util/log", + "//pkg/util/quotapool", "//vendor/github.com/cockroachdb/errors", "//vendor/github.com/cockroachdb/logtags", "//vendor/github.com/cockroachdb/redact", diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index bdc82b8fc6ee..e40e82a58e97 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -21,7 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "google.golang.org/grpc" @@ -110,19 +112,32 @@ func (h *Helper) EveryNode( return err } + // We'll want to rate limit outgoing RPCs (limit pulled out of thin air). + qp := quotapool.NewIntPool("every-node", 25) for { - // TODO(irfansharif): We can/should send out these RPCs in parallel. log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) + grp := ctxgroup.WithContext(ctx) for _, node := range ns { - conn, err := h.c.dial(ctx, node.id) + id := node.id // copy out of the loop variable + alloc, err := qp.Acquire(ctx, 1) if err != nil { return err } - client := serverpb.NewMigrationClient(conn) - if err := fn(ctx, client); err != nil { - return err - } + + grp.GoCtx(func(ctx context.Context) error { + defer alloc.Release() + + conn, err := h.c.dial(ctx, id) + if err != nil { + return err + } + client := serverpb.NewMigrationClient(conn) + return fn(ctx, client) + }) + } + if err := grp.Wait(); err != nil { + return err } curNodes, err := h.c.nodes(ctx) From 3f059861d406ebfc22b9d52bf1137b7181e6bfd7 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 7 Dec 2020 11:58:09 -0500 Subject: [PATCH 10/11] migration: introduce the IterateRangeDescriptors primitive It's not currently wired up to anything. We'll use it in future PRs to send out `Migrate` requests to the entire keyspace. This was originally prototyped in #57445. See the inline comments and the RFC (#48843) for the motivation here. Release note: None --- pkg/migration/BUILD.bazel | 10 ++++++ pkg/migration/client_test.go | 61 ++++++++++++++++++++++++++++++++ pkg/migration/helper.go | 68 ++++++++++++++++++++++++++++++++++++ pkg/migration/main_test.go | 29 +++++++++++++++ 4 files changed, 168 insertions(+) create mode 100644 pkg/migration/client_test.go create mode 100644 pkg/migration/main_test.go diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index e674c65e55a4..23a1b35826fc 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", + "//pkg/keys", "//pkg/kv", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", @@ -33,17 +34,26 @@ go_library( go_test( name = "migration_test", srcs = [ + "client_test.go", "helper_test.go", + "main_test.go", "util_test.go", ], embed = [":migration"], deps = [ "//pkg/clusterversion", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", "//pkg/server/serverpb", + "//pkg/sql/tests", "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", "//pkg/util/leaktest", "//pkg/util/syncutil", "//vendor/google.golang.org/grpc", diff --git a/pkg/migration/client_test.go b/pkg/migration/client_test.go new file mode 100644 index 000000000000..a56a1c490a18 --- /dev/null +++ b/pkg/migration/client_test.go @@ -0,0 +1,61 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +package migration_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/migration" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestHelperIterateRangeDescriptors(t *testing.T) { + defer leaktest.AfterTest(t) + + cv := clusterversion.ClusterVersion{} + ctx := context.Background() + const numNodes = 1 + + params, _ := tests.CreateTestServerParams() + server, _, kvDB := serverutils.StartServer(t, params) + defer server.Stopper().Stop(context.Background()) + + var numRanges int + if err := server.GetStores().(*kvserver.Stores).VisitStores(func(s *kvserver.Store) error { + numRanges = s.ReplicaCount() + return nil + }); err != nil { + t.Fatal(err) + } + + c := migration.TestingNewCluster(numNodes, migration.TestingWithKV(kvDB)) + h := migration.TestingNewHelper(c, cv) + + for _, blockSize := range []int{1, 5, 10, 50} { + var numDescs int + init := func() { numDescs = 0 } + if err := h.IterateRangeDescriptors(ctx, blockSize, init, func(descriptors ...roachpb.RangeDescriptor) error { + numDescs += len(descriptors) + return nil + }); err != nil { + t.Fatal(err) + } + + if numDescs != numRanges { + t.Fatalf("expected to find %d ranges, found %d", numRanges+1, numDescs) + } + } +} diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index e40e82a58e97..510142b7a162 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -157,6 +158,73 @@ func (h *Helper) EveryNode( return nil } +// IterateRangeDescriptors provides a handle on every range descriptor in the +// system, which callers can then use to send out arbitrary KV requests to in +// order to run arbitrary KV-level migrations. These requests will typically +// just be the `Migrate` request, with code added within [1] to do the specific +// things intended for the specified version. +// +// It's important to note that the closure is being executed in the context of a +// distributed transaction that may be automatically retried. So something like +// the following is an anti-pattern: +// +// processed := 0 +// _ = h.IterateRangeDescriptors(..., +// func(descriptors ...roachpb.RangeDescriptor) error { +// processed += len(descriptors) // we'll over count if retried +// log.Infof(ctx, "processed %d ranges", processed) +// }, +// ) +// +// Instead we allow callers to pass in a callback to signal on every attempt +// (including the first). This lets us salvage the example above: +// +// var processed int +// init := func() { processed = 0 } +// _ = h.IterateRangeDescriptors(..., init, +// func(descriptors ...roachpb.RangeDescriptor) error { +// processed += len(descriptors) +// log.Infof(ctx, "processed %d ranges", processed) +// }, +// ) +// +// [1]: pkg/kv/kvserver/batch_eval/cmd_migrate.go +func (h *Helper) IterateRangeDescriptors( + ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error, +) error { + if err := h.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Inform the caller that we're starting a fresh attempt to page in + // range descriptors. + init() + + // Iterate through meta2 to pull out all the range descriptors. + return txn.Iterate(ctx, keys.Meta2Prefix, keys.MetaMax, blockSize, + func(rows []kv.KeyValue) error { + descriptors := make([]roachpb.RangeDescriptor, len(rows)) + for i, row := range rows { + if err := row.ValueProto(&descriptors[i]); err != nil { + return errors.Wrapf(err, + "unable to unmarshal range descriptor from %s", + row.Key, + ) + } + } + + // Invoke fn with the current chunk (of size ~blockSize) of + // range descriptors. + if err := fn(descriptors...); err != nil { + return err + } + + return nil + }) + }); err != nil { + return err + } + + return nil +} + // DB provides exposes the underlying *kv.DB instance. func (h *Helper) DB() *kv.DB { return h.c.db() diff --git a/pkg/migration/main_test.go b/pkg/migration/main_test.go new file mode 100644 index 000000000000..cdc3f7742a5a --- /dev/null +++ b/pkg/migration/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} From a0c5160a0c5c8218052319a5c88143dce5685fd6 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 17 Dec 2020 16:45:56 -0500 Subject: [PATCH 11/11] migration: don't abbreviate logging, for readability Release note: None --- pkg/migration/manager.go | 6 +++--- pkg/server/migration.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go index 99686c0f21d4..df2e77efe5b6 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/manager.go @@ -146,7 +146,7 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // direct successor of the fence). fenceVersion := fenceVersionFor(ctx, clusterVersion) req := &serverpb.BumpClusterVersionRequest{ClusterVersion: &fenceVersion} - op := fmt.Sprintf("bump-cv=%s", req.ClusterVersion.PrettyPrint()) + op := fmt.Sprintf("bump-cluster-version=%s", req.ClusterVersion.PrettyPrint()) err := h.EveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { _, err := client.BumpClusterVersion(ctx, req) return err @@ -159,7 +159,7 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe // Now sanity check that we'll actually be able to perform the real // cluster version bump, cluster-wide. req := &serverpb.ValidateTargetClusterVersionRequest{ClusterVersion: &clusterVersion} - op := fmt.Sprintf("validate-cv=%s", req.ClusterVersion.PrettyPrint()) + op := fmt.Sprintf("validate-cluster-version=%s", req.ClusterVersion.PrettyPrint()) err := h.EveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { _, err := client.ValidateTargetClusterVersion(ctx, req) return err @@ -171,7 +171,7 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe { // Finally, bump the real version cluster-wide. req := &serverpb.BumpClusterVersionRequest{ClusterVersion: &clusterVersion} - op := fmt.Sprintf("bump-cv=%s", req.ClusterVersion.PrettyPrint()) + op := fmt.Sprintf("bump-cluster-version=%s", req.ClusterVersion.PrettyPrint()) if err := h.EveryNode(ctx, op, func(ctx context.Context, client serverpb.MigrationClient) error { _, err := client.BumpClusterVersion(ctx, req) return err diff --git a/pkg/server/migration.go b/pkg/server/migration.go index 76765bac3db6..41b058f09201 100644 --- a/pkg/server/migration.go +++ b/pkg/server/migration.go @@ -40,9 +40,9 @@ var _ serverpb.MigrationServer = &migrationServer{} func (m *migrationServer) ValidateTargetClusterVersion( ctx context.Context, req *serverpb.ValidateTargetClusterVersionRequest, ) (*serverpb.ValidateTargetClusterVersionResponse, error) { - ctx, span := m.server.AnnotateCtxWithSpan(ctx, "validate-cv") + ctx, span := m.server.AnnotateCtxWithSpan(ctx, "validate-cluster-version") defer span.Finish() - ctx = logtags.AddTag(ctx, "validate-cv", nil) + ctx = logtags.AddTag(ctx, "validate-cluster-version", nil) targetCV := req.ClusterVersion versionSetting := m.server.ClusterSettings().Version @@ -86,9 +86,9 @@ func (m *migrationServer) ValidateTargetClusterVersion( func (m *migrationServer) BumpClusterVersion( ctx context.Context, req *serverpb.BumpClusterVersionRequest, ) (*serverpb.BumpClusterVersionResponse, error) { - ctx, span := m.server.AnnotateCtxWithSpan(ctx, "bump-cv") + ctx, span := m.server.AnnotateCtxWithSpan(ctx, "bump-cluster-version") defer span.Finish() - ctx = logtags.AddTag(ctx, "bump-cv", nil) + ctx = logtags.AddTag(ctx, "bump-cluster-version", nil) m.Lock() defer m.Unlock()