Skip to content

Commit

Permalink
migration: break down pkg across a few files
Browse files Browse the repository at this point in the history
We can separate out the `Helper`, `Migration`, and various utilities
into their own files. We'll add tests for individual components in
future commits; the physical separation here sets the foundation for
doing so (prototyped in cockroachdb#57445). This commit is purely code movement.

Release note: None
  • Loading branch information
irfansharif committed Dec 11, 2020
1 parent 5a53b00 commit fd6bd4f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 151 deletions.
1 change: 1 addition & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "migration",
srcs = [
"helper.go",
"manager.go",
"migrations.go",
"util.go",
Expand Down
121 changes: 121 additions & 0 deletions pkg/migration/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migration

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// Helper captures all the primitives required to fully specify a migration.
type Helper struct {
*Manager
}

// RequiredNodeIDs returns the node IDs for all nodes that are currently part of
// the cluster (i.e. they haven't been decommissioned away). Migrations have the
// pre-requisite that all required nodes are up and running so that we're able
// to execute all relevant node-level operations on them. If any of the nodes
// are found to be unavailable, an error is returned.
func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) {
var nodeIDs []roachpb.NodeID
ls, err := h.nl.GetLivenessesFromKV(ctx)
if err != nil {
return nil, err
}
for _, l := range ls {
if l.Membership.Decommissioned() {
continue
}
live, err := h.nl.IsLive(l.NodeID)
if err != nil {
return nil, err
}
if !live {
return nil, errors.Newf("n%d required, but unavailable", l.NodeID)
}
nodeIDs = append(nodeIDs, l.NodeID)
}
return nodeIDs, nil
}

// EveryNode invokes the given closure (named by the informational parameter op)
// across every node in the cluster. The mechanism for ensuring that we've done
// so, while accounting for the possibility of new nodes being added to the
// cluster in the interim, is provided by the following structure:
// (a) We'll retrieve the list of node IDs for all nodes in the system
// (b) For each node, we'll invoke the closure
// (c) We'll retrieve the list of node IDs again to account for the
// possibility of a new node being added during (b)
// (d) If there any discrepancies between the list retrieved in (a)
// and (c), we'll invoke the closure each node again
// (e) We'll continue to loop around until the node ID list stabilizes
//
// By the time EveryNode returns, we'll have thus invoked the closure against
// every node in the cluster.
//
// To consider one example of how this primitive is used, let's consider our use
// of it to bump the cluster version. After we return, given all nodes in the
// cluster will have their cluster versions bumped, and future node additions
// will observe the latest version (through the join RPC). This lets us author
// migrations that can assume that a certain version gate has been enabled on
// all nodes in the cluster, and will always be enabled for any new nodes in the
// system.
//
// It may be possible however that right after we return, a new node may join.
// This means that some migrations may have to be split up into two version
// bumps: one that phases out the old version (i.e. stops creation of stale data
// or behavior) and a clean-up version, which removes any vestiges of the stale
// data/behavior, and which, when active, ensures that the old data has vanished
// from the system. This is similar in spirit to how schema changes are split up
// into multiple smaller steps that are carried out sequentially.
func (h *Helper) EveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
) error {
nodeIDs, err := h.RequiredNodeIDs(ctx)
if err != nil {
return err
}

for {
// TODO(irfansharif): We can/should send out these RPCs in parallel.
log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs)
for _, nodeID := range nodeIDs {
conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
}
client := serverpb.NewMigrationClient(conn)
if err := fn(ctx, client); err != nil {
return err
}
}

curNodeIDs, err := h.RequiredNodeIDs(ctx)
if err != nil {
return err
}

if !identical(nodeIDs, curNodeIDs) {
nodeIDs = curNodeIDs
continue
}

break
}

return nil
}
150 changes: 0 additions & 150 deletions pkg/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

