From 7bf11fbca2d965e1d6e1403262221d03c776d979 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Thu, 26 May 2022 10:22:17 +0800 Subject: [PATCH 1/8] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=B0=B1?= =?UTF-8?q?=E8=BF=91=E8=B7=AF=E7=94=B1=E6=94=AF=E6=8C=81=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/quickstart/provider/main.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/examples/quickstart/provider/main.go b/examples/quickstart/provider/main.go index b611f08a..3a09fa84 100644 --- a/examples/quickstart/provider/main.go +++ b/examples/quickstart/provider/main.go @@ -122,6 +122,23 @@ func (svr *PolarisProvider) deregisterService() { log.Fatalf("fail to deregister instance, err is %v", err) } log.Printf("deregister successfully.") + go svr.doHeartbeat() +} + +func (svr *PolarisProvider) doHeartbeat() { + log.Printf("start to invoke heartbeat operation") + ticker := time.NewTicker(time.Duration(5 * time.Second)) + for range ticker.C { + if !svr.isShutdown { + heartbeatRequest := &polaris.InstanceHeartbeatRequest{} + heartbeatRequest.Namespace = namespace + heartbeatRequest.Service = service + heartbeatRequest.Host = svr.host + heartbeatRequest.Port = svr.port + heartbeatRequest.ServiceToken = token + svr.provider.Heartbeat(heartbeatRequest) + } + } } func (svr *PolarisProvider) runMainLoop() { From f661b3ce51b4df5e3f0491453dd7f8408903fe2a Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Fri, 12 Aug 2022 12:09:32 +0800 Subject: [PATCH 2/8] rebase upstream/master --- examples/quickstart/provider/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/quickstart/provider/main.go b/examples/quickstart/provider/main.go index 3a09fa84..ec0fde2c 100644 --- a/examples/quickstart/provider/main.go +++ b/examples/quickstart/provider/main.go @@ -122,7 +122,6 @@ func (svr *PolarisProvider) deregisterService() { log.Fatalf("fail to deregister instance, err is %v", err) } log.Printf("deregister successfully.") - go svr.doHeartbeat() } func (svr *PolarisProvider) doHeartbeat() { From dbe2280468c4b751bcf743cdc9b7dbd7dc95d8b2 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 21 Mar 2023 11:50:16 +0800 Subject: [PATCH 3/8] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8Ddemo=E4=B8=AD?= =?UTF-8?q?=E7=BC=BA=E5=A4=B1import=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/quickstart/provider/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/quickstart/provider/main.go b/examples/quickstart/provider/main.go index ec0fde2c..42e2efc1 100644 --- a/examples/quickstart/provider/main.go +++ b/examples/quickstart/provider/main.go @@ -27,6 +27,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/polarismesh/polaris-go" ) From a6fd205008d43ca4795e43693ebe2e21706f9c14 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 21 Mar 2023 11:50:59 +0800 Subject: [PATCH 4/8] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8Ddemo=E4=B8=AD?= =?UTF-8?q?=E7=BC=BA=E5=A4=B1import=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/quickstart/provider/main.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/examples/quickstart/provider/main.go b/examples/quickstart/provider/main.go index 42e2efc1..b611f08a 100644 --- a/examples/quickstart/provider/main.go +++ b/examples/quickstart/provider/main.go @@ -27,7 +27,6 @@ import ( "os/signal" "strings" "syscall" - "time" "github.com/polarismesh/polaris-go" ) @@ -125,22 +124,6 @@ func (svr *PolarisProvider) deregisterService() { log.Printf("deregister successfully.") } -func (svr *PolarisProvider) doHeartbeat() { - log.Printf("start to invoke heartbeat operation") - ticker := time.NewTicker(time.Duration(5 * time.Second)) - for range ticker.C { - if !svr.isShutdown { - heartbeatRequest := &polaris.InstanceHeartbeatRequest{} - heartbeatRequest.Namespace = namespace - heartbeatRequest.Service = service - heartbeatRequest.Host = svr.host - heartbeatRequest.Port = svr.port - heartbeatRequest.ServiceToken = token - svr.provider.Heartbeat(heartbeatRequest) - } - } -} - func (svr *PolarisProvider) runMainLoop() { ch := make(chan os.Signal, 1) signal.Notify(ch, []os.Signal{ From b7e0947c111d13b6d16f4651c0e1e2669ef0ea24 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 27 Dec 2023 12:03:21 +0800 Subject: [PATCH 5/8] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8DLoadBalance=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E9=80=89=E6=8B=A9=E5=A4=84=E4=BA=8E=E5=8D=8A=E5=BC=80?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/quickstart/consumer/Dockerfile | 19 +++++++ examples/quickstart/consumer/go.mod | 2 +- examples/quickstart/consumer/go.sum | 9 +--- examples/quickstart/consumer/polaris.yaml | 2 +- .../quickstart/k8s/configmap-consumer.yaml | 11 ++++ .../quickstart/k8s/configmap-provider.yaml | 11 ++++ .../quickstart/k8s/deployment-consumer.yaml | 35 ++++++++++++ .../quickstart/k8s/deployment-provider.yaml | 54 +++++++++++++++++++ examples/quickstart/provider/Dockerfile | 20 +++++++ examples/quickstart/provider/go.mod | 2 +- examples/quickstart/provider/go.sum | 9 +--- examples/quickstart/provider/isolate.sh | 5 ++ examples/quickstart/provider/main.go | 9 ++-- examples/quickstart/provider/polaris.yaml | 2 +- pkg/plugin/loadbalancer/proxy.go | 3 +- 15 files changed, 169 insertions(+), 24 deletions(-) create mode 100644 examples/quickstart/consumer/Dockerfile create mode 100644 examples/quickstart/k8s/configmap-consumer.yaml create mode 100644 examples/quickstart/k8s/configmap-provider.yaml create mode 100644 examples/quickstart/k8s/deployment-consumer.yaml create mode 100644 examples/quickstart/k8s/deployment-provider.yaml create mode 100644 examples/quickstart/provider/Dockerfile create mode 100644 examples/quickstart/provider/isolate.sh diff --git a/examples/quickstart/consumer/Dockerfile b/examples/quickstart/consumer/Dockerfile new file mode 100644 index 00000000..f2693367 --- /dev/null +++ b/examples/quickstart/consumer/Dockerfile @@ -0,0 +1,19 @@ +FROM alpine:3.13.6 + +RUN sed -i 's!http://dl-cdn.alpinelinux.org/!https://mirrors.tencent.com/!g' /etc/apk/repositories + +RUN set -eux && \ + apk add tcpdump && \ + apk add tzdata && \ + apk add busybox-extras && \ + apk add curl && \ + apk add bash && \ + cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ + echo "Asia/Shanghai" > /etc/timezone && \ + date + +COPY consumer /root/consumer + +WORKDIR /root + +CMD ["/root/consumer"] \ No newline at end of file diff --git a/examples/quickstart/consumer/go.mod b/examples/quickstart/consumer/go.mod index 86f3aac6..91f78fac 100644 --- a/examples/quickstart/consumer/go.mod +++ b/examples/quickstart/consumer/go.mod @@ -17,7 +17,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/polarismesh/specification v1.3.2-alpha.2 // indirect + github.com/polarismesh/specification v1.4.1 // indirect github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect diff --git a/examples/quickstart/consumer/go.sum b/examples/quickstart/consumer/go.sum index 093a5191..d93306f5 100644 --- a/examples/quickstart/consumer/go.sum +++ b/examples/quickstart/consumer/go.sum @@ -171,7 +171,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw= github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -321,7 +320,6 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -340,7 +338,6 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -375,8 +372,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/specification v1.3.2-alpha.2 h1:cMghyvCnRVM5ca2kYCGHOgIIxVnokiMvw0720q8a8RA= -github.com/polarismesh/specification v1.3.2-alpha.2/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= +github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A= +github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -404,9 +401,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= -github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/examples/quickstart/consumer/polaris.yaml b/examples/quickstart/consumer/polaris.yaml index c6ac55cf..07e08516 100644 --- a/examples/quickstart/consumer/polaris.yaml +++ b/examples/quickstart/consumer/polaris.yaml @@ -1,7 +1,7 @@ global: serverConnector: addresses: - - 119.91.66.223:8091 + - 127.0.0.1:8091 statReporter: enable: true chain: diff --git a/examples/quickstart/k8s/configmap-consumer.yaml b/examples/quickstart/k8s/configmap-consumer.yaml new file mode 100644 index 00000000..c36e0b3e --- /dev/null +++ b/examples/quickstart/k8s/configmap-consumer.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +data: + polaris.yaml: |- + global: + serverConnector: + addresses: + - { PolarisServerIP:GrpcPort } +kind: ConfigMap +metadata: + name: polaris-consumer-config + namespace: examples-go diff --git a/examples/quickstart/k8s/configmap-provider.yaml b/examples/quickstart/k8s/configmap-provider.yaml new file mode 100644 index 00000000..b8a480c1 --- /dev/null +++ b/examples/quickstart/k8s/configmap-provider.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +data: + polaris.yaml: |- + global: + serverConnector: + addresses: + - { PolarisServerIP:GrpcPort } +kind: ConfigMap +metadata: + name: polaris-provider-config + namespace: examples-go diff --git a/examples/quickstart/k8s/deployment-consumer.yaml b/examples/quickstart/k8s/deployment-consumer.yaml new file mode 100644 index 00000000..9c1fcff0 --- /dev/null +++ b/examples/quickstart/k8s/deployment-consumer.yaml @@ -0,0 +1,35 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: consumer + namespace: examples-go +spec: + replicas: 1 + selector: + matchLabels: + app: consumer + template: + metadata: + labels: + app: consumer + spec: + containers: + - image: polarismesh/examples-go-consumer:latest + imagePullPolicy: Always + name: polaris-consumer + resources: + limits: + cpu: "500m" + memory: 1000Mi + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /root/polaris.yaml + name: polaris-consumer-config + subPath: polaris.yaml + restartPolicy: Always + volumes: + - configMap: + defaultMode: 420 + name: polaris-consumer-config + name: polaris-consumer-config diff --git a/examples/quickstart/k8s/deployment-provider.yaml b/examples/quickstart/k8s/deployment-provider.yaml new file mode 100644 index 00000000..6d9dbfb7 --- /dev/null +++ b/examples/quickstart/k8s/deployment-provider.yaml @@ -0,0 +1,54 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: provider + namespace: examples-go +spec: + replicas: 3 + selector: + matchLabels: + app: provider + template: + metadata: + labels: + app: provider + spec: + containers: + - image: polarismesh/examples-go-provider:latest + imagePullPolicy: Always + name: polaris-provider + env: + - name: INSTANCE_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: INSTANCE_PORT + value: "28080" + - name: "INSTANCE_SERVICE" + value: "DiscoverEchoServer" + - name: "INSTANCE_NAMESPACE" + value: "default" + - name: "POLARIS_OPEN_API" + value: "" + - name: "POLARIS_TOKEN" + value: "" + resources: + limits: + cpu: "500m" + memory: 1000Mi + lifecycle: + preStop: + exec: + command: ["/bin/bash", "-c", "cd /root; bash ./isolate.sh; sleep 30; exit 0;"] + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /root/polaris.yaml + name: polaris-provider-config + subPath: polaris.yaml + restartPolicy: Always + volumes: + - configMap: + defaultMode: 420 + name: polaris-provider-config + name: polaris-provider-config diff --git a/examples/quickstart/provider/Dockerfile b/examples/quickstart/provider/Dockerfile new file mode 100644 index 00000000..4ff74488 --- /dev/null +++ b/examples/quickstart/provider/Dockerfile @@ -0,0 +1,20 @@ +FROM alpine:3.13.6 + +RUN sed -i 's!http://dl-cdn.alpinelinux.org/!https://mirrors.tencent.com/!g' /etc/apk/repositories + +RUN set -eux && \ + apk add tcpdump && \ + apk add tzdata && \ + apk add busybox-extras && \ + apk add curl && \ + apk add bash && \ + cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ + echo "Asia/Shanghai" > /etc/timezone && \ + date + +COPY provider /root/provider +COPY isolate.sh /root/isolate.sh + +WORKDIR /root + +CMD ["/root/provider", "-port", "28080"] \ No newline at end of file diff --git a/examples/quickstart/provider/go.mod b/examples/quickstart/provider/go.mod index dc988a11..4c23a466 100644 --- a/examples/quickstart/provider/go.mod +++ b/examples/quickstart/provider/go.mod @@ -17,7 +17,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/polarismesh/specification v1.4.0 // indirect + github.com/polarismesh/specification v1.4.1 // indirect github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect diff --git a/examples/quickstart/provider/go.sum b/examples/quickstart/provider/go.sum index 41c830a6..d93306f5 100644 --- a/examples/quickstart/provider/go.sum +++ b/examples/quickstart/provider/go.sum @@ -171,7 +171,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw= github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -321,7 +320,6 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -340,7 +338,6 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -375,8 +372,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/specification v1.4.0 h1:fm7sUtFZC2g9+lLmRCtjGrUow47CY5JDFoZXwwCQGGY= -github.com/polarismesh/specification v1.4.0/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= +github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A= +github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -404,9 +401,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= -github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/examples/quickstart/provider/isolate.sh b/examples/quickstart/provider/isolate.sh new file mode 100644 index 00000000..0e9df827 --- /dev/null +++ b/examples/quickstart/provider/isolate.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +data="[{\"service\":\"${INSTANCE_SERVICE}\",\"namespace\":\"${INSTANCE_NAMESPACE}\",\"host\":\"${INSTANCE_IP}\",\"port\":\"${INSTANCE_PORT}\",\"isolate\":true}]" +echo "${data}" +curl -H "X-Polaris-Token; ${POLARIS_TOKEN}" -H 'Content-Type: application/json' -X PUT -d "${data}" "http://${POLARIS_OPEN_API}/naming/v1/instances" diff --git a/examples/quickstart/provider/main.go b/examples/quickstart/provider/main.go index b611f08a..ea7dc25c 100644 --- a/examples/quickstart/provider/main.go +++ b/examples/quickstart/provider/main.go @@ -54,6 +54,7 @@ type PolarisProvider struct { host string port int isShutdown bool + webSvr *http.Server } // Run starts the provider @@ -83,7 +84,8 @@ func (svr *PolarisProvider) runWebServer() { go func() { log.Printf("[INFO] start http server, listen port is %v", svr.port) - if err := http.Serve(ln, nil); err != nil { + svr.webSvr = &http.Server{Handler: nil} + if err := svr.webSvr.Serve(ln); err != nil { svr.isShutdown = false log.Fatalf("[ERROR]fail to run webServer, err is %v", err) } @@ -99,8 +101,6 @@ func (svr *PolarisProvider) registerService() { registerRequest.Port = svr.port registerRequest.ServiceToken = token registerRequest.SetTTL(1) - // 实例id不是必填,如果不填,服务端会默认生成一个唯一Id,否则当提供实例id时,需要保证实例id是唯一的 - registerRequest.InstanceId = providedInstanceId(namespace, service, svr.host, svr.port) resp, err := svr.provider.RegisterInstance(registerRequest) if err != nil { log.Fatalf("fail to register instance, err is %v", err) @@ -116,8 +116,6 @@ func (svr *PolarisProvider) deregisterService() { deregisterRequest.Host = svr.host deregisterRequest.Port = svr.port deregisterRequest.ServiceToken = token - // 实例id不是必填,如果注册时指定了实例id,则反注册时需要提供同样的id - deregisterRequest.InstanceID = providedInstanceId(namespace, service, svr.host, svr.port) if err := svr.provider.Deregister(deregisterRequest); err != nil { log.Fatalf("fail to deregister instance, err is %v", err) } @@ -135,6 +133,7 @@ func (svr *PolarisProvider) runMainLoop() { log.Printf("catch signal(%+v), stop servers", s) svr.isShutdown = true svr.deregisterService() + _ = svr.webSvr.Close() return } } diff --git a/examples/quickstart/provider/polaris.yaml b/examples/quickstart/provider/polaris.yaml index c6ac55cf..07e08516 100644 --- a/examples/quickstart/provider/polaris.yaml +++ b/examples/quickstart/provider/polaris.yaml @@ -1,7 +1,7 @@ global: serverConnector: addresses: - - 119.91.66.223:8091 + - 127.0.0.1:8091 statReporter: enable: true chain: diff --git a/pkg/plugin/loadbalancer/proxy.go b/pkg/plugin/loadbalancer/proxy.go index 5da24d26..1a1c5b6d 100644 --- a/pkg/plugin/loadbalancer/proxy.go +++ b/pkg/plugin/loadbalancer/proxy.go @@ -43,7 +43,8 @@ type SelectStatus struct { // ChooseInstance proxy LoadBalancer ChooseInstance func (p *Proxy) ChooseInstance(criteria *Criteria, instances model.ServiceInstances) (model.Instance, error) { - // 第一次进行负载均衡,包括半开实例 + // 包括处于半开的实例 + criteria.Cluster.IncludeHalfOpen = true result, err := p.LoadBalancer.ChooseInstance(criteria, instances) return result, err } From 2b99eef99bdbe2b5af4c9b45302b82b47c73f781 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 9 Jan 2024 00:26:59 +0800 Subject: [PATCH 6/8] feat:support user custom sdk labels --- api/config.go | 27 +++++++++++---- pkg/config/api.go | 12 ++++++- pkg/config/default.go | 50 +++++++++++++++++++++++++++ pkg/config/impl.go | 6 ++++ pkg/flow/configuration/config_flow.go | 20 +++++------ pkg/flow/configuration/file_repo.go | 19 ++++++---- pkg/model/context.go | 19 ++++++++++ pkg/version/version.go | 2 +- 8 files changed, 130 insertions(+), 25 deletions(-) diff --git a/api/config.go b/api/config.go index be5f384c..66a52d09 100644 --- a/api/config.go +++ b/api/config.go @@ -27,7 +27,6 @@ import ( "sync/atomic" "time" - "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/modern-go/reflect2" "gopkg.in/yaml.v2" @@ -261,20 +260,21 @@ func InitContextByConfig(cfg config.Configuration) (SDKContext, error) { if err := cfg.Verify(); err != nil { return nil, model.NewSDKError(model.ErrCodeAPIInvalidConfig, err, "fail to verify input config") } - getSelfIP(cfg) - token := model.SDKToken{ + initSelfIP(cfg) + token := &model.SDKToken{ IP: cfg.GetGlobal().GetAPI().GetBindIP(), PID: int32(os.Getpid()), - UID: strings.ToUpper(uuid.New().String()), Client: version.ClientType, Version: version.Version, PodName: getPodName(), HostName: getHostName(), } + token.InitUID() + initSelfLabels(cfg, token) log.GetBaseLogger().Infof("\n-------Start to init SDKContext of version %s, IP: %s, PID: %d, UID: %s, CONTAINER: "+"%s, HOSTNAME:%s-------", version.Version, token.IP, token.PID, token.UID, token.PodName, token.HostName) - globalCtx.SetValue(model.ContextKeyToken, token) + globalCtx.SetValue(model.ContextKeyToken, *token) plugManager := plugin.NewPluginManager() globalCtx.SetValue(model.ContextKeyPlugins, plugManager) connManager, err := network.NewConnectionManager(cfg, globalCtx) @@ -321,8 +321,8 @@ func InitContextByConfig(cfg config.Configuration) (SDKContext, error) { return ctx, nil } -// getSelfIP 获取SDK自身的IP -func getSelfIP(cfg config.Configuration) { +// initSelfIP 获取SDK自身的IP +func initSelfIP(cfg config.Configuration) { bindIP := cfg.GetGlobal().GetAPI().GetBindIP() bindIntf := cfg.GetGlobal().GetAPI().GetBindIntf() if len(bindIP) != 0 || len(bindIntf) != 0 { @@ -346,6 +346,19 @@ func getSelfIP(cfg config.Configuration) { } } +func initSelfLabels(cfg config.Configuration, sdkToken *model.SDKToken) { + clientCfg := cfg.GetGlobal().GetClient().(*config.ClientConfigImpl) + if clientCfg.GetId() == "" { + clientCfg.SetId(sdkToken.UID) + } + clientCfg.AddLabels(map[string]string{ + "CLIENT_IP": cfg.GetGlobal().GetAPI().GetBindIP(), + "CLIENT_ID": clientCfg.GetId(), + "CLIENT_VERSION": sdkToken.Version, + "CLIENT_LANGUAGE": sdkToken.Client, + }) +} + // onContextInitialized 在全局上下文初始化完成后,触发事件回调,可针对不同插件做一些阻塞等待某个事件完成的操作 func onContextInitialized(ctx SDKContext) error { eventHandlers := ctx.GetPlugins().GetEventSubscribers(common.OnContextStarted) diff --git a/pkg/config/api.go b/pkg/config/api.go index 8ad0cf53..f0de217c 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -44,6 +44,8 @@ type GlobalConfig interface { GetStatReporter() StatReporterConfig // GetLocation global.location前缀开头的所有配置项 GetLocation() LocationConfig + // GetClient global.client前缀开头的所有配置项 + GetClient() ClientConfig } // ConsumerConfig consumer config object. @@ -213,10 +215,18 @@ type LocationConfig interface { BaseConfig // GetProvider 获取地理位置的提供者插件名称 GetProviders() []*LocationProviderConfigImpl - + // GetProvider 根据类型名称获取对应插件的配置内容信息 GetProvider(typ string) *LocationProviderConfigImpl } +type ClientConfig interface { + BaseConfig + // GetId 获取客户端ID + GetId() string + // GetLabels 获取客户端标签 + GetLabels() map[string]string +} + // ServerConnectorConfig 与名字服务服务端的连接配置. type ServerConnectorConfig interface { BaseConfig diff --git a/pkg/config/default.go b/pkg/config/default.go index aa4cb24a..b7b71de1 100644 --- a/pkg/config/default.go +++ b/pkg/config/default.go @@ -461,6 +461,8 @@ func (g *GlobalConfigImpl) Init() { g.StatReporter.Init() g.Location = &LocationConfigImpl{} g.Location.Init() + g.Client = &ClientConfigImpl{} + g.Client.Init() } // Init 初始化ConsumerConfigImpl. @@ -616,3 +618,51 @@ func (s *ServerClusterConfigImpl) Verify() error { } return errs } + +type ClientConfigImpl struct { + ID string `yaml:"id" json:"id"` + Labels map[string]string `yaml:"labels" json:"labels"` +} + +// Init 初始化 +func (c *ClientConfigImpl) Init() { +} + +func (c *ClientConfigImpl) SetId(id string) { + c.ID = id +} + +func (c *ClientConfigImpl) GetId() string { + return c.ID +} + +func (c *ClientConfigImpl) SetLabels(m map[string]string) { + c.Labels = m +} + +func (c *ClientConfigImpl) AddLabels(m map[string]string) { + if len(c.Labels) == 0 { + c.Labels = map[string]string{} + } + for k, v := range m { + c.Labels[k] = v + } +} + +func (c *ClientConfigImpl) GetLabels() map[string]string { + copyM := map[string]string{} + for k, v := range c.Labels { + copyM[k] = v + } + return copyM +} + +func (c *ClientConfigImpl) SetDefault() { + if len(c.Labels) == 0 { + c.Labels = map[string]string{} + } +} + +func (c *ClientConfigImpl) Verify() error { + return nil +} diff --git a/pkg/config/impl.go b/pkg/config/impl.go index 2f876e3e..5afaca0b 100644 --- a/pkg/config/impl.go +++ b/pkg/config/impl.go @@ -64,6 +64,7 @@ type GlobalConfigImpl struct { ServerConnector *ServerConnectorConfigImpl `yaml:"serverConnector" json:"serverConnector"` StatReporter *StatReporterConfigImpl `yaml:"statReporter" json:"statReporter"` Location *LocationConfigImpl `yaml:"location" json:"location"` + Client *ClientConfigImpl `yaml:"client" json:"client"` } // GetSystem 获取系统配置. @@ -91,6 +92,11 @@ func (g *GlobalConfigImpl) GetLocation() LocationConfig { return g.Location } +// GetClient global.client前缀开头的所有配置项. +func (g *GlobalConfigImpl) GetClient() ClientConfig { + return g.Client +} + // ConsumerConfigImpl 消费者配置. type ConsumerConfigImpl struct { LocalCache *LocalCacheConfigImpl `yaml:"localCache" json:"localCache"` diff --git a/pkg/flow/configuration/config_flow.go b/pkg/flow/configuration/config_flow.go index c9aec99e..4a0079bf 100644 --- a/pkg/flow/configuration/config_flow.go +++ b/pkg/flow/configuration/config_flow.go @@ -43,9 +43,9 @@ type ConfigFileFlow struct { configFilePool map[string]*ConfigFileRepo notifiedVersion map[string]uint64 - connector configconnector.ConfigConnector - chain configfilter.Chain - configuration config.Configuration + connector configconnector.ConfigConnector + chain configfilter.Chain + conf config.Configuration persistHandler *CachePersistHandler @@ -54,12 +54,12 @@ type ConfigFileFlow struct { // NewConfigFileFlow 创建配置中心服务 func NewConfigFileFlow(connector configconnector.ConfigConnector, chain configfilter.Chain, - configuration config.Configuration) (*ConfigFileFlow, error) { + conf config.Configuration) (*ConfigFileFlow, error) { persistHandler, err := NewCachePersistHandler( - configuration.GetConfigFile().GetLocalCache().GetPersistDir(), - configuration.GetConfigFile().GetLocalCache().GetPersistMaxWriteRetry(), - configuration.GetConfigFile().GetLocalCache().GetPersistMaxReadRetry(), - configuration.GetConfigFile().GetLocalCache().GetPersistRetryInterval(), + conf.GetConfigFile().GetLocalCache().GetPersistDir(), + conf.GetConfigFile().GetLocalCache().GetPersistMaxWriteRetry(), + conf.GetConfigFile().GetLocalCache().GetPersistMaxReadRetry(), + conf.GetConfigFile().GetLocalCache().GetPersistRetryInterval(), ) if err != nil { return nil, err @@ -68,7 +68,7 @@ func NewConfigFileFlow(connector configconnector.ConfigConnector, chain configfi configFileService := &ConfigFileFlow{ connector: connector, chain: chain, - configuration: configuration, + conf: conf, repos: make([]*ConfigFileRepo, 0, 8), configFileCache: map[string]model.ConfigFile{}, configFilePool: map[string]*ConfigFileRepo{}, @@ -112,7 +112,7 @@ func (c *ConfigFileFlow) GetConfigFile(req *model.GetConfigFileRequest) (model.C return configFile, nil } - fileRepo, err := newConfigFileRepo(configFileMetadata, c.connector, c.chain, c.configuration, c.persistHandler) + fileRepo, err := newConfigFileRepo(configFileMetadata, c.connector, c.chain, c.conf, c.persistHandler) if err != nil { return nil, err } diff --git a/pkg/flow/configuration/file_repo.go b/pkg/flow/configuration/file_repo.go index 402f03b5..08940462 100644 --- a/pkg/flow/configuration/file_repo.go +++ b/pkg/flow/configuration/file_repo.go @@ -47,9 +47,9 @@ var ( // ConfigFileRepo 服务端配置文件代理类,从服务端拉取配置并同步数据 type ConfigFileRepo struct { - connector configconnector.ConfigConnector - chain configfilter.Chain - configuration config.Configuration + connector configconnector.ConfigConnector + chain configfilter.Chain + conf config.Configuration configFileMetadata model.ConfigFileMetadata // 长轮询通知的版本号 @@ -71,12 +71,12 @@ type ConfigFileRepoChangeListener func(configFileMetadata model.ConfigFileMetada func newConfigFileRepo(metadata model.ConfigFileMetadata, connector configconnector.ConfigConnector, chain configfilter.Chain, - configuration config.Configuration, + conf config.Configuration, persistHandler *CachePersistHandler) (*ConfigFileRepo, error) { repo := &ConfigFileRepo{ connector: connector, chain: chain, - configuration: configuration, + conf: conf, configFileMetadata: metadata, notifiedVersion: initVersion, retryPolicy: retryPolicy{ @@ -85,7 +85,7 @@ func newConfigFileRepo(metadata model.ConfigFileMetadata, }, remoteConfigFileRef: &atomic.Value{}, persistHandler: persistHandler, - fallbackToLocalCache: configuration.GetConfigFile().GetLocalCache().IsFallbackToLocalCache(), + fallbackToLocalCache: conf.GetConfigFile().GetLocalCache().IsFallbackToLocalCache(), } repo.remoteConfigFileRef.Store(&configconnector.ConfigFile{ Namespace: metadata.GetNamespace(), @@ -143,6 +143,13 @@ func (r *ConfigFileRepo) pull() error { FileGroup: r.configFileMetadata.GetFileGroup(), FileName: r.configFileMetadata.GetFileName(), Version: r.notifiedVersion, + Tags: make([]*configconnector.ConfigFileTag, 0, len(r.conf.GetGlobal().GetClient().GetLabels())), + } + for k, v := range r.conf.GetGlobal().GetClient().GetLabels() { + pullConfigFileReq.Tags = append(pullConfigFileReq.Tags, &configconnector.ConfigFileTag{ + Key: k, + Value: v, + }) } log.GetBaseLogger().Infof("[Config] start pull config file. config file = %+v, version = %d", diff --git a/pkg/model/context.go b/pkg/model/context.go index 5684c970..1e7ea649 100644 --- a/pkg/model/context.go +++ b/pkg/model/context.go @@ -19,10 +19,13 @@ package model import ( "context" + "strconv" + "strings" "sync" "sync/atomic" "time" + "github.com/google/uuid" "github.com/polarismesh/polaris-go/pkg/clock" ) @@ -53,6 +56,22 @@ type SDKToken struct { HostName string } +func (sdkToken *SDKToken) InitUID() { + if sdkToken.PodName != "" { + sdkToken.UID = sdkToken.PodName + return + } + if sdkToken.HostName != "" { + sdkToken.UID = sdkToken.HostName + "-" + strconv.Itoa(int(sdkToken.PID)) + return + } + if sdkToken.IP != "" { + sdkToken.UID = sdkToken.IP + "-" + strconv.Itoa(int(sdkToken.PID)) + return + } + sdkToken.UID = strings.ToUpper(uuid.New().String()) +} + // LocationInfo 地域信息 type LocationInfo interface { // GetLocation 获取地域明细 diff --git a/pkg/version/version.go b/pkg/version/version.go index de31f009..b2320ce7 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -20,7 +20,7 @@ package version var ( // Version current version number - Version = "v1.0.0" + Version = "v1.6.0" // ClientType client type ClientType = "polaris-go" ) From f54d90ca148756ba39b0fb562bf2965c072b41e2 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Tue, 9 Jan 2024 00:45:22 +0800 Subject: [PATCH 7/8] feat:support set sdk labels --- examples/configuration/crud/go.sum | 5 ----- examples/configuration/normal/go.mod | 2 +- examples/configuration/normal/go.sum | 9 ++------- examples/configuration/normal/polaris.yaml | 3 +++ plugin/configconnector/polaris/config_connector.go | 8 ++++++++ 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/examples/configuration/crud/go.sum b/examples/configuration/crud/go.sum index 55fed020..ef33672a 100644 --- a/examples/configuration/crud/go.sum +++ b/examples/configuration/crud/go.sum @@ -171,7 +171,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw= github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -320,7 +319,6 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -338,7 +336,6 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -401,9 +398,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= -github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/examples/configuration/normal/go.mod b/examples/configuration/normal/go.mod index ba846ada..1f119279 100644 --- a/examples/configuration/normal/go.mod +++ b/examples/configuration/normal/go.mod @@ -17,7 +17,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/polarismesh/specification v1.3.2-alpha.2 // indirect + github.com/polarismesh/specification v1.4.1 // indirect github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect diff --git a/examples/configuration/normal/go.sum b/examples/configuration/normal/go.sum index 093a5191..d93306f5 100644 --- a/examples/configuration/normal/go.sum +++ b/examples/configuration/normal/go.sum @@ -171,7 +171,6 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw= github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -321,7 +320,6 @@ github.com/googleapis/gax-go/v2 v2.3.0/go.mod h1:b8LNqSzNabLiUpXKkY7HAR5jr6bIT99 github.com/googleapis/gax-go/v2 v2.4.0/go.mod h1:XOTVJ59hdnfJLIP/dh8n5CGryZR2LxK9wbMD5+iXC6c= github.com/googleapis/gax-go/v2 v2.5.1/go.mod h1:h6B0KMMFNtI2ddbGJn3T3ZbwkeT6yqEF02fYlzkUCyo= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= -github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -340,7 +338,6 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= -github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -375,8 +372,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/specification v1.3.2-alpha.2 h1:cMghyvCnRVM5ca2kYCGHOgIIxVnokiMvw0720q8a8RA= -github.com/polarismesh/specification v1.3.2-alpha.2/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= +github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A= +github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= @@ -404,9 +401,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= -github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/examples/configuration/normal/polaris.yaml b/examples/configuration/normal/polaris.yaml index fdee6156..2e43bfef 100644 --- a/examples/configuration/normal/polaris.yaml +++ b/examples/configuration/normal/polaris.yaml @@ -2,6 +2,9 @@ global: serverConnector: addresses: - 127.0.0.1:8091 + client: + labels: + env: pre config: configConnector: addresses: diff --git a/plugin/configconnector/polaris/config_connector.go b/plugin/configconnector/polaris/config_connector.go index 876821c3..8f7272d0 100644 --- a/plugin/configconnector/polaris/config_connector.go +++ b/plugin/configconnector/polaris/config_connector.go @@ -385,12 +385,20 @@ func (c *Connector) handleResponse(request string, reqID string, opKey string, r } func transferToClientConfigFileInfo(configFile *configconnector.ConfigFile) *config_manage.ClientConfigFileInfo { + tags := make([]*config_manage.ConfigFileTag, 0, len(configFile.GetLabels())) + for key, val := range configFile.GetLabels() { + tags = append(tags, &config_manage.ConfigFileTag{ + Key: wrapperspb.String(key), + Value: wrapperspb.String(val), + }) + } return &config_manage.ClientConfigFileInfo{ Namespace: wrapperspb.String(configFile.Namespace), Group: wrapperspb.String(configFile.GetFileGroup()), FileName: wrapperspb.String(configFile.GetFileName()), Version: wrapperspb.UInt64(configFile.GetVersion()), PublicKey: wrapperspb.String(configFile.GetPublicKey()), + Tags: tags, } } From c91ec9d61001ee34f8719f2c94d838db3b0c035b Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Thu, 28 Mar 2024 01:02:03 +0800 Subject: [PATCH 8/8] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8D=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E9=89=B4=E6=9D=83=E6=8E=A5=E5=8F=A3=E8=AE=BE=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/config/api.go | 4 +++ pkg/config/config_connector.go | 12 +++++++ pkg/config/serverconnector.go | 14 +++++++++ .../polaris/config_connector.go | 12 +++---- plugin/serverconnector/common/client.go | 10 ++++-- plugin/serverconnector/common/discover.go | 20 ++++++++++-- plugin/serverconnector/common/util.go | 31 +++++++------------ .../serverconnector/grpc/operation_async.go | 10 +++--- plugin/serverconnector/grpc/operation_sync.go | 8 ++--- 9 files changed, 83 insertions(+), 38 deletions(-) diff --git a/pkg/config/api.go b/pkg/config/api.go index f0de217c..aa88af3e 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -270,6 +270,10 @@ type ServerConnectorConfig interface { GetConnectionIdleTimeout() time.Duration // SetConnectionIdleTimeout 设置连接会被释放的空闲的时长 SetConnectionIdleTimeout(time.Duration) + // GetToken . + GetToken() string + // SetToken . + SetToken(string) } // LocalCacheConfig 本地缓存相关配置项. diff --git a/pkg/config/config_connector.go b/pkg/config/config_connector.go index e27d2076..8cb4461e 100644 --- a/pkg/config/config_connector.go +++ b/pkg/config/config_connector.go @@ -51,6 +51,8 @@ type ConfigConnectorConfigImpl struct { Plugin PluginConfigs `yaml:"plugin" json:"plugin"` + Token string `yaml:"token" json:"token"` + ConnectorType string `yaml:"connectorType" json:"connectorType"` } @@ -158,6 +160,16 @@ func (c *ConfigConnectorConfigImpl) SetConnectorType(connectorType string) { c.ConnectorType = connectorType } +// GetToken . +func (c *ConfigConnectorConfigImpl) GetToken() string { + return c.Token +} + +// SetToken . +func (c *ConfigConnectorConfigImpl) SetToken(token string) { + c.Token = token +} + // Verify 检验ConfigConnector配置. func (c *ConfigConnectorConfigImpl) Verify() error { if nil == c { diff --git a/pkg/config/serverconnector.go b/pkg/config/serverconnector.go index d06dc9a6..f51f5906 100644 --- a/pkg/config/serverconnector.go +++ b/pkg/config/serverconnector.go @@ -49,6 +49,8 @@ type ServerConnectorConfigImpl struct { ReconnectInterval *time.Duration `yaml:"reconnectInterval" json:"reconnectInterval"` Plugin PluginConfigs `yaml:"plugin" json:"plugin"` + + Token string `yaml:"token" json:"token"` } // GetAddresses global.serverConnector.addresses @@ -152,6 +154,18 @@ func (s *ServerConnectorConfigImpl) SetPluginConfig(pluginName string, value Bas return s.Plugin.SetPluginConfig(common.TypeServerConnector, pluginName, value) } +// GetProtocol global.serverConnector.protocol +// 与cl5 server对接的协议. +func (s *ServerConnectorConfigImpl) GetToken() string { + return s.Token +} + +// SetProtocol 设置与cl5 server对接的协议. +func (s *ServerConnectorConfigImpl) SetToken(t string) { + s.Token = t +} + + // Verify 检验ServerConnector配置. func (s *ServerConnectorConfigImpl) Verify() error { if nil == s { diff --git a/plugin/configconnector/polaris/config_connector.go b/plugin/configconnector/polaris/config_connector.go index 8f7272d0..479abc10 100644 --- a/plugin/configconnector/polaris/config_connector.go +++ b/plugin/configconnector/polaris/config_connector.go @@ -121,7 +121,7 @@ func (c *Connector) GetConfigFile(configFile *configconnector.ConfigFile) (*conf defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextRegisterInstanceReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -153,7 +153,7 @@ func (c *Connector) WatchConfigFiles(configFileList []*configconnector.ConfigFil defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextWatchConfigFilesReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -190,7 +190,7 @@ func (c *Connector) CreateConfigFile(configFile *configconnector.ConfigFile) (*c defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextCreateConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -222,7 +222,7 @@ func (c *Connector) UpdateConfigFile(configFile *configconnector.ConfigFile) (*c defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextUpdateConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -254,7 +254,7 @@ func (c *Connector) PublishConfigFile(configFile *configconnector.ConfigFile) (* defer conn.Release(opKey) configClient := config_manage.NewPolarisConfigGRPCClient(network.ToGRPCConn(conn.Conn)) reqID := connector.NextPublishConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } @@ -283,7 +283,7 @@ func (c *Connector) GetConfigGroup(req *configconnector.ConfigGroup) (*configcon } reqID := connector.NextPublishConfigFileReqID() - ctx, cancel := connector.CreateHeaderContextWithReqId(0, reqID) + ctx, cancel := connector.CreateHeadersContext(0, connector.AppendHeaderWithReqId(reqID)) if cancel != nil { defer cancel() } diff --git a/plugin/serverconnector/common/client.go b/plugin/serverconnector/common/client.go index 8111442f..c830361d 100644 --- a/plugin/serverconnector/common/client.go +++ b/plugin/serverconnector/common/client.go @@ -36,6 +36,12 @@ type DiscoverClient interface { CloseSend() error } +type DiscoverClientCreatorArgs struct { + ReqId string + AuthToken string + Connection *network.Connection + Timeout time.Duration +} + // DiscoverClientCreator 创建client的函数 -type DiscoverClientCreator func( - reqId string, connection *network.Connection, timeout time.Duration) (DiscoverClient, context.CancelFunc, error) +type DiscoverClientCreator func(args *DiscoverClientCreatorArgs) (DiscoverClient, context.CancelFunc, error) diff --git a/plugin/serverconnector/common/discover.go b/plugin/serverconnector/common/discover.go index e323b81b..1b42d7d8 100644 --- a/plugin/serverconnector/common/discover.go +++ b/plugin/serverconnector/common/discover.go @@ -44,6 +44,8 @@ import ( const ( // 需要发往服务端的请求跟踪标识 headerRequestID = "request-id" + // + headerAuthToken = "X-Polaris-Token" // 失败时的最大超时时间 maxConnTimeout = 100 * time.Millisecond // 任务重试间隔 @@ -68,6 +70,8 @@ type DiscoverConnector struct { ServiceConnector *plugin.PluginBase connectionIdleTimeout time.Duration messageTimeout time.Duration + // authToken + authToken string // 普通任务队列 taskChannel chan *clientTask // 高优先级重试任务队列,只会在系统服务未ready时候会往队列塞值 @@ -100,6 +104,7 @@ type clientTask struct { // Init 初始化插件 func (g *DiscoverConnector) Init(ctx *plugin.InitContext, createClient DiscoverClientCreator) { ctxConfig := ctx.Config + g.authToken = ctxConfig.GetGlobal().GetServerConnector().GetToken() g.RunContext = common.NewRunContext() g.scalableRand = rand.NewScalableRand() g.discoverKey.Namespace = ctxConfig.GetGlobal().GetSystem().GetDiscoverCluster().GetNamespace() @@ -587,8 +592,12 @@ func (g *DiscoverConnector) newStream(task *serviceUpdateTask) (streamingClient goto finally } streamingClient.reqID = NextDiscoverReqID() - streamingClient.discoverClient, streamingClient.cancel, err = g.createClient(streamingClient.reqID, - streamingClient.connection, 0) + streamingClient.discoverClient, streamingClient.cancel, err = g.createClient(&DiscoverClientCreatorArgs{ + ReqId: streamingClient.reqID, + Connection: streamingClient.connection, + Timeout: 0, + AuthToken: g.authToken, + }) if err != nil { log.GetNetworkLogger().Errorf("%s, newStream: fail to get streaming client from %s, reqID %s, err %v", g.ServiceConnector.GetSDKContextID(), streamingClient.connection, streamingClient.reqID, err) @@ -953,7 +962,12 @@ func (g *DiscoverConnector) syncUpdateTask(task *serviceUpdateTask) error { } defer connection.Release(OpKeyDiscover) reqID := NextDiscoverReqID() - discoverClient, cancel, err := g.createClient(reqID, connection, g.messageTimeout) + discoverClient, cancel, err := g.createClient(&DiscoverClientCreatorArgs{ + ReqId: reqID, + Connection: connection, + Timeout: g.messageTimeout, + AuthToken: g.authToken, + }) if cancel != nil { defer cancel() } diff --git a/plugin/serverconnector/common/util.go b/plugin/serverconnector/common/util.go index 63f526de..eb8b3d2d 100644 --- a/plugin/serverconnector/common/util.go +++ b/plugin/serverconnector/common/util.go @@ -174,8 +174,12 @@ func GetUpdateTaskRequestTime(updateTask *serviceUpdateTask) time.Duration { // return metadata.NewOutgoingContext(ctx, md) // } -// CreateHeaderContext 创建传输grpc头的valueContext -func CreateHeaderContext(timeout time.Duration, headers map[string]string) (context.Context, context.CancelFunc) { +func CreateHeadersContext(timeout time.Duration, options ...func(map[string]string)) (context.Context, context.CancelFunc) { + headers := map[string]string{} + for _, option := range options { + option(headers) + } + md := metadata.New(headers) var ctx context.Context var cancel context.CancelFunc @@ -188,25 +192,14 @@ func CreateHeaderContext(timeout time.Duration, headers map[string]string) (cont return metadata.NewOutgoingContext(ctx, md), cancel } -// CreateHeaderContextWithReqId 创建传输grpc头的valueContext -func CreateHeaderContextWithReqId(timeout time.Duration, reqID string) (context.Context, context.CancelFunc) { - md := metadata.New(map[string]string{headerRequestID: reqID}) - var ctx context.Context - var cancel context.CancelFunc - if timeout > 0 { - ctx, cancel = context.WithTimeout(context.Background(), timeout) - } else { - ctx = context.Background() - cancel = nil +func AppendAuthHeader(token string) func(map[string]string) { + return func(header map[string]string) { + header[headerAuthToken] = token } - return metadata.NewOutgoingContext(ctx, md), cancel } -func AppendHeaderWithReqId(header map[string]string, reqID string) map[string]string { - m := make(map[string]string, len(header)+1) - for k, v := range header { - m[k] = v +func AppendHeaderWithReqId(reqID string) func(map[string]string) { + return func(header map[string]string) { + header[headerRequestID] = reqID } - m[headerRequestID] = reqID - return m } diff --git a/plugin/serverconnector/grpc/operation_async.go b/plugin/serverconnector/grpc/operation_async.go index 7e76ab8c..b9127135 100644 --- a/plugin/serverconnector/grpc/operation_async.go +++ b/plugin/serverconnector/grpc/operation_async.go @@ -98,11 +98,13 @@ func (g *Connector) GetConnectionManager() network.ConnectionManager { } // 创建服务发现客户端 -func (g *Connector) createDiscoverClient(reqID string, - connection *network.Connection, timeout time.Duration) (connector.DiscoverClient, context.CancelFunc, error) { +func (g *Connector) createDiscoverClient(args *connector.DiscoverClientCreatorArgs) (connector.DiscoverClient, context.CancelFunc, error) { // 创建namingClient对象 - client := apiservice.NewPolarisGRPCClient(network.ToGRPCConn(connection.Conn)) - outgoingCtx, cancel := connector.CreateHeaderContextWithReqId(timeout, reqID) + client := apiservice.NewPolarisGRPCClient(network.ToGRPCConn(args.Connection.Conn)) + outgoingCtx, cancel := connector.CreateHeadersContext(args.Timeout, + connector.AppendAuthHeader(args.AuthToken), + connector.AppendHeaderWithReqId(args.ReqId)) + discoverClient, err := client.Discover(outgoingCtx) return discoverClient, cancel, err } diff --git a/plugin/serverconnector/grpc/operation_sync.go b/plugin/serverconnector/grpc/operation_sync.go index 4199ec5d..c14f1dbe 100644 --- a/plugin/serverconnector/grpc/operation_sync.go +++ b/plugin/serverconnector/grpc/operation_sync.go @@ -56,7 +56,7 @@ func (g *Connector) RegisterInstance(req *model.InstanceRegisterRequest, header var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextRegisterInstanceReqID() - ctx, cancel = connector.CreateHeaderContext(*req.Timeout, connector.AppendHeaderWithReqId(header, reqID)) + ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { @@ -120,7 +120,7 @@ func (g *Connector) DeregisterInstance(req *model.InstanceDeRegisterRequest) err var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextDeRegisterInstanceReqID() - ctx, cancel = connector.CreateHeaderContextWithReqId(*req.Timeout, reqID) + ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { defer cancel() @@ -181,7 +181,7 @@ func (g *Connector) Heartbeat(req *model.InstanceHeartbeatRequest) error { var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextHeartbeatReqID() - ctx, cancel = connector.CreateHeaderContextWithReqId(*req.Timeout, reqID) + ctx, cancel = connector.CreateHeadersContext(*req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { defer cancel() @@ -269,7 +269,7 @@ func (g *Connector) ReportClient(req *model.ReportClientRequest) (*model.ReportC var ( namingClient = apiservice.NewPolarisGRPCClient(network.ToGRPCConn(conn.Conn)) reqID = connector.NextReportClientReqID() - ctx, cancel = connector.CreateHeaderContextWithReqId(req.Timeout, reqID) + ctx, cancel = connector.CreateHeadersContext(req.Timeout, connector.AppendHeaderWithReqId(reqID)) ) if cancel != nil { defer cancel()