diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 26f4d7feacad..093a9f8af2f2 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "grpc_server.go", "init.go", "loopback.go", + "migration.go", "node.go", "node_engine_health.go", "node_tombstone_storage.go", @@ -228,6 +229,7 @@ go_test( "graphite_test.go", "intent_test.go", "main_test.go", + "migration_test.go", "multi_store_test.go", "node_test.go", "node_tombstone_storage_test.go", diff --git a/pkg/server/migration.go b/pkg/server/migration.go new file mode 100644 index 000000000000..48c85eb30dfe --- /dev/null +++ b/pkg/server/migration.go @@ -0,0 +1,117 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// migrationServer is an implementation of the Migration service. The RPCs here +// are used to power the migrations infrastructure in pkg/migrations. +type migrationServer struct { + server *Server + + // We used this mutex to serialize attempts to bump the cluster version. + syncutil.Mutex +} + +var _ serverpb.MigrationServer = &migrationServer{} + +// ValidateTargetClusterVersion implements the MigrationServer interface. +// It's used to verify that we're running a binary that's able to support the +// given cluster version. +func (m *migrationServer) ValidateTargetClusterVersion( + ctx context.Context, req *serverpb.ValidateTargetClusterVersionRequest, +) (*serverpb.ValidateTargetClusterVersionResponse, error) { + targetVersion := *req.Version + versionSetting := m.server.ClusterSettings().Version + + // We're validating the following: + // + // node's minimum supported version <= target version <= node's binary version + if targetVersion.Less(versionSetting.BinaryMinSupportedVersion()) { + msg := fmt.Sprintf("target version %s less than binary's min supported version %s", + targetVersion, versionSetting.BinaryMinSupportedVersion()) + log.Warningf(ctx, "%s", msg) + return nil, errors.Newf("%s", redact.Safe(msg)) + } + + if versionSetting.BinaryVersion().Less(targetVersion) { + msg := fmt.Sprintf("binary version %s less than target version %s", + versionSetting.BinaryVersion(), targetVersion) + log.Warningf(ctx, "%s", msg) + return nil, errors.Newf("%s", redact.Safe(msg)) + } + + resp := &serverpb.ValidateTargetClusterVersionResponse{} + return resp, nil +} + +// BumpClusterVersion implements the MigrationServer interface. It's used to +// inform us a cluster version bump. Here we're responsible for durably +// persisting the cluster version and enabling the corresponding version gates. +func (m *migrationServer) BumpClusterVersion( + ctx context.Context, req *serverpb.BumpClusterVersionRequest, +) (*serverpb.BumpClusterVersionResponse, error) { + versionSetting := m.server.ClusterSettings().Version + prevCV, err := kvserver.SynthesizeClusterVersionFromEngines( + ctx, m.server.engines, versionSetting.BinaryVersion(), + versionSetting.BinaryMinSupportedVersion(), + ) + if err != nil { + return nil, err + } + + newCV := clusterversion.ClusterVersion{Version: *req.Version} + + if err := func() error { + if !prevCV.Version.Less(*req.Version) { + // Nothing to do. + return nil + } + + m.Lock() + defer m.Unlock() + + // TODO(irfansharif): We should probably capture this pattern of + // "persist the cluster version first" and only then bump the + // version setting in a better way. + + // Whenever the version changes, we want to persist that update to + // wherever the CRDB process retrieved the initial version from + // (typically a collection of storage.Engines). + if err := kvserver.WriteClusterVersionToEngines(ctx, m.server.engines, newCV); err != nil { + return err + } + + // TODO(irfansharif): We'll eventually want to bump the local version + // gate here. On 21.1 nodes we'll no longer be using gossip to propagate + // cluster version bumps. We'll still have probably disseminate it + // through gossip (do we actually have to?), but we won't listen to it. + // + // _ = s.server.ClusterSettings().<...>.SetActiveVersion(ctx, newCV) + return nil + }(); err != nil { + return nil, err + } + + resp := &serverpb.BumpClusterVersionResponse{} + return resp, nil +} diff --git a/pkg/server/migration_test.go b/pkg/server/migration_test.go new file mode 100644 index 000000000000..e555b05760c9 --- /dev/null +++ b/pkg/server/migration_test.go @@ -0,0 +1,94 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestValidateTargetClusterVersion(t *testing.T) { + defer leaktest.AfterTest(t)() + + v := func(major, minor int32) roachpb.Version { + return roachpb.Version{Major: major, Minor: minor} + } + + var tests = []struct { + binaryVersion roachpb.Version + binaryMinSupportedVersion roachpb.Version + targetVersion roachpb.Version + expErrMatch string // Empty if expecting a nil error. + }{ + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(20, 1), + expErrMatch: "", + }, + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(20, 2), + expErrMatch: "", + }, + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(21, 1), + expErrMatch: "binary version.*less than target version", + }, + { + binaryVersion: v(20, 2), + binaryMinSupportedVersion: v(20, 1), + targetVersion: v(19, 2), + expErrMatch: "target version.*less than binary's min supported version", + }, + } + + // node's minimum supported version <= target version <= node's binary version + + for i, test := range tests { + st := cluster.MakeTestingClusterSettingsWithVersions( + test.binaryVersion, + test.binaryMinSupportedVersion, + false, /* initializeVersion */ + ) + + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + Server: &TestingKnobs{ + BinaryVersionOverride: test.binaryVersion, + }, + }, + }) + + migrationServer := s.MigrationServer().(*migrationServer) + req := &serverpb.ValidateTargetClusterVersionRequest{ + Version: &test.targetVersion, + } + _, err := migrationServer.ValidateTargetClusterVersion(context.Background(), req) + if !testutils.IsError(err, test.expErrMatch) { + t.Fatalf("test %d: got error %s, wanted error matching '%s'", i, err, test.expErrMatch) + } + + s.Stopper().Stop(context.Background()) + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 701e0e4b3504..9e25a4db50cc 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -139,14 +139,15 @@ type Server struct { recorder *status.MetricsRecorder runtime *status.RuntimeStatSampler - admin *adminServer - status *statusServer - authentication *authenticationServer - oidc OIDC - tsDB *ts.DB - tsServer *ts.Server - raftTransport *kvserver.RaftTransport - stopper *stop.Stopper + admin *adminServer + status *statusServer + authentication *authenticationServer + migrationServer *migrationServer + oidc OIDC + tsDB *ts.DB + tsServer *ts.Server + raftTransport *kvserver.RaftTransport + stopper *stop.Stopper debug *debug.Server @@ -1218,6 +1219,11 @@ func (s *Server) PreStart(ctx context.Context) error { serverpb.RegisterInitServer(s.grpc.Server, initServer) + // Register the Migration service, to power internal crdb migrations. + migrationServer := &migrationServer{server: s} + serverpb.RegisterMigrationServer(s.grpc.Server, migrationServer) + s.migrationServer = migrationServer // only for testing via TestServer + // Pebble does its own engine health checks, that call back into an event // handler registered in storage/pebble.go when a slow disk event is // detected. Starting a separate routine for Pebble is unnecessary. diff --git a/pkg/server/serverpb/BUILD.bazel b/pkg/server/serverpb/BUILD.bazel index b0ae8cc73f2a..48bce073fefb 100644 --- a/pkg/server/serverpb/BUILD.bazel +++ b/pkg/server/serverpb/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "authentication.pb.go", "authentication.pb.gw.go", "init.pb.go", + "migration.pb.go", "status.go", "status.pb.go", "status.pb.gw.go", diff --git a/pkg/server/serverpb/migration.pb.go b/pkg/server/serverpb/migration.pb.go new file mode 100644 index 000000000000..3b5d1626098e --- /dev/null +++ b/pkg/server/serverpb/migration.pb.go @@ -0,0 +1,877 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: server/serverpb/migration.proto + +package serverpb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" + +import ( + context "context" + grpc "google.golang.org/grpc" +) + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// ValidateTargetClusterVersion is used to verify that the target node is +// running a binary that's able to support the specified cluster version. +type ValidateTargetClusterVersionRequest struct { + Version *roachpb.Version `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (m *ValidateTargetClusterVersionRequest) Reset() { *m = ValidateTargetClusterVersionRequest{} } +func (m *ValidateTargetClusterVersionRequest) String() string { return proto.CompactTextString(m) } +func (*ValidateTargetClusterVersionRequest) ProtoMessage() {} +func (*ValidateTargetClusterVersionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_c69e816c34ca9e47, []int{0} +} +func (m *ValidateTargetClusterVersionRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ValidateTargetClusterVersionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *ValidateTargetClusterVersionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ValidateTargetClusterVersionRequest.Merge(dst, src) +} +func (m *ValidateTargetClusterVersionRequest) XXX_Size() int { + return m.Size() +} +func (m *ValidateTargetClusterVersionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ValidateTargetClusterVersionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ValidateTargetClusterVersionRequest proto.InternalMessageInfo + +// ValidateTargetClusterVersionResponse is the response to a +// ValidateTargetClusterVersionRequest. +type ValidateTargetClusterVersionResponse struct { +} + +func (m *ValidateTargetClusterVersionResponse) Reset() { *m = ValidateTargetClusterVersionResponse{} } +func (m *ValidateTargetClusterVersionResponse) String() string { return proto.CompactTextString(m) } +func (*ValidateTargetClusterVersionResponse) ProtoMessage() {} +func (*ValidateTargetClusterVersionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_c69e816c34ca9e47, []int{1} +} +func (m *ValidateTargetClusterVersionResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ValidateTargetClusterVersionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *ValidateTargetClusterVersionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ValidateTargetClusterVersionResponse.Merge(dst, src) +} +func (m *ValidateTargetClusterVersionResponse) XXX_Size() int { + return m.Size() +} +func (m *ValidateTargetClusterVersionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ValidateTargetClusterVersionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ValidateTargetClusterVersionResponse proto.InternalMessageInfo + +// BumpClusterVersionRequest is used to inform a given node of a cluster version +// bump. +type BumpClusterVersionRequest struct { + Version *roachpb.Version `protobuf:"bytes,1,opt,name=version,proto3" json:"version,omitempty"` +} + +func (m *BumpClusterVersionRequest) Reset() { *m = BumpClusterVersionRequest{} } +func (m *BumpClusterVersionRequest) String() string { return proto.CompactTextString(m) } +func (*BumpClusterVersionRequest) ProtoMessage() {} +func (*BumpClusterVersionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_c69e816c34ca9e47, []int{2} +} +func (m *BumpClusterVersionRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BumpClusterVersionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *BumpClusterVersionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_BumpClusterVersionRequest.Merge(dst, src) +} +func (m *BumpClusterVersionRequest) XXX_Size() int { + return m.Size() +} +func (m *BumpClusterVersionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_BumpClusterVersionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_BumpClusterVersionRequest proto.InternalMessageInfo + +// BumpClusterVersionResponse is the response to an BumpClusterVersionRequest. +type BumpClusterVersionResponse struct { +} + +func (m *BumpClusterVersionResponse) Reset() { *m = BumpClusterVersionResponse{} } +func (m *BumpClusterVersionResponse) String() string { return proto.CompactTextString(m) } +func (*BumpClusterVersionResponse) ProtoMessage() {} +func (*BumpClusterVersionResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_c69e816c34ca9e47, []int{3} +} +func (m *BumpClusterVersionResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *BumpClusterVersionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *BumpClusterVersionResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_BumpClusterVersionResponse.Merge(dst, src) +} +func (m *BumpClusterVersionResponse) XXX_Size() int { + return m.Size() +} +func (m *BumpClusterVersionResponse) XXX_DiscardUnknown() { + xxx_messageInfo_BumpClusterVersionResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_BumpClusterVersionResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*ValidateTargetClusterVersionRequest)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionRequest") + proto.RegisterType((*ValidateTargetClusterVersionResponse)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionResponse") + proto.RegisterType((*BumpClusterVersionRequest)(nil), "cockroach.server.serverpb.BumpClusterVersionRequest") + proto.RegisterType((*BumpClusterVersionResponse)(nil), "cockroach.server.serverpb.BumpClusterVersionResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// MigrationClient is the client API for Migration service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type MigrationClient interface { + // ValidateTargetClusterVersion is used to verify that the target node is + // running a binary that's able to support the specified cluster version. + // Specifically: + // + // node's minimum supported version <= version <= node's binary version + ValidateTargetClusterVersion(ctx context.Context, in *ValidateTargetClusterVersionRequest, opts ...grpc.CallOption) (*ValidateTargetClusterVersionResponse, error) + // BumpClusterVersion is used to inform a given node of a cluster version + // bump. The node is responsible for durably persisting the message and + // enabling the corresponding version gates. + // + // This rpc is typically used together with ValidateTargetClusterVersion, + // which checks to see that all nodes in the cluster are running binaries + // that would be able to support the intended version bump. + // + // NB: We should note that this validation is a best-effort one. Albeit + // unlikely, it's possible for the following scenario to occur: + // - N nodes running v21.1 + // - N nodes rolled into v21.2 binaries, but with active cluster version + // still as v21.1 + // - Validation for setting active cluster version to v21.2 checks to see + // that all nodes are running v21.2 binaries + // - A new node is added to the cluster, but running binary v21.1 + // - We start bumping the cluster gates to v21.2, despite there now being + // a v21.1 node in the cluster. + BumpClusterVersion(ctx context.Context, in *BumpClusterVersionRequest, opts ...grpc.CallOption) (*BumpClusterVersionResponse, error) +} + +type migrationClient struct { + cc *grpc.ClientConn +} + +func NewMigrationClient(cc *grpc.ClientConn) MigrationClient { + return &migrationClient{cc} +} + +func (c *migrationClient) ValidateTargetClusterVersion(ctx context.Context, in *ValidateTargetClusterVersionRequest, opts ...grpc.CallOption) (*ValidateTargetClusterVersionResponse, error) { + out := new(ValidateTargetClusterVersionResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/ValidateTargetClusterVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *migrationClient) BumpClusterVersion(ctx context.Context, in *BumpClusterVersionRequest, opts ...grpc.CallOption) (*BumpClusterVersionResponse, error) { + out := new(BumpClusterVersionResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/BumpClusterVersion", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// MigrationServer is the server API for Migration service. +type MigrationServer interface { + // ValidateTargetClusterVersion is used to verify that the target node is + // running a binary that's able to support the specified cluster version. + // Specifically: + // + // node's minimum supported version <= version <= node's binary version + ValidateTargetClusterVersion(context.Context, *ValidateTargetClusterVersionRequest) (*ValidateTargetClusterVersionResponse, error) + // BumpClusterVersion is used to inform a given node of a cluster version + // bump. The node is responsible for durably persisting the message and + // enabling the corresponding version gates. + // + // This rpc is typically used together with ValidateTargetClusterVersion, + // which checks to see that all nodes in the cluster are running binaries + // that would be able to support the intended version bump. + // + // NB: We should note that this validation is a best-effort one. Albeit + // unlikely, it's possible for the following scenario to occur: + // - N nodes running v21.1 + // - N nodes rolled into v21.2 binaries, but with active cluster version + // still as v21.1 + // - Validation for setting active cluster version to v21.2 checks to see + // that all nodes are running v21.2 binaries + // - A new node is added to the cluster, but running binary v21.1 + // - We start bumping the cluster gates to v21.2, despite there now being + // a v21.1 node in the cluster. + BumpClusterVersion(context.Context, *BumpClusterVersionRequest) (*BumpClusterVersionResponse, error) +} + +func RegisterMigrationServer(s *grpc.Server, srv MigrationServer) { + s.RegisterService(&_Migration_serviceDesc, srv) +} + +func _Migration_ValidateTargetClusterVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ValidateTargetClusterVersionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).ValidateTargetClusterVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/ValidateTargetClusterVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).ValidateTargetClusterVersion(ctx, req.(*ValidateTargetClusterVersionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Migration_BumpClusterVersion_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BumpClusterVersionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).BumpClusterVersion(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/BumpClusterVersion", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).BumpClusterVersion(ctx, req.(*BumpClusterVersionRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Migration_serviceDesc = grpc.ServiceDesc{ + ServiceName: "cockroach.server.serverpb.Migration", + HandlerType: (*MigrationServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ValidateTargetClusterVersion", + Handler: _Migration_ValidateTargetClusterVersion_Handler, + }, + { + MethodName: "BumpClusterVersion", + Handler: _Migration_BumpClusterVersion_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "server/serverpb/migration.proto", +} + +func (m *ValidateTargetClusterVersionRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ValidateTargetClusterVersionRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Version != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintMigration(dAtA, i, uint64(m.Version.Size())) + n1, err := m.Version.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + return i, nil +} + +func (m *ValidateTargetClusterVersionResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ValidateTargetClusterVersionResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *BumpClusterVersionRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BumpClusterVersionRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Version != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintMigration(dAtA, i, uint64(m.Version.Size())) + n2, err := m.Version.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + return i, nil +} + +func (m *BumpClusterVersionResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BumpClusterVersionResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func encodeVarintMigration(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *ValidateTargetClusterVersionRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Version != nil { + l = m.Version.Size() + n += 1 + l + sovMigration(uint64(l)) + } + return n +} + +func (m *ValidateTargetClusterVersionResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *BumpClusterVersionRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Version != nil { + l = m.Version.Size() + n += 1 + l + sovMigration(uint64(l)) + } + return n +} + +func (m *BumpClusterVersionResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func sovMigration(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozMigration(x uint64) (n int) { + return sovMigration(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *ValidateTargetClusterVersionRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ValidateTargetClusterVersionRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ValidateTargetClusterVersionRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMigration + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Version == nil { + m.Version = &roachpb.Version{} + } + if err := m.Version.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ValidateTargetClusterVersionResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ValidateTargetClusterVersionResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ValidateTargetClusterVersionResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *BumpClusterVersionRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BumpClusterVersionRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BumpClusterVersionRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMigration + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Version == nil { + m.Version = &roachpb.Version{} + } + if err := m.Version.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *BumpClusterVersionResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BumpClusterVersionResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BumpClusterVersionResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMigration(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthMigration + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMigration + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipMigration(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthMigration = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMigration = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("server/serverpb/migration.proto", fileDescriptor_migration_c69e816c34ca9e47) +} + +var fileDescriptor_migration_c69e816c34ca9e47 = []byte{ + // 279 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2f, 0x4e, 0x2d, 0x2a, + 0x4b, 0x2d, 0xd2, 0x87, 0x50, 0x05, 0x49, 0xfa, 0xb9, 0x99, 0xe9, 0x45, 0x89, 0x25, 0x99, 0xf9, + 0x79, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x92, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, + 0xc9, 0x19, 0x7a, 0x10, 0x35, 0x7a, 0x30, 0xa5, 0x52, 0x62, 0x60, 0x61, 0x90, 0x9e, 0xd4, 0x92, + 0xc4, 0x94, 0xc4, 0x92, 0x44, 0x88, 0x16, 0xa5, 0x68, 0x2e, 0xe5, 0xb0, 0xc4, 0x9c, 0xcc, 0x94, + 0xc4, 0x92, 0xd4, 0x90, 0xc4, 0xa2, 0xf4, 0xd4, 0x12, 0xe7, 0x9c, 0xd2, 0xe2, 0x92, 0xd4, 0xa2, + 0xb0, 0xd4, 0xa2, 0xe2, 0xcc, 0xfc, 0xbc, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x21, 0x13, + 0x2e, 0xf6, 0x32, 0x88, 0x88, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0xb7, 0x91, 0x94, 0x1e, 0xc2, 0x2e, + 0xa8, 0xd1, 0x7a, 0x30, 0x3d, 0x30, 0xa5, 0x4a, 0x6a, 0x5c, 0x2a, 0xf8, 0x0d, 0x2f, 0x2e, 0xc8, + 0xcf, 0x2b, 0x4e, 0x55, 0x0a, 0xe4, 0x92, 0x74, 0x2a, 0xcd, 0x2d, 0xa0, 0xa6, 0xd5, 0x32, 0x5c, + 0x52, 0xd8, 0x8c, 0x84, 0x58, 0x68, 0xb4, 0x95, 0x89, 0x8b, 0xd3, 0x17, 0x16, 0x78, 0x42, 0x0b, + 0x19, 0xb9, 0x64, 0xf0, 0xb9, 0x53, 0xc8, 0x4e, 0x0f, 0x67, 0xc0, 0xea, 0x11, 0x11, 0x7a, 0x52, + 0xf6, 0x64, 0xeb, 0x87, 0x06, 0x10, 0x83, 0x50, 0x33, 0x23, 0x97, 0x10, 0xa6, 0x87, 0x84, 0x4c, + 0xf0, 0x98, 0x8c, 0x33, 0x48, 0xa5, 0x4c, 0x49, 0xd4, 0x05, 0x73, 0x85, 0x93, 0xd6, 0x89, 0x87, + 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0x78, 0xe3, 0x91, 0x1c, 0xe3, 0x83, + 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, + 0x14, 0x07, 0xcc, 0xa0, 0x24, 0x36, 0x70, 0x02, 0x33, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xf8, + 0x76, 0x3a, 0x9f, 0xb6, 0x02, 0x00, 0x00, +} diff --git a/pkg/server/serverpb/migration.proto b/pkg/server/serverpb/migration.proto new file mode 100644 index 000000000000..fdb4c58a72e4 --- /dev/null +++ b/pkg/server/serverpb/migration.proto @@ -0,0 +1,64 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.server.serverpb; +option go_package = "serverpb"; + +import "roachpb/metadata.proto"; + +// ValidateTargetClusterVersion is used to verify that the target node is +// running a binary that's able to support the specified cluster version. +message ValidateTargetClusterVersionRequest { + roachpb.Version version = 1; +} + +// ValidateTargetClusterVersionResponse is the response to a +// ValidateTargetClusterVersionRequest. +message ValidateTargetClusterVersionResponse { +} + +// BumpClusterVersionRequest is used to inform a given node of a cluster version +// bump. +message BumpClusterVersionRequest { + roachpb.Version version = 1; +} + +// BumpClusterVersionResponse is the response to an BumpClusterVersionRequest. +message BumpClusterVersionResponse { } + +service Migration { + // ValidateTargetClusterVersion is used to verify that the target node is + // running a binary that's able to support the specified cluster version. + // Specifically: + // + // node's minimum supported version <= version <= node's binary version + rpc ValidateTargetClusterVersion(ValidateTargetClusterVersionRequest) returns (ValidateTargetClusterVersionResponse) { } + + // BumpClusterVersion is used to inform a given node of a cluster version + // bump. The node is responsible for durably persisting the message and + // enabling the corresponding version gates. + // + // This rpc is typically used together with ValidateTargetClusterVersion, + // which checks to see that all nodes in the cluster are running binaries + // that would be able to support the intended version bump. + // + // NB: We should note that this validation is a best-effort one. Albeit + // unlikely, it's possible for the following scenario to occur: + // - N nodes running v21.1 + // - N nodes rolled into v21.2 binaries, but with active cluster version + // still as v21.1 + // - Validation for setting active cluster version to v21.2 checks to see + // that all nodes are running v21.2 binaries + // - A new node is added to the cluster, but running binary v21.1 + // - We start bumping the cluster gates to v21.2, despite there now being + // a v21.1 node in the cluster. + rpc BumpClusterVersion(BumpClusterVersionRequest) returns (BumpClusterVersionResponse) { } +} diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 9c24172397b4..fb25d6d85cb5 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -1015,6 +1015,11 @@ func (ts *TestServer) DistSender() *kvcoord.DistSender { return ts.DistSenderI().(*kvcoord.DistSender) } +// MigrationServer is part of TestServerInterface. +func (ts *TestServer) MigrationServer() interface{} { + return ts.migrationServer +} + // SQLServer is part of TestServerInterface. func (ts *TestServer) SQLServer() interface{} { return ts.PGServer().SQLServer diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index e0738eb221c2..63ce87794818 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -100,6 +100,9 @@ type TestServerInterface interface { // The real return type is *kv.DistSender. DistSenderI() interface{} + // MigrationServer returns the internal *migrationServer as in interface{} + MigrationServer() interface{} + // SQLServer returns the *sql.Server as an interface{}. SQLServer() interface{}