Skip to content

Commit

Permalink
enhance: remove the rpc layer of coordinator when enabling standalone…
Browse files Browse the repository at this point in the history
… or mixcoord (#38207)

issue: #37764
pr: #37815 
also see: #38259

- add a local client to call local server directly for
querycoord/rootcoord/datacoord.
- enable local client if milvus is running mixcoord or standalone mode.
- after removing rpc layer from mixcoord, the querycoord at standby mode
will be blocked forever of deployment rolling

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 10, 2024
1 parent c4b6460 commit 6b310e1
Show file tree
Hide file tree
Showing 18 changed files with 548 additions and 65 deletions.
7 changes: 7 additions & 0 deletions cmd/roles/roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/http"
"github.com/milvus-io/milvus/internal/http/healthz"
"github.com/milvus-io/milvus/internal/util/dependency"
Expand Down Expand Up @@ -380,6 +381,12 @@ func (mr *MilvusRoles) Run() {
paramtable.Init()
paramtable.SetRole(mr.ServerType)
}
coordclient.EnableLocalClientRole(&coordclient.LocalClientRoleConfig{
ServerType: mr.ServerType,
EnableQueryCoord: mr.EnableQueryCoord,
EnableDataCoord: mr.EnableDataCoord,
EnableRootCoord: mr.EnableRootCoord,
})

enableComponents := []bool{
mr.EnableRootCoord,
Expand Down
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ common:
usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode.

# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
Expand Down
162 changes: 162 additions & 0 deletions internal/coordinator/coordclient/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package coordclient

import (
"context"
"fmt"

"go.uber.org/zap"

dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
qcc "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// localClient is a client that can access local server directly
type localClient struct {
queryCoordClient *syncutil.Future[types.QueryCoordClient]
dataCoordClient *syncutil.Future[types.DataCoordClient]
rootCoordClient *syncutil.Future[types.RootCoordClient]
}

var (
enableLocal *LocalClientRoleConfig // a global map to store all can be local accessible roles.
glocalClient *localClient // !!! WARNING: local client will ignore all interceptor of grpc client and server.
)

func init() {
enableLocal = &LocalClientRoleConfig{}
glocalClient = &localClient{
queryCoordClient: syncutil.NewFuture[types.QueryCoordClient](),
dataCoordClient: syncutil.NewFuture[types.DataCoordClient](),
rootCoordClient: syncutil.NewFuture[types.RootCoordClient](),
}
}

type LocalClientRoleConfig struct {
ServerType string
EnableQueryCoord bool
EnableDataCoord bool
EnableRootCoord bool
}

// EnableLocalClientRole init localable roles
func EnableLocalClientRole(cfg *LocalClientRoleConfig) {
if !paramtable.Get().CommonCfg.LocalRPCEnabled.GetAsBool() {
return
}
if cfg.ServerType != typeutil.StandaloneRole && cfg.ServerType != typeutil.MixtureRole {
return
}
enableLocal = cfg
}

// RegisterQueryCoordServer register query coord server
func RegisterQueryCoordServer(server querypb.QueryCoordServer) {
if !enableLocal.EnableQueryCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&querypb.QueryCoord_ServiceDesc, server, querypb.NewQueryCoordClient)
glocalClient.queryCoordClient.Set(&nopCloseQueryCoordClient{newLocalClient})
log.Info("register query coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegsterDataCoordServer register data coord server
func RegisterDataCoordServer(server datapb.DataCoordServer) {
if !enableLocal.EnableDataCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&datapb.DataCoord_ServiceDesc, server, datapb.NewDataCoordClient)
glocalClient.dataCoordClient.Set(&nopCloseDataCoordClient{newLocalClient})
log.Info("register data coord server", zap.Any("enableLocalClient", enableLocal))
}

// RegisterRootCoordServer register root coord server
func RegisterRootCoordServer(server rootcoordpb.RootCoordServer) {
if !enableLocal.EnableRootCoord {
return
}
newLocalClient := grpcclient.NewLocalGRPCClient(&rootcoordpb.RootCoord_ServiceDesc, server, rootcoordpb.NewRootCoordClient)
glocalClient.rootCoordClient.Set(&nopCloseRootCoordClient{newLocalClient})
log.Info("register root coord server", zap.Any("enableLocalClient", enableLocal))
}

// GetQueryCoordClient return query coord client
func GetQueryCoordClient(ctx context.Context) types.QueryCoordClient {
var client types.QueryCoordClient
var err error
if enableLocal.EnableQueryCoord {
client, err = glocalClient.queryCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
client, err = qcc.NewClient(ctx)
}
if err != nil {
panic(fmt.Sprintf("get query coord client failed: %v", err))
}
return client
}

// GetDataCoordClient return data coord client
func GetDataCoordClient(ctx context.Context) types.DataCoordClient {
var client types.DataCoordClient
var err error
if enableLocal.EnableDataCoord {
client, err = glocalClient.dataCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
client, err = dcc.NewClient(ctx)
}
if err != nil {
panic(fmt.Sprintf("get data coord client failed: %v", err))
}
return client
}

// GetRootCoordClient return root coord client
func GetRootCoordClient(ctx context.Context) types.RootCoordClient {
var client types.RootCoordClient
var err error
if enableLocal.EnableRootCoord {
client, err = glocalClient.rootCoordClient.GetWithContext(ctx)
} else {
// TODO: we should make a singleton here. but most unittest rely on a dedicated client.
client, err = rcc.NewClient(ctx)
}
if err != nil {
panic(fmt.Sprintf("get root coord client failed: %v", err))
}
return client
}

type nopCloseQueryCoordClient struct {
querypb.QueryCoordClient
}

func (n *nopCloseQueryCoordClient) Close() error {
return nil
}

type nopCloseDataCoordClient struct {
datapb.DataCoordClient
}

func (n *nopCloseDataCoordClient) Close() error {
return nil
}

type nopCloseRootCoordClient struct {
rootcoordpb.RootCoordClient
}

func (n *nopCloseRootCoordClient) Close() error {
return nil
}
78 changes: 78 additions & 0 deletions internal/coordinator/coordclient/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package coordclient

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestRegistry(t *testing.T) {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().CommonCfg.LocalRPCEnabled.Key, "true")

assert.False(t, enableLocal.EnableQueryCoord)
assert.False(t, enableLocal.EnableDataCoord)
assert.False(t, enableLocal.EnableRootCoord)

EnableLocalClientRole(&LocalClientRoleConfig{
ServerType: typeutil.RootCoordRole,
EnableQueryCoord: true,
EnableDataCoord: true,
EnableRootCoord: true,
})
assert.False(t, enableLocal.EnableQueryCoord)
assert.False(t, enableLocal.EnableDataCoord)
assert.False(t, enableLocal.EnableRootCoord)

RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
assert.False(t, glocalClient.dataCoordClient.Ready())
assert.False(t, glocalClient.queryCoordClient.Ready())
assert.False(t, glocalClient.rootCoordClient.Ready())

enableLocal = &LocalClientRoleConfig{}

EnableLocalClientRole(&LocalClientRoleConfig{
ServerType: typeutil.StandaloneRole,
EnableQueryCoord: true,
EnableDataCoord: true,
EnableRootCoord: true,
})
assert.True(t, enableLocal.EnableDataCoord)
assert.True(t, enableLocal.EnableQueryCoord)
assert.True(t, enableLocal.EnableRootCoord)

RegisterRootCoordServer(&rootcoordpb.UnimplementedRootCoordServer{})
RegisterDataCoordServer(&datapb.UnimplementedDataCoordServer{})
RegisterQueryCoordServer(&querypb.UnimplementedQueryCoordServer{})
assert.True(t, glocalClient.dataCoordClient.Ready())
assert.True(t, glocalClient.queryCoordClient.Ready())
assert.True(t, glocalClient.rootCoordClient.Ready())

enableLocal = &LocalClientRoleConfig{}

EnableLocalClientRole(&LocalClientRoleConfig{
ServerType: typeutil.MixtureRole,
EnableQueryCoord: true,
EnableDataCoord: true,
EnableRootCoord: true,
})
assert.True(t, enableLocal.EnableDataCoord)
assert.True(t, enableLocal.EnableQueryCoord)
assert.True(t, enableLocal.EnableRootCoord)

assert.NotNil(t, GetQueryCoordClient(context.Background()))
assert.NotNil(t, GetDataCoordClient(context.Background()))
assert.NotNil(t, GetRootCoordClient(context.Background()))
GetQueryCoordClient(context.Background()).Close()
GetDataCoordClient(context.Background()).Close()
GetRootCoordClient(context.Background()).Close()
}
4 changes: 2 additions & 2 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
globalIDAllocator "github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/datacoord/broker"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/tikv"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
Expand Down Expand Up @@ -255,7 +255,7 @@ func defaultIndexNodeCreatorFunc(ctx context.Context, addr string, nodeID int64)
}

func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoordClient, error) {
return rootcoordclient.NewClient(ctx)
return coordclient.GetRootCoordClient(ctx), nil
}

