-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
56476: server: introduce the `Migration` service r=irfansharif a=irfansharif The upcoming migration manager (prototyped in #56107) will want to execute a few known RPCs on every node in the cluster. Part of being the "migration infrastructure", we also want authors of individual migrations to be able to define arbitrary node-level operations to execute on each node in the system. To this end we introduce a `Migration` service, and populate it with the two known RPCs the migration manager will want to depend on: - ValidateTargetClusterVersion: used to verify that the target node is running a binary that's able to support the given cluster version. - BumpClusterVersion: used to inform the target node about a (validated) cluster version bump. Both these RPCs are not currently wired up to anything, and BumpClusterVersion will be fleshed out just a tiny bit further in a future PR, but they'll both be used to propagate cluster version bumps across the crdb cluster through direct RPCs, supplanting our existing gossip based distribution mechanism. This will let the migration manager bump version gates in a more controlled fashion. See #56107 for what that will end up looking like, and see the long-running migrations RFC (#48843) for the motivation. Like we mentioned earlier, we expect this service to pick up more RPCs over time to service specific migrations. Release note: None --- Ignore the first four commits. They're from #56368 and #56474 Co-authored-by: irfan sharif <[email protected]>
- Loading branch information
Showing
9 changed files
with
1,255 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
// Copyright 2020 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package server | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/clusterversion" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvserver" | ||
"github.com/cockroachdb/cockroach/pkg/server/serverpb" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/syncutil" | ||
"github.com/cockroachdb/errors" | ||
"github.com/cockroachdb/redact" | ||
) | ||
|
||
// migrationServer is an implementation of the Migration service. The RPCs here | ||
// are used to power the migrations infrastructure in pkg/migrations. | ||
type migrationServer struct { | ||
server *Server | ||
|
||
// We use this mutex to serialize attempts to bump the cluster version. | ||
syncutil.Mutex | ||
} | ||
|
||
var _ serverpb.MigrationServer = &migrationServer{} | ||
|
||
// ValidateTargetClusterVersion implements the MigrationServer interface. | ||
// It's used to verify that we're running a binary that's able to support the | ||
// given cluster version. | ||
func (m *migrationServer) ValidateTargetClusterVersion( | ||
ctx context.Context, req *serverpb.ValidateTargetClusterVersionRequest, | ||
) (*serverpb.ValidateTargetClusterVersionResponse, error) { | ||
targetVersion := *req.Version | ||
versionSetting := m.server.ClusterSettings().Version | ||
|
||
// We're validating the following: | ||
// | ||
// node's minimum supported version <= target version <= node's binary version | ||
if targetVersion.Less(versionSetting.BinaryMinSupportedVersion()) { | ||
msg := fmt.Sprintf("target version %s less than binary's min supported version %s", | ||
targetVersion, versionSetting.BinaryMinSupportedVersion()) | ||
log.Warningf(ctx, "%s", msg) | ||
return nil, errors.Newf("%s", redact.Safe(msg)) | ||
} | ||
|
||
if versionSetting.BinaryVersion().Less(targetVersion) { | ||
msg := fmt.Sprintf("binary version %s less than target version %s", | ||
versionSetting.BinaryVersion(), targetVersion) | ||
log.Warningf(ctx, "%s", msg) | ||
return nil, errors.Newf("%s", redact.Safe(msg)) | ||
} | ||
|
||
resp := &serverpb.ValidateTargetClusterVersionResponse{} | ||
return resp, nil | ||
} | ||
|
||
// BumpClusterVersion implements the MigrationServer interface. It's used to | ||
// inform us of a cluster version bump. Here we're responsible for durably | ||
// persisting the cluster version and enabling the corresponding version gates. | ||
func (m *migrationServer) BumpClusterVersion( | ||
ctx context.Context, req *serverpb.BumpClusterVersionRequest, | ||
) (*serverpb.BumpClusterVersionResponse, error) { | ||
m.Lock() | ||
defer m.Unlock() | ||
|
||
versionSetting := m.server.ClusterSettings().Version | ||
prevCV, err := kvserver.SynthesizeClusterVersionFromEngines( | ||
ctx, m.server.engines, versionSetting.BinaryVersion(), | ||
versionSetting.BinaryMinSupportedVersion(), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
newCV := clusterversion.ClusterVersion{Version: *req.Version} | ||
|
||
if err := func() error { | ||
if !prevCV.Version.Less(*req.Version) { | ||
// Nothing to do. | ||
return nil | ||
} | ||
|
||
// TODO(irfansharif): We should probably capture this pattern of | ||
// "persist the cluster version first" and only then bump the | ||
// version setting in a better way. | ||
|
||
// Whenever the version changes, we want to persist that update to | ||
// wherever the CRDB process retrieved the initial version from | ||
// (typically a collection of storage.Engines). | ||
if err := kvserver.WriteClusterVersionToEngines(ctx, m.server.engines, newCV); err != nil { | ||
return err | ||
} | ||
|
||
// TODO(irfansharif): We'll eventually want to bump the local version | ||
// gate here. On 21.1 nodes we'll no longer be using gossip to propagate | ||
// cluster version bumps. We'll still have probably disseminate it | ||
// through gossip (do we actually have to?), but we won't listen to it. | ||
// | ||
// _ = s.server.ClusterSettings().<...>.SetActiveVersion(ctx, newCV) | ||
return nil | ||
}(); err != nil { | ||
return nil, err | ||
} | ||
|
||
resp := &serverpb.BumpClusterVersionResponse{} | ||
return resp, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
// Copyright 2020 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package server | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/base" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/server/serverpb" | ||
"github.com/cockroachdb/cockroach/pkg/settings/cluster" | ||
"github.com/cockroachdb/cockroach/pkg/testutils" | ||
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
) | ||
|
||
func TestValidateTargetClusterVersion(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
|
||
v := func(major, minor int32) roachpb.Version { | ||
return roachpb.Version{Major: major, Minor: minor} | ||
} | ||
|
||
var tests = []struct { | ||
binaryVersion roachpb.Version | ||
binaryMinSupportedVersion roachpb.Version | ||
targetVersion roachpb.Version | ||
expErrMatch string // empty if expecting a nil error | ||
}{ | ||
{ | ||
binaryVersion: v(20, 2), | ||
binaryMinSupportedVersion: v(20, 1), | ||
targetVersion: v(20, 1), | ||
expErrMatch: "", | ||
}, | ||
{ | ||
binaryVersion: v(20, 2), | ||
binaryMinSupportedVersion: v(20, 1), | ||
targetVersion: v(20, 2), | ||
expErrMatch: "", | ||
}, | ||
{ | ||
binaryVersion: v(20, 2), | ||
binaryMinSupportedVersion: v(20, 1), | ||
targetVersion: v(21, 1), | ||
expErrMatch: "binary version.*less than target version", | ||
}, | ||
{ | ||
binaryVersion: v(20, 2), | ||
binaryMinSupportedVersion: v(20, 1), | ||
targetVersion: v(19, 2), | ||
expErrMatch: "target version.*less than binary's min supported version", | ||
}, | ||
} | ||
|
||
// node's minimum supported version <= target version <= node's binary version | ||
|
||
for i, test := range tests { | ||
st := cluster.MakeTestingClusterSettingsWithVersions( | ||
test.binaryVersion, | ||
test.binaryMinSupportedVersion, | ||
false, /* initializeVersion */ | ||
) | ||
|
||
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ | ||
Settings: st, | ||
Knobs: base.TestingKnobs{ | ||
Server: &TestingKnobs{ | ||
BinaryVersionOverride: test.binaryVersion, | ||
}, | ||
}, | ||
}) | ||
|
||
migrationServer := s.MigrationServer().(*migrationServer) | ||
req := &serverpb.ValidateTargetClusterVersionRequest{ | ||
Version: &test.targetVersion, | ||
} | ||
_, err := migrationServer.ValidateTargetClusterVersion(context.Background(), req) | ||
if !testutils.IsError(err, test.expErrMatch) { | ||
t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) | ||
} | ||
|
||
s.Stopper().Stop(context.Background()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.