Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:优化example #70

Merged
merged 6 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Ad
}
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn(
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: false})
if err != nil {
GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
Expand Down Expand Up @@ -245,6 +245,8 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
// report an error.
return
}
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
p.regeneratePicker(nil)
p.cc.UpdateState(balancer.State{
ConnectivityState: p.state,
Expand Down Expand Up @@ -308,8 +310,6 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) {
return
}
readySCs := make(map[string]balancer.SubConn)
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
// Filter out all ready SCs from full subConn map.
for addr, sc := range p.subConns {
if st, ok := p.scStates[sc]; ok && st == connectivity.Ready {
Expand Down Expand Up @@ -436,6 +436,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
subSc, ok := pnp.readySCs[addr]
if ok {
reporter := &resultReporter{
method: info.FullMethodName,
instance: targetInstance,
consumerAPI: pnp.balancer.consumerAPI,
startTime: time.Now(),
Expand Down Expand Up @@ -543,6 +544,7 @@ func collectRouteLabels(routings []*traffic_manage.Route) []string {
}

type resultReporter struct {
method string
instance model.Instance
consumerAPI polaris.ConsumerAPI
startTime time.Time
Expand All @@ -559,6 +561,7 @@ func (r *resultReporter) report(info balancer.DoneInfo) {
callResult.CalledInstance = r.instance
callResult.RetStatus = retStatus
callResult.SourceService = r.sourceService
callResult.SetMethod(r.method)
callResult.SetDelay(time.Since(r.startTime))
callResult.SetRetCode(int32(code))
if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/quickstart/README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ go build -o consumer

#### HTTP调用

执行http调用,其中`${app.port}`替换为consumer的监听端口(默认为16011)。
执行http调用,其中`${app.port}`替换为consumer的监听端口(默认为 18080 )。

```shell
curl -L -X GET 'http://localhost:${app.port}/echo?value=hello_world''
curl -H 'uid: 12313' -L -X GET 'http://localhost:${app.port}/echo?value=hello_world'
```

预期返回值:`echo: hello_world`
4 changes: 2 additions & 2 deletions examples/quickstart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ Login into polaris console, and check the instances in Service `EchoServerGRPC`.

#### Invoke by http call

Invoke http call,replace `${app.port}` to the consumer port (16011 by default).
Invoke http call,replace `${app.port}` to the consumer port (18080 by default).
```shell
curl -L -X GET 'http://localhost:47080/quickstart/feign?msg=hello_world''
curl -L -X GET 'http://localhost:${app.port}/echo?value=hello_world'
```

expect:`echo: hello_world`
45 changes: 34 additions & 11 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,27 @@ import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/polarismesh/polaris-go/api"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/common/pb"
"github.com/polarismesh/polaris-go/api"
)

const (
listenPort = 16011
listenPort = 18080
)

func main() {
// grpc客户端连接获取
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
polaris.GetLogger().SetLevel(polaris.LogDebug)

conn, err := polaris.DialContext(ctx, "polaris://QuickStartEchoServerGRPC",
polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
Expand All @@ -49,7 +52,6 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
echoClient := pb.NewEchoServerClient(conn)

indexHandler := func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -68,11 +70,14 @@ func main() {
}

ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{})
ctx = metadata.AppendToOutgoingContext(ctx, "uid", r.Header.Get("uid"))

// 请求时设置本次请求的负载均衡算法
ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash)
ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid"))
if len(r.Header.Get("uid")) != 0 {
ctx = metadata.AppendToOutgoingContext(ctx, "uid", r.Header.Get("uid"))
// 请求时设置本次请求的负载均衡算法
ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash)
ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid"))
}

resp, err := echoClient.Echo(ctx, &pb.EchoRequest{Value: value})
log.Printf("send message, resp (%v), err(%v)", resp, err)
if nil != err {
Expand All @@ -84,8 +89,26 @@ func main() {
_, _ = w.Write([]byte(resp.GetValue()))
}
http.HandleFunc("/echo", indexHandler)
if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err {
log.Fatal(err)
}
go func() {
if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err {
log.Fatal(err)
}
}()
runMainLoop(conn, cancel)
}

func runMainLoop(conn *grpc.ClientConn, cancel context.CancelFunc) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
syscall.SIGINT, syscall.SIGTERM,
syscall.SIGSEGV,
}...)

for s := range ch {
log.Printf("catch signal(%+v), stop servers", s)
cancel()
conn.Close()
polaris.ClosePolarisContext()
return
}
}
16 changes: 15 additions & 1 deletion examples/quickstart/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
- 127.0.0.1:8091
statReporter:
#描述:是否将统计信息上报至monitor
#类型:bool
enable: true
#描述:启用的统计上报插件类型
#类型:list
#范围:已经注册的统计上报插件的名字
chain:
- prometheus
plugin:
prometheus:
type: push
address: 127.0.0.1:9091
interval: 10s
17 changes: 16 additions & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
// DefaultNamespace default namespace when namespace is not set
DefaultNamespace = "default"
// DefaultTTL default ttl value when ttl is not set
DefaultTTL = 20
DefaultTTL = 5
// DefaultGracefulStopMaxWaitDuration default stop max wait duration when not set
DefaultGracefulStopMaxWaitDuration = 30 * time.Second
// DefaultDelayStopWaitDuration default delay time before stop
Expand All @@ -57,16 +57,31 @@
)

var (
ctxRef = 0
polarisContext api.SDKContext
polarisConfig config.Configuration
mutexPolarisContext sync.Mutex
oncePolarisConfig sync.Once
)

func ClosePolarisContext() {

Check failure on line 67 in global.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function ClosePolarisContext should have comment or be unexported
mutexPolarisContext.Lock()
defer mutexPolarisContext.Unlock()
if nil == polarisContext {
return
}
ctxRef--
if ctxRef == 0 {
polarisContext.Destroy()
polarisContext = nil
}
}

// PolarisContext get or init the global polaris context
func PolarisContext() (api.SDKContext, error) {
mutexPolarisContext.Lock()
defer mutexPolarisContext.Unlock()
ctxRef++
if nil != polarisContext {
return polarisContext, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/polarismesh/polaris-go v1.6.0-beta.5
github.com/polarismesh/polaris-go v1.6.0-alpha.8
github.com/polarismesh/specification v1.5.1
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap v1.27.0
golang.org/x/net v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/grpc v1.64.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1695,8 +1695,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarismesh/polaris-go v1.6.0-beta.5 h1:llucvfydWlFWTNeABHbbuVL2ijR7AITx8UG02tx0c/Y=
github.com/polarismesh/polaris-go v1.6.0-beta.5/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/polaris-go v1.6.0-alpha.8 h1:KzANbn7gumZLfbJEA1KavDiFBqlDKxeMVS3eTxZXFR0=
github.com/polarismesh/polaris-go v1.6.0-alpha.8/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/polarismesh/specification v1.5.1 h1:cJ2m0RBepdopGo/e3UpKdsab3NpDZnw5IsVTB1sFc5I=
github.com/polarismesh/specification v1.5.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
Expand Down
75 changes: 63 additions & 12 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,51 @@

import (
"fmt"
"log"
"runtime"
"sync/atomic"
"time"

"github.com/natefinch/lumberjack"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type LogLevel int

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type LogLevel should have comment or be unexported

const (
_ LogLevel = iota
LogDebug

Check failure on line 35 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported const LogDebug should have comment (or a comment on this block) or be unexported
LogInfo
LogWarn
LogError
)

func (l LogLevel) String() string {
switch l {
case LogDebug:
return "[debug]"
case LogInfo:
return "[info]"
case LogWarn:
return "[warn]"
case LogError:
return "[error]"
default:
return ""
}
}

var _log Logger = newDefaultLogger()

func SetLogger(logger Logger) {

Check failure on line 58 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function SetLogger should have comment or be unexported
_log = logger
}

func GetLogger() Logger {

Check failure on line 62 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function GetLogger should have comment or be unexported
return _log
}

type Logger interface {

Check failure on line 66 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type Logger should have comment or be unexported
SetLevel(LogLevel)
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Expand All @@ -54,25 +72,35 @@
}

type defaultLogger struct {
writer *log.Logger
writer zapcore.Core
levelRef atomic.Value
}

func newDefaultLogger() *defaultLogger {
lumberJackLogger := &lumberjack.Logger{
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
TimeKey: "time",
NameKey: "name",
CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
w := zapcore.AddSync(&lumberjack.Logger{
Filename: "./logs/grpc-go-polaris.log", // 文件位置
MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: 7, // 保留旧文件的最大天数
MaxBackups: 100, // 保留旧文件的最大个数
Compress: true, // 是否压缩/归档旧文件
}
})

levelRef := atomic.Value{}

levelRef.Store(LogInfo)
core := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderCfg), w, zap.InfoLevel)
return &defaultLogger{
writer: log.New(lumberJackLogger, "", log.Lshortfile|log.Ldate|log.Ltime),
levelRef: levelRef,
writer: core,
}
}

Expand All @@ -98,9 +126,32 @@
}

func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) {
curLevel := l.levelRef.Load().(LogLevel)
if curLevel > expectLevel {
zapL := func() zapcore.Level {
switch expectLevel {
case LogDebug:
return zapcore.DebugLevel
case LogInfo:
return zapcore.InfoLevel
case LogWarn:
return zapcore.WarnLevel
case LogError:
return zapcore.ErrorLevel
default:
return zapcore.InfoLevel
}
}()

if !l.writer.Enabled(zapL) {
return
}
_ = l.writer.Output(3, fmt.Sprintf(format, args...))

msg := fmt.Sprintf(format, args...)
e := zapcore.Entry{
Message: msg,
Level: zapL,
Time: time.Now(),
}

e.Caller = zapcore.NewEntryCaller(runtime.Caller(2))
_ = l.writer.Write(e, nil)
}
Loading
Loading