Skip to content

Commit

Permalink
feature: support protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
skyitachi authored and xujianhai666 committed Mar 21, 2020
1 parent bde7db3 commit c37b2d2
Show file tree
Hide file tree
Showing 25 changed files with 2,511 additions and 238 deletions.
1 change: 1 addition & 0 deletions common/constant/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
DEFAULT_FAILBACK_TASKS = 100
DEFAULT_REST_CLIENT = "resty"
DEFAULT_REST_SERVER = "go-restful"
DEFAULT_SERIALIZATION = HESSIAN2_SERIALIZATION
)

const (
Expand Down
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
SERIALIZATION_KEY = "serialization"
)

const (
Expand Down
28 changes: 28 additions & 0 deletions common/constant/serializtion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package constant

const (
S_Hessian2 byte = 2
S_Proto byte = 21
)

const (
HESSIAN2_SERIALIZATION = "hessian2"
PROTOBUF_SERIALIZATION = "protobuf"
)
58 changes: 58 additions & 0 deletions common/extension/serialization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import (
"github.com/pkg/errors"
)

import (
"github.com/apache/dubbo-go/common/constant"
)

var (
serializers = make(map[string]interface{})
nameMaps = make(map[byte]string)
)

func init() {
nameMaps = map[byte]string{
constant.S_Hessian2: constant.HESSIAN2_SERIALIZATION,
constant.S_Proto: constant.PROTOBUF_SERIALIZATION,
}
}

func SetSerializer(name string, serializer interface{}) {
serializers[name] = serializer
}

func GetSerializer(name string) interface{} {
return serializers[name]
}

func GetSerializerById(id byte) (interface{}, error) {
name, ok := nameMaps[id]
if !ok {
return nil, errors.Errorf("serialId %d not found", id)
}
serializer, ok := serializers[name]
if !ok {
return nil, errors.Errorf("serialization %s not found", name)
}
return serializer, nil
}
27 changes: 27 additions & 0 deletions common/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math"
"net"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -629,3 +630,29 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f
}
return methodConfigMergeFcn
}

// doesn't encode url reserve character, url.QueryEscape will do this work
// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method
func ParamsUnescapeEncode(params url.Values) string {
if params == nil {
return ""
}
var buf strings.Builder
keys := make([]string, len(params))
for k := range params {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
vs := params[k]
for _, v := range vs {
if buf.Len() > 0 {
buf.WriteByte('&')
}
buf.WriteString(k)
buf.WriteByte('=')
buf.WriteString(v)
}
}
return buf.String()
}
2 changes: 2 additions & 0 deletions config/service_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ServiceConfig struct {
Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"`
Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"`
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Serialization string `yaml:"serialization" json:"serialization" property:"serialization"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
Expand Down Expand Up @@ -193,6 +194,7 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.GROUP_KEY, c.Group)
urlMap.Set(constant.VERSION_KEY, c.Version)
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.SERIALIZATION_KEY, srvconfig.Serialization)
// application info
urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name)
urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect
github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect
github.com/magiconair/properties v1.8.1
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/mitchellh/mapstructure v1.1.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
Expand Down
87 changes: 51 additions & 36 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
)

import (
hessian "github.com/apache/dubbo-go-hessian2"
"github.com/dubbogo/getty"
gxsync "github.com/dubbogo/gost/sync"
perrors "github.com/pkg/errors"
Expand Down Expand Up @@ -137,6 +136,7 @@ type Client struct {
sequence atomic.Uint64

pendingResponses *sync.Map
codec DubboCodec
}

// NewClient ...
Expand All @@ -160,6 +160,7 @@ func NewClient(opt Options) *Client {
opts: opt,
pendingResponses: new(sync.Map),
conf: *clientConf,
codec: DubboCodec{},
}
c.sequence.Store(initSequence)
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
Expand All @@ -178,6 +179,10 @@ type Request struct {

// NewRequest ...
func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request {
// NOTE: compatible with old versions
if svcUrl.GetParam(constant.SERIALIZATION_KEY, "") == "" {
svcUrl.SetParam(constant.SERIALIZATION_KEY, constant.DEFAULT_SERIALIZATION)
}
return &Request{
addr: addr,
svcUrl: svcUrl,
Expand Down Expand Up @@ -225,35 +230,6 @@ func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, resp
}

func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error {

p := &DubboPackage{}
p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
p.Service.Method = request.method

p.Service.Timeout = c.opts.RequestTimeout
var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "")
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
p.Service.Timeout = t
}
}

p.Header.SerialID = byte(S_Dubbo)
p.Body = hessian.NewRequest(request.args, request.atta)

var rsp *PendingResponse
if ct != CT_OneWay {
p.Header.Type = hessian.PackageRequest_TwoWay
rsp = NewPendingResponse()
rsp.response = response
rsp.callback = callback
} else {
p.Header.Type = hessian.PackageRequest
}

var (
err error
session getty.Session
Expand All @@ -274,6 +250,37 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac
conn.close()
}()

var rsp *PendingResponse
svc := Service{}
header := DubboHeader{}
svc.Path = strings.TrimPrefix(request.svcUrl.Path, "/")
svc.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "")
svc.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "")
svc.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "")
svc.Method = request.method
svc.Timeout = c.opts.RequestTimeout
p := NewClientRequestPackage(header, svc)

serialization := request.svcUrl.GetParam(constant.SERIALIZATION_KEY, c.conf.Serialization)
if serialization == constant.HESSIAN2_SERIALIZATION {
p.Header.SerialID = constant.S_Hessian2
} else if serialization == constant.PROTOBUF_SERIALIZATION {
p.Header.SerialID = constant.S_Proto
}
p.SetBody(NewRequestPayload(request.args, request.atta))

if err := loadSerializer(p); err != nil {
return err
}

if ct != CT_OneWay {
p.Header.Type = PackageRequest_TwoWay
rsp = NewPendingResponse()
rsp.response = response
rsp.callback = callback
} else {
p.Header.Type = PackageRequest
}
if err = c.transfer(session, p, rsp); err != nil {
return perrors.WithStack(err)
}
Expand Down Expand Up @@ -324,13 +331,21 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
sequence = c.sequence.Add(1)

if pkg == nil {
pkg = &DubboPackage{}
pkg.Body = hessian.NewRequest([]interface{}{}, nil)
pkg.Body = []interface{}{}
pkg.Header.Type = hessian.PackageHeartbeat
pkg.Header.SerialID = byte(S_Dubbo)
// make heartbeat package
header := DubboHeader{
Type: PackageHeartbeat,
SerialID: constant.S_Hessian2,
}
pkg = NewClientRequestPackage(header, Service{})
// SetBody
reqPayload := NewRequestPayload([]interface{}{}, nil)
pkg.SetBody(reqPayload)
// set serializer
if err := loadSerializer(pkg); err != nil {
return err
}
}
pkg.Header.ID = int64(sequence)
pkg.SetID(int64(sequence))

// cond1
if rsp != nil {
Expand Down
Loading

0 comments on commit c37b2d2

Please sign in to comment.