From 7381c2ac94fb1060e46f34269797ec0dc3ac6f3e Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 22 Jan 2020 13:32:03 -0800 Subject: [PATCH] [WIP] migrations: introduce new system which supports long-running migrations This subsumes the cluster version cluster setting. There is a large overlap with pkg/sqlmigrations, which should be cleaned up in the future. With the new system a CockroachDB binary upgrade will work as follows: - If the customer wants the upgrade to auto-finalize, they leave the `cluster.preserve_downgrade_option` cluster setting to its default value. (same as before) - Each node is rolled one-by-one onto the new binary, waiting for it to health check and for latencies to stabilize before moving onto the next node. (same as before) - If the `cluster.preserve_downgrade_option` option is used, the cluster can be rolled back to the previous binary, but (some) new features are not available. Once the cluster stabilizes, the customer can commit to the new version to unlock these features. - To commit to the new version, the user runs: (same as before) `SET CLUSTER SETTING version = 'v20.1.0'` or `RESET CLUSTER SETTING cluster.preserve_downgrade_option` - [NEW] Before, the new features were available quickly, but now there may be a delay. To wait for the features to be available, the user polls: `SHOW UPGRADE STATUS` - [NEW] If a user quickly upgrades from major version X to major version X+1 and then onto major version X+2, they must wait for the X+1 upgrade to finish finalization before rolling any nodes onto X+2. If an X+2 node connects to this cluster before the upgrade finalization is done, it will crash (which hopefully will be noticed by the deployment tooling). This is unfortunate but a necessary trade off in allowing us to reduce our technical debt. Closes #39182 Release note (): --- pkg/migration/connect/connect.go | 31 + pkg/migration/connect/connect.pb.go | 570 ++++++++++++++++++ pkg/migration/connect/connect.proto | 32 + pkg/migration/everynode/everynode.go | 88 +++ .../fflag}/cluster_version.go | 2 +- .../fflag}/cluster_version.pb.go | 40 +- .../fflag}/cluster_version.proto | 2 +- pkg/migration/fflag/fflag.go | 120 ++++ pkg/migration/migration.go | 77 +++ pkg/server/server.go | 18 + pkg/settings/cluster/settings.go | 12 +- pkg/sql/alter_table.go | 3 +- pkg/sql/create_table.go | 5 +- pkg/sql/sqlbase/metadata.go | 3 +- 14 files changed, 975 insertions(+), 28 deletions(-) create mode 100644 pkg/migration/connect/connect.go create mode 100644 pkg/migration/connect/connect.pb.go create mode 100644 pkg/migration/connect/connect.proto create mode 100644 pkg/migration/everynode/everynode.go rename pkg/{settings/cluster => migration/fflag}/cluster_version.go (98%) rename pkg/{settings/cluster => migration/fflag}/cluster_version.pb.go (81%) rename pkg/{settings/cluster => migration/fflag}/cluster_version.proto (96%) create mode 100644 pkg/migration/fflag/fflag.go create mode 100644 pkg/migration/migration.go diff --git a/pkg/migration/connect/connect.go b/pkg/migration/connect/connect.go new file mode 100644 index 000000000000..0382b5a0d169 --- /dev/null +++ b/pkg/migration/connect/connect.go @@ -0,0 +1,31 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package connect provides a no-dependency mechanism for a new or restarting +// node to discover the cluster information it needs to boot. +package connect + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" +) + +// Connect retrieves information that is necessary to boot a node joining an +// existing cluster. This information comes via a low-dependency rpc to an +// existing node in the cluster. +func Connect(ctx context.Context, joinList base.JoinListType) (*ConnectResponse, error) { + panic(`WIP`) + // TODO(dan): This is using the joinList to keep the dependencies minimal, but + // gossip is doing all sorts of smarts with this before actually using it + // (filtering out self, etc). On one hand it'd be nice to keep this from + // depending on gossip (so the response could be _used_ by gossip), but it + // would be unfortuante to duplicate this initial work. +} diff --git a/pkg/migration/connect/connect.pb.go b/pkg/migration/connect/connect.pb.go new file mode 100644 index 000000000000..fb1a2bbfb8b3 --- /dev/null +++ b/pkg/migration/connect/connect.pb.go @@ -0,0 +1,570 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: migration/connect/connect.proto + +package connect + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import fflag "github.com/cockroachdb/cockroach/pkg/migration/fflag" + +import ( + context "context" + grpc "google.golang.org/grpc" +) + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type ConnectRequest struct { +} + +func (m *ConnectRequest) Reset() { *m = ConnectRequest{} } +func (m *ConnectRequest) String() string { return proto.CompactTextString(m) } +func (*ConnectRequest) ProtoMessage() {} +func (*ConnectRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_connect_93338a9671869cb4, []int{0} +} +func (m *ConnectRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ConnectRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *ConnectRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConnectRequest.Merge(dst, src) +} +func (m *ConnectRequest) XXX_Size() int { + return m.Size() +} +func (m *ConnectRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ConnectRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ConnectRequest proto.InternalMessageInfo + +type ConnectResponse struct { + // BootVersion is the initial feature version to use on a booting node. + BootVersion fflag.ClusterVersion `protobuf:"bytes,1,opt,name=boot_version,json=bootVersion,proto3" json:"boot_version"` + // EverynodeHooks is the set of everynode.Hooks that must be run before this + // node can serve traffic. + EveryNodeHooks []string `protobuf:"bytes,2,rep,name=every_node_hooks,json=everyNodeHooks,proto3" json:"every_node_hooks,omitempty"` +} + +func (m *ConnectResponse) Reset() { *m = ConnectResponse{} } +func (m *ConnectResponse) String() string { return proto.CompactTextString(m) } +func (*ConnectResponse) ProtoMessage() {} +func (*ConnectResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_connect_93338a9671869cb4, []int{1} +} +func (m *ConnectResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ConnectResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *ConnectResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConnectResponse.Merge(dst, src) +} +func (m *ConnectResponse) XXX_Size() int { + return m.Size() +} +func (m *ConnectResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ConnectResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ConnectResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*ConnectRequest)(nil), "cockroach.connect.ConnectRequest") + proto.RegisterType((*ConnectResponse)(nil), "cockroach.connect.ConnectResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// ConnectClient is the client API for Connect service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ConnectClient interface { + Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (*ConnectResponse, error) +} + +type connectClient struct { + cc *grpc.ClientConn +} + +func NewConnectClient(cc *grpc.ClientConn) ConnectClient { + return &connectClient{cc} +} + +func (c *connectClient) Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (*ConnectResponse, error) { + out := new(ConnectResponse) + err := c.cc.Invoke(ctx, "/cockroach.connect.Connect/Connect", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ConnectServer is the server API for Connect service. +type ConnectServer interface { + Connect(context.Context, *ConnectRequest) (*ConnectResponse, error) +} + +func RegisterConnectServer(s *grpc.Server, srv ConnectServer) { + s.RegisterService(&_Connect_serviceDesc, srv) +} + +func _Connect_Connect_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConnectRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ConnectServer).Connect(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.connect.Connect/Connect", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ConnectServer).Connect(ctx, req.(*ConnectRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Connect_serviceDesc = grpc.ServiceDesc{ + ServiceName: "cockroach.connect.Connect", + HandlerType: (*ConnectServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Connect", + Handler: _Connect_Connect_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "migration/connect/connect.proto", +} + +func (m *ConnectRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ConnectRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *ConnectResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ConnectResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintConnect(dAtA, i, uint64(m.BootVersion.Size())) + n1, err := m.BootVersion.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + if len(m.EveryNodeHooks) > 0 { + for _, s := range m.EveryNodeHooks { + dAtA[i] = 0x12 + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func encodeVarintConnect(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *ConnectRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *ConnectResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = m.BootVersion.Size() + n += 1 + l + sovConnect(uint64(l)) + if len(m.EveryNodeHooks) > 0 { + for _, s := range m.EveryNodeHooks { + l = len(s) + n += 1 + l + sovConnect(uint64(l)) + } + } + return n +} + +func sovConnect(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozConnect(x uint64) (n int) { + return sovConnect(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *ConnectRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnect + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ConnectRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ConnectRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipConnect(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthConnect + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ConnectResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnect + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ConnectResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ConnectResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field BootVersion", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnect + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthConnect + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.BootVersion.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EveryNodeHooks", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConnect + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthConnect + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EveryNodeHooks = append(m.EveryNodeHooks, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipConnect(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthConnect + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipConnect(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowConnect + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowConnect + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowConnect + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthConnect + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowConnect + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipConnect(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthConnect = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowConnect = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("migration/connect/connect.proto", fileDescriptor_connect_93338a9671869cb4) +} + +var fileDescriptor_connect_93338a9671869cb4 = []byte{ + // 284 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0xcf, 0xcd, 0x4c, 0x2f, + 0x4a, 0x2c, 0xc9, 0xcc, 0xcf, 0xd3, 0x4f, 0xce, 0xcf, 0xcb, 0x4b, 0x4d, 0x2e, 0x81, 0xd1, 0x7a, + 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x82, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, 0xc9, 0x19, + 0x7a, 0x50, 0x09, 0x29, 0x91, 0xf4, 0xfc, 0xf4, 0x7c, 0xb0, 0xac, 0x3e, 0x88, 0x05, 0x51, 0x28, + 0xa5, 0x8a, 0x30, 0x29, 0x2d, 0x2d, 0x27, 0x31, 0x5d, 0x3f, 0x39, 0xa7, 0xb4, 0xb8, 0x24, 0xb5, + 0x28, 0xbe, 0x2c, 0xb5, 0xa8, 0x38, 0x33, 0x3f, 0x0f, 0xa2, 0x4c, 0x49, 0x80, 0x8b, 0xcf, 0x19, + 0x62, 0x4e, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x52, 0x0b, 0x23, 0x17, 0x3f, 0x5c, 0xa8, + 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x55, 0xc8, 0x9d, 0x8b, 0x27, 0x29, 0x3f, 0xbf, 0x04, 0xa6, 0x57, + 0x82, 0x51, 0x81, 0x51, 0x83, 0xdb, 0x48, 0x4e, 0x0f, 0xe1, 0x98, 0xa4, 0xc4, 0xe2, 0x54, 0x3d, + 0x67, 0x88, 0x15, 0x61, 0x10, 0x55, 0x4e, 0x2c, 0x27, 0xee, 0xc9, 0x33, 0x04, 0x71, 0x83, 0x74, + 0x42, 0x85, 0x84, 0x34, 0xb8, 0x04, 0x52, 0xcb, 0x52, 0x8b, 0x2a, 0xe3, 0xf3, 0xf2, 0x53, 0x52, + 0xe3, 0x33, 0xf2, 0xf3, 0xb3, 0x8b, 0x25, 0x98, 0x14, 0x98, 0x35, 0x38, 0x83, 0xf8, 0xc0, 0xe2, + 0x7e, 0xf9, 0x29, 0xa9, 0x1e, 0x20, 0x51, 0xa3, 0x58, 0x2e, 0x76, 0xa8, 0x2b, 0x84, 0x82, 0x10, + 0x4c, 0x45, 0x3d, 0x0c, 0xff, 0xeb, 0xa1, 0xba, 0x5f, 0x4a, 0x09, 0x9f, 0x12, 0x88, 0x7f, 0x94, + 0x18, 0x9c, 0x34, 0x4f, 0x3c, 0x94, 0x63, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, + 0x1b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, + 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0xa1, 0x9a, 0x93, 0xd8, 0xc0, 0x21, 0x65, 0x0c, 0x08, + 0x00, 0x00, 0xff, 0xff, 0x43, 0xea, 0x72, 0x5e, 0x9c, 0x01, 0x00, 0x00, +} diff --git a/pkg/migration/connect/connect.proto b/pkg/migration/connect/connect.proto new file mode 100644 index 000000000000..28c30ba5be52 --- /dev/null +++ b/pkg/migration/connect/connect.proto @@ -0,0 +1,32 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.connect; +option go_package = "connect"; + +import "gogoproto/gogo.proto"; +import "migration/fflag/cluster_version.proto"; + +message ConnectRequest { + // TODO(dan): Add an option to request a node id as described in #32574. +} + +message ConnectResponse { + // BootVersion is the initial feature version to use on a booting node. + cockroach.base.ClusterVersion boot_version = 1 [(gogoproto.nullable) = false]; + // EverynodeHooks is the set of everynode.Hooks that must be run before this + // node can serve traffic. + repeated string every_node_hooks = 2; +} + +service Connect { + rpc Connect (ConnectRequest) returns (ConnectResponse) {} +} diff --git a/pkg/migration/everynode/everynode.go b/pkg/migration/everynode/everynode.go new file mode 100644 index 000000000000..23a1f7bbf56d --- /dev/null +++ b/pkg/migration/everynode/everynode.go @@ -0,0 +1,88 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package everynode provides a mechanism for running an idempotent closure on +// each node in the cluster during upgrades. +package everynode + +import ( + "context" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// Node is a minimal subset of server.Node. +type Node interface{} + +// Hook WIP +type Hook struct { + // Name is a unique name for this Hook. + Name string + + // RunFn is an idempotent function. + RunFn func(context.Context, Node) error +} + +var registry struct { + mu struct { + syncutil.Mutex + hooks map[string]Hook + } +} + +// Register is called in an init func to add a hook implementation for use with +// RunHookOnEveryNode. +func Register(h Hook) { + registry.mu.Lock() + defer registry.mu.Unlock() + if registry.mu.hooks == nil { + registry.mu.hooks = make(map[string]Hook) + } + if _, ok := registry.mu.hooks[h.Name]; ok { + panic(errors.AssertionFailedf(`multiple hooks with name: %s`, h.Name)) + } + registry.mu.hooks[h.Name] = h +} + +// RunHooksOnThisNode executes the requested closure on this node. +func RunHooksOnThisNode(ctx context.Context, node Node, names ...string) error { + for _, name := range names { + registry.mu.Lock() + hook, ok := registry.mu.hooks[name] + registry.mu.Unlock() + if !ok { + return errors.Errorf(`could not find hook with name: %s`, name) + } + if err := hook.RunFn(ctx, node); err != nil { + return errors.Wrapf(err, `running hook: %s`, name) + } + } + return nil +} + +// RunHookOnEveryNode executes the requested hook closure on each node in the +// cluster and each node added to the cluster in the future. If no error is +// returned, the caller is guaranteed that no node will ever again serve SQL or +// KV traffic without first having run this hook. +// +// This is used during cluster version upgrades and requires that each node in +// the cluster be available. It blocks until each node has been contacted via +// RPC and returned a successful response. +// +// WIP how do we communicate to the user if this is blocking while a node is +// down? +// +// WIP what about nodes that are decommissioned and later re-added? Is this even +// legal or will they get a new node id? +func RunHookOnEveryNode(ctx context.Context, name string) error { + panic(`WIP`) + // Edge cases to handle: + // - A node is added while this is running +} diff --git a/pkg/settings/cluster/cluster_version.go b/pkg/migration/fflag/cluster_version.go similarity index 98% rename from pkg/settings/cluster/cluster_version.go rename to pkg/migration/fflag/cluster_version.go index a726f42f9709..ef7da1064f73 100644 --- a/pkg/settings/cluster/cluster_version.go +++ b/pkg/migration/fflag/cluster_version.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package cluster +package fflag import "github.com/cockroachdb/cockroach/pkg/roachpb" diff --git a/pkg/settings/cluster/cluster_version.pb.go b/pkg/migration/fflag/cluster_version.pb.go similarity index 81% rename from pkg/settings/cluster/cluster_version.pb.go rename to pkg/migration/fflag/cluster_version.pb.go index 1cdb2658d2fe..eaff07982f2c 100644 --- a/pkg/settings/cluster/cluster_version.pb.go +++ b/pkg/migration/fflag/cluster_version.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. -// source: settings/cluster/cluster_version.proto +// source: migration/fflag/cluster_version.proto -package cluster +package fflag import proto "github.com/gogo/protobuf/proto" import fmt "fmt" @@ -33,7 +33,7 @@ type ClusterVersion struct { func (m *ClusterVersion) Reset() { *m = ClusterVersion{} } func (*ClusterVersion) ProtoMessage() {} func (*ClusterVersion) Descriptor() ([]byte, []int) { - return fileDescriptor_cluster_version_6bb8528e3c56dcd8, []int{0} + return fileDescriptor_cluster_version_bf5a55c481c1f0be, []int{0} } func (m *ClusterVersion) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -306,23 +306,23 @@ var ( ) func init() { - proto.RegisterFile("settings/cluster/cluster_version.proto", fileDescriptor_cluster_version_6bb8528e3c56dcd8) + proto.RegisterFile("migration/fflag/cluster_version.proto", fileDescriptor_cluster_version_bf5a55c481c1f0be) } -var fileDescriptor_cluster_version_6bb8528e3c56dcd8 = []byte{ - // 216 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0x2b, 0x4e, 0x2d, 0x29, - 0xc9, 0xcc, 0x4b, 0x2f, 0xd6, 0x4f, 0xce, 0x29, 0x2d, 0x2e, 0x49, 0x2d, 0x82, 0xd1, 0xf1, 0x65, - 0xa9, 0x45, 0xc5, 0x99, 0xf9, 0x79, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x7c, 0xc9, 0xf9, - 0xc9, 0xd9, 0x45, 0xf9, 0x89, 0xc9, 0x19, 0x7a, 0x49, 0x89, 0xc5, 0xa9, 0x52, 0x62, 0x60, 0x76, - 0x41, 0x92, 0x7e, 0x6e, 0x6a, 0x49, 0x62, 0x4a, 0x62, 0x49, 0x22, 0x44, 0x9d, 0x94, 0x48, 0x7a, - 0x7e, 0x7a, 0x3e, 0x98, 0xa9, 0x0f, 0x62, 0x41, 0x44, 0x95, 0x32, 0xb9, 0xf8, 0x9c, 0x21, 0xc6, - 0x86, 0x41, 0x4c, 0x15, 0xf2, 0xe6, 0xe2, 0x4b, 0x4c, 0x2e, 0xc9, 0x2c, 0x4b, 0x85, 0xd9, 0x23, - 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x6d, 0x24, 0xa5, 0x87, 0xb0, 0x08, 0x6a, 0x85, 0x1e, 0x54, 0x8f, - 0x13, 0xc7, 0x89, 0x7b, 0xf2, 0x0c, 0x17, 0xee, 0xc9, 0x33, 0x06, 0xf1, 0x42, 0xf4, 0x42, 0x25, - 0xac, 0x58, 0x66, 0x2c, 0x90, 0x67, 0xf0, 0x62, 0xe1, 0x60, 0x14, 0x60, 0x72, 0xd2, 0x3c, 0xf1, - 0x50, 0x8e, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x6f, 0x3c, 0x92, 0x63, 0x7c, - 0xf0, 0x48, 0x8e, 0x71, 0xc2, 0x63, 0x39, 0x86, 0x0b, 0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, - 0x88, 0x62, 0x87, 0xfa, 0x30, 0x89, 0x0d, 0xec, 0x38, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, - 0x45, 0xdc, 0x17, 0x61, 0x04, 0x01, 0x00, 0x00, +var fileDescriptor_cluster_version_bf5a55c481c1f0be = []byte{ + // 221 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x52, 0xcd, 0xcd, 0x4c, 0x2f, + 0x4a, 0x2c, 0xc9, 0xcc, 0xcf, 0xd3, 0x4f, 0x4b, 0xcb, 0x49, 0x4c, 0xd7, 0x4f, 0xce, 0x29, 0x2d, + 0x2e, 0x49, 0x2d, 0x8a, 0x2f, 0x4b, 0x2d, 0x2a, 0xce, 0xcc, 0xcf, 0xd3, 0x2b, 0x28, 0xca, 0x2f, + 0xc9, 0x17, 0xe2, 0x4b, 0xce, 0x4f, 0xce, 0x2e, 0xca, 0x4f, 0x4c, 0xce, 0xd0, 0x4b, 0x4a, 0x2c, + 0x4e, 0x95, 0x12, 0x03, 0xb3, 0x0b, 0x92, 0xf4, 0x73, 0x53, 0x4b, 0x12, 0x53, 0x12, 0x4b, 0x12, + 0x21, 0xea, 0xa4, 0x44, 0xd2, 0xf3, 0xd3, 0xf3, 0xc1, 0x4c, 0x7d, 0x10, 0x0b, 0x22, 0xaa, 0x94, + 0xc9, 0xc5, 0xe7, 0x0c, 0x31, 0x36, 0x0c, 0x62, 0xaa, 0x90, 0x37, 0x17, 0x5f, 0x62, 0x72, 0x49, + 0x66, 0x59, 0x2a, 0xcc, 0x1e, 0x09, 0x26, 0x05, 0x46, 0x0d, 0x6e, 0x23, 0x29, 0x3d, 0x84, 0x45, + 0x50, 0x2b, 0xf4, 0xa0, 0x7a, 0x9c, 0x38, 0x4e, 0xdc, 0x93, 0x67, 0xb8, 0x70, 0x4f, 0x9e, 0x31, + 0x88, 0x17, 0xa2, 0x17, 0x2a, 0x61, 0xc5, 0x32, 0x63, 0x81, 0x3c, 0x83, 0x17, 0x0b, 0x07, 0xa3, + 0x00, 0x93, 0x93, 0xfa, 0x89, 0x87, 0x72, 0x0c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, + 0x78, 0xe3, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, + 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x14, 0x2b, 0xd8, 0x9f, 0x49, 0x6c, 0x60, 0xa7, 0x19, 0x03, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xab, 0x74, 0x47, 0x91, 0x01, 0x01, 0x00, 0x00, } diff --git a/pkg/settings/cluster/cluster_version.proto b/pkg/migration/fflag/cluster_version.proto similarity index 96% rename from pkg/settings/cluster/cluster_version.proto rename to pkg/migration/fflag/cluster_version.proto index 9353a01cd8aa..d6b8d8299938 100644 --- a/pkg/settings/cluster/cluster_version.proto +++ b/pkg/migration/fflag/cluster_version.proto @@ -10,7 +10,7 @@ syntax = "proto3"; package cockroach.base; -option go_package = "cluster"; +option go_package = "fflag"; import "roachpb/metadata.proto"; import "gogoproto/gogo.proto"; diff --git a/pkg/migration/fflag/fflag.go b/pkg/migration/fflag/fflag.go new file mode 100644 index 000000000000..1397b338a98b --- /dev/null +++ b/pkg/migration/fflag/fflag.go @@ -0,0 +1,120 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package fflag WIP +package fflag + +import ( + "context" + "github.com/cockroachdb/cockroach/pkg/util/log" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/kr/pretty" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// Handle is a read-only view of the cluster's currently supported features. +// This is based both on the binaries present in the cluster as well as the +// backward-incompatible migrations that have been run. +type Handle struct { + mu struct { + syncutil.Mutex + version ClusterVersion + } +} + +// GetHandle returns a read-only handle for querying which features are enabled. +// The argument must be the BootVersion field from a ConnectResponse proto +// returned by connect.Connect. +// +// WIP how does this work for cluster init? separate method in this package? +func GetHandle(bootVersion ClusterVersion) *Handle { + h := &Handle{} + h.mu.version = bootVersion + return h +} + +// IsActive returns true if the features of the supplied version are active at +// the running version. This may change over time so the results of calling this +// method should not be cached. +func (h *Handle) IsActive(feature VersionKey) bool { + h.mu.Lock() + defer h.mu.Unlock() + panic(`WIP`) +} + +// Forward migrates this cluster to activate the target features. +func (h *Handle) Forward(ctx context.Context, target ClusterVersion) error { + panic(`WIP`) +} + +// VersionKey is cluster.VersionKey but moved here. +type VersionKey = int + +// Feature constants, the same as the VersionKey ones in cluster. +const ( + Version19_2 VersionKey = 0 + VersionStart20_1 VersionKey = 1 + VersionNoNewPreemptiveSnapshotsStarted VersionKey = 2 + VersionNoPreemptiveSnapshotsOnDisk VersionKey = 3 + VersionFillInTableDescriptor VersionKey = 4 +) + +// keyedVersion is the same thing as cluster.keyedVersion. +type keyedVersion struct { + Key VersionKey + roachpb.Version +} + +// versionsSingleton is the same thing as cluster.versionsSingleton. +var versionsSingleton = keyedVersions{ + { + // Version19_2 is CockroachDB v19.2. It's used for all v19.2.x patch releases. + Key: Version19_2, + Version: roachpb.Version{Major: 19, Minor: 2}, + }, + { + // VersionStart20_1 demarcates work towards CockroachDB v20.1. + Key: VersionStart20_1, + Version: roachpb.Version{Major: 19, Minor: 2, Unstable: 1}, + }, + { + Key: VersionNoNewPreemptiveSnapshotsStarted, + Version: roachpb.Version{Major: 19, Minor: 2, Unstable: 2}, + }, + { + Key: VersionNoPreemptiveSnapshotsOnDisk, + Version: roachpb.Version{Major: 19, Minor: 2, Unstable: 3}, + }, + { + Key: VersionFillInTableDescriptor, + Version: roachpb.Version{Major: 19, Minor: 2, Unstable: 4}, + }, +} + +// keyedVersions is a container for managing the versions of CockroachDB. +type keyedVersions []keyedVersion + +// MustByKey asserts that the version specified by this key exists, and returns it. +func (kv keyedVersions) MustByKey(k VersionKey) roachpb.Version { + key := int(k) + if key >= len(kv) || key < 0 { + log.Fatalf(context.Background(), "version with key %d does not exist, have:\n%s", + key, pretty.Sprint(kv)) + } + return kv[key].Version +} + +// VersionByKey returns the roachpb.Version for a given key. +// It is a fatal error to use an invalid key. +func VersionByKey(key VersionKey) roachpb.Version { + return versionsSingleton.MustByKey(key) +} diff --git a/pkg/migration/migration.go b/pkg/migration/migration.go new file mode 100644 index 000000000000..1a9f62ef13ac --- /dev/null +++ b/pkg/migration/migration.go @@ -0,0 +1,77 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package migration WIP +package migration + +import ( + "context" + gosql "database/sql" + "sort" + + "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// Hook WIP +type Hook struct { + Version fflag.VersionKey + + // RunFn is an idempotent closure that (if non-nil) must run without error + // before advancing the cluster onto this version. + RunFn func(context.Context, *client.DB, *gosql.DB) error +} + +var registry struct { + mu struct { + syncutil.Mutex + hooks []Hook + } +} + +// Register is called in an init func to add a hook implementation for use with +// RunHookOnEveryNode. +func Register(h Hook) { + registry.mu.Lock() + defer registry.mu.Unlock() + registry.mu.hooks = append(registry.mu.hooks, h) + sort.Slice(registry.mu.hooks, func(i, j int) bool { + return registry.mu.hooks[i].Version < registry.mu.hooks[j].Version + }) +} + +func init() { + Register(Hook{ + Version: fflag.VersionNoPreemptiveSnapshotsOnDisk, + RunFn: func(context.Context, *client.DB, *gosql.DB) error { + // Before this hook is called, we're guaranteed by the new system that + // `IsActive(fflag.VersionNoNewPreemptiveSnapshotsStarted)` will never + // return false on any node in this cluster ever again. If that check is + // what is used to determine whether a preemptive or a learner snapshot is + // sent, then the cleanup becomes much easier. First we have to wait for + // any in-flight replica changes to finish (or abort them). Then we use an + // everynode.Hook to clean out the ones on disk. + panic(`WIP`) + }, + }) + Register(Hook{ + Version: fflag.VersionFillInTableDescriptor, + RunFn: func(context.Context, *client.DB, *gosql.DB) error { + tableDescsSpan := roachpb.Span{Key: keys.MakeTablePrefix(keys.DescriptorTableID)} + tableDescsSpan.EndKey = tableDescsSpan.Key.PrefixEnd() + // This hook pages through every record in tableDescsSpan, runs + // MaybeFillInDescriptor on each table descriptor and writes it back to + // kv. This allows us to get rid of MaybeFillInDescriptor entirely in the + // next major version. + }, + }) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 21f0961b1511..0aa5ce0de758 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -15,6 +15,7 @@ import ( "context" "crypto/tls" "fmt" + "github.com/cockroachdb/cockroach/pkg/migration/fflag" "io" "io/ioutil" "math" @@ -37,6 +38,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/migration/connect" + "github.com/cockroachdb/cockroach/pkg/migration/everynode" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -206,11 +209,20 @@ type Server struct { // NewServer creates a Server from a server.Config. func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { + // WIP do we know way up here whether this node is bootstrapping the cluster + // or not? + connectRes, err := connect.Connect(context.Background(), cfg.JoinList) + if err != nil { + return nil, err + } + version := fflag.GetHandle(connectRes.BootVersion) + if err := cfg.ValidateAddrs(context.Background()); err != nil { return nil, err } st := cfg.Settings + st.Version = version if cfg.AmbientCtx.Tracer == nil { panic(errors.New("no tracer set in AmbientCtx")) @@ -540,6 +552,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { s.node = NewNode( storeCfg, s.recorder, s.registry, s.stopper, txnMetrics, nil /* execCfg */, &s.rpcContext.ClusterID) + + // WIP is this too late in the boot process? + if err := everynode.RunHooksOnThisNode(ctx, s.node, connectRes.EveryNodeHooks...); err != nil { + return nil, err + } + roachpb.RegisterInternalServer(s.grpc.Server, s.node) storage.RegisterPerReplicaServer(s.grpc.Server, s.node.perReplicaServer) s.node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(s.grpc.Server) diff --git a/pkg/settings/cluster/settings.go b/pkg/settings/cluster/settings.go index e6dc72727522..506fad41d8ce 100644 --- a/pkg/settings/cluster/settings.go +++ b/pkg/settings/cluster/settings.go @@ -12,6 +12,7 @@ package cluster import ( "context" + "github.com/cockroachdb/cockroach/pkg/migration/fflag" "sync/atomic" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,6 +25,10 @@ import ( "github.com/cockroachdb/errors" ) +// WIP migrate all cluster.ClusterVersion references to the one in fflag and +// remove this alias. +type ClusterVersion = fflag.ClusterVersion + // Settings is the collection of cluster settings. For a running CockroachDB // node, there is a single instance of ClusterSetting which is shared across all // of its components. @@ -51,7 +56,10 @@ type Settings struct { // the `pprofui` server as opposed to the raw endpoint). cpuProfiling int32 // atomic - // Versions describing the range supported by this binary. + // WIP this should be used instead of the below. + Version *fflag.Handle + + // WIP REMOVE Versions describing the range supported by this binary. binaryMinSupportedVersion roachpb.Version binaryServerVersion roachpb.Version beforeClusterVersionChangeMu struct { @@ -320,7 +328,7 @@ func (cv *clusterVersionSetting) ActiveVersionOrEmpty( func (cv *clusterVersionSetting) IsActive( ctx context.Context, st *Settings, versionKey VersionKey, ) bool { - return cv.ActiveVersion(ctx, st).IsActive(versionKey) + return cv.ActiveVersion(ctx, st).IsActive(fflag.VersionKey(versionKey)) } // BeforeChange is part of the StateMachineSettingImpl interface diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index e59f549417c7..18eb3ef4e181 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -11,6 +11,7 @@ package sql import ( + "github.com/cockroachdb/cockroach/pkg/migration/fflag" "bytes" "context" gojson "encoding/json" @@ -306,7 +307,7 @@ func (n *alterTableNode) startExec(params runParams) error { case *tree.AlterTableAlterPrimaryKey: // Make sure that all nodes in the cluster are able to perform primary key changes before proceeding. version := cluster.Version.ActiveVersionOrEmpty(params.ctx, params.p.ExecCfg().Settings) - if !version.IsActive(cluster.VersionPrimaryKeyChanges) { + if !version.IsActive(fflag.VersionKey(cluster.VersionPrimaryKeyChanges)) { return pgerror.Newf(pgcode.FeatureNotSupported, "all nodes are not the correct version for primary key changes") } diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 6e0f4bdfab9d..2259dd664cf6 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -14,6 +14,7 @@ import ( "bytes" "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/migration/fflag" "math" "sort" "strings" @@ -1105,7 +1106,7 @@ func MakeTableDesc( // cases where this function is called in tests where st is nil. if st != nil { if version := cluster.Version.ActiveVersionOrEmpty(ctx, st); version != (cluster.ClusterVersion{}) && - version.IsActive(cluster.VersionSecondaryIndexColumnFamilies) { + version.IsActive(fflag.VersionKey(cluster.VersionSecondaryIndexColumnFamilies)) { indexEncodingVersion = sqlbase.SecondaryIndexFamilyFormatVersion } } @@ -1275,7 +1276,7 @@ func MakeTableDesc( // if a primary key column is not in column family 0. if st != nil { if version := cluster.Version.ActiveVersionOrEmpty(ctx, st); version != (cluster.ClusterVersion{}) && - !version.IsActive(cluster.VersionPrimaryKeyColumnsOutOfFamilyZero) { + !version.IsActive(fflag.VersionKey(cluster.VersionPrimaryKeyColumnsOutOfFamilyZero)) { var colsInFamZero util.FastIntSet for _, colID := range desc.Families[0].ColumnIDs { colsInFamZero.Add(int(colID)) diff --git a/pkg/sql/sqlbase/metadata.go b/pkg/sql/sqlbase/metadata.go index ceb39113a391..2f6dffb3da08 100644 --- a/pkg/sql/sqlbase/metadata.go +++ b/pkg/sql/sqlbase/metadata.go @@ -13,6 +13,7 @@ package sqlbase import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/migration/fflag" "sort" "github.com/cockroachdb/cockroach/pkg/config/zonepb" @@ -146,7 +147,7 @@ func (ms MetadataSchema) GetInitialValues( // TODO(solon): This if/else can be removed in 20.2, as there will be no // need to support the deprecated namespace table. - if bootstrapVersion.IsActive(cluster.VersionNamespaceTableWithSchemas) { + if bootstrapVersion.IsActive(fflag.VersionKey(cluster.VersionNamespaceTableWithSchemas)) { if parentID != keys.RootNamespaceID { ret = append(ret, roachpb.KeyValue{ Key: NewPublicTableKey(parentID, desc.GetName()).Key(),