// QuitSignal returns signal when server quits
Expand Down
2 changes: 2 additions & 0 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/datacoord"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -200,6 +201,7 @@ func (s *Server) startGrpcLoop() {
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
coordclient.RegisterDataCoordServer(s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {
s.grpcErrChan <- err
Expand Down
32 changes: 4 additions & 28 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
dcc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/coordinator/coordclient"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
qc "github.com/milvus-io/milvus/internal/querycoordv2"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -169,45 +167,22 @@ func (s *Server) init() error {

// --- Master Server Client ---
if s.rootCoord == nil {
s.rootCoord, err = rcc.NewClient(s.loopCtx)
if err != nil {
log.Error("QueryCoord try to new RootCoord client failed", zap.Error(err))
panic(err)
}
s.rootCoord = coordclient.GetRootCoordClient(s.loopCtx)
}

// wait for master init or healthy
log.Info("QueryCoord try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err))
panic(err)
}

if err := s.SetRootCoord(s.rootCoord); err != nil {
panic(err)
}
log.Info("QueryCoord report RootCoord ready")

// --- Data service client ---
if s.dataCoord == nil {
s.dataCoord, err = dcc.NewClient(s.loopCtx)
if err != nil {
log.Error("QueryCoord try to new DataCoord client failed", zap.Error(err))
panic(err)
}
s.dataCoord = coordclient.GetDataCoordClient(s.loopCtx)
}

log.Info("QueryCoord try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err))
panic(err)
}
if err := s.SetDataCoord(s.dataCoord); err != nil {
panic(err)
}
log.Info("QueryCoord report DataCoord ready")

if err := s.queryCoord.Init(); err != nil {
return err
Expand Down Expand Up @@ -258,6 +233,7 @@ func (s *Server) startGrpcLoop() {
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
querypb.RegisterQueryCoordServer(s.grpcServer, s)
coordclient.RegisterQueryCoordServer(s)

go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {
Expand Down
8 changes: 0 additions & 8 deletions internal/distributed/querycoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,8 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, server)

mdc := mocks.NewMockDataCoordClient(t)
mdc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil)

mrc := mocks.NewMockRootCoordClient(t)
mrc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil)

mqc := getQueryCoord()
successStatus := merr.Success()
Expand Down
Loading

0 comments on commit 6b310e1

Please sign in to comment.