Skip to content

Commit

Permalink
enhance: remove the rpc layer of coordinator when enabling standalone…
Browse files Browse the repository at this point in the history
… or mixcoord (#38246)

issue: #33285
pr: #37815

- remove the rpc layer of coordinator when enabling standalone or
mixcoord
- move health check into init

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 5, 2024
1 parent c4df6b5 commit 99279e0
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 36 deletions.
8 changes: 1 addition & 7 deletions cmd/milvus/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/cmd/roles"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
Expand Down Expand Up @@ -172,12 +171,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
fmt.Fprintf(os.Stderr, "Unknown server type = %s\n%s", serverType, getHelp())
os.Exit(-1)
}
coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
ServerType: serverType,
EnableQueryCoord: role.EnableQueryCoord,
EnableDataCoord: role.EnableDataCoord,
EnableRootCoord: role.EnableRootCoord,
})

return role
}

Expand Down
7 changes: 7 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/http/healthz"
"github.com/milvus-io/milvus/internal/util/dependency"
Expand Down Expand Up @@ -375,6 +376,12 @@ func (mr *MilvusRoles) Run() {
paramtable.Init()
paramtable.SetRole(mr.ServerType)
}
coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
ServerType: mr.ServerType,
EnableQueryCoord: mr.EnableQueryCoord,
EnableDataCoord: mr.EnableDataCoord,
EnableRootCoord: mr.EnableRootCoord,
})

enableComponents := []bool{
mr.EnableRootCoord,
Expand Down
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ common:
usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
localRPCEnabled: true # enable local rpc for internal communication when mix or standalone mode.

# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d7
require (
github.com/bits-and-blooms/bitset v1.10.0
github.com/bytedance/sonic v1.9.1
github.com/fullstorydev/grpchan v1.1.1
github.com/greatroar/blobloom v0.8.0
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -140,7 +139,6 @@ require (
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20221217025313-27d3c9f66b6a // indirect
github.com/jhump/protoreflect v1.12.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7z
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas=
github.com/fullstorydev/grpchan v1.1.1/go.mod h1:f4HpiV8V6htfY/K44GWV1ESQzHBTq7DinhzqQ95lpgc=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
Expand Down Expand Up @@ -499,7 +497,6 @@ github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSl
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01HR10=
github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM=
Expand Down
18 changes: 8 additions & 10 deletions internal/coordinator/coordclient/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (

"go.uber.org/zap"

"github.com/fullstorydev/grpchan/inprocgrpc"
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -49,6 +50,9 @@ type LocalClientRoleConfig struct {

// EnableLocalClientRole init localable roles
func EnableLocalClientRole(cfg *LocalClientRoleConfig) {
if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
return
}
if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole {
return
}
Expand All @@ -60,9 +64,7 @@ func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
if !enableLocal.EnableQueryCoord {
return
}
channel := &inprocgrpc.Channel{}
channel.RegisterService(&querypb.QueryCoord_ServiceDesc, server)
newLocalClient := querypb.NewQueryCoordClient(channel)
newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient)
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
log.Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
}
Expand All @@ -72,9 +74,7 @@ func RegisterDataCoordServer(server datapb.DataCoordServer) {
if !enableLocal.EnableDataCoord {
return
}
channel := &inprocgrpc.Channel{}
channel.RegisterService(&datapb.DataCoord_ServiceDesc, server)
newLocalClient := datapb.NewDataCoordClient(channel)
newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient)
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
log.Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
}
Expand All @@ -84,9 +84,7 @@ func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
if !enableLocal.EnableRootCoord {
return
}
channel := &inprocgrpc.Channel{}
channel.RegisterService(&rootcoordpb.RootCoord_ServiceDesc, server)
newLocalClient := rootcoordpb.NewRootCoordClient(channel)
newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
log.Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
}
Expand Down
4 changes: 4 additions & 0 deletions internal/coordinator/coordclient/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestRegistry(t *testing.T) {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().CommonCfg.LocalRPCEnabled.Key, "true")

assert.False(t, enableLocal.EnableQueryCoord)
assert.False(t, enableLocal.EnableDataCoord)
assert.False(t, enableLocal.EnableRootCoord)
Expand Down
14 changes: 0 additions & 14 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
qc "github.com/milvus-io/milvus/internal/querycoordv2"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -172,29 +171,16 @@ func (s *Server) init() error {
}

// wait for master init or healthy
log.Info("QueryCoord try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err))
panic(err)
}

if err := s.SetRootCoord(s.rootCoord); err != nil {
panic(err)
}
log.Info("QueryCoord report RootCoord ready")

// --- Data service client ---
if s.dataCoord == nil {
s.dataCoord = coordclient.GetDataCoordClient(s.loopCtx)
}

log.Info("QueryCoord try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err))
panic(err)
}
if err := s.SetDataCoord(s.dataCoord); err != nil {
panic(err)
}
Expand Down
17 changes: 17 additions & 0 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
Expand Down Expand Up @@ -218,6 +219,22 @@ func (s *Server) Init() error {
}

func (s *Server) initQueryCoord() error {
// wait for master init or healthy
log.Info("QueryCoord try to wait for RootCoord ready")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200); err != nil {
log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err))
panic(err)
}
log.Info("QueryCoord report RootCoord ready")

