From 3ce0f356438d6a66f624a87772ff9343e277d530 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 9 Nov 2020 16:46:07 -0500 Subject: [PATCH] server: introduce the `Migration` service The upcoming migration manager (prototyped in #56107) will want to execute a few known RPCs on every node in the cluster. Part of being the "migration infrastructure", we also want authors of individual migrations to be able to define arbitrary node-level operations to execute on each node in the system. To this end we introduce a `Migration` service, and populate it with the two known RPCs the migration manager will want to depend on: - ValidateTargetClusterVersion: used to verify that the target node is running a binary that's able to support the given cluster version. - BumpClusterVersion: used to inform the target node about a (validated) cluster version bump. Both these RPCs are not currently wired up to anything, and BumpClusterVersion will be fleshed out just a tiny bit further in a future PR, but they'll both be used to propagate cluster version bumps across the crdb cluster through direct RPCs, supplanting our existing gossip based distribution mechanism. This will let the migration manager bump version gates in a more controlled fashion. See #56107 for what that will end up looking like, and see the long-running migrations RFC (#48843) for the motivation. Like we mentioned earlier, we expect this service to pick up more RPCs over time to service specific migrations. Release note: None --- pkg/server/BUILD.bazel | 2 + pkg/server/migration.go | 117 +++ pkg/server/migration_test.go | 94 ++ pkg/server/server.go | 22 +- pkg/server/serverpb/BUILD.bazel | 1 + pkg/server/serverpb/migration.pb.go | 929 ++++++++++++++++++ pkg/server/serverpb/migration.proto | 90 ++ pkg/server/testserver.go | 5 + pkg/testutils/serverutils/test_server_shim.go | 3 + 9 files changed, 1255 insertions(+), 8 deletions(-) create mode 100644 pkg/server/migration.go create mode 100644 pkg/server/migration_test.go create mode 100644 pkg/server/serverpb/migration.pb.go create mode 100644 pkg/server/serverpb/migration.proto diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 26f4d7feacad..093a9f8af2f2 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "grpc_server.go", "init.go", "loopback.go", + "migration.go", "node.go", "node_engine_health.go", "node_tombstone_storage.go", @@ -228,6 +229,7 @@ go_test( "graphite_test.go", "intent_test.go", "main_test.go", + "migration_test.go", "multi_store_test.go", "node_test.go", "node_tombstone_storage_test.go", diff --git a/pkg/server/migration.go b/pkg/server/migration.go new file mode 100644 index 000000000000..a28eb2d06400 --- /dev/null +++ b/pkg/server/migration.go @@ -0,0 +1,117 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// migrationServer is an implementation of the Migration service. The RPCs here +// are used to power the migrations infrastructure in pkg/migrations. +type migrationServer struct { + server *Server + + // We use this mutex to serialize attempts to bump the cluster version. + syncutil.Mutex +} + +var _ serverpb.MigrationServer = &migrationServer{} + +// ValidateTargetClusterVersion implements the MigrationServer interface. +// It's used to verify that we're running a binary that's able to support the +// given cluster version. +func (m *migrationServer) ValidateTargetClusterVersion( + ctx context.Context, req *serverpb.ValidateTargetClusterVersionRequest, +) (*serverpb.ValidateTargetClusterVersionResponse, error) { + targetVersion := *req.Version + versionSetting := m.server.ClusterSettings().Version + + // We're validating the following: + // + // node's minimum supported version <= target version <= node's binary version + if targetVersion.Less(versionSetting.BinaryMinSupportedVersion()) { + msg := fmt.Sprintf("target version %s less than binary's min supported version %s", + targetVersion, versionSetting.BinaryMinSupportedVersion()) + log.Warningf(ctx, "%s", msg) + return nil, errors.Newf("%s", redact.Safe(msg)) + } + + if versionSetting.BinaryVersion().Less(targetVersion) { + msg := fmt.Sprintf("binary version %s less than target version %s", + versionSetting.BinaryVersion(), targetVersion) + log.Warningf(ctx, "%s", msg) + return nil, errors.Newf("%s", redact.Safe(msg)) + } + + resp := &serverpb.ValidateTargetClusterVersionResponse{} + return resp, nil +} + +// BumpClusterVersion implements the MigrationServer interface. It's used to +// inform us of a cluster version bump. Here we're responsible for durably +// persisting the cluster version and enabling the corresponding version gates. +func (m *migrationServer) BumpClusterVersion( + ctx context.Context, req *serverpb.BumpClusterVersionRequest, +) (*serverpb.BumpClusterVersionResponse, error) { + m.Lock() + defer m.Unlock() + + versionSetting := m.server.ClusterSettings().Version + prevCV, err := kvserver.SynthesizeClusterVersionFromEngines( + ctx, m.server.engines, versionSetting.BinaryVersion(), + versionSetting.BinaryMinSupportedVersion(), + ) + if err != nil { + return nil, err + } + + newCV := clusterversion.ClusterVersion{Version: *req.Version} + + if err := func() error { + if !prevCV.Version.Less(*req.Version) { + // Nothing to do. + return nil + } + + // TODO(irfansharif): We should probably capture this pattern of + // "persist the cluster version first" and only then bump the + // version setting in a better way. + + // Whenever the version changes, we want to persist that update to + // wherever the CRDB process retrieved the initial version from + // (typically a collection of storage.Engines). + if err := kvserver.WriteClusterVersionToEngines(ctx, m.server.engines, newCV); err != nil { + return err + } + + // TODO(irfansharif): We'll eventually want to bump the local version + // gate here. On 21.1 nodes we'll no longer be using gossip to propagate + // cluster version bumps. We'll still have probably disseminate it + // through gossip (do we actually have to?), but we won't listen to it. + // + // _ = s.server.ClusterSettings().<...>.SetActiveVersion(ctx, newCV) + return nil + }(); err != nil { + return nil, err + } + + resp := &serverpb.BumpClusterVersionResponse{} + return resp, nil +} diff --git a/pkg/server/migration_test.go b/pkg/server/migration_test.go new file mode 100644 index 000000000000..e5643a1bb614 --- /dev/null +++ b/pkg/server/migration_test.go @@ -0,0 +1,94 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestValidateTargetClusterVersion(t *testing.T) { + defer leaktest.AfterTest(t)() + + v := func(major, minor int32) roachpb.Version { + return roachpb.Version{Major: major, Minor: minor} + } + + var tests = []struct { + binaryVersion roachpb.Version + binaryMinSupportedVersion roachpb.Version + targetVersion roachpb.Version + expErrMatch string // empty if expecting a nil error + }{ + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(20, 1), + expErrMatch: "", + }, + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(20, 2), + expErrMatch: "", + }, + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(21, 1), + expErrMatch: "binary version.*less than target version", + }, + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(19, 2), + expErrMatch: "target version.*less than binary's min supported version", + }, + } + + // node's minimum supported version <= target version <= node's binary version + + for i, test := range tests { + st := cluster.MakeTestingClusterSettingsWithVersions( + test.binaryVersion, + test.binaryMinSupportedVersion, + false, /* initializeVersion */ + ) + + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Server: &TestingKnobs{ + BinaryVersionOverride: test.binaryVersion, + }, + }, + }) + + migrationServer := s.MigrationServer().(*migrationServer) + req := &serverpb.ValidateTargetClusterVersionRequest{ + Version: &test.targetVersion, + } + _, err := migrationServer.ValidateTargetClusterVersion(context.Background(), req) + if !testutils.IsError(err, test.expErrMatch) { + t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) + } + + s.Stopper().Stop(context.Background()) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 44e0ff6fabec..560728fde284 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -139,14 +139,15 @@ type Server struct { recorder *status.MetricsRecorder runtime *status.RuntimeStatSampler - admin *adminServer - status *statusServer - authentication *authenticationServer - oidc OIDC - tsDB *ts.DB - tsServer *ts.Server - raftTransport *kvserver.RaftTransport - stopper *stop.Stopper + admin *adminServer + status *statusServer + authentication *authenticationServer + migrationServer *migrationServer + oidc OIDC + tsDB *ts.DB + tsServer *ts.Server + raftTransport *kvserver.RaftTransport + stopper *stop.Stopper debug *debug.Server @@ -1218,6 +1219,11 @@ func (s *Server) PreStart(ctx context.Context) error { serverpb.RegisterInitServer(s.grpc.Server, initServer) + // Register the Migration service, to power internal crdb migrations. + migrationServer := &migrationServer{server: s} + serverpb.RegisterMigrationServer(s.grpc.Server, migrationServer) + s.migrationServer = migrationServer // only for testing via TestServer + // Pebble does its own engine health checks, that call back into an event // handler registered in storage/pebble.go when a slow disk event is // detected. Starting a separate routine for Pebble is unnecessary. diff --git a/pkg/server/serverpb/BUILD.bazel b/pkg/server/serverpb/BUILD.bazel index b0ae8cc73f2a..48bce073fefb 100644 --- a/pkg/server/serverpb/BUILD.bazel +++ b/pkg/server/serverpb/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "authentication.pb.go", "authentication.pb.gw.go", "init.pb.go", + "migration.pb.go", "status.go", "status.pb.go", "status.pb.gw.go", diff --git a/pkg/server/serverpb/migration.pb.go b/pkg/server/serverpb/migration.pb.go new file mode 100644 index 000000000000..4bcdaab18c07 --- /dev/null +++ b/pkg/server/serverpb/migration.pb.go @@ -0,0 +1,929 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: server/serverpb/migration.proto + +package serverpb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + +import ( + context "context" + grpc "google.golang.org/grpc" +) + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// ValidateTargetClusterVersion is used to verify that the target node is +// running a binary that's able to support the specified cluster version. +type ValidateTargetClusterVersionRequest struct { + Version *roachpb.Version `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (m *ValidateTargetClusterVersionRequest) Reset() { *m = ValidateTargetClusterVersionRequest{} } +func (m *ValidateTargetClusterVersionRequest) String() string { return proto.CompactTextString(m) } +func (*ValidateTargetClusterVersionRequest) ProtoMessage() {} +func (*ValidateTargetClusterVersionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_37957468907239c5, []int{0} +} +func (m *ValidateTargetClusterVersionRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ValidateTargetClusterVersionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *ValidateTargetClusterVersionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ValidateTargetClusterVersionRequest.Merge(dst, src) +} +func (m *ValidateTargetClusterVersionRequest) XXX_Size() int { + return m.Size() +} +func (m *ValidateTargetClusterVersionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ValidateTargetClusterVersionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ValidateTargetClusterVersionRequest proto.InternalMessageInfo + +// ValidateTargetClusterVersionResponse is the response to a +// ValidateTargetClusterVersionRequest. +type ValidateTargetClusterVersionResponse struct { +} + +func (m *ValidateTargetClusterVersionResponse) Reset() { *m = ValidateTargetClusterVersionResponse{} } +func (m *ValidateTargetClusterVersionResponse) String() string { return proto.CompactTextString(m) } +func (*ValidateTargetClusterVersionResponse) ProtoMessage() {} +func (*ValidateTargetClusterVersionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_37957468907239c5, []int{1} +} +func (m *ValidateTargetClusterVersionResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ValidateTargetClusterVersionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *ValidateTargetClusterVersionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ValidateTargetClusterVersionResponse.Merge(dst, src) +} +func (m *ValidateTargetClusterVersionResponse) XXX_Size() int { + return m.Size() +} +func (m *ValidateTargetClusterVersionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ValidateTargetClusterVersionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ValidateTargetClusterVersionResponse proto.InternalMessageInfo + +// BumpClusterVersionRequest is used to inform a given node of a cluster version +// bump. +type BumpClusterVersionRequest struct { + Version *roachpb.Version `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (m *BumpClusterVersionRequest) Reset() { *m = BumpClusterVersionRequest{} } +func (m *BumpClusterVersionRequest) String() string { return proto.CompactTextString(m) } +func (*BumpClusterVersionRequest) ProtoMessage() {} +func (*BumpClusterVersionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_37957468907239c5, []int{2} +} +func (m *BumpClusterVersionRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BumpClusterVersionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *BumpClusterVersionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_BumpClusterVersionRequest.Merge(dst, src) +} +func (m *BumpClusterVersionRequest) XXX_Size() int { + return m.Size() +} +func (m *BumpClusterVersionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_BumpClusterVersionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_BumpClusterVersionRequest proto.InternalMessageInfo + +// BumpClusterVersionResponse is the response to an BumpClusterVersionRequest. +type BumpClusterVersionResponse struct { +} + +func (m *BumpClusterVersionResponse) Reset() { *m = BumpClusterVersionResponse{} } +func (m *BumpClusterVersionResponse) String() string { return proto.CompactTextString(m) } +func (*BumpClusterVersionResponse) ProtoMessage() {} +func (*BumpClusterVersionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_37957468907239c5, []int{3} +} +func (m *BumpClusterVersionResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BumpClusterVersionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *BumpClusterVersionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_BumpClusterVersionResponse.Merge(dst, src) +} +func (m *BumpClusterVersionResponse) XXX_Size() int { + return m.Size() +} +func (m *BumpClusterVersionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_BumpClusterVersionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_BumpClusterVersionResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*ValidateTargetClusterVersionRequest)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionRequest") + proto.RegisterType((*ValidateTargetClusterVersionResponse)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionResponse") + proto.RegisterType((*BumpClusterVersionRequest)(nil), "cockroach.server.serverpb.BumpClusterVersionRequest") + proto.RegisterType((*BumpClusterVersionResponse)(nil), "cockroach.server.serverpb.BumpClusterVersionResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// MigrationClient is the client API for Migration service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type MigrationClient interface { + // ValidateTargetClusterVersion is used to verify that the target node is + // running a binary that's able to support the specified cluster version. + // Specifically: + // + // node's minimum supported version <= version <= node's binary version + ValidateTargetClusterVersion(ctx context.Context, in *ValidateTargetClusterVersionRequest, opts ...grpc.CallOption) (*ValidateTargetClusterVersionResponse, error) + // BumpClusterVersion is used to inform a given node of a cluster version + // bump. The node is responsible for durably persisting the message and + // enabling the corresponding version gates. + // + // This RPC is typically used together with ValidateTargetClusterVersion, + // which checks to see that all nodes in the cluster are running binaries + // that would be able to support the intended version bump. + // + // The migrations infrastructure makes use of internal fence/noop-versions + // when stepping through consecutive versions. It's instructive to walk + // through how we expect a version migration from v21.1 to v21.2 to take + // place, and how we behave in the presence of new v21.1 or v21.2 nodes being + // added to the cluster during. + // - All nodes are running v21.1 + // - All nodes are rolled into v21.2 binaries, but with active cluster + // version still as v21.1 + // - The first version bump will be into v21.2.0-1noop + // - Validation for setting active cluster version to v21.2.0-1noop first + // checks to see that all nodes are running v21.2 binaries + // Then concurrently: + // - A new node is added to the cluster, but running binary v21.1 + // - We try bumping the cluster gates to v21.2.0-1noop + // + // If the v21.1 nodes manages to sneak in before the version bump, it's + // fine as the version bump is a no-op one. Any subsequent bumps (including + // the "actual" one bumping to v21.2.0) will fail during validation. + // + // If the v21.1 node is only added after v21.2.0-1noop is active, it won't + // be able to actually join the cluster (it'll be prevented by the join + // RPC). + // + // The general mechanism for bumping any cluster version across every node in + // the system goes through the following steps (the complexity here again + // arising from the possibility of new nodes being added during version + // upgrades): + // (a) We'll retrieve the list of node IDs for all nodes in the system + // (b) For each node, we'll bump the cluster version + // (c) We'll load the list of node IDs again to account for the possibility + // of a new node being added during (b). It's possible for this node to + // have joined the cluster by pointing to an existing node that hadn't + // yet seen the cluster version bump + // (d) If there any discrepancies between the node ID list retrieved in (a) + // and (c), we'll bump the cluster version for the newly found node IDs + // (e) We'll continue to loop around until the node ID list stabilizes + BumpClusterVersion(ctx context.Context, in *BumpClusterVersionRequest, opts ...grpc.CallOption) (*BumpClusterVersionResponse, error) +} + +type migrationClient struct { + cc *grpc.ClientConn +} + +func NewMigrationClient(cc *grpc.ClientConn) MigrationClient { + return &migrationClient{cc} +} + +func (c *migrationClient) ValidateTargetClusterVersion(ctx context.Context, in *ValidateTargetClusterVersionRequest, opts ...grpc.CallOption) (*ValidateTargetClusterVersionResponse, error) { + out := new(ValidateTargetClusterVersionResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/ValidateTargetClusterVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *migrationClient) BumpClusterVersion(ctx context.Context, in *BumpClusterVersionRequest, opts ...grpc.CallOption) (*BumpClusterVersionResponse, error) { + out := new(BumpClusterVersionResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/BumpClusterVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// MigrationServer is the server API for Migration service. +type MigrationServer interface { + // ValidateTargetClusterVersion is used to verify that the target node is + // running a binary that's able to support the specified cluster version. + // Specifically: + // + // node's minimum supported version <= version <= node's binary version + ValidateTargetClusterVersion(context.Context, *ValidateTargetClusterVersionRequest) (*ValidateTargetClusterVersionResponse, error) + // BumpClusterVersion is used to inform a given node of a cluster version + // bump. The node is responsible for durably persisting the message and + // enabling the corresponding version gates. + // + // This RPC is typically used together with ValidateTargetClusterVersion, + // which checks to see that all nodes in the cluster are running binaries + // that would be able to support the intended version bump. + // + // The migrations infrastructure makes use of internal fence/noop-versions + // when stepping through consecutive versions. It's instructive to walk + // through how we expect a version migration from v21.1 to v21.2 to take + // place, and how we behave in the presence of new v21.1 or v21.2 nodes being + // added to the cluster during. + // - All nodes are running v21.1 + // - All nodes are rolled into v21.2 binaries, but with active cluster + // version still as v21.1 + // - The first version bump will be into v21.2.0-1noop + // - Validation for setting active cluster version to v21.2.0-1noop first + // checks to see that all nodes are running v21.2 binaries + // Then concurrently: + // - A new node is added to the cluster, but running binary v21.1 + // - We try bumping the cluster gates to v21.2.0-1noop + // + // If the v21.1 nodes manages to sneak in before the version bump, it's + // fine as the version bump is a no-op one. Any subsequent bumps (including + // the "actual" one bumping to v21.2.0) will fail during validation. + // + // If the v21.1 node is only added after v21.2.0-1noop is active, it won't + // be able to actually join the cluster (it'll be prevented by the join + // RPC). + // + // The general mechanism for bumping any cluster version across every node in + // the system goes through the following steps (the complexity here again + // arising from the possibility of new nodes being added during version + // upgrades): + // (a) We'll retrieve the list of node IDs for all nodes in the system + // (b) For each node, we'll bump the cluster version + // (c) We'll load the list of node IDs again to account for the possibility + // of a new node being added during (b). It's possible for this node to + // have joined the cluster by pointing to an existing node that hadn't + // yet seen the cluster version bump + // (d) If there any discrepancies between the node ID list retrieved in (a) + // and (c), we'll bump the cluster version for the newly found node IDs + // (e) We'll continue to loop around until the node ID list stabilizes + BumpClusterVersion(context.Context, *BumpClusterVersionRequest) (*BumpClusterVersionResponse, error) +} + +func RegisterMigrationServer(s *grpc.Server, srv MigrationServer) { + s.RegisterService(&_Migration_serviceDesc, srv) +} + +func _Migration_ValidateTargetClusterVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ValidateTargetClusterVersionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).ValidateTargetClusterVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/ValidateTargetClusterVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).ValidateTargetClusterVersion(ctx, req.(*ValidateTargetClusterVersionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Migration_BumpClusterVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BumpClusterVersionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).BumpClusterVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/BumpClusterVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).BumpClusterVersion(ctx, req.(*BumpClusterVersionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Migration_serviceDesc = grpc.ServiceDesc{ + ServiceName: "cockroach.server.serverpb.Migration", + HandlerType: (*MigrationServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ValidateTargetClusterVersion", + Handler: _Migration_ValidateTargetClusterVersion_Handler, + }, + { + MethodName: "BumpClusterVersion", + Handler: _Migration_BumpClusterVersion_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "server/serverpb/migration.proto", +} + +func (m *ValidateTargetClusterVersionRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ValidateTargetClusterVersionRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Version != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintMigration(dAtA, i, uint64(m.Version.Size())) + n1, err := m.Version.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + return i, nil +} + +func (m *ValidateTargetClusterVersionResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ValidateTargetClusterVersionResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *BumpClusterVersionRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BumpClusterVersionRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Version != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintMigration(dAtA, i, uint64(m.Version.Size())) + n2, err := m.Version.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + return i, nil +} + +func (m *BumpClusterVersionResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BumpClusterVersionResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func encodeVarintMigration(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *ValidateTargetClusterVersionRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Version != nil { + l = m.Version.Size() + n += 1 + l + sovMigration(uint64(l)) + } + return n +} + +func (m *ValidateTargetClusterVersionResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *BumpClusterVersionRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Version != nil { + l = m.Version.Size() + n += 1 + l + sovMigration(uint64(l)) + } + return n +} + +func (m *BumpClusterVersionResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func sovMigration(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozMigration(x uint64) (n int) { + return sovMigration(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *ValidateTargetClusterVersionRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ValidateTargetClusterVersionRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ValidateTargetClusterVersionRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMigration + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Version == nil { + m.Version = &roachpb.Version{} + } + if err := m.Version.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ValidateTargetClusterVersionResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ValidateTargetClusterVersionResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ValidateTargetClusterVersionResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *BumpClusterVersionRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BumpClusterVersionRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BumpClusterVersionRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMigration + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Version == nil { + m.Version = &roachpb.Version{} + } + if err := m.Version.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *BumpClusterVersionResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BumpClusterVersionResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BumpClusterVersionResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMigration(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthMigration + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipMigration(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthMigration = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMigration = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("server/serverpb/migration.proto", fileDescriptor_migration_37957468907239c5) +} + +var fileDescriptor_migration_37957468907239c5 = []byte{ + // 279 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2f, 0x4e, 0x2d, 0x2a, + 0x4b, 0x2d, 0xd2, 0x87, 0x50, 0x05, 0x49, 0xfa, 0xb9, 0x99, 0xe9, 0x45, 0x89, 0x25, 0x99, 0xf9, + 0x79, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x92, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, + 0xc9, 0x19, 0x7a, 0x10, 0x35, 0x7a, 0x30, 0xa5, 0x52, 0x62, 0x60, 0x61, 0x90, 0x9e, 0xd4, 0x92, + 0xc4, 0x94, 0xc4, 0x92, 0x44, 0x88, 0x16, 0xa5, 0x68, 0x2e, 0xe5, 0xb0, 0xc4, 0x9c, 0xcc, 0x94, + 0xc4, 0x92, 0xd4, 0x90, 0xc4, 0xa2, 0xf4, 0xd4, 0x12, 0xe7, 0x9c, 0xd2, 0xe2, 0x92, 0xd4, 0xa2, + 0xb0, 0xd4, 0xa2, 0xe2, 0xcc, 0xfc, 0xbc, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x21, 0x13, + 0x2e, 0xf6, 0x32, 0x88, 0x88, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0xb7, 0x91, 0x94, 0x1e, 0xc2, 0x2e, + 0xa8, 0xd1, 0x7a, 0x30, 0x3d, 0x30, 0xa5, 0x4a, 0x6a, 0x5c, 0x2a, 0xf8, 0x0d, 0x2f, 0x2e, 0xc8, + 0xcf, 0x2b, 0x4e, 0x55, 0x0a, 0xe4, 0x92, 0x74, 0x2a, 0xcd, 0x2d, 0xa0, 0xa6, 0xd5, 0x32, 0x5c, + 0x52, 0xd8, 0x8c, 0x84, 0x58, 0x68, 0xb4, 0x95, 0x89, 0x8b, 0xd3, 0x17, 0x16, 0x78, 0x42, 0x0b, + 0x19, 0xb9, 0x64, 0xf0, 0xb9, 0x53, 0xc8, 0x4e, 0x0f, 0x67, 0xc0, 0xea, 0x11, 0x11, 0x7a, 0x52, + 0xf6, 0x64, 0xeb, 0x87, 0x06, 0x10, 0x83, 0x50, 0x33, 0x23, 0x97, 0x10, 0xa6, 0x87, 0x84, 0x4c, + 0xf0, 0x98, 0x8c, 0x33, 0x48, 0xa5, 0x4c, 0x49, 0xd4, 0x05, 0x73, 0x85, 0x93, 0xd6, 0x89, 0x87, + 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0x78, 0xe3, 0x91, 0x1c, 0xe3, 0x83, + 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, + 0x14, 0x07, 0xcc, 0xa0, 0x24, 0x36, 0x70, 0x02, 0x33, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xf8, + 0x76, 0x3a, 0x9f, 0xb6, 0x02, 0x00, 0x00, +} diff --git a/pkg/server/serverpb/migration.proto b/pkg/server/serverpb/migration.proto new file mode 100644 index 000000000000..590ed76a6a64 --- /dev/null +++ b/pkg/server/serverpb/migration.proto @@ -0,0 +1,90 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.server.serverpb; +option go_package = "serverpb"; + +import "roachpb/metadata.proto"; + +// ValidateTargetClusterVersion is used to verify that the target node is +// running a binary that's able to support the specified cluster version. +message ValidateTargetClusterVersionRequest { + roachpb.Version version = 1; +} + +// ValidateTargetClusterVersionResponse is the response to a +// ValidateTargetClusterVersionRequest. +message ValidateTargetClusterVersionResponse { +} + +// BumpClusterVersionRequest is used to inform a given node of a cluster version +// bump. +message BumpClusterVersionRequest { + roachpb.Version version = 1; +} + +// BumpClusterVersionResponse is the response to an BumpClusterVersionRequest. +message BumpClusterVersionResponse { } + +service Migration { + // ValidateTargetClusterVersion is used to verify that the target node is + // running a binary that's able to support the specified cluster version. + // Specifically: + // + // node's minimum supported version <= version <= node's binary version + rpc ValidateTargetClusterVersion(ValidateTargetClusterVersionRequest) returns (ValidateTargetClusterVersionResponse) { } + + // BumpClusterVersion is used to inform a given node of a cluster version + // bump. The node is responsible for durably persisting the message and + // enabling the corresponding version gates. + // + // This RPC is typically used together with ValidateTargetClusterVersion, + // which checks to see that all nodes in the cluster are running binaries + // that would be able to support the intended version bump. + // + // The migrations infrastructure makes use of internal fence/noop-versions + // when stepping through consecutive versions. It's instructive to walk + // through how we expect a version migration from v21.1 to v21.2 to take + // place, and how we behave in the presence of new v21.1 or v21.2 nodes being + // added to the cluster during. + // - All nodes are running v21.1 + // - All nodes are rolled into v21.2 binaries, but with active cluster + // version still as v21.1 + // - The first version bump will be into v21.2.0-1noop + // - Validation for setting active cluster version to v21.2.0-1noop first + // checks to see that all nodes are running v21.2 binaries + // Then concurrently: + // - A new node is added to the cluster, but running binary v21.1 + // - We try bumping the cluster gates to v21.2.0-1noop + // + // If the v21.1 nodes manages to sneak in before the version bump, it's + // fine as the version bump is a no-op one. Any subsequent bumps (including + // the "actual" one bumping to v21.2.0) will fail during validation. + // + // If the v21.1 node is only added after v21.2.0-1noop is active, it won't + // be able to actually join the cluster (it'll be prevented by the join + // RPC). + // + // The general mechanism for bumping any cluster version across every node in + // the system goes through the following steps (the complexity here again + // arising from the possibility of new nodes being added during version + // upgrades): + // (a) We'll retrieve the list of node IDs for all nodes in the system + // (b) For each node, we'll bump the cluster version + // (c) We'll load the list of node IDs again to account for the possibility + // of a new node being added during (b). It's possible for this node to + // have joined the cluster by pointing to an existing node that hadn't + // yet seen the cluster version bump + // (d) If there any discrepancies between the node ID list retrieved in (a) + // and (c), we'll bump the cluster version for the newly found node IDs + // (e) We'll continue to loop around until the node ID list stabilizes + rpc BumpClusterVersion(BumpClusterVersionRequest) returns (BumpClusterVersionResponse) { } +} diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 9c24172397b4..fb25d6d85cb5 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -1015,6 +1015,11 @@ func (ts *TestServer) DistSender() *kvcoord.DistSender { return ts.DistSenderI().(*kvcoord.DistSender) } +// MigrationServer is part of TestServerInterface. +func (ts *TestServer) MigrationServer() interface{} { + return ts.migrationServer +} + // SQLServer is part of TestServerInterface. func (ts *TestServer) SQLServer() interface{} { return ts.PGServer().SQLServer diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index e0738eb221c2..63ce87794818 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -100,6 +100,9 @@ type TestServerInterface interface { // The real return type is *kv.DistSender. DistSenderI() interface{} + // MigrationServer returns the internal *migrationServer as in interface{} + MigrationServer() interface{} + // SQLServer returns the *sql.Server as an interface{}. SQLServer() interface{}