Skip to content

Commit

Permalink
server: introduce the Migration service
Browse files Browse the repository at this point in the history
The upcoming migration manager (prototyped in cockroachdb#56107) will want to
execute a few known RPCs on every node in the cluster. Part of being the
"migration infrastructure", we also want authors of individual
migrations to be able to define arbitrary node-level operations to
execute on each node in the system.

To this end we introduce a `Migration` service, and populate it with the
two known RPCs the migration manager will want to depend on:
- ValidateTargetClusterVersion: used to verify that the target node is
  running a binary that's able to support the given cluster version.
- BumpClusterVersion: used to inform the target node about a (validated)
  cluster version bump.

Both these RPCs are not currently wired up to anything, and
BumpClusterVersion will be fleshed out just a tiny bit further in a
future PR, but they'll both be used to propagate cluster version bumps
across the crdb cluster through direct RPCs, supplanting our existing
gossip based distribution mechanism. This will let the migration manager
bump version gates in a more controlled fashion. See cockroachdb#56107 for what
that will end up looking like, and see the long-running migrations RFC
(cockroachdb#48843) for the motivation.

Like we mentioned earlier, we expect this service to pick up more RPCs
over time to service specific migrations.

Release note: None
  • Loading branch information
irfansharif committed Nov 10, 2020
1 parent 9a7a35c commit 33c2588
Show file tree
Hide file tree
Showing 9 changed files with 1,177 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"grpc_server.go",
"init.go",
"loopback.go",
"migration.go",
"node.go",
"node_engine_health.go",
"node_tombstone_storage.go",
Expand Down Expand Up @@ -228,6 +229,7 @@ go_test(
"graphite_test.go",
"intent_test.go",
"main_test.go",
"migration_test.go",
"multi_store_test.go",
"node_test.go",
"node_tombstone_storage_test.go",
Expand Down
117 changes: 117 additions & 0 deletions pkg/server/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// migrationServer is an implementation of the Migration service. The RPCs here
// are used to power the migrations infrastructure in pkg/migrations.
type migrationServer struct {
server *Server

// We used this mutex to serialize attempts to bump the cluster version.
syncutil.Mutex
}

var _ serverpb.MigrationServer = &migrationServer{}

// ValidateTargetClusterVersion implements the MigrationServer interface.
// It's used to verify that we're running a binary that's able to support the
// given cluster version.
func (m *migrationServer) ValidateTargetClusterVersion(
ctx context.Context, req *serverpb.ValidateTargetClusterVersionRequest,
) (*serverpb.ValidateTargetClusterVersionResponse, error) {
targetVersion := *req.Version
versionSetting := m.server.ClusterSettings().Version

// We're validating the following:
//
// node's minimum supported version <= target version <= node's binary version
if targetVersion.Less(versionSetting.BinaryMinSupportedVersion()) {
msg := fmt.Sprintf("target version %s less than binary's min supported version %s",
targetVersion, versionSetting.BinaryMinSupportedVersion())
log.Warningf(ctx, "%s", msg)
return nil, errors.Newf("%s", redact.Safe(msg))
}

if versionSetting.BinaryVersion().Less(targetVersion) {
msg := fmt.Sprintf("binary version %s less than target version %s",
versionSetting.BinaryVersion(), targetVersion)
log.Warningf(ctx, "%s", msg)
return nil, errors.Newf("%s", redact.Safe(msg))
}

resp := &serverpb.ValidateTargetClusterVersionResponse{}
return resp, nil
}

// BumpClusterVersion implements the MigrationServer interface. It's used to
// inform us a cluster version bump. Here we're responsible for durably
// persisting the cluster version and enabling the corresponding version gates.
func (m *migrationServer) BumpClusterVersion(
ctx context.Context, req *serverpb.BumpClusterVersionRequest,
) (*serverpb.BumpClusterVersionResponse, error) {
versionSetting := m.server.ClusterSettings().Version
prevCV, err := kvserver.SynthesizeClusterVersionFromEngines(
ctx, m.server.engines, versionSetting.BinaryVersion(),
versionSetting.BinaryMinSupportedVersion(),
)
if err != nil {
return nil, err
}

newCV := clusterversion.ClusterVersion{Version: *req.Version}

if err := func() error {
if !prevCV.Version.Less(*req.Version) {
// Nothing to do.
return nil
}

m.Lock()
defer m.Unlock()

// TODO(irfansharif): We should probably capture this pattern of
// "persist the cluster version first" and only then bump the
// version setting in a better way.

// Whenever the version changes, we want to persist that update to
// wherever the CRDB process retrieved the initial version from
// (typically a collection of storage.Engines).
if err := kvserver.WriteClusterVersionToEngines(ctx, m.server.engines, newCV); err != nil {
return err
}

// TODO(irfansharif): We'll eventually want to bump the local version
// gate here. On 21.1 nodes we'll no longer be using gossip to propagate
// cluster version bumps. We'll still have probably disseminate it
// through gossip (do we actually have to?), but we won't listen to it.
//
// _ = s.server.ClusterSettings().<...>.SetActiveVersion(ctx, newCV)
return nil
}(); err != nil {
return nil, err
}

resp := &serverpb.BumpClusterVersionResponse{}
return resp, nil
}
94 changes: 94 additions & 0 deletions pkg/server/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestValidateTargetClusterVersion(t *testing.T) {
defer leaktest.AfterTest(t)()

v := func(major, minor int32) roachpb.Version {
return roachpb.Version{Major: major, Minor: minor}
}

var tests = []struct {
binaryVersion roachpb.Version
binaryMinSupportedVersion roachpb.Version
targetVersion roachpb.Version
expErrMatch string // Empty if expecting a nil error.
}{
{
binaryVersion: v(20, 2),
binaryMinSupportedVersion: v(20, 1),
targetVersion: v(20, 1),
expErrMatch: "",
},
{
binaryVersion: v(20, 2),
binaryMinSupportedVersion: v(20, 1),
targetVersion: v(20, 2),
expErrMatch: "",
},
{
binaryVersion: v(20, 2),
binaryMinSupportedVersion: v(20, 1),
targetVersion: v(21, 1),
expErrMatch: "binary version.*less than target version",
},
{
binaryVersion: v(20, 2),
binaryMinSupportedVersion: v(20, 1),
targetVersion: v(19, 2),
expErrMatch: "target version.*less than binary's min supported version",
},
}

// node's minimum supported version <= target version <= node's binary version

for i, test := range tests {
st := cluster.MakeTestingClusterSettingsWithVersions(
test.binaryVersion,
test.binaryMinSupportedVersion,
false, /* initializeVersion */
)

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Server: &TestingKnobs{
BinaryVersionOverride: test.binaryVersion,
},
},
})

migrationServer := s.MigrationServer().(*migrationServer)
req := &serverpb.ValidateTargetClusterVersionRequest{
Version: &test.targetVersion,
}
_, err := migrationServer.ValidateTargetClusterVersion(context.Background(), req)
if !testutils.IsError(err, test.expErrMatch) {
t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch)
}

s.Stopper().Stop(context.Background())
}
}
22 changes: 14 additions & 8 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,15 @@ type Server struct {
recorder *status.MetricsRecorder
runtime *status.RuntimeStatSampler

admin *adminServer
status *statusServer
authentication *authenticationServer
oidc OIDC
tsDB *ts.DB
tsServer *ts.Server
raftTransport *kvserver.RaftTransport
stopper *stop.Stopper
admin *adminServer
status *statusServer
authentication *authenticationServer
migrationServer *migrationServer
oidc OIDC
tsDB *ts.DB
tsServer *ts.Server
raftTransport *kvserver.RaftTransport
stopper *stop.Stopper

debug *debug.Server

Expand Down Expand Up @@ -1218,6 +1219,11 @@ func (s *Server) PreStart(ctx context.Context) error {

serverpb.RegisterInitServer(s.grpc.Server, initServer)

// Register the Migration service, to power internal crdb migrations.
migrationServer := &migrationServer{server: s}
serverpb.RegisterMigrationServer(s.grpc.Server, migrationServer)
s.migrationServer = migrationServer // only for testing via TestServer

// Pebble does its own engine health checks, that call back into an event
// handler registered in storage/pebble.go when a slow disk event is
// detected. Starting a separate routine for Pebble is unnecessary.
Expand Down
1 change: 1 addition & 0 deletions pkg/server/serverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"authentication.pb.go",
"authentication.pb.gw.go",
"init.pb.go",
"migration.pb.go",
"status.go",
"status.pb.go",
"status.pb.gw.go",
Expand Down
Loading

0 comments on commit 33c2588

Please sign in to comment.