// Migration defines a program to be executed once every node in the cluster is
// (a) running a specific binary version, and (b) has completed all prior
// migrations.
//
// Each migration is associated with a specific internal cluster version and is
// idempotent in nature. When setting the cluster version (via `SET CLUSTER
// SETTING version`), a manager process determines the set of migrations needed
// to bridge the gap between the current active cluster version, and the target
// one.
//
// To introduce a migration, introduce a version key in pkg/clusterversion, and
// introduce a corresponding internal cluster version for it. See [1] for
// details. Following that, define a Migration in this package and add it to the
// Registry. Be sure to key it in with the new cluster version we just added.
// During cluster upgrades, once the operator is able to set a cluster version
// setting that's past the version that was introduced (typically the major
// release version the migration was introduced in), the manager will execute
// the defined migration before letting the upgrade finalize.
//
// [1]: pkg/clusterversion/cockroach_versions.go
type Migration func(context.Context, *Helper) error

// Manager is the instance responsible for executing migrations across the
// cluster.
type Manager struct {
Expand All @@ -69,106 +45,6 @@ type Manager struct {
db *kv.DB
}

// Helper captures all the primitives required to fully specify a migration.
type Helper struct {
*Manager
}

// RequiredNodeIDs returns the node IDs for all nodes that are currently part of
// the cluster (i.e. they haven't been decommissioned away). Migrations have the
// pre-requisite that all required nodes are up and running so that we're able
// to execute all relevant node-level operations on them. If any of the nodes
// are found to be unavailable, an error is returned.
func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) {
var nodeIDs []roachpb.NodeID
ls, err := h.nl.GetLivenessesFromKV(ctx)
if err != nil {
return nil, err
}
for _, l := range ls {
if l.Membership.Decommissioned() {
continue
}
live, err := h.nl.IsLive(l.NodeID)
if err != nil {
return nil, err
}
if !live {
return nil, errors.Newf("n%d required, but unavailable", l.NodeID)
}
nodeIDs = append(nodeIDs, l.NodeID)
}
return nodeIDs, nil
}

// EveryNode invokes the given closure (named by the informational parameter op)
// across every node in the cluster. The mechanism for ensuring that we've done
// so, while accounting for the possibility of new nodes being added to the
// cluster in the interim, is provided by the following structure:
// (a) We'll retrieve the list of node IDs for all nodes in the system
// (b) For each node, we'll invoke the closure
// (c) We'll retrieve the list of node IDs again to account for the
// possibility of a new node being added during (b)
// (d) If there any discrepancies between the list retrieved in (a)
// and (c), we'll invoke the closure each node again
// (e) We'll continue to loop around until the node ID list stabilizes
//
// By the time EveryNode returns, we'll have thus invoked the closure against
// every node in the cluster.
//
// To consider one example of how this primitive is used, let's consider our use
// of it to bump the cluster version. After we return, given all nodes in the
// cluster will have their cluster versions bumped, and future node additions
// will observe the latest version (through the join RPC). This lets us author
// migrations that can assume that a certain version gate has been enabled on
// all nodes in the cluster, and will always be enabled for any new nodes in the
// system.
//
// It may be possible however that right after we return, a new node may join.
// This means that some migrations may have to be split up into two version
// bumps: one that phases out the old version (i.e. stops creation of stale data
// or behavior) and a clean-up version, which removes any vestiges of the stale
// data/behavior, and which, when active, ensures that the old data has vanished
// from the system. This is similar in spirit to how schema changes are split up
// into multiple smaller steps that are carried out sequentially.
func (h *Helper) EveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
) error {
nodeIDs, err := h.RequiredNodeIDs(ctx)
if err != nil {
return err
}

for {
// TODO(irfansharif): We can/should send out these RPCs in parallel.
log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs)
for _, nodeID := range nodeIDs {
conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
}
client := serverpb.NewMigrationClient(conn)
if err := fn(ctx, client); err != nil {
return err
}
}

curNodeIDs, err := h.RequiredNodeIDs(ctx)
if err != nil {
return err
}

if !identical(nodeIDs, curNodeIDs) {
nodeIDs = curNodeIDs
continue
}

break
}

return nil
}

