Skip to content

Commit

Permalink
Merge pull request #27 from andrewshan/resolve_branch
Browse files Browse the repository at this point in the history
fix: serviceconfig auto set, support token
  • Loading branch information
andrewshan authored Mar 6, 2022
2 parents de48ea3 + b14e011 commit 32a215b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 11 deletions.
1 change: 1 addition & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type balancerBuilder struct {

// Build 创建一个Balancer
func (bb *balancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
grpclog.Infof("[POLARIS] start to build polaris balancer\n")
return &polarisNamingBalancer{
cc: cc,
target: opts.Target,
Expand Down
5 changes: 2 additions & 3 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

"google.golang.org/grpc"

_ "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/quickstart/pb"

polaris "github.com/polarismesh/grpc-go-polaris"
)

const (
Expand All @@ -38,8 +38,7 @@ func main() {
// grpc客户端连接获取
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := grpc.DialContext(ctx, "polaris://EchoServerGRPC/", grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(polaris.LoadBalanceConfig))
conn, err := grpc.DialContext(ctx, "polaris://EchoServerGRPC/", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
- 9.134.15.118:8091
31 changes: 24 additions & 7 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ import (
"fmt"
"sync"

"github.com/polarismesh/polaris-go/api"
"google.golang.org/grpc"
"google.golang.org/grpc/serviceconfig"

"google.golang.org/grpc/grpclog"

"github.com/golang/protobuf/proto"
"github.com/polarismesh/polaris-go/pkg/model"

"github.com/polarismesh/polaris-go/api"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -95,10 +101,11 @@ type polarisNamingResolver struct {
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
wg sync.WaitGroup
options *dialOptions
target resolver.Target
rn chan struct{}
wg sync.WaitGroup
options *dialOptions
target resolver.Target
balanceOnce sync.Once
}

// ResolveNow The method is called by the gRPC framework to resolve the target name
Expand Down Expand Up @@ -181,7 +188,17 @@ func (pr *polarisNamingResolver) watcher() {
if err != nil {
pr.cc.ReportError(err)
} else {
pr.cc.UpdateState(*state)
pr.balanceOnce.Do(func() {
state.ServiceConfig = &serviceconfig.ParseResult{
Config: &grpc.ServiceConfig{
LB: proto.String(scheme),
},
}
})
err = pr.cc.UpdateState(*state)
if nil != err {
grpclog.Errorf("fail to do update service %s: %v", pr.target.URL.Host, err)
}
var svcKey model.ServiceKey
svcKey, eventChan, err = pr.doWatch(consumerAPI)
if nil != err {
Expand Down
11 changes: 11 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type serverOptions struct {
metadata map[string]string
host string
port int
token string
}

func (s *serverOptions) setDefault() {
Expand Down Expand Up @@ -92,6 +93,13 @@ func WithGRPCServerOptions(opts ...grpc.ServerOption) ServerOption {
})
}

// WithToken set the token to do server operations
func WithToken(token string) ServerOption {
return newFuncServerOption(func(options *serverOptions) {
options.token = token
})
}

// WithServerNamespace set the namespace to register instance
func WithServerNamespace(namespace string) ServerOption {
return newFuncServerOption(func(options *serverOptions) {
Expand Down Expand Up @@ -166,6 +174,7 @@ func deregisterServices(registerContext *RegisterContext) {
deregisterRequest.Service = registerRequest.Service
deregisterRequest.Host = registerRequest.Host
deregisterRequest.Port = registerRequest.Port
deregisterRequest.ServiceToken = registerRequest.ServiceToken
err := registerContext.providerAPI.Deregister(deregisterRequest)
if nil != err {
grpclog.Errorf("[Polaris]fail to deregister %s:%d to service %s(%s)",
Expand Down Expand Up @@ -229,6 +238,7 @@ func (s *Server) startHeartbeat(ctx context.Context,
hbRequest.Service = registerRequest.Service
hbRequest.Host = registerRequest.Host
hbRequest.Port = registerRequest.Port
hbRequest.ServiceToken = registerRequest.ServiceToken
err := providerAPI.Heartbeat(hbRequest)
if nil != err {
grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v",
Expand Down Expand Up @@ -291,6 +301,7 @@ func Register(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) (*Serve
registerRequest.SetTTL(srv.serverOptions.ttl)
registerRequest.Protocol = proto.String(lis.Addr().Network())
registerRequest.Metadata = srv.serverOptions.metadata
registerRequest.ServiceToken = srv.serverOptions.token
registerContext.registerRequests = append(registerContext.registerRequests, registerRequest)
resp, err := registerContext.providerAPI.Register(registerRequest)
if nil != err {
Expand Down

0 comments on commit 32a215b

Please sign in to comment.