Skip to content

Commit

Permalink
feature: support protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
skyitachi authored and xujianhai666 committed Mar 21, 2020
1 parent 8d46ac0 commit 46978e8
Show file tree
Hide file tree
Showing 26 changed files with 2,522 additions and 219 deletions.
1 change: 1 addition & 0 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
DEFAULT_FAILBACK_TIMES = "3"
DEFAULT_FAILBACK_TIMES_INT = 3
DEFAULT_FAILBACK_TASKS = 100
DEFAULT_SERIALIZATION = HESSIAN2_SERIALIZATION
)

const (
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
SERIALIZATION_KEY = "serialization"
)

const (
Expand Down
28 changes: 28 additions & 0 deletions common/constant/serializtion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constant

const (
S_Hessian2 byte = 2
S_Proto byte = 21
)

const (
HESSIAN2_SERIALIZATION = "hessian2"
PROTOBUF_SERIALIZATION = "protobuf"
)
58 changes: 58 additions & 0 deletions common/extension/serialization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import (
"github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common/constant"
)

var (
serializers = make(map[string]interface{})
nameMaps = make(map[byte]string)
)

func init() {
nameMaps = map[byte]string{
constant.S_Hessian2: constant.HESSIAN2_SERIALIZATION,
constant.S_Proto: constant.PROTOBUF_SERIALIZATION,
}
}

func SetSerializer(name string, serializer interface{}) {
serializers[name] = serializer
}

func GetSerializer(name string) interface{} {
return serializers[name]
}

func GetSerializerById(id byte) (interface{}, error) {
name, ok := nameMaps[id]
if !ok {
return nil, errors.Errorf("serialId %d not found", id)
}
serializer, ok := serializers[name]
if !ok {
return nil, errors.Errorf("serialization %s not found", name)
}
return serializer, nil
}
27 changes: 27 additions & 0 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"net"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -565,3 +566,29 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
}
return methodConfigMergeFcn
}

// doesn't encode url reserve character, url.QueryEscape will do this work
// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method
func ParamsUnescapeEncode(params url.Values) string {
if params == nil {
return ""
}
var buf strings.Builder
keys := make([]string, len(params))
for k := range params {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
vs := params[k]
for _, v := range vs {
if buf.Len() > 0 {
buf.WriteByte('&')
}
buf.WriteString(k)
buf.WriteByte('=')
buf.WriteString(v)
}
}
return buf.String()
}
2 changes: 2 additions & 0 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ServiceConfig struct {
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
Expand Down Expand Up @@ -185,6 +186,7 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GROUP_KEY, srvconfig.Group)
urlMap.Set(constant.VERSION_KEY, srvconfig.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.SERIALIZATION_KEY, srvconfig.Serialization)
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require (
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20191029001541-894e45c9aaaa // indirect
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
Expand All @@ -18,6 +18,7 @@ require (
github.com/go-errors/errors v1.0.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand All @@ -31,6 +32,7 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/tebeka/strftime v0.1.3 h1:5HQXOqWKYRFfNyBMNVc9z5+QzuBtIXy03psIhtdJYto=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
github.com/tent/http-link-go v0.0.0-20130702225549-ac974c61c2f9/go.mod h1:RHkNRtSLfOK7qBTHaeSX1D6BNpI3qw7NTxsmNr4RvN8=
github.com/tevid/gohamcrest v1.1.1 h1:ou+xSqlIw1xfGTg1uq1nif/htZ2S3EzRqLm2BP+tYU0=
github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
79 changes: 51 additions & 28 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

import (
"github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
Expand Down Expand Up @@ -130,6 +129,7 @@ type Client struct {
sequence atomic.Uint64

pendingResponses *sync.Map
codec DubboCodec
}

func NewClient(opt Options) *Client {
Expand All @@ -146,6 +146,7 @@ func NewClient(opt Options) *Client {
opts: opt,
pendingResponses: new(sync.Map),
conf: *clientConf,
codec: DubboCodec{},
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))

Expand All @@ -161,6 +162,10 @@ type Request struct {
}

func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request {
// NOTE: compatible with old versions
if svcUrl.GetParam(constant.SERIALIZATION_KEY, "") == "" {
svcUrl.SetParam(constant.SERIALIZATION_KEY, constant.DEFAULT_SERIALIZATION)
}
return &Request{
addr: addr,
svcUrl: svcUrl,
Expand Down Expand Up @@ -205,27 +210,6 @@ func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, resp
}

func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {

p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
p.Service.Method = request.method
p.Service.Timeout = c.opts.RequestTimeout
p.Header.SerialID = byte(S_Dubbo)
p.Body = hessian.NewRequest(request.args, request.atta)

var rsp *PendingResponse
if ct != CT_OneWay {
p.Header.Type = hessian.PackageRequest_TwoWay
rsp = NewPendingResponse()
rsp.response = response
rsp.callback = callback
} else {
p.Header.Type = hessian.PackageRequest
}

var (
err error
session getty.Session
Expand All @@ -240,6 +224,37 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
}
defer c.pool.release(conn, err)

var rsp *PendingResponse
svc := Service{}
header := DubboHeader{}
svc.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
svc.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
svc.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
svc.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
svc.Method = request.method
svc.Timeout = c.opts.RequestTimeout
p := NewClientRequestPackage(header, svc)

serialization := request.svcUrl.GetParam(constant.SERIALIZATION_KEY, c.conf.Serialization)
if serialization == constant.HESSIAN2_SERIALIZATION {
p.Header.SerialID = constant.S_Hessian2
} else if serialization == constant.PROTOBUF_SERIALIZATION {
p.Header.SerialID = constant.S_Proto
}
p.SetBody(NewRequestPayload(request.args, request.atta))

if err := loadSerializer(p); err != nil {
return err
}

if ct != CT_OneWay {
p.Header.Type = PackageRequest_TwoWay
rsp = NewPendingResponse()
rsp.response = response
rsp.callback = callback
} else {
p.Header.Type = PackageRequest
}
if err = c.transfer(session, p, rsp); err != nil {
return perrors.WithStack(err)
}
Expand Down Expand Up @@ -289,13 +304,21 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
sequence = c.sequence.Add(1)

if pkg == nil {
pkg = &DubboPackage{}
pkg.Body = hessian.NewRequest([]interface{}{}, nil)
pkg.Body = []interface{}{}
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
// make heartbeat package
header := DubboHeader{
Type: PackageHeartbeat,
SerialID: constant.S_Hessian2,
}
pkg = NewClientRequestPackage(header, Service{})
// SetBody
reqPayload := NewRequestPayload([]interface{}{}, nil)
pkg.SetBody(reqPayload)
// set serializer
if err := loadSerializer(pkg); err != nil {
return err
}
}
pkg.Header.ID = int64(sequence)
pkg.SetID(int64(sequence))

// cond1
if rsp != nil {
Expand Down
Loading

0 comments on commit 46978e8

Please sign in to comment.