// wait for master init or healthy
log.Info("QueryCoord try to wait for DataCoord ready")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200); err != nil {
log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err))
panic(err)
}
log.Info("QueryCoord report DataCoord ready")

s.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("start init querycoord", zap.Any("State", commonpb.StateCode_Initializing))
// Init KV and ID allocator
Expand Down
68 changes: 68 additions & 0 deletions internal/util/grpcclient/local_grpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package grpcclient

import (
"context"
"fmt"
"reflect"
"strings"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var _ grpc.ClientConnInterface = &localConn{}

// NewLocalGRPCClient creates a grpc client that calls the server directly.
// !!! Warning: it didn't make any network or serialization/deserialization, so it's not promise concurrent safe.
// and there's no interceptor for client and server like the common grpc client/server.
func NewLocalGRPCClient[C any, S any](desc *grpc.ServiceDesc, server S, clientCreator func(grpc.ClientConnInterface) C) C {
return clientCreator(&localConn{
serviceDesc: desc,
server: server,
})
}

// localConn is a grpc.ClientConnInterface implementation that calls the server directly.
type localConn struct {
serviceDesc *grpc.ServiceDesc // ServiceDesc is the descriptor for this service.
server interface{} // the server object.
}

// Invoke calls the server method directly.
func (c *localConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error {
methodDesc := c.findMethod(method)
if methodDesc == nil {
return status.Errorf(codes.Unimplemented, fmt.Sprintf("method %s not implemented", method))
}
resp, err := methodDesc.Handler(c.server, ctx, func(in any) error {
reflect.ValueOf(in).Elem().Set(reflect.ValueOf(args).Elem())
return nil
}, nil)
if err != nil {
return err
}
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(resp).Elem())
return nil
}

// NewStream is not supported by now, wait for implementation.
func (c *localConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
panic("we don't support local stream rpc by now")
}

// findMethod finds the method descriptor by the full method name.
func (c *localConn) findMethod(fullMethodName string) *grpc.MethodDesc {
strs := strings.SplitN(fullMethodName[1:], "/", 2)
serviceName := strs[0]
methodName := strs[1]
if c.serviceDesc.ServiceName != serviceName {
return nil
}
for i := range c.serviceDesc.Methods {
if c.serviceDesc.Methods[i].MethodName == methodName {
return &c.serviceDesc.Methods[i]
}
}
return nil
}
46 changes: 46 additions & 0 deletions internal/util/grpcclient/local_grpc_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package grpcclient

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
)

type mockRootCoordServer struct {
t *testing.T
*rootcoordpb.UnimplementedRootCoordServer
}

func (s *mockRootCoordServer) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
assert.NotNil(s.t, req)
assert.Equal(s.t, uint32(100), req.Count)
return &rootcoordpb.AllocIDResponse{
ID: 1,
Count: 2,
}, nil
}

func TestLocalGRPCClient(t *testing.T) {
localClient := NewLocalGRPCClient(
&rootcoordpb.RootCoord_ServiceDesc,
&mockRootCoordServer{
t: t,
UnimplementedRootCoordServer: &rootcoordpb.UnimplementedRootCoordServer{},
},
rootcoordpb.NewRootCoordClient,
)
result, err := localClient.AllocTimestamp(context.Background(), &rootcoordpb.AllocTimestampRequest{})
assert.Error(t, err)
assert.Nil(t, result)

result2, err := localClient.AllocID(context.Background(), &rootcoordpb.AllocIDRequest{
Count: 100,
})
assert.NoError(t, err)
assert.NotNil(t, result2)
assert.Equal(t, int64(1), result2.ID)
assert.Equal(t, uint32(2), result2.Count)
}
31 changes: 31 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ type commonConfig struct {
ReadOnlyPrivileges ParamItem `refreshable:"false"`
ReadWritePrivileges ParamItem `refreshable:"false"`
AdminPrivileges ParamItem `refreshable:"false"`

HealthCheckInterval ParamItem `refreshable:"true"`
HealthCheckRPCTimeout ParamItem `refreshable:"true"`

// Local RPC enabled for milvus internal communication when mix or standalone mode.
LocalRPCEnabled ParamItem `refreshable:"false"`
}

func (p *commonConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -913,6 +919,31 @@ This helps Milvus-CDC synchronize incremental data`,
Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`,
}
p.AdminPrivileges.Init(base.mgr)

p.HealthCheckInterval = ParamItem{
Key: "common.healthcheck.interval.seconds",
Version: "2.4.8",
DefaultValue: "30",
Doc: `health check interval in seconds, default 30s`,
}
p.HealthCheckInterval.Init(base.mgr)

p.HealthCheckRPCTimeout = ParamItem{
Key: "common.healthcheck.timeout.seconds",
Version: "2.4.8",
DefaultValue: "10",
Doc: `RPC timeout for health check request`,
}
p.HealthCheckRPCTimeout.Init(base.mgr)

p.LocalRPCEnabled = ParamItem{
Key: "common.localRPCEnabled",
Version: "2.4.18",
DefaultValue: "true",
Doc: `enable local rpc for internal communication when mix or standalone mode.`,
Export: true,
}
p.LocalRPCEnabled.Init(base.mgr)
}

type gpuConfig struct {
Expand Down
Loading

0 comments on commit 99279e0

Please sign in to comment.