From 00f1a419fe7a3b8103a35c15f66320af4ea9816c Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 8 Dec 2020 00:11:53 -0500 Subject: [PATCH] *: introduce `GCReplicas` and `FlushAllEngines` RPCs We introduce two new RPCs for the migration infrastructure to use. `GCReplicas` will be used to instruct the target node to process all GC-able replicas. `FlushAllEngines` will be used to instruct the target node to persist all in-memory state to disk. Both of these are necessary primitives for the migration infastructure. Specifically this comes up in the context of wanting the ensure that ranges where we've executed a ranged `Migrate` command over have no way of ever surfacing pre-migrated state. This can happen with older replicas in the replica GC queue and with applied state that is not yet persisted. We elaborate on both of these below: Motivation for `GCReplicas`: Currently we wait for the `Migrate` to have applied on all replicas of a range before returning to the caller. This does not include earlier incarnations of the range, possibly sitting idle in the replica GC queue. These replicas can still request leases, and go through the request evaluation paths, possibly tripping up assertions that check to see no pre-migrated state is found. For this reason we introduce the `GCReplicas` RPC that the migration manager can use to ensure all GC-able replicas are processed before declaring the specific cluster version bump complete. Motivation for `FlushAllEngines`: Like we mentioned above, KV currently waits for the `Migrate` command to have applied on all replicas before returning. With the applied state, there's no necessity to durably persist it (the representative version is already stored in the raft log). Out of an abundance of caution, and to really really ensure that no pre-migrated state is ever seen in the system, we provide the migration manager a mechanism to flush out all in-memory state to disk. This way the manager can guarantee that by the time a specific cluster version is bumped, all pre-migrated state from prior to that cluster version will have been fully purged from the system. --- The ideas here follow from our original prototype in #57445. Neither of these RPCs are currently wired up to anything. That'll happen in a future commit introducing the raft truncated state migration. Release note: None --- pkg/kv/kvserver/queue_helpers_testutil.go | 18 - pkg/kv/kvserver/store.go | 26 + pkg/kv/kvserver/testing_knobs.go | 3 + pkg/server/migration.go | 28 ++ pkg/server/migration_test.go | 34 ++ pkg/server/serverpb/migration.pb.go | 559 +++++++++++++++++++++- pkg/server/serverpb/migration.proto | 26 +- 7 files changed, 657 insertions(+), 37 deletions(-) diff --git a/pkg/kv/kvserver/queue_helpers_testutil.go b/pkg/kv/kvserver/queue_helpers_testutil.go index 31cf64f7154f..60bc8684cd35 100644 --- a/pkg/kv/kvserver/queue_helpers_testutil.go +++ b/pkg/kv/kvserver/queue_helpers_testutil.go @@ -14,7 +14,6 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/errors" ) // Code in this file is for testing usage only. It is exported only because it @@ -26,23 +25,6 @@ func (bq *baseQueue) testingAdd( return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority) } -func forceScanAndProcess(s *Store, q *baseQueue) error { - // Check that the system config is available. It is needed by many queues. If - // it's not available, some queues silently fail to process any replicas, - // which is undesirable for this method. - if cfg := s.Gossip().GetSystemConfig(); cfg == nil { - return errors.Errorf("system config not available in gossip") - } - - newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { - q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now()) - return true - }) - - q.DrainQueue(s.stopper) - return nil -} - func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) { if err := forceScanAndProcess(s, q); err != nil { log.Fatalf(ctx, "%v", err) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 4fe6d9a83221..a501b272cbda 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2763,6 +2763,32 @@ func (s *Store) ManuallyEnqueue( return collect(), processErr, nil } +// GCReplicas iterates over all ranges and processes any that may need to be +// GC'd. +func (s *Store) GCReplicas() error { + if interceptor := s.TestingKnobs().GCReplicasInterceptor; interceptor != nil { + interceptor() + } + return forceScanAndProcess(s, s.replicaGCQueue.baseQueue) +} + +func forceScanAndProcess(s *Store, q *baseQueue) error { + // Check that the system config is available. It is needed by many queues. If + // it's not available, some queues silently fail to process any replicas, + // which is undesirable for this method. + if cfg := s.Gossip().GetSystemConfig(); cfg == nil { + return errors.Errorf("system config not available in gossip") + } + + newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool { + q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now()) + return true + }) + + q.DrainQueue(s.stopper) + return nil +} + // WriteClusterVersion writes the given cluster version to the store-local // cluster version key. func WriteClusterVersion( diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 2fdc13fb1706..5e45e275d594 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -259,6 +259,9 @@ type StoreTestingKnobs struct { // in execChangeReplicasTxn that prevent moving // to a configuration that cannot make progress. AllowDangerousReplicationChanges bool + // GCReplicasInterceptor intercepts attempts to GC all replicas in the + // store. + GCReplicasInterceptor func() } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/server/migration.go b/pkg/server/migration.go index 76765bac3db6..a2502fd2ea1f 100644 --- a/pkg/server/migration.go +++ b/pkg/server/migration.go @@ -143,3 +143,31 @@ func (m *migrationServer) BumpClusterVersion( resp := &serverpb.BumpClusterVersionResponse{} return resp, nil } + +// FlushAllEngines implements the MigrationServer interface. +func (m *migrationServer) FlushAllEngines( + _ context.Context, _ *serverpb.FlushAllEnginesRequest, +) (*serverpb.FlushAllEnginesResponse, error) { + for _, eng := range m.server.engines { + if err := eng.Flush(); err != nil { + return nil, err + } + } + + resp := &serverpb.FlushAllEnginesResponse{} + return resp, nil +} + +// GCReplicas implements the MigrationServer interface. +func (m *migrationServer) GCReplicas( + _ context.Context, _ *serverpb.GCReplicasRequest, +) (*serverpb.GCReplicasResponse, error) { + if err := m.server.node.stores.VisitStores(func(s *kvserver.Store) error { + return s.GCReplicas() + }); err != nil { + return nil, err + } + + resp := &serverpb.GCReplicasResponse{} + return resp, nil +} diff --git a/pkg/server/migration_test.go b/pkg/server/migration_test.go index 0056a4d8985e..6c8e74b03bec 100644 --- a/pkg/server/migration_test.go +++ b/pkg/server/migration_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -96,3 +97,36 @@ func TestValidateTargetClusterVersion(t *testing.T) { s.Stopper().Stop(context.Background()) } } + +func TestMigrationGCReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + + const numStores = 3 + var storeSpecs []base.StoreSpec + for i := 0; i < numStores; i++ { + storeSpecs = append(storeSpecs, base.StoreSpec{InMemory: true}) + } + + intercepted := 0 + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{ + StoreSpecs: storeSpecs, + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + GCReplicasInterceptor: func() { + intercepted++ + }, + }, + }, + }) + + migrationServer := s.MigrationServer().(*migrationServer) + if _, err := migrationServer.GCReplicas(context.Background(), &serverpb.GCReplicasRequest{}); err != nil { + t.Fatal(err) + } + + if intercepted != numStores { + t.Fatalf("expected to have GC-ed replicas on %d stores, found %d", numStores, intercepted) + } + + s.Stopper().Stop(context.Background()) +} diff --git a/pkg/server/serverpb/migration.pb.go b/pkg/server/serverpb/migration.pb.go index e2eba51c12f3..3dc79f4664b6 100644 --- a/pkg/server/serverpb/migration.pb.go +++ b/pkg/server/serverpb/migration.pb.go @@ -36,7 +36,7 @@ func (m *ValidateTargetClusterVersionRequest) Reset() { *m = ValidateTar func (m *ValidateTargetClusterVersionRequest) String() string { return proto.CompactTextString(m) } func (*ValidateTargetClusterVersionRequest) ProtoMessage() {} func (*ValidateTargetClusterVersionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{0} + return fileDescriptor_migration_02bb88ff52938b97, []int{0} } func (m *ValidateTargetClusterVersionRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -70,7 +70,7 @@ func (m *ValidateTargetClusterVersionResponse) Reset() { *m = ValidateTa func (m *ValidateTargetClusterVersionResponse) String() string { return proto.CompactTextString(m) } func (*ValidateTargetClusterVersionResponse) ProtoMessage() {} func (*ValidateTargetClusterVersionResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{1} + return fileDescriptor_migration_02bb88ff52938b97, []int{1} } func (m *ValidateTargetClusterVersionResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -95,8 +95,8 @@ func (m *ValidateTargetClusterVersionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ValidateTargetClusterVersionResponse proto.InternalMessageInfo -// BumpClusterVersionRequest is used to inform a given node of a cluster version -// bump. +// BumpClusterVersionRequest is used to inform the target node of a cluster +// version bump. type BumpClusterVersionRequest struct { ClusterVersion *clusterversion.ClusterVersion `protobuf:"bytes,1,opt,name=cluster_version,json=clusterVersion,proto3" json:"cluster_version,omitempty"` } @@ -105,7 +105,7 @@ func (m *BumpClusterVersionRequest) Reset() { *m = BumpClusterVersionReq func (m *BumpClusterVersionRequest) String() string { return proto.CompactTextString(m) } func (*BumpClusterVersionRequest) ProtoMessage() {} func (*BumpClusterVersionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{2} + return fileDescriptor_migration_02bb88ff52938b97, []int{2} } func (m *BumpClusterVersionRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -138,7 +138,7 @@ func (m *BumpClusterVersionResponse) Reset() { *m = BumpClusterVersionRe func (m *BumpClusterVersionResponse) String() string { return proto.CompactTextString(m) } func (*BumpClusterVersionResponse) ProtoMessage() {} func (*BumpClusterVersionResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_migration_8dfeb6fcf9144e4c, []int{3} + return fileDescriptor_migration_02bb88ff52938b97, []int{3} } func (m *BumpClusterVersionResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -163,11 +163,149 @@ func (m *BumpClusterVersionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_BumpClusterVersionResponse proto.InternalMessageInfo +// GCReplicasRequest is used to instruct the target node to process all GC-able +// replicas. +type GCReplicasRequest struct { +} + +func (m *GCReplicasRequest) Reset() { *m = GCReplicasRequest{} } +func (m *GCReplicasRequest) String() string { return proto.CompactTextString(m) } +func (*GCReplicasRequest) ProtoMessage() {} +func (*GCReplicasRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{4} +} +func (m *GCReplicasRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GCReplicasRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *GCReplicasRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_GCReplicasRequest.Merge(dst, src) +} +func (m *GCReplicasRequest) XXX_Size() int { + return m.Size() +} +func (m *GCReplicasRequest) XXX_DiscardUnknown() { + xxx_messageInfo_GCReplicasRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_GCReplicasRequest proto.InternalMessageInfo + +// GCReplicasResponse is the response to a GCReplicasRequest. +type GCReplicasResponse struct { +} + +func (m *GCReplicasResponse) Reset() { *m = GCReplicasResponse{} } +func (m *GCReplicasResponse) String() string { return proto.CompactTextString(m) } +func (*GCReplicasResponse) ProtoMessage() {} +func (*GCReplicasResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{5} +} +func (m *GCReplicasResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *GCReplicasResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *GCReplicasResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_GCReplicasResponse.Merge(dst, src) +} +func (m *GCReplicasResponse) XXX_Size() int { + return m.Size() +} +func (m *GCReplicasResponse) XXX_DiscardUnknown() { + xxx_messageInfo_GCReplicasResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_GCReplicasResponse proto.InternalMessageInfo + +// FlushAllEnginesRequest is used to instruct the target node to flush all its +// engines. +type FlushAllEnginesRequest struct { +} + +func (m *FlushAllEnginesRequest) Reset() { *m = FlushAllEnginesRequest{} } +func (m *FlushAllEnginesRequest) String() string { return proto.CompactTextString(m) } +func (*FlushAllEnginesRequest) ProtoMessage() {} +func (*FlushAllEnginesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{6} +} +func (m *FlushAllEnginesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *FlushAllEnginesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *FlushAllEnginesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FlushAllEnginesRequest.Merge(dst, src) +} +func (m *FlushAllEnginesRequest) XXX_Size() int { + return m.Size() +} +func (m *FlushAllEnginesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FlushAllEnginesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FlushAllEnginesRequest proto.InternalMessageInfo + +// FlushAllEnginesResponse is the response to a FlushAllEnginesRequest. +type FlushAllEnginesResponse struct { +} + +func (m *FlushAllEnginesResponse) Reset() { *m = FlushAllEnginesResponse{} } +func (m *FlushAllEnginesResponse) String() string { return proto.CompactTextString(m) } +func (*FlushAllEnginesResponse) ProtoMessage() {} +func (*FlushAllEnginesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_migration_02bb88ff52938b97, []int{7} +} +func (m *FlushAllEnginesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *FlushAllEnginesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil +} +func (dst *FlushAllEnginesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_FlushAllEnginesResponse.Merge(dst, src) +} +func (m *FlushAllEnginesResponse) XXX_Size() int { + return m.Size() +} +func (m *FlushAllEnginesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_FlushAllEnginesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_FlushAllEnginesResponse proto.InternalMessageInfo + func init() { proto.RegisterType((*ValidateTargetClusterVersionRequest)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionRequest") proto.RegisterType((*ValidateTargetClusterVersionResponse)(nil), "cockroach.server.serverpb.ValidateTargetClusterVersionResponse") proto.RegisterType((*BumpClusterVersionRequest)(nil), "cockroach.server.serverpb.BumpClusterVersionRequest") proto.RegisterType((*BumpClusterVersionResponse)(nil), "cockroach.server.serverpb.BumpClusterVersionResponse") + proto.RegisterType((*GCReplicasRequest)(nil), "cockroach.server.serverpb.GCReplicasRequest") + proto.RegisterType((*GCReplicasResponse)(nil), "cockroach.server.serverpb.GCReplicasResponse") + proto.RegisterType((*FlushAllEnginesRequest)(nil), "cockroach.server.serverpb.FlushAllEnginesRequest") + proto.RegisterType((*FlushAllEnginesResponse)(nil), "cockroach.server.serverpb.FlushAllEnginesResponse") } // Reference imports to suppress errors if they are not otherwise used. @@ -196,6 +334,12 @@ type MigrationClient interface { // which checks to see that all nodes in the cluster are running binaries // that would be able to support the intended version bump. BumpClusterVersion(ctx context.Context, in *BumpClusterVersionRequest, opts ...grpc.CallOption) (*BumpClusterVersionResponse, error) + // FlushAllEngines is used to instruct the target node to flush all its + // engines. + FlushAllEngines(ctx context.Context, in *FlushAllEnginesRequest, opts ...grpc.CallOption) (*FlushAllEnginesResponse, error) + // GCReplicas is used to instruct the target node to process all GC-able + // replicas. + GCReplicas(ctx context.Context, in *GCReplicasRequest, opts ...grpc.CallOption) (*GCReplicasResponse, error) } type migrationClient struct { @@ -224,6 +368,24 @@ func (c *migrationClient) BumpClusterVersion(ctx context.Context, in *BumpCluste return out, nil } +func (c *migrationClient) FlushAllEngines(ctx context.Context, in *FlushAllEnginesRequest, opts ...grpc.CallOption) (*FlushAllEnginesResponse, error) { + out := new(FlushAllEnginesResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/FlushAllEngines", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *migrationClient) GCReplicas(ctx context.Context, in *GCReplicasRequest, opts ...grpc.CallOption) (*GCReplicasResponse, error) { + out := new(GCReplicasResponse) + err := c.cc.Invoke(ctx, "/cockroach.server.serverpb.Migration/GCReplicas", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MigrationServer is the server API for Migration service. type MigrationServer interface { // ValidateTargetClusterVersion is used to verify that the target node is @@ -240,6 +402,12 @@ type MigrationServer interface { // which checks to see that all nodes in the cluster are running binaries // that would be able to support the intended version bump. BumpClusterVersion(context.Context, *BumpClusterVersionRequest) (*BumpClusterVersionResponse, error) + // FlushAllEngines is used to instruct the target node to flush all its + // engines. + FlushAllEngines(context.Context, *FlushAllEnginesRequest) (*FlushAllEnginesResponse, error) + // GCReplicas is used to instruct the target node to process all GC-able + // replicas. + GCReplicas(context.Context, *GCReplicasRequest) (*GCReplicasResponse, error) } func RegisterMigrationServer(s *grpc.Server, srv MigrationServer) { @@ -282,6 +450,42 @@ func _Migration_BumpClusterVersion_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _Migration_FlushAllEngines_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FlushAllEnginesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).FlushAllEngines(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/FlushAllEngines", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).FlushAllEngines(ctx, req.(*FlushAllEnginesRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Migration_GCReplicas_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GCReplicasRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MigrationServer).GCReplicas(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cockroach.server.serverpb.Migration/GCReplicas", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MigrationServer).GCReplicas(ctx, req.(*GCReplicasRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Migration_serviceDesc = grpc.ServiceDesc{ ServiceName: "cockroach.server.serverpb.Migration", HandlerType: (*MigrationServer)(nil), @@ -294,6 +498,14 @@ var _Migration_serviceDesc = grpc.ServiceDesc{ MethodName: "BumpClusterVersion", Handler: _Migration_BumpClusterVersion_Handler, }, + { + MethodName: "FlushAllEngines", + Handler: _Migration_FlushAllEngines_Handler, + }, + { + MethodName: "GCReplicas", + Handler: _Migration_GCReplicas_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "server/serverpb/migration.proto", @@ -391,6 +603,78 @@ func (m *BumpClusterVersionResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *GCReplicasRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GCReplicasRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *GCReplicasResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *GCReplicasResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *FlushAllEnginesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FlushAllEnginesRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *FlushAllEnginesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FlushAllEnginesResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + func encodeVarintMigration(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -444,6 +728,42 @@ func (m *BumpClusterVersionResponse) Size() (n int) { return n } +func (m *GCReplicasRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *GCReplicasResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *FlushAllEnginesRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *FlushAllEnginesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovMigration(x uint64) (n int) { for { n++ @@ -723,6 +1043,206 @@ func (m *BumpClusterVersionResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *GCReplicasRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GCReplicasRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GCReplicasRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *GCReplicasResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: GCReplicasResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: GCReplicasResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *FlushAllEnginesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: FlushAllEnginesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: FlushAllEnginesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *FlushAllEnginesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMigration + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: FlushAllEnginesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: FlushAllEnginesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipMigration(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMigration + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipMigration(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 @@ -829,11 +1349,11 @@ var ( ) func init() { - proto.RegisterFile("server/serverpb/migration.proto", fileDescriptor_migration_8dfeb6fcf9144e4c) + proto.RegisterFile("server/serverpb/migration.proto", fileDescriptor_migration_02bb88ff52938b97) } -var fileDescriptor_migration_8dfeb6fcf9144e4c = []byte{ - // 278 bytes of a gzipped FileDescriptorProto +var fileDescriptor_migration_02bb88ff52938b97 = []byte{ + // 366 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2f, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x87, 0x50, 0x05, 0x49, 0xfa, 0xb9, 0x99, 0xe9, 0x45, 0x89, 0x25, 0x99, 0xf9, 0x79, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x92, 0xc9, 0xf9, 0xc9, 0xd9, 0x45, 0xf9, 0x89, @@ -844,12 +1364,17 @@ var fileDescriptor_migration_8dfeb6fcf9144e4c = []byte{ 0xd3, 0x2f, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x6d, 0xa4, 0xa1, 0x87, 0x70, 0x01, 0xaa, 0x85, 0x7a, 0x68, 0x26, 0xf1, 0x25, 0xa3, 0xf0, 0x95, 0xd4, 0xb8, 0x54, 0xf0, 0xdb, 0x5c, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0xaa, 0x94, 0xc7, 0x25, 0xe9, 0x54, 0x9a, 0x5b, 0x40, 0x37, 0x77, 0xc9, 0x70, 0x49, - 0x61, 0xb3, 0x0f, 0xe2, 0x1a, 0xa3, 0xad, 0x4c, 0x5c, 0x9c, 0xbe, 0xb0, 0x48, 0x10, 0x5a, 0xc8, - 0xc8, 0x25, 0x83, 0xcf, 0x13, 0x42, 0x76, 0x7a, 0x38, 0x23, 0x48, 0x8f, 0x88, 0x70, 0x97, 0xb2, - 0x27, 0x5b, 0x3f, 0x34, 0xf4, 0x18, 0x84, 0x9a, 0x19, 0xb9, 0x84, 0x30, 0x3d, 0x24, 0x64, 0x82, - 0xc7, 0x64, 0x9c, 0xe1, 0x2d, 0x65, 0x4a, 0xa2, 0x2e, 0x98, 0x2b, 0x9c, 0xb4, 0x4e, 0x3c, 0x94, - 0x63, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x1b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, - 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, - 0x38, 0x60, 0x06, 0x25, 0xb1, 0x81, 0x93, 0xa6, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x08, 0xa5, - 0xf0, 0xc3, 0xfe, 0x02, 0x00, 0x00, + 0x61, 0xb3, 0x0f, 0xea, 0x1a, 0x61, 0x2e, 0x41, 0x77, 0xe7, 0xa0, 0xd4, 0x82, 0x9c, 0xcc, 0xe4, + 0xc4, 0x62, 0xa8, 0x2b, 0x94, 0x44, 0xb8, 0x84, 0x90, 0x05, 0xa1, 0x4a, 0x25, 0xb8, 0xc4, 0xdc, + 0x72, 0x4a, 0x8b, 0x33, 0x1c, 0x73, 0x72, 0x5c, 0xf3, 0xd2, 0x33, 0xf3, 0x52, 0xe1, 0xea, 0x25, + 0xb9, 0xc4, 0x31, 0x64, 0x20, 0x9a, 0x8c, 0xe6, 0xb1, 0x70, 0x71, 0xfa, 0xc2, 0x22, 0x59, 0x68, + 0x21, 0x23, 0x97, 0x0c, 0xbe, 0x40, 0x12, 0xb2, 0xd3, 0xc3, 0x99, 0x00, 0xf4, 0x88, 0x88, 0x57, + 0x29, 0x7b, 0xb2, 0xf5, 0x43, 0x3d, 0xc9, 0x20, 0xd4, 0xcc, 0xc8, 0x25, 0x84, 0x19, 0x60, 0x42, + 0x26, 0x78, 0x4c, 0xc6, 0x19, 0x9f, 0x52, 0xa6, 0x24, 0xea, 0x82, 0xbb, 0xa2, 0x8a, 0x8b, 0x1f, + 0x2d, 0x48, 0x85, 0x0c, 0xf1, 0x98, 0x85, 0x3d, 0x62, 0xa4, 0x8c, 0x48, 0xd1, 0x02, 0xb7, 0x3b, + 0x9b, 0x8b, 0x0b, 0x11, 0xfd, 0x42, 0x3a, 0x78, 0xcc, 0xc0, 0x48, 0x3a, 0x52, 0xba, 0x44, 0xaa, + 0x86, 0x59, 0xe6, 0xa4, 0x75, 0xe2, 0xa1, 0x1c, 0xc3, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e, 0xc9, + 0x31, 0xde, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, + 0xcb, 0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0xc5, 0x01, 0x33, 0x20, 0x89, 0x0d, 0x9c, 0xc7, 0x8d, + 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xc5, 0x1c, 0x4a, 0x9f, 0x47, 0x04, 0x00, 0x00, } diff --git a/pkg/server/serverpb/migration.proto b/pkg/server/serverpb/migration.proto index 871b00dc8543..fd8d3394e11b 100644 --- a/pkg/server/serverpb/migration.proto +++ b/pkg/server/serverpb/migration.proto @@ -25,8 +25,8 @@ message ValidateTargetClusterVersionRequest { message ValidateTargetClusterVersionResponse { } -// BumpClusterVersionRequest is used to inform a given node of a cluster version -// bump. +// BumpClusterVersionRequest is used to inform the target node of a cluster +// version bump. message BumpClusterVersionRequest { clusterversion.ClusterVersion cluster_version = 1; } @@ -34,6 +34,20 @@ message BumpClusterVersionRequest { // BumpClusterVersionResponse is the response to an BumpClusterVersionRequest. message BumpClusterVersionResponse { } +// GCReplicasRequest is used to instruct the target node to process all GC-able +// replicas. +message GCReplicasRequest{} + +// GCReplicasResponse is the response to a GCReplicasRequest. +message GCReplicasResponse{} + +// FlushAllEnginesRequest is used to instruct the target node to flush all its +// engines. +message FlushAllEnginesRequest{} + +// FlushAllEnginesResponse is the response to a FlushAllEnginesRequest. +message FlushAllEnginesResponse{} + service Migration { // ValidateTargetClusterVersion is used to verify that the target node is // running a binary that's able to support the specified cluster version. @@ -50,4 +64,12 @@ service Migration { // which checks to see that all nodes in the cluster are running binaries // that would be able to support the intended version bump. rpc BumpClusterVersion(BumpClusterVersionRequest) returns (BumpClusterVersionResponse) { } + + // FlushAllEngines is used to instruct the target node to flush all its + // engines. + rpc FlushAllEngines (FlushAllEnginesRequest) returns (FlushAllEnginesResponse) { } + + // GCReplicas is used to instruct the target node to process all GC-able + // replicas. + rpc GCReplicas (GCReplicasRequest) returns (GCReplicasResponse) { } }