diff --git a/balancer.go b/balancer.go index 3d917d9..ea9bd47 100644 --- a/balancer.go +++ b/balancer.go @@ -40,6 +40,7 @@ type balancerBuilder struct { // Build 创建一个Balancer func (bb *balancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + grpclog.Infof("[POLARIS] start to build polaris balancer\n") return &polarisNamingBalancer{ cc: cc, target: opts.Target, diff --git a/examples/quickstart/consumer/main.go b/examples/quickstart/consumer/main.go index 7e3d9f6..3db1be8 100644 --- a/examples/quickstart/consumer/main.go +++ b/examples/quickstart/consumer/main.go @@ -25,9 +25,9 @@ import ( "google.golang.org/grpc" + _ "github.com/polarismesh/grpc-go-polaris" "github.com/polarismesh/grpc-go-polaris/examples/quickstart/pb" - polaris "github.com/polarismesh/grpc-go-polaris" ) const ( @@ -38,8 +38,7 @@ func main() { // grpc客户端连接获取 ctx, cancel := context.WithCancel(context.Background()) defer cancel() - conn, err := grpc.DialContext(ctx, "polaris://EchoServerGRPC/", grpc.WithInsecure(), - grpc.WithDefaultServiceConfig(polaris.LoadBalanceConfig)) + conn, err := grpc.DialContext(ctx, "polaris://EchoServerGRPC/", grpc.WithInsecure()) if err != nil { log.Fatal(err) } diff --git a/examples/quickstart/provider/polaris.yaml b/examples/quickstart/provider/polaris.yaml index 3f0b683..d926e60 100644 --- a/examples/quickstart/provider/polaris.yaml +++ b/examples/quickstart/provider/polaris.yaml @@ -1,4 +1,4 @@ global: serverConnector: addresses: - - 127.0.0.1:8091 \ No newline at end of file + - 9.134.15.118:8091 \ No newline at end of file diff --git a/resolver.go b/resolver.go index d8c087a..c70d362 100644 --- a/resolver.go +++ b/resolver.go @@ -24,10 +24,16 @@ import ( "fmt" "sync" - "github.com/polarismesh/polaris-go/api" + "google.golang.org/grpc" + "google.golang.org/grpc/serviceconfig" + + "google.golang.org/grpc/grpclog" + + "github.com/golang/protobuf/proto" "github.com/polarismesh/polaris-go/pkg/model" + + "github.com/polarismesh/polaris-go/api" "google.golang.org/grpc/attributes" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" ) @@ -95,10 +101,11 @@ type polarisNamingResolver struct { cancel context.CancelFunc cc resolver.ClientConn // rn channel is used by ResolveNow() to force an immediate resolution of the target. - rn chan struct{} - wg sync.WaitGroup - options *dialOptions - target resolver.Target + rn chan struct{} + wg sync.WaitGroup + options *dialOptions + target resolver.Target + balanceOnce sync.Once } // ResolveNow The method is called by the gRPC framework to resolve the target name @@ -181,7 +188,17 @@ func (pr *polarisNamingResolver) watcher() { if err != nil { pr.cc.ReportError(err) } else { - pr.cc.UpdateState(*state) + pr.balanceOnce.Do(func() { + state.ServiceConfig = &serviceconfig.ParseResult{ + Config: &grpc.ServiceConfig{ + LB: proto.String(scheme), + }, + } + }) + err = pr.cc.UpdateState(*state) + if nil != err { + grpclog.Errorf("fail to do update service %s: %v", pr.target.URL.Host, err) + } var svcKey model.ServiceKey svcKey, eventChan, err = pr.doWatch(consumerAPI) if nil != err { diff --git a/server.go b/server.go index 0f73735..fbee631 100644 --- a/server.go +++ b/server.go @@ -46,6 +46,7 @@ type serverOptions struct { metadata map[string]string host string port int + token string } func (s *serverOptions) setDefault() { @@ -92,6 +93,13 @@ func WithGRPCServerOptions(opts ...grpc.ServerOption) ServerOption { }) } +// WithToken set the token to do server operations +func WithToken(token string) ServerOption { + return newFuncServerOption(func(options *serverOptions) { + options.token = token + }) +} + // WithServerNamespace set the namespace to register instance func WithServerNamespace(namespace string) ServerOption { return newFuncServerOption(func(options *serverOptions) { @@ -166,6 +174,7 @@ func deregisterServices(registerContext *RegisterContext) { deregisterRequest.Service = registerRequest.Service deregisterRequest.Host = registerRequest.Host deregisterRequest.Port = registerRequest.Port + deregisterRequest.ServiceToken = registerRequest.ServiceToken err := registerContext.providerAPI.Deregister(deregisterRequest) if nil != err { grpclog.Errorf("[Polaris]fail to deregister %s:%d to service %s(%s)", @@ -229,6 +238,7 @@ func (s *Server) startHeartbeat(ctx context.Context, hbRequest.Service = registerRequest.Service hbRequest.Host = registerRequest.Host hbRequest.Port = registerRequest.Port + hbRequest.ServiceToken = registerRequest.ServiceToken err := providerAPI.Heartbeat(hbRequest) if nil != err { grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v", @@ -291,6 +301,7 @@ func Register(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) (*Serve registerRequest.SetTTL(srv.serverOptions.ttl) registerRequest.Protocol = proto.String(lis.Addr().Network()) registerRequest.Metadata = srv.serverOptions.metadata + registerRequest.ServiceToken = srv.serverOptions.token registerContext.registerRequests = append(registerContext.registerRequests, registerRequest) resp, err := registerContext.providerAPI.Register(registerRequest) if nil != err {