diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 77a5c3c86544..20f81184284b 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "cmd_lease_request.go", "cmd_lease_transfer.go", "cmd_merge.go", + "cmd_migrate.go", "cmd_push_txn.go", "cmd_put.go", "cmd_query_intent.go", diff --git a/pkg/kv/kvserver/queue_helpers_testutil.go b/pkg/kv/kvserver/queue_helpers_testutil.go index 31cf64f7154f..60bc8684cd35 100644 --- a/pkg/kv/kvserver/queue_helpers_testutil.go +++ b/pkg/kv/kvserver/queue_helpers_testutil.go @@ -14,7 +14,6 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) // Code in this file is for testing usage only. It is exported only because it @@ -26,23 +25,6 @@ func (bq *baseQueue) testingAdd( return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority) } -func forceScanAndProcess(s *Store, q *baseQueue) error { - // Check that the system config is available. It is needed by many queues. If - // it's not available, some queues silently fail to process any replicas, - // which is undesirable for this method. - if cfg := s.Gossip().GetSystemConfig(); cfg == nil { - return errors.Errorf("system config not available in gossip") - } - - newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { - q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now()) - return true - }) - - q.DrainQueue(s.stopper) - return nil -} - func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) { if err := forceScanAndProcess(s, q); err != nil { log.Fatalf(ctx, "%v", err) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 4fe6d9a83221..a501b272cbda 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2763,6 +2763,32 @@ func (s *Store) ManuallyEnqueue( return collect(), processErr, nil } +// GCReplicas iterates over all ranges and processes any that may need to be +// GC'd. +func (s *Store) GCReplicas() error { + if interceptor := s.TestingKnobs().GCReplicasInterceptor; interceptor != nil { + interceptor() + } + return forceScanAndProcess(s, s.replicaGCQueue.baseQueue) +} + +func forceScanAndProcess(s *Store, q *baseQueue) error { + // Check that the system config is available. It is needed by many queues. If + // it's not available, some queues silently fail to process any replicas, + // which is undesirable for this method. + if cfg := s.Gossip().GetSystemConfig(); cfg == nil { + return errors.Errorf("system config not available in gossip") + } + + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now()) + return true + }) + + q.DrainQueue(s.stopper) + return nil +} + // WriteClusterVersion writes the given cluster version to the store-local // cluster version key. func WriteClusterVersion( diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 2fdc13fb1706..5e45e275d594 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -259,6 +259,9 @@ type StoreTestingKnobs struct { // in execChangeReplicasTxn that prevent moving // to a configuration that cannot make progress. AllowDangerousReplicationChanges bool + // GCReplicasInterceptor intercepts attempts to GC all replicas in the + // store. + GCReplicasInterceptor func() } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/server/migration.go b/pkg/server/migration.go index 76765bac3db6..a2502fd2ea1f 100644 --- a/pkg/server/migration.go +++ b/pkg/server/migration.go @@ -143,3 +143,31 @@ func (m *migrationServer) BumpClusterVersion( resp := &serverpb.BumpClusterVersionResponse{} return resp, nil } + +// FlushAllEngines implements the MigrationServer interface. +func (m *migrationServer) FlushAllEngines( + _ context.Context, _ *serverpb.FlushAllEnginesRequest, +) (*serverpb.FlushAllEnginesResponse, error) { + for _, eng := range m.server.engines { + if err := eng.Flush(); err != nil { + return nil, err + } + } + + resp := &serverpb.FlushAllEnginesResponse{} + return resp, nil +} + +// GCReplicas implements the MigrationServer interface. +func (m *migrationServer) GCReplicas( + _ context.Context, _ *serverpb.GCReplicasRequest, +) (*serverpb.GCReplicasResponse, error) { + if err := m.server.node.stores.VisitStores(func(s *kvserver.Store) error { + return s.GCReplicas() + }); err != nil { + return nil, err + } + + resp := &serverpb.GCReplicasResponse{} + return resp, nil +} diff --git a/pkg/server/migration_test.go b/pkg/server/migration_test.go index 0056a4d8985e..6c8e74b03bec 100644 --- a/pkg/server/migration_test.go +++ b/pkg/server/migration_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -96,3 +97,36 @@ func TestValidateTargetClusterVersion(t *testing.T) { s.Stopper().Stop(context.Background()) } } + +func TestMigrationGCReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numStores = 3 + var storeSpecs []base.StoreSpec + for i := 0; i < numStores; i++ { + storeSpecs = append(storeSpecs, base.StoreSpec{InMemory: true}) + } + + intercepted := 0 + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + StoreSpecs: storeSpecs, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + GCReplicasInterceptor: func() { + intercepted++ + }, + }, + }, + }) + + migrationServer := s.MigrationServer().(*migrationServer) + if _, err := migrationServer.GCReplicas(context.Background(), &serverpb.GCReplicasRequest{}); err != nil { + t.Fatal(err) + } + + if intercepted != numStores { + t.Fatalf("expected to have GC-ed replicas on %d stores, found %d", numStores, intercepted) + } + + s.Stopper().Stop(context.Background()) +} diff --git a/pkg/server/serverpb/migration.pb.go b/pkg/server/serverpb/migration.pb.go index e2eba51c12f3..3dc79f4664b6 100644 --- a/pkg/server/serverpb/migration.pb.go +++ b/pkg/server/serverpb/migration.pb.go @@ -36,7 +36,7 @@ func (m *ValidateTargetClusterVersionRequest) Reset() { *m = ValidateTar func (m *ValidateTargetClusterVersionRequest) String() string { return proto.CompactTextString(m) } func (*ValidateTargetClusterVersionRequest) ProtoMessage() {} func (*ValidateTargetClusterVersionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{0} + return fileDescriptor_migration_02bb88ff52938b97, []int{0} } func (m *ValidateTargetClusterVersionRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -70,7 +70,7 @@ func (m *ValidateTargetClusterVersionResponse) Reset() { *m = ValidateTa func (m *ValidateTargetClusterVersionResponse) String() string { return proto.CompactTextString(m) } func (*ValidateTargetClusterVersionResponse) ProtoMessage() {} func (*ValidateTargetClusterVersionResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{1} + return fileDescriptor_migration_02bb88ff52938b97, []int{1} } func (m *ValidateTargetClusterVersionResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -95,8 +95,8 @@ func (m *ValidateTargetClusterVersionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ValidateTargetClusterVersionResponse proto.InternalMessageInfo -// BumpClusterVersionRequest is used to inform a given node of a cluster version -// bump. +// BumpClusterVersionRequest is used to inform the target node of a cluster +// version bump. type BumpClusterVersionRequest struct { ClusterVersion *clusterversion.ClusterVersion `protobuf:"bytes,1,opt,name=cluster_version,json=clusterVersion,proto3" json:"cluster_version,omitempty"` } @@ -105,7 +105,7 @@ func (m *BumpClusterVersionRequest) Reset() { *m = BumpClusterVersionReq func (m *BumpClusterVersionRequest) String() string { return proto.CompactTextString(m) } func (*BumpClusterVersionRequest) ProtoMessage() {} func (*BumpClusterVersionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{2} + return fileDescriptor_migration_02bb88ff52938b97, []int{2} } func (m *BumpClusterVersionRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -138,7 +138,7 @@ func (m *BumpClusterVersionResponse) Reset() { *m = BumpClusterVersionRe func (m *BumpClusterVersionResponse) String() string { return proto.CompactTextString(m) } func (*BumpClusterVersionResponse) ProtoMessage() {} func (*BumpClusterVersionResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{3} + return fileDescriptor_migration_02bb88ff52938b97, []int{3} } func (m *BumpClusterVersionResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -163,11 +163,149 @@ func (m *BumpClusterVersionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_BumpClusterVersionResponse proto.InternalMessageInfo +// GCReplicasRequest is used to instruct the target node to process all GC-able +// replicas. +type GCReplicasRequest struct { +} + +func (m *GCReplicasRequest) Reset() { *m = GCReplicasRequest{} } +func (m *GCReplicasRequest) String() string { return proto.CompactTextString(m) } +func (*GCReplicasRequest) ProtoMessage() {} +func (*GCReplicasRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{4} +} +func (m *GCReplicasRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GCReplicasRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *GCReplicasRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GCReplicasRequest.Merge(dst, src) +} +func (m *GCReplicasRequest) XXX_Size() int { + return m.Size() +} +func (m *GCReplicasRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GCReplicasRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GCReplicasRequest proto.InternalMessageInfo + +// GCReplicasResponse is the response to a GCReplicasRequest. +type GCReplicasResponse struct { +} + +func (m *GCReplicasResponse) Reset() { *m = GCReplicasResponse{} } +func (m *GCReplicasResponse) String() string { return proto.CompactTextString(m) } +func (*GCReplicasResponse) ProtoMessage() {} +func (*GCReplicasResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{5} +} +func (m *GCReplicasResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GCReplicasResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *GCReplicasResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GCReplicasResponse.Merge(dst, src) +} +func (m *GCReplicasResponse) XXX_Size() int { + return m.Size() +} +func (m *GCReplicasResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GCReplicasResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GCReplicasResponse proto.InternalMessageInfo + +// FlushAllEnginesRequest is used to instruct the target node to flush all its +// engines. +type FlushAllEnginesRequest struct { +} + +func (m *FlushAllEnginesRequest) Reset() { *m = FlushAllEnginesRequest{} } +func (m *FlushAllEnginesRequest) String() string { return proto.CompactTextString(m) } +func (*FlushAllEnginesRequest) ProtoMessage() {} +func (*FlushAllEnginesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{6} +} +func (m *FlushAllEnginesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *FlushAllEnginesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *FlushAllEnginesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FlushAllEnginesRequest.Merge(dst, src) +} +func (m *FlushAllEnginesRequest) XXX_Size() int { + return m.Size() +} +func (m *FlushAllEnginesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FlushAllEnginesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FlushAllEnginesRequest proto.InternalMessageInfo + +// FlushAllEnginesResponse is the response to a FlushAllEnginesRequest. +type FlushAllEnginesResponse struct { +} + +func (m *FlushAllEnginesResponse) Reset() { *m = FlushAllEnginesResponse{} } +func (m *FlushAllEnginesResponse) String() string { return proto.CompactTextString(m) } +func (*FlushAllEnginesResponse) ProtoMessage() {} +func (*FlushAllEnginesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{7} +} +func (m *FlushAllEnginesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *FlushAllEnginesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *FlushAllEnginesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_FlushAllEnginesResponse.Merge(dst, src) +} +func (m *FlushAllEnginesResponse) XXX_Size() int { + return m.Size() +} +func (m *FlushAllEnginesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_FlushAllEnginesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_FlushAllEnginesResponse proto.InternalMessageInfo + func init() { proto.RegisterType((*ValidateTargetClusterVersionRequest)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionRequest") proto.RegisterType((*ValidateTargetClusterVersionResponse)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionResponse") proto.RegisterType((*BumpClusterVersionRequest)(nil), "cockroach.server.serverpb.BumpClusterVersionRequest") proto.RegisterType((*BumpClusterVersionResponse)(nil), "cockroach.server.serverpb.BumpClusterVersionResponse") + proto.RegisterType((*GCReplicasRequest)(nil), "cockroach.server.serverpb.GCReplicasRequest") + proto.RegisterType((*GCReplicasResponse)(nil), "cockroach.server.serverpb.GCReplicasResponse") + proto.RegisterType((*FlushAllEnginesRequest)(nil), "cockroach.server.serverpb.FlushAllEnginesRequest") + proto.RegisterType((*FlushAllEnginesResponse)(nil), "cockroach.server.serverpb.FlushAllEnginesResponse") } // Reference imports to suppress errors if they are not otherwise used. @@ -196,6 +334,12 @@ type MigrationClient interface { // which checks to see that all nodes in the cluster are running binaries // that would be able to support the intended version bump. BumpClusterVersion(ctx context.Context, in *BumpClusterVersionRequest, opts ...grpc.CallOption) (*BumpClusterVersionResponse, error) + // FlushAllEngines is used to instruct the target node to flush all its + // engines. + FlushAllEngines(ctx context.Context, in *FlushAllEnginesRequest, opts ...grpc.CallOption) (*FlushAllEnginesResponse, error) + // GCReplicas is used to instruct the target node to process all GC-able + // replicas. + GCReplicas(ctx context.Context, in *GCReplicasRequest, opts ...grpc.CallOption) (*GCReplicasResponse, error) } type migrationClient struct { @@ -224,6 +368,24 @@ func (c *migrationClient) BumpClusterVersion(ctx context.Context, in *BumpCluste return out, nil } +func (c *migrationClient) FlushAllEngines(ctx context.Context, in *FlushAllEnginesRequest, opts ...grpc.CallOption) (*FlushAllEnginesResponse, error) { + out := new(FlushAllEnginesResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/FlushAllEngines", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *migrationClient) GCReplicas(ctx context.Context, in *GCReplicasRequest, opts ...grpc.CallOption) (*GCReplicasResponse, error) { + out := new(GCReplicasResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/GCReplicas", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MigrationServer is the server API for Migration service. type MigrationServer interface { // ValidateTargetClusterVersion is used to verify that the target node is @@ -240,6 +402,12 @@ type MigrationServer interface { // which checks to see that all nodes in the cluster are running binaries // that would be able to support the intended version bump. BumpClusterVersion(context.Context, *BumpClusterVersionRequest) (*BumpClusterVersionResponse, error) + // FlushAllEngines is used to instruct the target node to flush all its + // engines. + FlushAllEngines(context.Context, *FlushAllEnginesRequest) (*FlushAllEnginesResponse, error) + // GCReplicas is used to instruct the target node to process all GC-able + // replicas. + GCReplicas(context.Context, *GCReplicasRequest) (*GCReplicasResponse, error) } func RegisterMigrationServer(s *grpc.Server, srv MigrationServer) { @@ -282,6 +450,42 @@ func _Migration_BumpClusterVersion_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _Migration_FlushAllEngines_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FlushAllEnginesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).FlushAllEngines(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/FlushAllEngines", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).FlushAllEngines(ctx, req.(*FlushAllEnginesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Migration_GCReplicas_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GCReplicasRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).GCReplicas(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/GCReplicas", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).GCReplicas(ctx, req.(*GCReplicasRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Migration_serviceDesc = grpc.ServiceDesc{ ServiceName: "cockroach.server.serverpb.Migration", HandlerType: (*MigrationServer)(nil), @@ -294,6 +498,14 @@ var _Migration_serviceDesc = grpc.ServiceDesc{ MethodName: "BumpClusterVersion", Handler: _Migration_BumpClusterVersion_Handler, }, + { + MethodName: "FlushAllEngines", + Handler: _Migration_FlushAllEngines_Handler, + }, + { + MethodName: "GCReplicas", + Handler: _Migration_GCReplicas_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "server/serverpb/migration.proto", @@ -391,6 +603,78 @@ func (m *BumpClusterVersionResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *GCReplicasRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GCReplicasRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *GCReplicasResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GCReplicasResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *FlushAllEnginesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FlushAllEnginesRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *FlushAllEnginesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FlushAllEnginesResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + func encodeVarintMigration(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -444,6 +728,42 @@ func (m *BumpClusterVersionResponse) Size() (n int) { return n } +func (m *GCReplicasRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GCReplicasResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *FlushAllEnginesRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *FlushAllEnginesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovMigration(x uint64) (n int) { for { n++ @@ -723,6 +1043,206 @@ func (m *BumpClusterVersionResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *GCReplicasRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GCReplicasRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GCReplicasRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GCReplicasResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GCReplicasResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GCReplicasResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *FlushAllEnginesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: FlushAllEnginesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: FlushAllEnginesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *FlushAllEnginesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: FlushAllEnginesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: FlushAllEnginesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipMigration(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -829,11 +1349,11 @@ var ( ) func init() { - proto.RegisterFile("server/serverpb/migration.proto", fileDescriptor_migration_8dfeb6fcf9144e4c) + proto.RegisterFile("server/serverpb/migration.proto", fileDescriptor_migration_02bb88ff52938b97) } -var fileDescriptor_migration_8dfeb6fcf9144e4c = []byte{ - // 278 bytes of a gzipped FileDescriptorProto +var fileDescriptor_migration_02bb88ff52938b97 = []byte{ + // 366 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2f, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x87, 0x50, 0x05, 0x49, 0xfa, 0xb9, 0x99, 0xe9, 0x45, 0x89, 0x25, 0x99, 0xf9, 0x79, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x92, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, @@ -844,12 +1364,17 @@ var fileDescriptor_migration_8dfeb6fcf9144e4c = []byte{ 0xd3, 0x2f, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x6d, 0xa4, 0xa1, 0x87, 0x70, 0x01, 0xaa, 0x85, 0x7a, 0x68, 0x26, 0xf1, 0x25, 0xa3, 0xf0, 0x95, 0xd4, 0xb8, 0x54, 0xf0, 0xdb, 0x5c, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0xaa, 0x94, 0xc7, 0x25, 0xe9, 0x54, 0x9a, 0x5b, 0x40, 0x37, 0x77, 0xc9, 0x70, 0x49, - 0x61, 0xb3, 0x0f, 0xe2, 0x1a, 0xa3, 0xad, 0x4c, 0x5c, 0x9c, 0xbe, 0xb0, 0x48, 0x10, 0x5a, 0xc8, - 0xc8, 0x25, 0x83, 0xcf, 0x13, 0x42, 0x76, 0x7a, 0x38, 0x23, 0x48, 0x8f, 0x88, 0x70, 0x97, 0xb2, - 0x27, 0x5b, 0x3f, 0x34, 0xf4, 0x18, 0x84, 0x9a, 0x19, 0xb9, 0x84, 0x30, 0x3d, 0x24, 0x64, 0x82, - 0xc7, 0x64, 0x9c, 0xe1, 0x2d, 0x65, 0x4a, 0xa2, 0x2e, 0x98, 0x2b, 0x9c, 0xb4, 0x4e, 0x3c, 0x94, - 0x63, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x1b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, - 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, - 0x38, 0x60, 0x06, 0x25, 0xb1, 0x81, 0x93, 0xa6, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x08, 0xa5, - 0xf0, 0xc3, 0xfe, 0x02, 0x00, 0x00, + 0x61, 0xb3, 0x0f, 0xea, 0x1a, 0x61, 0x2e, 0x41, 0x77, 0xe7, 0xa0, 0xd4, 0x82, 0x9c, 0xcc, 0xe4, + 0xc4, 0x62, 0xa8, 0x2b, 0x94, 0x44, 0xb8, 0x84, 0x90, 0x05, 0xa1, 0x4a, 0x25, 0xb8, 0xc4, 0xdc, + 0x72, 0x4a, 0x8b, 0x33, 0x1c, 0x73, 0x72, 0x5c, 0xf3, 0xd2, 0x33, 0xf3, 0x52, 0xe1, 0xea, 0x25, + 0xb9, 0xc4, 0x31, 0x64, 0x20, 0x9a, 0x8c, 0xe6, 0xb1, 0x70, 0x71, 0xfa, 0xc2, 0x22, 0x59, 0x68, + 0x21, 0x23, 0x97, 0x0c, 0xbe, 0x40, 0x12, 0xb2, 0xd3, 0xc3, 0x99, 0x00, 0xf4, 0x88, 0x88, 0x57, + 0x29, 0x7b, 0xb2, 0xf5, 0x43, 0x3d, 0xc9, 0x20, 0xd4, 0xcc, 0xc8, 0x25, 0x84, 0x19, 0x60, 0x42, + 0x26, 0x78, 0x4c, 0xc6, 0x19, 0x9f, 0x52, 0xa6, 0x24, 0xea, 0x82, 0xbb, 0xa2, 0x8a, 0x8b, 0x1f, + 0x2d, 0x48, 0x85, 0x0c, 0xf1, 0x98, 0x85, 0x3d, 0x62, 0xa4, 0x8c, 0x48, 0xd1, 0x02, 0xb7, 0x3b, + 0x9b, 0x8b, 0x0b, 0x11, 0xfd, 0x42, 0x3a, 0x78, 0xcc, 0xc0, 0x48, 0x3a, 0x52, 0xba, 0x44, 0xaa, + 0x86, 0x59, 0xe6, 0xa4, 0x75, 0xe2, 0xa1, 0x1c, 0xc3, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, + 0x31, 0xde, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, + 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0xc5, 0x01, 0x33, 0x20, 0x89, 0x0d, 0x9c, 0xc7, 0x8d, + 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xc5, 0x1c, 0x4a, 0x9f, 0x47, 0x04, 0x00, 0x00, } diff --git a/pkg/server/serverpb/migration.proto b/pkg/server/serverpb/migration.proto index 871b00dc8543..fd8d3394e11b 100644 --- a/pkg/server/serverpb/migration.proto +++ b/pkg/server/serverpb/migration.proto @@ -25,8 +25,8 @@ message ValidateTargetClusterVersionRequest { message ValidateTargetClusterVersionResponse { } -// BumpClusterVersionRequest is used to inform a given node of a cluster version -// bump. +// BumpClusterVersionRequest is used to inform the target node of a cluster +// version bump. message BumpClusterVersionRequest { clusterversion.ClusterVersion cluster_version = 1; } @@ -34,6 +34,20 @@ message BumpClusterVersionRequest { // BumpClusterVersionResponse is the response to an BumpClusterVersionRequest. message BumpClusterVersionResponse { } +// GCReplicasRequest is used to instruct the target node to process all GC-able +// replicas. +message GCReplicasRequest{} + +// GCReplicasResponse is the response to a GCReplicasRequest. +message GCReplicasResponse{} + +// FlushAllEnginesRequest is used to instruct the target node to flush all its +// engines. +message FlushAllEnginesRequest{} + +// FlushAllEnginesResponse is the response to a FlushAllEnginesRequest. +message FlushAllEnginesResponse{} + service Migration { // ValidateTargetClusterVersion is used to verify that the target node is // running a binary that's able to support the specified cluster version. @@ -50,4 +64,12 @@ service Migration { // which checks to see that all nodes in the cluster are running binaries // that would be able to support the intended version bump. rpc BumpClusterVersion(BumpClusterVersionRequest) returns (BumpClusterVersionResponse) { } + + // FlushAllEngines is used to instruct the target node to flush all its + // engines. + rpc FlushAllEngines (FlushAllEnginesRequest) returns (FlushAllEnginesResponse) { } + + // GCReplicas is used to instruct the target node to process all GC-able + // replicas. + rpc GCReplicas (GCReplicasRequest) returns (GCReplicasResponse) { } }