// nodeLiveness is the subset of the interface satisfied by CRDB's node liveness
// component that the migration manager relies upon.
type nodeLiveness interface {
Expand Down Expand Up @@ -313,29 +189,3 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe

return nil
}

// fenceVersionFor constructs the appropriate "fence version" for the given
// cluster version. Fence versions allow the migrations infrastructure to safely
// step through consecutive cluster versions in the presence of nodes (running
// any binary version) being added to the cluster. See the migration manager
// above for intended usage.
//
// Fence versions (and the migrations infrastructure entirely) were introduced
// in the 21.1 release cycle. In the same release cycle, we introduced the
// invariant that new user-defined versions (users being crdb engineers) must
// always have even-numbered Internal versions, thus reserving the odd numbers
// to slot in fence versions for each cluster version. See top-level
// documentation in pkg/clusterversion for more details.
func fenceVersionFor(
ctx context.Context, cv clusterversion.ClusterVersion,
) clusterversion.ClusterVersion {
if (cv.Internal % 2) != 0 {
log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version)
}

// We'll pick the odd internal version preceding the cluster version,
// slotting ourselves right before it.
fenceCV := cv
fenceCV.Internal--
return fenceCV
}
28 changes: 27 additions & 1 deletion pkg/migration/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@

package migration

import "github.com/cockroachdb/cockroach/pkg/clusterversion"
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
)

// Registry defines the global mapping between a cluster version, and the
// associated migration. The migration is only executed after a cluster-wide
Expand All @@ -23,3 +27,25 @@ func init() {
//
// Registry[clusterversion.ByKey(clusterversion.VersionWhatever)] = WhateverMigration
}

// Migration defines a program to be executed once every node in the cluster is
// (a) running a specific binary version, and (b) has completed all prior
// migrations.
//
// Each migration is associated with a specific internal cluster version and is
// idempotent in nature. When setting the cluster version (via `SET CLUSTER
// SETTING version`), a manager process determines the set of migrations needed
// to bridge the gap between the current active cluster version, and the target
// one.
//
// To introduce a migration, introduce a version key in pkg/clusterversion, and
// introduce a corresponding internal cluster version for it. See [1] for
// details. Following that, define a Migration in this package and add it to the
// Registry. Be sure to key it in with the new cluster version we just added.
// During cluster upgrades, once the operator is able to set a cluster version
// setting that's past the version that was introduced (typically the major
// release version the migration was introduced in), the manager will execute
// the defined migration before letting the upgrade finalize.
//
// [1]: pkg/clusterversion/cockroach_versions.go
type Migration func(context.Context, *Helper) error
29 changes: 29 additions & 0 deletions pkg/migration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
package migration

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// identical returns whether or not two lists of node IDs are identical as sets.
Expand All @@ -35,3 +38,29 @@ func identical(a, b []roachpb.NodeID) bool {
}
return true
}

// fenceVersionFor constructs the appropriate "fence version" for the given
// cluster version. Fence versions allow the migrations infrastructure to safely
// step through consecutive cluster versions in the presence of nodes (running
// any binary version) being added to the cluster. See the migration manager
// above for intended usage.
//
// Fence versions (and the migrations infrastructure entirely) were introduced
// in the 21.1 release cycle. In the same release cycle, we introduced the
// invariant that new user-defined versions (users being crdb engineers) must
// always have even-numbered Internal versions, thus reserving the odd numbers
// to slot in fence versions for each cluster version. See top-level
// documentation in pkg/clusterversion for more details.
func fenceVersionFor(
ctx context.Context, cv clusterversion.ClusterVersion,
) clusterversion.ClusterVersion {
if (cv.Internal % 2) != 0 {
log.Fatalf(ctx, "only even numbered internal versions allowed, found %s", cv.Version)
}

// We'll pick the odd internal version preceding the cluster version,
// slotting ourselves right before it.
fenceCV := cv
fenceCV.Internal--
return fenceCV
}

0 comments on commit fd6bd4f

Please sign in to comment.