From c37b2d284f0cdecc3dadbc0ede346b592eab3192 Mon Sep 17 00:00:00 2001 From: skyitachi Date: Thu, 19 Dec 2019 10:28:10 +0800 Subject: [PATCH] feature: support protobuf --- common/constant/default.go | 1 + common/constant/key.go | 1 + common/constant/serializtion.go | 28 ++ common/extension/serialization.go | 58 ++++ common/url.go | 27 ++ config/service_config.go | 2 + go.mod | 1 + protocol/dubbo/client.go | 87 ++--- protocol/dubbo/codec.go | 301 +++++++++++++---- protocol/dubbo/codec_test.go | 166 ++++++++-- protocol/dubbo/config.go | 4 + protocol/dubbo/const.go | 238 ++++++++++++++ protocol/dubbo/dubbo_invoker.go | 4 + protocol/dubbo/dubbo_protocol.go | 1 - protocol/dubbo/hessian.go | 504 +++++++++++++++++++++++++++++ protocol/dubbo/listener.go | 98 +++--- protocol/dubbo/package.go | 198 ++++++++++++ protocol/dubbo/proto.go | 392 ++++++++++++++++++++++ protocol/dubbo/proto/payload.pb.go | 328 +++++++++++++++++++ protocol/dubbo/proto/payload.proto | 78 +++++ protocol/dubbo/readwriter.go | 137 ++++---- protocol/dubbo/request.go | 40 +++ protocol/dubbo/response.go | 46 +++ protocol/dubbo/serialize.go | 6 + registry/zookeeper/registry.go | 3 +- 25 files changed, 2511 insertions(+), 238 deletions(-) create mode 100644 common/constant/serializtion.go create mode 100644 common/extension/serialization.go create mode 100644 protocol/dubbo/const.go create mode 100644 protocol/dubbo/hessian.go create mode 100644 protocol/dubbo/package.go create mode 100644 protocol/dubbo/proto.go create mode 100644 protocol/dubbo/proto/payload.pb.go create mode 100644 protocol/dubbo/proto/payload.proto create mode 100644 protocol/dubbo/request.go create mode 100644 protocol/dubbo/response.go create mode 100644 protocol/dubbo/serialize.go diff --git a/common/constant/default.go b/common/constant/default.go index 3c889158e4..e4453068c2 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -43,6 +43,7 @@ const ( DEFAULT_FAILBACK_TASKS = 100 DEFAULT_REST_CLIENT = "resty" DEFAULT_REST_SERVER = "go-restful" + DEFAULT_SERIALIZATION = HESSIAN2_SERIALIZATION ) const ( diff --git a/common/constant/key.go b/common/constant/key.go index 07335bed59..8c38ebc7fe 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -75,6 +75,7 @@ const ( EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler" PROVIDER_SHUTDOWN_FILTER = "pshutdown" CONSUMER_SHUTDOWN_FILTER = "cshutdown" + SERIALIZATION_KEY = "serialization" ) const ( diff --git a/common/constant/serializtion.go b/common/constant/serializtion.go new file mode 100644 index 0000000000..f27598ccf5 --- /dev/null +++ b/common/constant/serializtion.go @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +const ( + S_Hessian2 byte = 2 + S_Proto byte = 21 +) + +const ( + HESSIAN2_SERIALIZATION = "hessian2" + PROTOBUF_SERIALIZATION = "protobuf" +) diff --git a/common/extension/serialization.go b/common/extension/serialization.go new file mode 100644 index 0000000000..a0b8b889cf --- /dev/null +++ b/common/extension/serialization.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common/constant" +) + +var ( + serializers = make(map[string]interface{}) + nameMaps = make(map[byte]string) +) + +func init() { + nameMaps = map[byte]string{ + constant.S_Hessian2: constant.HESSIAN2_SERIALIZATION, + constant.S_Proto: constant.PROTOBUF_SERIALIZATION, + } +} + +func SetSerializer(name string, serializer interface{}) { + serializers[name] = serializer +} + +func GetSerializer(name string) interface{} { + return serializers[name] +} + +func GetSerializerById(id byte) (interface{}, error) { + name, ok := nameMaps[id] + if !ok { + return nil, errors.Errorf("serialId %d not found", id) + } + serializer, ok := serializers[name] + if !ok { + return nil, errors.Errorf("serialization %s not found", name) + } + return serializer, nil +} diff --git a/common/url.go b/common/url.go index ebb648db27..3d41dd32f4 100644 --- a/common/url.go +++ b/common/url.go @@ -24,6 +24,7 @@ import ( "math" "net" "net/url" + "sort" "strconv" "strings" "sync" @@ -629,3 +630,29 @@ func mergeNormalParam(mergedUrl *URL, referenceUrl *URL, paramKeys []string) []f } return methodConfigMergeFcn } + +// doesn't encode url reserve character, url.QueryEscape will do this work +// reference: https://github.com/golang/go.git, src/net/url/url.go, Encode method +func ParamsUnescapeEncode(params url.Values) string { + if params == nil { + return "" + } + var buf strings.Builder + keys := make([]string, len(params)) + for k := range params { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + vs := params[k] + for _, v := range vs { + if buf.Len() > 0 { + buf.WriteByte('&') + } + buf.WriteString(k) + buf.WriteByte('=') + buf.WriteString(v) + } + } + return buf.String() +} diff --git a/config/service_config.go b/config/service_config.go index 7d97fa4d1e..31317663d0 100644 --- a/config/service_config.go +++ b/config/service_config.go @@ -57,6 +57,7 @@ type ServiceConfig struct { Methods []*MethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` Warmup string `yaml:"warmup" json:"warmup,omitempty" property:"warmup"` Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"` + Serialization string `yaml:"serialization" json:"serialization" property:"serialization"` Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"` Token string `yaml:"token" json:"token,omitempty" property:"token"` AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"` @@ -193,6 +194,7 @@ func (c *ServiceConfig) getUrlMap() url.Values { urlMap.Set(constant.GROUP_KEY, c.Group) urlMap.Set(constant.VERSION_KEY, c.Version) urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER)) + urlMap.Set(constant.SERIALIZATION_KEY, srvconfig.Serialization) // application info urlMap.Set(constant.APPLICATION_KEY, providerConfig.ApplicationConfig.Name) urlMap.Set(constant.ORGANIZATION_KEY, providerConfig.ApplicationConfig.Organization) diff --git a/go.mod b/go.mod index 83091cf8b9..e9a9f0a7d9 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f // indirect github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 // indirect github.com/magiconair/properties v1.8.1 + github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/mitchellh/mapstructure v1.1.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb diff --git a/protocol/dubbo/client.go b/protocol/dubbo/client.go index 5ec7db51af..07cac4a3d3 100644 --- a/protocol/dubbo/client.go +++ b/protocol/dubbo/client.go @@ -25,7 +25,6 @@ import ( ) import ( - hessian "github.com/apache/dubbo-go-hessian2" "github.com/dubbogo/getty" gxsync "github.com/dubbogo/gost/sync" perrors "github.com/pkg/errors" @@ -137,6 +136,7 @@ type Client struct { sequence atomic.Uint64 pendingResponses *sync.Map + codec DubboCodec } // NewClient ... @@ -160,6 +160,7 @@ func NewClient(opt Options) *Client { opts: opt, pendingResponses: new(sync.Map), conf: *clientConf, + codec: DubboCodec{}, } c.sequence.Store(initSequence) c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) @@ -178,6 +179,10 @@ type Request struct { // NewRequest ... func NewRequest(addr string, svcUrl common.URL, method string, args interface{}, atta map[string]string) *Request { + // NOTE: compatible with old versions + if svcUrl.GetParam(constant.SERIALIZATION_KEY, "") == "" { + svcUrl.SetParam(constant.SERIALIZATION_KEY, constant.DEFAULT_SERIALIZATION) + } return &Request{ addr: addr, svcUrl: svcUrl, @@ -225,35 +230,6 @@ func (c *Client) AsyncCall(request *Request, callback common.AsyncCallback, resp } func (c *Client) call(ct CallType, request *Request, response *Response, callback common.AsyncCallback) error { - - p := &DubboPackage{} - p.Service.Path = strings.TrimPrefix(request.svcUrl.Path, "/") - p.Service.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "") - p.Service.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") - p.Service.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") - p.Service.Method = request.method - - p.Service.Timeout = c.opts.RequestTimeout - var timeout = request.svcUrl.GetParam(strings.Join([]string{constant.METHOD_KEYS, request.method + constant.RETRIES_KEY}, "."), "") - if len(timeout) != 0 { - if t, err := time.ParseDuration(timeout); err == nil { - p.Service.Timeout = t - } - } - - p.Header.SerialID = byte(S_Dubbo) - p.Body = hessian.NewRequest(request.args, request.atta) - - var rsp *PendingResponse - if ct != CT_OneWay { - p.Header.Type = hessian.PackageRequest_TwoWay - rsp = NewPendingResponse() - rsp.response = response - rsp.callback = callback - } else { - p.Header.Type = hessian.PackageRequest - } - var ( err error session getty.Session @@ -274,6 +250,37 @@ func (c *Client) call(ct CallType, request *Request, response *Response, callbac conn.close() }() + var rsp *PendingResponse + svc := Service{} + header := DubboHeader{} + svc.Path = strings.TrimPrefix(request.svcUrl.Path, "/") + svc.Interface = request.svcUrl.GetParam(constant.INTERFACE_KEY, "") + svc.Version = request.svcUrl.GetParam(constant.VERSION_KEY, "") + svc.Group = request.svcUrl.GetParam(constant.GROUP_KEY, "") + svc.Method = request.method + svc.Timeout = c.opts.RequestTimeout + p := NewClientRequestPackage(header, svc) + + serialization := request.svcUrl.GetParam(constant.SERIALIZATION_KEY, c.conf.Serialization) + if serialization == constant.HESSIAN2_SERIALIZATION { + p.Header.SerialID = constant.S_Hessian2 + } else if serialization == constant.PROTOBUF_SERIALIZATION { + p.Header.SerialID = constant.S_Proto + } + p.SetBody(NewRequestPayload(request.args, request.atta)) + + if err := loadSerializer(p); err != nil { + return err + } + + if ct != CT_OneWay { + p.Header.Type = PackageRequest_TwoWay + rsp = NewPendingResponse() + rsp.response = response + rsp.callback = callback + } else { + p.Header.Type = PackageRequest + } if err = c.transfer(session, p, rsp); err != nil { return perrors.WithStack(err) } @@ -324,13 +331,21 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage, sequence = c.sequence.Add(1) if pkg == nil { - pkg = &DubboPackage{} - pkg.Body = hessian.NewRequest([]interface{}{}, nil) - pkg.Body = []interface{}{} - pkg.Header.Type = hessian.PackageHeartbeat - pkg.Header.SerialID = byte(S_Dubbo) + // make heartbeat package + header := DubboHeader{ + Type: PackageHeartbeat, + SerialID: constant.S_Hessian2, + } + pkg = NewClientRequestPackage(header, Service{}) + // SetBody + reqPayload := NewRequestPayload([]interface{}{}, nil) + pkg.SetBody(reqPayload) + // set serializer + if err := loadSerializer(pkg); err != nil { + return err + } } - pkg.Header.ID = int64(sequence) + pkg.SetID(int64(sequence)) // cond1 if rsp != nil { diff --git a/protocol/dubbo/codec.go b/protocol/dubbo/codec.go index 76416b2baf..85bfbbe788 100644 --- a/protocol/dubbo/codec.go +++ b/protocol/dubbo/codec.go @@ -19,23 +19,37 @@ package dubbo import ( "bufio" - "bytes" - "fmt" + "encoding/binary" "time" ) +import ( + "github.com/pkg/errors" +) + import ( "github.com/apache/dubbo-go-hessian2" "github.com/apache/dubbo-go/common" - perrors "github.com/pkg/errors" + "github.com/apache/dubbo-go/common/logger" ) -//SerialID serial ID -type SerialID byte +type DubboCodec struct { + reader *bufio.Reader + pkgType PackageType + bodyLen int + serializer Serializer + headerRead bool +} +// enum part const ( - // S_Dubbo dubbo serial id - S_Dubbo SerialID = 2 + PackageError = PackageType(0x01) + PackageRequest = PackageType(0x02) + PackageResponse = PackageType(0x04) + PackageHeartbeat = PackageType(0x08) + PackageRequest_TwoWay = PackageType(0x10) + PackageResponse_Exception = PackageType(0x20) + PackageType_BitSize = 0x2f ) //CallType call type @@ -53,77 +67,252 @@ const ( //////////////////////////////////////////// // dubbo package //////////////////////////////////////////// - -// SequenceType ... type SequenceType int64 -// DubboPackage ... -type DubboPackage struct { - Header hessian.DubboHeader - Service hessian.Service - Body interface{} - Err error -} +// PackageType ... +type PackageType int -func (p DubboPackage) String() string { - return fmt.Sprintf("DubboPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) -} +func (c *DubboCodec) ReadHeader(header *DubboHeader) error { + var err error + if c.reader.Size() < HEADER_LENGTH { + return hessian.ErrHeaderNotEnough + } + buf, err := c.reader.Peek(HEADER_LENGTH) + if err != nil { // this is impossible + return errors.WithStack(err) + } + _, err = c.reader.Discard(HEADER_LENGTH) + if err != nil { // this is impossible + return errors.WithStack(err) + } -// Marshal ... -func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { - codec := hessian.NewHessianCodec(nil) + //// read header + if buf[0] != MAGIC_HIGH && buf[1] != MAGIC_LOW { + return hessian.ErrIllegalPackage + } - pkg, err := codec.Write(p.Service, p.Header, p.Body) - if err != nil { - return nil, perrors.WithStack(err) + // Header{serialization id(5 bit), event, two way, req/response} + if header.SerialID = buf[2] & SERIAL_MASK; header.SerialID == Zero { + return errors.Errorf("serialization ID:%v", header.SerialID) + } + + flag := buf[2] & FLAG_EVENT + if flag != Zero { + header.Type |= PackageHeartbeat + } + flag = buf[2] & FLAG_REQUEST + if flag != Zero { + header.Type |= PackageRequest + flag = buf[2] & FLAG_TWOWAY + if flag != Zero { + header.Type |= PackageRequest_TwoWay + } + } else { + header.Type |= PackageResponse + header.ResponseStatus = buf[3] + if header.ResponseStatus != Response_OK { + header.Type |= PackageResponse_Exception + } + } + + // Header{req id} + header.ID = int64(binary.BigEndian.Uint64(buf[4:])) + + // Header{body len} + header.BodyLen = int(binary.BigEndian.Uint32(buf[12:])) + if header.BodyLen < 0 { + return hessian.ErrIllegalPackage } - return bytes.NewBuffer(pkg), nil + c.pkgType = header.Type + c.bodyLen = header.BodyLen + + if c.reader.Buffered() < c.bodyLen { + return hessian.ErrBodyNotEnough + } + c.headerRead = true + return errors.WithStack(err) +} + +func (c *DubboCodec) EncodeHeader(p DubboPackage) []byte { + header := p.Header + bs := make([]byte, 0) + switch header.Type { + case PackageHeartbeat: + if header.ResponseStatus == Zero { + bs = append(bs, hessian.DubboRequestHeartbeatHeader[:]...) + } else { + bs = append(bs, hessian.DubboResponseHeartbeatHeader[:]...) + } + case PackageResponse: + bs = append(bs, hessian.DubboResponseHeaderBytes[:]...) + if header.ResponseStatus != 0 { + bs[3] = header.ResponseStatus + } + case PackageRequest_TwoWay: + bs = append(bs, hessian.DubboRequestHeaderBytesTwoWay[:]...) + } + bs[2] |= header.SerialID & hessian.SERIAL_MASK + binary.BigEndian.PutUint64(bs[4:], uint64(header.ID)) + return bs } -// Unmarshal ... -func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error { - // fix issue https://github.com/apache/dubbo-go/issues/380 - bufLen := buf.Len() - if bufLen < hessian.HEADER_LENGTH { - return perrors.WithStack(hessian.ErrHeaderNotEnough) +func (c *DubboCodec) Write(p DubboPackage) ([]byte, error) { + // header + if c.serializer == nil { + return nil, errors.New("serializer should not be nil") } + header := p.Header + switch header.Type { + case PackageHeartbeat: + if header.ResponseStatus == Zero { + return packRequest(p, c.serializer) + } + return packResponse(p, c.serializer) - codec := hessian.NewHessianCodec(bufio.NewReaderSize(buf, bufLen)) + case PackageRequest, PackageRequest_TwoWay: + return packRequest(p, c.serializer) - // read header - err := codec.ReadHeader(&p.Header) - if err != nil { - return perrors.WithStack(err) + case PackageResponse: + return packResponse(p, c.serializer) + + default: + return nil, errors.Errorf("Unrecognised message type: %v", header.Type) } +} - if len(opts) != 0 { // for client - client, ok := opts[0].(*Client) +func (c *DubboCodec) Read(p *DubboPackage) error { + if !c.headerRead { + if err := c.ReadHeader(&p.Header); err != nil { + return err + } + } + body, err := c.reader.Peek(p.GetBodyLen()) + if err != nil { + return err + } + if p.IsResponseWithException() { + logger.Infof("response with exception: %+v", p.Header) + decoder := hessian.NewDecoder(body) + exception, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + rsp, ok := p.Body.(*ResponsePayload) if !ok { - return perrors.Errorf("opts[0] is not of type *Client") + return errors.Errorf("java exception:%s", exception.(string)) } + rsp.Exception = errors.Errorf("java exception:%s", exception.(string)) + return nil + } else if p.IsHeartBeat() { + // heartbeat no need to unmarshal contents + return nil + } + if c.serializer == nil { + return errors.New("codec serializer is nil") + } + return c.serializer.Unmarshal(body, p) +} - if p.Header.Type&hessian.PackageRequest != 0x00 { - // size of this array must be '7' - // https://github.com/apache/dubbo-go-hessian2/blob/master/request.go#L272 - p.Body = make([]interface{}, 7) - } else { - pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID)) - if !ok { - return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID) - } - p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).response.reply} +func (c *DubboCodec) SetSerializer(serializer Serializer) { + c.serializer = serializer +} + +func packRequest(p DubboPackage, serializer Serializer) ([]byte, error) { + var ( + byteArray []byte + pkgLen int + ) + + header := p.Header + + ////////////////////////////////////////// + // byteArray + ////////////////////////////////////////// + // magic + switch header.Type { + case PackageHeartbeat: + byteArray = append(byteArray, DubboRequestHeartbeatHeader[:]...) + case PackageRequest_TwoWay: + byteArray = append(byteArray, DubboRequestHeaderBytesTwoWay[:]...) + default: + byteArray = append(byteArray, DubboRequestHeaderBytes[:]...) + } + + // serialization id, two way flag, event, request/response flag + // SerialID is id of serialization approach in java dubbo + byteArray[2] |= header.SerialID & SERIAL_MASK + // request id + binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID)) + + ////////////////////////////////////////// + // body + ////////////////////////////////////////// + if p.IsHeartBeat() { + byteArray = append(byteArray, byte('N')) + pkgLen = 1 + } else { + body, err := serializer.Marshal(p) + if err != nil { + return nil, err } + pkgLen = len(body) + if pkgLen > int(DEFAULT_LEN) { // 8M + return nil, errors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN) + } + byteArray = append(byteArray, body...) + } + binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen)) + return byteArray, nil +} + +func packResponse(p DubboPackage, serializer Serializer) ([]byte, error) { + var ( + byteArray []byte + ) + header := p.Header + hb := p.IsHeartBeat() + + // magic + if hb { + byteArray = append(byteArray, DubboResponseHeartbeatHeader[:]...) + } else { + byteArray = append(byteArray, DubboResponseHeaderBytes[:]...) + } + // set serialID, identify serialization types, eg: fastjson->6, hessian2->2 + byteArray[2] |= header.SerialID & SERIAL_MASK + // response status + if header.ResponseStatus != 0 { + byteArray[3] = header.ResponseStatus } - // read body - err = codec.ReadBody(p.Body) - return perrors.WithStack(err) + // request id + binary.BigEndian.PutUint64(byteArray[4:], uint64(header.ID)) + + // body + body, err := serializer.Marshal(p) + if err != nil { + return nil, err + } + + pkgLen := len(body) + if pkgLen > int(DEFAULT_LEN) { // 8M + return nil, errors.Errorf("Data length %d too large, max payload %d", pkgLen, DEFAULT_LEN) + } + // byteArray{body length} + binary.BigEndian.PutUint32(byteArray[12:], uint32(pkgLen)) + byteArray = append(byteArray, body...) + return byteArray, nil } -//////////////////////////////////////////// -// PendingResponse -//////////////////////////////////////////// +func NewDubboCodec(reader *bufio.Reader) *DubboCodec { + return &DubboCodec{ + reader: reader, + pkgType: 0, + bodyLen: 0, + headerRead: false, + } +} // PendingResponse ... type PendingResponse struct { diff --git a/protocol/dubbo/codec_test.go b/protocol/dubbo/codec_test.go index 5dc71f0d08..e488df7be0 100644 --- a/protocol/dubbo/codec_test.go +++ b/protocol/dubbo/codec_test.go @@ -18,39 +18,47 @@ package dubbo import ( - "bytes" "testing" "time" ) import ( hessian "github.com/apache/dubbo-go-hessian2" + "github.com/golang/protobuf/proto" perrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) +import ( + "github.com/apache/dubbo-go/common/constant" + pb "github.com/apache/dubbo-go/protocol/dubbo/proto" +) + func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { - pkg := &DubboPackage{} + pkg := NewDubboPackage(nil) pkg.Body = []interface{}{"a"} - pkg.Header.Type = hessian.PackageHeartbeat - pkg.Header.SerialID = byte(S_Dubbo) + pkg.Header.Type = PackageHeartbeat + pkg.Header.SerialID = constant.S_Hessian2 pkg.Header.ID = 10086 + pkg.SetSerializer(HessianSerializer{}) // heartbeat data, err := pkg.Marshal() assert.NoError(t, err) - pkgres := &DubboPackage{} + pkgres := NewDubboPackage(data) + pkgres.SetSerializer(HessianSerializer{}) + pkgres.Body = []interface{}{} - err = pkgres.Unmarshal(data) + err = pkgres.Unmarshal() assert.NoError(t, err) - assert.Equal(t, hessian.PackageHeartbeat|hessian.PackageRequest|hessian.PackageRequest_TwoWay, pkgres.Header.Type) - assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) + assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type) + assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID) assert.Equal(t, int64(10086), pkgres.Header.ID) assert.Equal(t, 0, len(pkgres.Body.([]interface{}))) // request - pkg.Header.Type = hessian.PackageRequest + pkg.Header.Type = PackageRequest pkg.Service.Interface = "Service" pkg.Service.Path = "path" pkg.Service.Version = "2.6" @@ -59,25 +67,139 @@ func TestDubboPackage_MarshalAndUnmarshal(t *testing.T) { data, err = pkg.Marshal() assert.NoError(t, err) - pkgres = &DubboPackage{} + pkgres = NewDubboPackage(data) + pkgres.SetSerializer(HessianSerializer{}) pkgres.Body = make([]interface{}, 7) - err = pkgres.Unmarshal(data) + err = pkgres.Unmarshal() + reassembleBody := pkgres.GetBody().(map[string]interface{}) + assert.NoError(t, err) + assert.Equal(t, PackageRequest, pkgres.Header.Type) + assert.Equal(t, constant.S_Hessian2, pkgres.Header.SerialID) + assert.Equal(t, int64(10086), pkgres.Header.ID) + assert.Equal(t, "2.0.2", reassembleBody["dubboVersion"].(string)) + assert.Equal(t, "path", pkgres.Service.Path) + assert.Equal(t, "2.6", pkgres.Service.Version) + assert.Equal(t, "Method", pkgres.Service.Method) + assert.Equal(t, "Ljava/lang/String;", reassembleBody["argsTypes"].(string)) + assert.Equal(t, []interface{}{"a"}, reassembleBody["args"]) + assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, reassembleBody["attachments"].(map[string]string)) +} + +func TestDubboPackage_Protobuf_Serialization_Request(t *testing.T) { + pkg := NewDubboPackage(nil) + pkg.Body = []interface{}{"a"} + pkg.Header.Type = PackageHeartbeat + pkg.Header.SerialID = constant.S_Proto + pkg.Header.ID = 10086 + pkg.SetSerializer(ProtoSerializer{}) + + // heartbeat + data, err := pkg.Marshal() assert.NoError(t, err) - assert.Equal(t, hessian.PackageRequest, pkgres.Header.Type) - assert.Equal(t, byte(S_Dubbo), pkgres.Header.SerialID) + + pkgres := NewDubboPackage(data) + pkgres.SetSerializer(HessianSerializer{}) + + pkgres.Body = []interface{}{} + err = pkgres.Unmarshal() + assert.NoError(t, err) + assert.Equal(t, PackageHeartbeat|PackageRequest|PackageRequest_TwoWay, pkgres.Header.Type) + assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID) assert.Equal(t, int64(10086), pkgres.Header.ID) - assert.Equal(t, "2.0.2", pkgres.Body.([]interface{})[0]) - assert.Equal(t, "path", pkgres.Body.([]interface{})[1]) - assert.Equal(t, "2.6", pkgres.Body.([]interface{})[2]) - assert.Equal(t, "Method", pkgres.Body.([]interface{})[3]) - assert.Equal(t, "Ljava/lang/String;", pkgres.Body.([]interface{})[4]) - assert.Equal(t, []interface{}{"a"}, pkgres.Body.([]interface{})[5]) - assert.Equal(t, map[string]string{"dubbo": "2.0.2", "interface": "Service", "path": "path", "timeout": "1000", "version": "2.6"}, pkgres.Body.([]interface{})[6]) + assert.Equal(t, 0, len(pkgres.Body.([]interface{}))) + + // request + pkg.Header.Type = PackageRequest + pkg.Service.Interface = "Service" + pkg.Service.Path = "path" + pkg.Service.Version = "2.6" + pkg.Service.Method = "Method" + pkg.Service.Timeout = time.Second + pkg.SetBody([]interface{}{&pb.StringValue{Value: "hello world"}}) + data, err = pkg.Marshal() + assert.NoError(t, err) + + pkgres = NewDubboPackage(data) + pkgres.SetSerializer(ProtoSerializer{}) + err = pkgres.Unmarshal() + assert.NoError(t, err) + body, ok := pkgres.Body.(map[string]interface{}) + assert.Equal(t, ok, true) + req, ok := body["args"].([]interface{}) + assert.Equal(t, ok, true) + // protobuf rpc just has exact one parameter + assert.Equal(t, len(req), 1) + argsBytes, ok := req[0].([]byte) + assert.Equal(t, ok, true) + sv := pb.StringValue{} + buf := proto.NewBuffer(argsBytes) + err = buf.Unmarshal(&sv) + assert.NoError(t, err) + assert.Equal(t, sv.Value, "hello world") +} + +func TestDubboCodec_Protobuf_Serialization_Response(t *testing.T) { + { + pkg := NewDubboPackage(nil) + pkg.Header.Type = PackageResponse + pkg.Header.SerialID = constant.S_Proto + pkg.Header.ID = 10086 + pkg.SetSerializer(ProtoSerializer{}) + pkg.SetBody(&pb.StringValue{Value: "hello world"}) + + // heartbeat + data, err := pkg.Marshal() + assert.NoError(t, err) + + pkgres := NewDubboPackage(data) + pkgres.SetSerializer(ProtoSerializer{}) + + pkgres.SetBody(&pb.StringValue{}) + err = pkgres.Unmarshal() + + assert.NoError(t, err) + assert.Equal(t, pkgres.Header.Type, PackageResponse) + assert.Equal(t, constant.S_Proto, pkgres.Header.SerialID) + assert.Equal(t, int64(10086), pkgres.Header.ID) + + res, ok := pkgres.Body.(*pb.StringValue) + assert.Equal(t, ok, true) + assert.Equal(t, res.Value, "hello world") + } + + // with attachments + { + attas := make(map[string]string) + attas["k1"] = "test" + resp := NewResponsePayload(&pb.StringValue{Value: "attachments"}, nil, attas) + p := NewDubboPackage(nil) + p.Header.Type = PackageResponse + p.Header.SerialID = constant.S_Proto + p.SetSerializer(ProtoSerializer{}) + p.SetBody(resp) + data, err := p.Marshal() + assert.NoError(t, err) + + pkgres := NewDubboPackage(data) + pkgres.Header.Type = PackageResponse + pkgres.Header.SerialID = constant.S_Proto + pkgres.Header.ID = 10086 + pkgres.SetSerializer(ProtoSerializer{}) + + resAttachment := make(map[string]string) + resBody := &pb.StringValue{} + pkgres.SetBody(NewResponsePayload(resBody, nil, resAttachment)) + + err = pkgres.Unmarshal() + assert.NoError(t, err) + assert.Equal(t, "attachments", resBody.Value) + assert.Equal(t, "test", resAttachment["k1"]) + } + } func TestIssue380(t *testing.T) { pkg := &DubboPackage{} - buf := bytes.NewBuffer([]byte("hello")) - err := pkg.Unmarshal(buf) + err := pkg.Unmarshal() assert.True(t, perrors.Cause(err) == hessian.ErrHeaderNotEnough) } diff --git a/protocol/dubbo/config.go b/protocol/dubbo/config.go index dbc6989c54..a2f3e70d4d 100644 --- a/protocol/dubbo/config.go +++ b/protocol/dubbo/config.go @@ -91,6 +91,9 @@ type ( // session tcp parameters GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"` + + // serialization + Serialization string `default:"hessian2" yaml:"serialization" json:"serialization"` } ) @@ -106,6 +109,7 @@ func GetDefaultClientConfig() ClientConfig { GrPoolSize: 200, QueueLen: 64, QueueNumber: 10, + Serialization: "hessian2", GettySessionParam: GettySessionParam{ CompressEncoding: false, TcpNoDelay: true, diff --git a/protocol/dubbo/const.go b/protocol/dubbo/const.go new file mode 100644 index 0000000000..936b8d83ac --- /dev/null +++ b/protocol/dubbo/const.go @@ -0,0 +1,238 @@ +package dubbo + +import ( + "github.com/pkg/errors" + "reflect" + "regexp" +) + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const ( + mask = byte(127) + flag = byte(128) +) + +const ( + // Zero : byte zero + Zero = byte(0x00) +) + +// constansts +const ( + TAG_READ = int32(-1) + ASCII_GAP = 32 + CHUNK_SIZE = 4096 + BC_BINARY = byte('B') // final chunk + BC_BINARY_CHUNK = byte('A') // non-final chunk + + BC_BINARY_DIRECT = byte(0x20) // 1-byte length binary + BINARY_DIRECT_MAX = byte(0x0f) + BC_BINARY_SHORT = byte(0x34) // 2-byte length binary + BINARY_SHORT_MAX = 0x3ff // 0-1023 binary + + BC_DATE = byte(0x4a) // 64-bit millisecond UTC date + BC_DATE_MINUTE = byte(0x4b) // 32-bit minute UTC date + + BC_DOUBLE = byte('D') // IEEE 64-bit double + + BC_DOUBLE_ZERO = byte(0x5b) + BC_DOUBLE_ONE = byte(0x5c) + BC_DOUBLE_BYTE = byte(0x5d) + BC_DOUBLE_SHORT = byte(0x5e) + BC_DOUBLE_MILL = byte(0x5f) + + BC_FALSE = byte('F') // boolean false + + BC_INT = byte('I') // 32-bit int + + INT_DIRECT_MIN = -0x10 + INT_DIRECT_MAX = byte(0x2f) + BC_INT_ZERO = byte(0x90) + + INT_BYTE_MIN = -0x800 + INT_BYTE_MAX = 0x7ff + BC_INT_BYTE_ZERO = byte(0xc8) + + BC_END = byte('Z') + + INT_SHORT_MIN = -0x40000 + INT_SHORT_MAX = 0x3ffff + BC_INT_SHORT_ZERO = byte(0xd4) + + BC_LIST_VARIABLE = byte(0x55) + BC_LIST_FIXED = byte('V') + BC_LIST_VARIABLE_UNTYPED = byte(0x57) + BC_LIST_FIXED_UNTYPED = byte(0x58) + _listFixedTypedLenTagMin = byte(0x70) + _listFixedTypedLenTagMax = byte(0x77) + _listFixedUntypedLenTagMin = byte(0x78) + _listFixedUntypedLenTagMax = byte(0x7f) + + BC_LIST_DIRECT = byte(0x70) + BC_LIST_DIRECT_UNTYPED = byte(0x78) + LIST_DIRECT_MAX = byte(0x7) + + BC_LONG = byte('L') // 64-bit signed integer + LONG_DIRECT_MIN = -0x08 + LONG_DIRECT_MAX = byte(0x0f) + BC_LONG_ZERO = byte(0xe0) + + LONG_BYTE_MIN = -0x800 + LONG_BYTE_MAX = 0x7ff + BC_LONG_BYTE_ZERO = byte(0xf8) + + LONG_SHORT_MIN = -0x40000 + LONG_SHORT_MAX = 0x3ffff + BC_LONG_SHORT_ZERO = byte(0x3c) + + BC_LONG_INT = byte(0x59) + + BC_MAP = byte('M') + BC_MAP_UNTYPED = byte('H') + + BC_NULL = byte('N') // x4e + + BC_OBJECT = byte('O') + BC_OBJECT_DEF = byte('C') + + BC_OBJECT_DIRECT = byte(0x60) + OBJECT_DIRECT_MAX = byte(0x0f) + + BC_REF = byte(0x51) + + BC_STRING = byte('S') // final string + BC_STRING_CHUNK = byte('R') // non-final string + + BC_STRING_DIRECT = byte(0x00) + STRING_DIRECT_MAX = byte(0x1f) + BC_STRING_SHORT = byte(0x30) + STRING_SHORT_MAX = 0x3ff + + BC_TRUE = byte('T') + + P_PACKET_CHUNK = byte(0x4f) + P_PACKET = byte('P') + + P_PACKET_DIRECT = byte(0x80) + PACKET_DIRECT_MAX = byte(0x7f) + + P_PACKET_SHORT = byte(0x70) + PACKET_SHORT_MAX = 0xfff + ARRAY_STRING = "[string" + ARRAY_INT = "[int" + ARRAY_DOUBLE = "[double" + ARRAY_FLOAT = "[float" + ARRAY_BOOL = "[boolean" + ARRAY_LONG = "[long" + + PATH_KEY = "path" + GROUP_KEY = "group" + INTERFACE_KEY = "interface" + VERSION_KEY = "version" + TIMEOUT_KEY = "timeout" + + STRING_NIL = "" + STRING_TRUE = "true" + STRING_FALSE = "false" + STRING_ZERO = "0.0" + STRING_ONE = "1.0" +) + +// ResponsePayload related consts +const ( + Response_OK byte = 20 + Response_CLIENT_TIMEOUT byte = 30 + Response_SERVER_TIMEOUT byte = 31 + Response_BAD_REQUEST byte = 40 + Response_BAD_RESPONSE byte = 50 + Response_SERVICE_NOT_FOUND byte = 60 + Response_SERVICE_ERROR byte = 70 + Response_SERVER_ERROR byte = 80 + Response_CLIENT_ERROR byte = 90 + + // According to "java dubbo" There are two cases of response: + // 1. with attachments + // 2. no attachments + RESPONSE_WITH_EXCEPTION int32 = 0 + RESPONSE_VALUE int32 = 1 + RESPONSE_NULL_VALUE int32 = 2 + RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS int32 = 3 + RESPONSE_VALUE_WITH_ATTACHMENTS int32 = 4 + RESPONSE_NULL_VALUE_WITH_ATTACHMENTS int32 = 5 +) + +/** + * the dubbo protocol header length is 16 Bytes. + * the first 2 Bytes is magic code '0xdabb' + * the next 1 Byte is message flags, in which its 16-20 bit is serial id, 21 for event, 22 for two way, 23 for request/response flag + * the next 1 Bytes is response state. + * the next 8 Bytes is package DI. + * the next 4 Bytes is package length. + **/ +const ( + // header length. + HEADER_LENGTH = 16 + + // magic header + MAGIC = uint16(0xdabb) + MAGIC_HIGH = byte(0xda) + MAGIC_LOW = byte(0xbb) + + // message flag. + FLAG_REQUEST = byte(0x80) + FLAG_TWOWAY = byte(0x40) + FLAG_EVENT = byte(0x20) // for heartbeat + SERIAL_MASK = 0x1f + + DUBBO_VERSION = "2.5.4" + DUBBO_VERSION_KEY = "dubbo" + DEFAULT_DUBBO_PROTOCOL_VERSION = "2.0.2" // Dubbo RPC protocol version, for compatibility, it must not be between 2.0.10 ~ 2.6.2 + LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT = 2000200 + DEFAULT_LEN = 8388608 // 8 * 1024 * 1024 default body max length +) + +// regular +const ( + JAVA_IDENT_REGEX = "(?:[_$a-zA-Z][_$a-zA-Z0-9]*)" + CLASS_DESC = "(?:L" + JAVA_IDENT_REGEX + "(?:\\/" + JAVA_IDENT_REGEX + ")*;)" + ARRAY_DESC = "(?:\\[+(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "))" + DESC_REGEX = "(?:(?:[VZBCDFIJS])|" + CLASS_DESC + "|" + ARRAY_DESC + ")" +) + +// Dubbo request response related consts +var ( + DubboRequestHeaderBytesTwoWay = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY} + DubboRequestHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST} + DubboResponseHeaderBytes = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, Zero, Response_OK} + DubboRequestHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_REQUEST | FLAG_TWOWAY | FLAG_EVENT} + DubboResponseHeartbeatHeader = [HEADER_LENGTH]byte{MAGIC_HIGH, MAGIC_LOW, FLAG_EVENT} +) + +// Error part +var ( + ErrHeaderNotEnough = errors.New("header buffer too short") + ErrBodyNotEnough = errors.New("body buffer too short") + ErrJavaException = errors.New("got java exception") + ErrIllegalPackage = errors.New("illegal package!") +) + +// DescRegex ... +var DescRegex, _ = regexp.Compile(DESC_REGEX) + +var NilValue = reflect.Zero(reflect.TypeOf((*interface{})(nil)).Elem()) diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 09c3725710..392e9af1dc 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -93,6 +93,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati di.appendCtx(ctx, inv) url := di.GetUrl() + // default hessian2 serialization, compatible + if url.GetParam("serialization", "") == "" { + url.SetParam("serialization", constant.HESSIAN2_SERIALIZATION) + } // async async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) if err != nil { diff --git a/protocol/dubbo/dubbo_protocol.go b/protocol/dubbo/dubbo_protocol.go index 355dbc8024..08140606d0 100644 --- a/protocol/dubbo/dubbo_protocol.go +++ b/protocol/dubbo/dubbo_protocol.go @@ -67,7 +67,6 @@ func (dp *DubboProtocol) Export(invoker protocol.Invoker) protocol.Exporter { exporter := NewDubboExporter(serviceKey, invoker, dp.ExporterMap()) dp.SetExporterMap(serviceKey, exporter) logger.Infof("Export service: %s", url.String()) - // start server dp.openServer(url) return exporter diff --git a/protocol/dubbo/hessian.go b/protocol/dubbo/hessian.go new file mode 100644 index 0000000000..713da84885 --- /dev/null +++ b/protocol/dubbo/hessian.go @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +import ( + "math" + "reflect" + "strconv" + "strings" + "time" +) + +import ( + hessian "github.com/apache/dubbo-go-hessian2" + "github.com/apache/dubbo-go-hessian2/java_exception" + "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" +) + +type Object interface{} + +func getArgType(v interface{}) string { + if v == nil { + return "V" + } + + switch v.(type) { + // Serialized tags for base types + case nil: + return "V" + case bool: + return "Z" + case []bool: + return "[Z" + case byte: + return "B" + case []byte: + return "[B" + case int8: + return "B" + case []int8: + return "[B" + case int16: + return "S" + case []int16: + return "[S" + case uint16: // Equivalent to Char of Java + return "C" + case []uint16: + return "[C" + // case rune: + // return "C" + case int: + return "J" + case []int: + return "[J" + case int32: + return "I" + case []int32: + return "[I" + case int64: + return "J" + case []int64: + return "[J" + case time.Time: + return "java.util.Date" + case []time.Time: + return "[Ljava.util.Date" + case float32: + return "F" + case []float32: + return "[F" + case float64: + return "D" + case []float64: + return "[D" + case string: + return "java.lang.String" + case []string: + return "[Ljava.lang.String;" + case []Object: + return "[Ljava.lang.Object;" + case map[interface{}]interface{}: + // return "java.util.HashMap" + return "java.util.Map" + + // Serialized tags for complex types + default: + t := reflect.TypeOf(v) + if reflect.Ptr == t.Kind() { + t = reflect.TypeOf(reflect.ValueOf(v).Elem()) + } + switch t.Kind() { + case reflect.Struct: + return "java.lang.Object" + case reflect.Slice, reflect.Array: + if t.Elem().Kind() == reflect.Struct { + return "[Ljava.lang.Object;" + } + // return "java.util.ArrayList" + return "java.util.List" + case reflect.Map: // Enter here, map may be map[string]int + return "java.util.Map" + default: + return "" + } + } +} + +func getArgsTypeList(args []interface{}) (string, error) { + var ( + typ string + types string + ) + for i := range args { + typ = getArgType(args[i]) + if typ == "" { + return types, errors.Errorf("cat not get arg %#v type", args[i]) + } + if !strings.Contains(typ, ".") { + types += typ + } else if strings.Index(typ, "[") == 0 { + types += strings.Replace(typ, ".", "/", -1) + } else { + // java.util.List -> Ljava/util/List; + types += "L" + strings.Replace(typ, ".", "/", -1) + ";" + } + } + + return types, nil +} + +type HessianSerializer struct { +} + +func (h HessianSerializer) Marshal(p DubboPackage) ([]byte, error) { + encoder := hessian.NewEncoder() + if p.IsRequest() { + return marshalRequest(encoder, p) + } + return marshalResponse(encoder, p) +} + +func (h HessianSerializer) Unmarshal(input []byte, p *DubboPackage) error { + if p.IsHeartBeat() { + return nil + } + if p.IsRequest() { + return unmarshalRequestBody(input, p) + } + return unmarshalResponseBody(input, p) +} + +func marshalResponse(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) { + header := p.Header + response := EnsureResponsePayload(p.Body) + if header.ResponseStatus == Response_OK { + if p.IsHeartBeat() { + encoder.Encode(nil) + } else { + atta := isSupportResponseAttachment(response.Attachments[DUBBO_VERSION_KEY]) + + var resWithException, resValue, resNullValue int32 + if atta { + resWithException = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS + resValue = RESPONSE_VALUE_WITH_ATTACHMENTS + resNullValue = RESPONSE_NULL_VALUE_WITH_ATTACHMENTS + } else { + resWithException = RESPONSE_WITH_EXCEPTION + resValue = RESPONSE_VALUE + resNullValue = RESPONSE_NULL_VALUE + } + + if response.Exception != nil { // throw error + encoder.Encode(resWithException) + if t, ok := response.Exception.(java_exception.Throwabler); ok { + encoder.Encode(t) + } else { + encoder.Encode(java_exception.NewThrowable(response.Exception.Error())) + } + } else { + if response.RspObj == nil { + encoder.Encode(resNullValue) + } else { + encoder.Encode(resValue) + encoder.Encode(response.RspObj) // result + } + } + + if atta { + encoder.Encode(response.Attachments) // attachments + } + } + } else { + if response.Exception != nil { // throw error + encoder.Encode(response.Exception.Error()) + } else { + encoder.Encode(response.RspObj) + } + } + bs := encoder.Buffer() + // encNull + bs = append(bs, byte('N')) + return bs, nil +} + +func marshalRequest(encoder *hessian.Encoder, p DubboPackage) ([]byte, error) { + service := p.Service + request := EnsureRequestPayload(p.Body) + encoder.Encode(DEFAULT_DUBBO_PROTOCOL_VERSION) + encoder.Encode(service.Path) + encoder.Encode(service.Version) + encoder.Encode(service.Method) + + args, ok := request.Params.([]interface{}) + + if !ok { + logger.Infof("request args are: %+v", request.Params) + return nil, errors.Errorf("@params is not of type: []interface{}") + } + types, err := getArgsTypeList(args) + if err != nil { + return nil, errors.Wrapf(err, " PackRequest(args:%+v)", args) + } + encoder.Encode(types) + for _, v := range args { + encoder.Encode(v) + } + + request.Attachments[PATH_KEY] = service.Path + request.Attachments[VERSION_KEY] = service.Version + if len(service.Group) > 0 { + request.Attachments[GROUP_KEY] = service.Group + } + if len(service.Interface) > 0 { + request.Attachments[INTERFACE_KEY] = service.Interface + } + if service.Timeout != 0 { + request.Attachments[TIMEOUT_KEY] = strconv.Itoa(int(service.Timeout / time.Millisecond)) + } + + encoder.Encode(request.Attachments) + return encoder.Buffer(), nil + +} + +var versionInt = make(map[string]int) + +// https://github.com/apache/dubbo/blob/dubbo-2.7.1/dubbo-common/src/main/java/org/apache/dubbo/common/Version.java#L96 +// isSupportResponseAttachment is for compatibility among some dubbo version +func isSupportResponseAttachment(version string) bool { + if version == "" { + return false + } + + v, ok := versionInt[version] + if !ok { + v = version2Int(version) + if v == -1 { + return false + } + } + + if v >= 2001000 && v <= 2060200 { // 2.0.10 ~ 2.6.2 + return false + } + return v >= LOWEST_VERSION_FOR_RESPONSE_ATTACHMENT +} + +func version2Int(version string) int { + var v = 0 + varr := strings.Split(version, ".") + length := len(varr) + for key, value := range varr { + v0, err := strconv.Atoi(value) + if err != nil { + return -1 + } + v += v0 * int(math.Pow10((length-key-1)*2)) + } + if length == 3 { + return v * 100 + } + return v +} + +func unmarshalRequestBody(body []byte, p *DubboPackage) error { + if p.Body == nil { + p.SetBody(make([]interface{}, 7)) + } + decoder := hessian.NewDecoder(body) + var ( + err error + dubboVersion, target, serviceVersion, method, argsTypes interface{} + args []interface{} + ) + req, ok := p.Body.([]interface{}) + if !ok { + return errors.Errorf("@reqObj is not of type: []interface{}") + } + dubboVersion, err = decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + req[0] = dubboVersion + + target, err = decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + req[1] = target + + serviceVersion, err = decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + req[2] = serviceVersion + + method, err = decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + req[3] = method + + argsTypes, err = decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + req[4] = argsTypes + + ats := hessian.DescRegex.FindAllString(argsTypes.(string), -1) + var arg interface{} + for i := 0; i < len(ats); i++ { + arg, err = decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + args = append(args, arg) + } + req[5] = args + + attachments, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + + if v, ok := attachments.(map[interface{}]interface{}); ok { + v[DUBBO_VERSION_KEY] = dubboVersion + req[6] = hessian.ToMapStringString(v) + buildServerSidePackageBody(p) + return nil + } + return errors.Errorf("get wrong attachments: %+v", attachments) +} + +func unmarshalResponseBody(body []byte, p *DubboPackage) error { + decoder := hessian.NewDecoder(body) + rspType, err := decoder.Decode() + if p.Body == nil { + p.SetBody(&ResponsePayload{}) + } + if err != nil { + return errors.WithStack(err) + } + response := EnsureResponsePayload(p.Body) + + switch rspType { + case RESPONSE_WITH_EXCEPTION, RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: + expt, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + if rspType == RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS { + attachments, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + atta := hessian.ToMapStringString(v) + response.Attachments = atta + } else { + return errors.Errorf("get wrong attachments: %+v", attachments) + } + } + + if e, ok := expt.(error); ok { + response.Exception = e + } else { + response.Exception = errors.Errorf("got exception: %+v", expt) + } + return nil + + case RESPONSE_VALUE, RESPONSE_VALUE_WITH_ATTACHMENTS: + rsp, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + if rspType == RESPONSE_VALUE_WITH_ATTACHMENTS { + attachments, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + atta := hessian.ToMapStringString(v) + response.Attachments = atta + } else { + return errors.Errorf("get wrong attachments: %+v", attachments) + } + } + + return errors.WithStack(hessian.ReflectResponse(rsp, response.RspObj)) + + case RESPONSE_NULL_VALUE, RESPONSE_NULL_VALUE_WITH_ATTACHMENTS: + if rspType == RESPONSE_NULL_VALUE_WITH_ATTACHMENTS { + attachments, err := decoder.Decode() + if err != nil { + return errors.WithStack(err) + } + if v, ok := attachments.(map[interface{}]interface{}); ok { + atta := hessian.ToMapStringString(v) + response.Attachments = atta + } else { + return errors.Errorf("get wrong attachments: %+v", attachments) + } + } + return nil + } + return nil +} + +func buildServerSidePackageBody(pkg *DubboPackage) { + req := pkg.GetBody().([]interface{}) // length of body should be 7 + if len(req) > 0 { + var dubboVersion, argsTypes string + var args []interface{} + var attachments map[string]string + svc := Service{} + if req[0] != nil { + dubboVersion = req[0].(string) + } + if req[1] != nil { + svc.Path = req[1].(string) + } + if req[2] != nil { + svc.Version = req[2].(string) + } + if req[3] != nil { + svc.Method = req[3].(string) + } + if req[4] != nil { + argsTypes = req[4].(string) + } + if req[5] != nil { + args = req[5].([]interface{}) + } + if req[6] != nil { + attachments = req[6].(map[string]string) + } + if svc.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { + svc.Path = attachments[constant.PATH_KEY] + } + if _, ok := attachments[constant.INTERFACE_KEY]; ok { + svc.Interface = attachments[constant.INTERFACE_KEY] + } else { + svc.Interface = svc.Path + } + if len(attachments[constant.GROUP_KEY]) > 0 { + svc.Group = attachments[constant.GROUP_KEY] + } + pkg.SetService(svc) + pkg.SetBody(map[string]interface{}{ + "dubboVersion": dubboVersion, + "argsTypes": argsTypes, + "args": args, + "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key + "attachments": attachments, + }) + } +} + +func init() { + extension.SetSerializer("hessian2", HessianSerializer{}) +} diff --git a/protocol/dubbo/listener.go b/protocol/dubbo/listener.go index 0251b78a2b..31eaed6f71 100644 --- a/protocol/dubbo/listener.go +++ b/protocol/dubbo/listener.go @@ -104,17 +104,10 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { return } - if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - if p.Header.Type&hessian.PackageResponse != 0x00 { - logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) - if p.Err != nil { - logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) - } - h.conn.pool.rpcClient.removePendingResponse(SequenceType(p.Header.ID)) - } else { - logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - p.Header.ResponseStatus = hessian.Response_OK - reply(session, p, hessian.PackageHeartbeat) + if p.Header.Type&PackageHeartbeat != 0x00 { + logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", p.Header, p.Body) + if p.Err != nil { + logger.Errorf("rpc heartbeat response{error: %#v}", p.Err) } return } @@ -137,6 +130,7 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { if pendingResponse.callback == nil { pendingResponse.done <- struct{}{} } else { + logger.Info("proxy service callback") pendingResponse.callback(pendingResponse.GetCallResponse()) } } @@ -228,82 +222,84 @@ func (h *RpcServerHandler) OnMessage(session getty.Session, pkg interface{}) { logger.Errorf("illegal package{%#v}", pkg) return } - p.Header.ResponseStatus = hessian.Response_OK + p.SetResponseStatus(hessian.Response_OK) + //p.Header.ResponseStatus = hessian.Response_OK // heartbeat - if p.Header.Type&hessian.PackageHeartbeat != 0x00 { - logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.Header, p.Service, p.Body) - reply(session, p, hessian.PackageHeartbeat) + if p.GetHeader().Type&PackageHeartbeat != 0x00 { + logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", p.GetHeader(), p.GetService(), p.GetBody()) + h.reply(session, p, PackageHeartbeat) return } twoway := true // not twoway - if p.Header.Type&hessian.PackageRequest_TwoWay == 0x00 { + if p.GetHeader().Type&PackageRequest_TwoWay == 0x00 { twoway = false } defer func() { if e := recover(); e != nil { - p.Header.ResponseStatus = hessian.Response_SERVER_ERROR + p.SetResponseStatus(hessian.Response_SERVER_ERROR) if err, ok := e.(error); ok { logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err)) - p.Body = perrors.WithStack(err) + p.SetBody(perrors.WithStack(err)) } else if err, ok := e.(string); ok { logger.Errorf("OnMessage panic: %+v", perrors.New(err)) - p.Body = perrors.New(err) + p.SetBody(perrors.New(err)) } else { logger.Errorf("OnMessage panic: %+v, this is impossible.", e) - p.Body = e + p.SetBody(e) } if !twoway { return } - reply(session, p, hessian.PackageResponse) + h.reply(session, p, PackageResponse) } }() - u := common.NewURLWithOptions(common.WithPath(p.Service.Path), common.WithParams(url.Values{}), - common.WithParamsValue(constant.GROUP_KEY, p.Service.Group), - common.WithParamsValue(constant.INTERFACE_KEY, p.Service.Interface), - common.WithParamsValue(constant.VERSION_KEY, p.Service.Version)) + u := common.NewURLWithOptions(common.WithPath(p.GetService().Path), common.WithParams(url.Values{}), + common.WithParamsValue(constant.GROUP_KEY, p.GetService().Group), + common.WithParamsValue(constant.INTERFACE_KEY, p.GetService().Interface), + common.WithParamsValue(constant.VERSION_KEY, p.GetService().Version)) exporter, _ := dubboProtocol.ExporterMap().Load(u.ServiceKey()) if exporter == nil { err := fmt.Errorf("don't have this exporter, key: %s", u.ServiceKey()) logger.Errorf(err.Error()) - p.Header.ResponseStatus = hessian.Response_OK - p.Body = err - reply(session, p, hessian.PackageResponse) + p.SetResponseStatus(Response_OK) + p.SetBody(err) + h.reply(session, p, PackageResponse) return } invoker := exporter.(protocol.Exporter).GetInvoker() if invoker != nil { - attachments := p.Body.(map[string]interface{})["attachments"].(map[string]string) + attachments := p.GetBody().(map[string]interface{})["attachments"].(map[string]string) attachments[constant.LOCAL_ADDR] = session.LocalAddr() attachments[constant.REMOTE_ADDR] = session.RemoteAddr() - args := p.Body.(map[string]interface{})["args"].([]interface{}) - inv := invocation.NewRPCInvocation(p.Service.Method, args, attachments) + args := p.GetBody().(map[string]interface{})["args"].([]interface{}) + inv := invocation.NewRPCInvocation(p.GetService().Method, args, attachments) ctx := rebuildCtx(inv) - result := invoker.Invoke(ctx, inv) + logger.Debugf("invoker result: %+v", result) if err := result.Error(); err != nil { - p.Header.ResponseStatus = hessian.Response_OK - p.Body = hessian.NewResponse(nil, err, result.Attachments()) + p.SetResponseStatus(Response_OK) + p.SetBody(&ResponsePayload{nil, err, result.Attachments()}) } else { res := result.Result() - p.Header.ResponseStatus = hessian.Response_OK - p.Body = hessian.NewResponse(res, nil, result.Attachments()) + p.SetResponseStatus(Response_OK) + p.SetBody(&ResponsePayload{res, nil, result.Attachments()}) + //logger.Debugf("service return response %v", res) } } if !twoway { return } - reply(session, p, hessian.PackageResponse) + h.reply(session, p, PackageResponse) } // OnCron ... @@ -347,23 +343,25 @@ func rebuildCtx(inv *invocation.RPCInvocation) context.Context { return ctx } -func reply(session getty.Session, req *DubboPackage, tp hessian.PackageType) { - resp := &DubboPackage{ - Header: hessian.DubboHeader{ - SerialID: req.Header.SerialID, - Type: tp, - ID: req.Header.ID, - ResponseStatus: req.Header.ResponseStatus, - }, +func (h *RpcServerHandler) reply(session getty.Session, req *DubboPackage, tp PackageType) { + header := DubboHeader{ + SerialID: req.GetHeader().SerialID, + Type: tp, + ID: req.GetHeader().ID, + BodyLen: 0, + ResponseStatus: req.GetHeader().ResponseStatus, + } + resp := NewServerResponsePackage(header) + if err := loadSerializer(resp); err != nil { + logger.Errorf("reply error %v", err) + return } - if req.Header.Type&hessian.PackageRequest != 0x00 { - resp.Body = req.Body - } else { - resp.Body = nil + if req.GetHeader().Type&PackageRequest != 0x00 { + resp.SetBody(req.GetBody()) } if err := session.WritePkg(resp, WritePkg_Timeout); err != nil { - logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.Header) + logger.Errorf("WritePkg error: %#v, %#v", perrors.WithStack(err), req.GetHeader()) } } diff --git a/protocol/dubbo/package.go b/protocol/dubbo/package.go new file mode 100644 index 0000000000..15d3b0705e --- /dev/null +++ b/protocol/dubbo/package.go @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +import ( + "bufio" + "bytes" + "fmt" + "time" +) + +import ( + "github.com/pkg/errors" +) + +type DubboHeader struct { + SerialID byte + Type PackageType + ID int64 + BodyLen int + ResponseStatus byte +} + +// Service defines service instance +type Service struct { + Path string + Interface string + Group string + Version string + Method string + Timeout time.Duration // request timeout +} + +type DubboPackage struct { + Header DubboHeader + Service Service + Body interface{} + Err error + codec *DubboCodec +} + +func (p DubboPackage) String() string { + return fmt.Sprintf("HessianPackage: Header-%v, Path-%v, Body-%v", p.Header, p.Service, p.Body) +} + +func (p *DubboPackage) ReadHeader() error { + return p.codec.ReadHeader(&p.Header) +} + +func (p *DubboPackage) Marshal() (*bytes.Buffer, error) { + if p.codec == nil { + return nil, errors.New("codec is nil") + } + pkg, err := p.codec.Write(*p) + if err != nil { + return nil, errors.WithStack(err) + } + return bytes.NewBuffer(pkg), nil +} + +func (p *DubboPackage) Unmarshal() error { + return p.codec.Read(p) +} + +func (p DubboPackage) IsHeartBeat() bool { + return p.Header.Type&PackageHeartbeat != 0 +} + +func (p DubboPackage) IsRequest() bool { + return p.Header.Type&(PackageRequest_TwoWay|PackageRequest) != 0 +} + +func (p DubboPackage) IsResponse() bool { + return p.Header.Type == PackageResponse +} + +func (p DubboPackage) IsResponseWithException() bool { + flag := PackageResponse | PackageResponse_Exception + return p.Header.Type&flag == flag +} + +func (p DubboPackage) GetBodyLen() int { + return p.Header.BodyLen +} + +func (p DubboPackage) GetLen() int { + return HEADER_LENGTH + p.Header.BodyLen +} + +func (p DubboPackage) GetBody() interface{} { + return p.Body +} + +func (p *DubboPackage) SetBody(body interface{}) { + p.Body = body +} + +func (p *DubboPackage) SetHeader(header DubboHeader) { + p.Header = header +} + +func (p *DubboPackage) SetService(svc Service) { + p.Service = svc +} + +func (p *DubboPackage) SetID(id int64) { + p.Header.ID = id +} + +func (p DubboPackage) GetHeader() DubboHeader { + return p.Header +} + +func (p DubboPackage) GetService() Service { + return p.Service +} + +func (p *DubboPackage) SetResponseStatus(status byte) { + p.Header.ResponseStatus = status +} + +func (p *DubboPackage) SetSerializer(serializer Serializer) { + p.codec.SetSerializer(serializer) +} + +func NewClientResponsePackage(data []byte) *DubboPackage { + return &DubboPackage{ + Header: DubboHeader{}, + Service: Service{}, + Body: &ResponsePayload{}, + Err: nil, + codec: NewDubboCodec(bufio.NewReaderSize(bytes.NewBuffer(data), len(data))), + } +} + +// server side receive request package, just for deserialization +func NewServerRequestPackage(data []byte) *DubboPackage { + return &DubboPackage{ + Header: DubboHeader{}, + Service: Service{}, + Body: make([]interface{}, 7), + Err: nil, + codec: NewDubboCodec(bufio.NewReaderSize(bytes.NewBuffer(data), len(data))), + } + +} + +// client side request package, just for serialization +func NewClientRequestPackage(header DubboHeader, svc Service) *DubboPackage { + return &DubboPackage{ + Header: header, + Service: svc, + Body: nil, + Err: nil, + codec: NewDubboCodec(nil), + } +} + +// server side response package, just for serialization +func NewServerResponsePackage(header DubboHeader) *DubboPackage { + return &DubboPackage{ + Header: header, + Body: nil, + Err: nil, + codec: NewDubboCodec(nil), + } +} + +func NewDubboPackage(data *bytes.Buffer) *DubboPackage { + var codec *DubboCodec + if data == nil { + codec = NewDubboCodec(nil) + } else { + codec = NewDubboCodec(bufio.NewReaderSize(data, len(data.Bytes()))) + } + return &DubboPackage{ + Header: DubboHeader{}, + Service: Service{}, + Body: nil, + Err: nil, + codec: codec, + } +} diff --git a/protocol/dubbo/proto.go b/protocol/dubbo/proto.go new file mode 100644 index 0000000000..17e9aeb6f0 --- /dev/null +++ b/protocol/dubbo/proto.go @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +import ( + "bytes" + "fmt" + "io" + "reflect" + "strconv" + "strings" + "time" + "encoding/binary" +) + +import ( + "github.com/pkg/errors" + "github.com/golang/protobuf/proto" + "github.com/matttproud/golang_protobuf_extensions/pbutil" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/constant" + pb "github.com/apache/dubbo-go/protocol/dubbo/proto" +) + +type ProtoSerializer struct{} + +func (p ProtoSerializer) Marshal(pkg DubboPackage) ([]byte, error) { + if pkg.IsHeartBeat() { + return []byte{byte('N')}, nil + } + if pkg.Body == nil { + return nil, errors.New("package body should not be nil") + } + if pkg.IsRequest() { + return marshalRequestProto(pkg) + } + return marshalResponseProto(pkg) +} + +func (p ProtoSerializer) Unmarshal(data []byte, pkg *DubboPackage) error { + if pkg.IsRequest() { + return unmarshalRequestProto(data, pkg) + } + return unmarshalResponseProto(data, pkg) +} + +func unmarshalResponseProto(data []byte, pkg *DubboPackage) error { + if pkg.Body == nil { + pkg.SetBody(NewResponsePayload(nil, nil, nil)) + } + response := EnsureResponsePayload(pkg.Body) + buf := bytes.NewBuffer(data) + + var responseType int32 + if err := readByte(buf, &responseType); err != nil { + return err + } + + hasAttachments := false + hasException := false + switch responseType { + case RESPONSE_VALUE_WITH_ATTACHMENTS: + hasAttachments = true + case RESPONSE_WITH_EXCEPTION: + hasException = true + case RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS: + hasAttachments = true + hasException = true + } + if hasException { + throwable := pb.ThrowableProto{} + if err := readObject(buf, &throwable); err != nil { + return err + } + // generate error only with error message + response.Exception = errors.New(throwable.OriginalMessage) + } else { + // read response body + protoMsg, ok := response.RspObj.(proto.Message) + if !ok { + return errors.New("response rspobj not protobuf message") + } + if err := readObject(buf, protoMsg); err != nil { + return err + } + } + + if hasAttachments { + atta := pb.Map{} + if err := readObject(buf, &atta); err != nil { + return err + } + if response.Attachments == nil { + response.Attachments = atta.Attachments + } else { + for k, v := range atta.Attachments { + response.Attachments[k] = v + } + } + + } + return nil +} + +func unmarshalRequestProto(data []byte, pkg *DubboPackage) error { + var dubboVersion string + var svcPath string + var svcVersion string + var svcMethod string + buf := bytes.NewBuffer(data) + if err := readUTF(buf, &dubboVersion); err != nil { + return err + } + if err := readUTF(buf, &svcPath); err != nil { + return err + } + if err := readUTF(buf, &svcVersion); err != nil { + return err + } + if err := readUTF(buf, &svcMethod); err != nil { + return err + } + // NOTE: protobuf rpc just have exact one parameter, while golang doesn't need this field + var argsType string + if err := readUTF(buf, &argsType); err != nil { + return err + } + // get raw body bytes for proxy methods to unmarshal + var protoMsgLength int + if err := readDelimitedLength(buf, &protoMsgLength); err != nil { + return err + } + argBytes := make([]byte, protoMsgLength) + if n, err := buf.Read(argBytes); err != nil { + if n != protoMsgLength { + return errors.New("illegal msg length") + } + return err + } + // unmarshal attachments + m := &pb.Map{} + if err := readObject(buf, m); err != nil { + return err + } + svc := Service{} + svc.Version = svcVersion + svc.Method = svcMethod + // just as hessian + svc.Path = svcPath + if svc.Path == "" && len(m.Attachments[constant.PATH_KEY]) > 0 { + svc.Path = m.Attachments[constant.PATH_KEY] + } + + if _, ok := m.Attachments[constant.INTERFACE_KEY]; ok { + svc.Interface = m.Attachments[constant.INTERFACE_KEY] + } else { + svc.Interface = svc.Path + } + pkg.SetService(svc) + pkg.SetBody(map[string]interface{}{ + "dubboVersion": dubboVersion, + "args": []interface{}{argBytes}, + "service": common.ServiceMap.GetService(DUBBO, svc.Path), // path as a key + "attachments": m.Attachments, + }) + + return nil +} + +func marshalRequestProto(pkg DubboPackage) ([]byte, error) { + request := EnsureRequestPayload(pkg.Body) + args, ok := request.Params.([]interface{}) + buf := bytes.NewBuffer(make([]byte, 0)) + if !ok { + return nil, errors.New("proto buffer args should be marshaled in []byte") + } + // NOTE: protobuf rpc just has exact one parameter + if len(args) != 1 { + return nil, errors.New("illegal protobuf service, len(arg) should equal 1") + } + // dubbo version + if err := writeUTF(buf, DUBBO_VERSION); err != nil { + return nil, err + } + // service path + if err := writeUTF(buf, pkg.Service.Path); err != nil { + return nil, err + } + // service version + if err := writeUTF(buf, pkg.Service.Version); err != nil { + return nil, err + } + // service method + if err := writeUTF(buf, pkg.Service.Method); err != nil { + return nil, err + } + // parameter types desc + v := reflect.ValueOf(args[0]) + mv := v.MethodByName("JavaClassName") + if mv.IsValid() { + javaCls := mv.Call([]reflect.Value{}) + if len(javaCls) != 1 { + return nil, errors.New("JavaStringName method should return exact 1 result") + } + javaClsStr, ok := javaCls[0].Interface().(string) + if !ok { + return nil, errors.New("JavaClassName method should return string") + } + if err := writeUTF(buf, getJavaArgType(javaClsStr)); err != nil { + return nil, err + } + } else { + // defensive code + if err := writeUTF(buf, ""); err != nil { + return nil, err + } + } + // consumer args + protoMsg := args[0].(proto.Message) + if err := writeObject(buf, protoMsg); err != nil { + return nil, err + } + // attachments + atta := make(map[string]string) + atta[PATH_KEY] = pkg.Service.Path + atta[VERSION_KEY] = pkg.Service.Version + if len(pkg.Service.Group) > 0 { + atta[GROUP_KEY] = pkg.Service.Group + } + if len(pkg.Service.Interface) > 0 { + atta[INTERFACE_KEY] = pkg.Service.Interface + } + if pkg.Service.Timeout != 0 { + atta[TIMEOUT_KEY] = strconv.Itoa(int(pkg.Service.Timeout / time.Millisecond)) + } + m := pb.Map{Attachments: atta} + if err := writeObject(buf, &m); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func marshalResponseProto(pkg DubboPackage) ([]byte, error) { + response := EnsureResponsePayload(pkg.Body) + buf := bytes.NewBuffer(make([]byte, 0)) + responseType := RESPONSE_VALUE + hasAttachments := false + if response.Attachments != nil { + responseType = RESPONSE_VALUE_WITH_ATTACHMENTS + hasAttachments = true + } else { + responseType = RESPONSE_VALUE + } + if response.Exception != nil { + if hasAttachments { + responseType = RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS + } else { + responseType = RESPONSE_WITH_EXCEPTION + } + } + // write response type + if err := writeByte(buf, responseType); err != nil { + return nil, err + } + if response.Exception != nil { + // deal with exception + throwable := pb.ThrowableProto{OriginalMessage: response.Exception.Error()} + if err := writeObject(buf, &throwable); err != nil { + return nil, err + } + } else { + res, ok := response.RspObj.(proto.Message) + if !ok { + return nil, errors.New("proto buffer params should be marshaled in proto.Message") + } + // response body + if err := writeObject(buf, res); err != nil { + return nil, err + } + } + + if hasAttachments { + attachments := pb.Map{Attachments: response.Attachments} + if err := writeObject(buf, &attachments); err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +func init() { + extension.SetSerializer("protobuf", ProtoSerializer{}) +} + +func getJavaArgType(javaClsName string) string { + return fmt.Sprintf("L%s;", strings.ReplaceAll(javaClsName, ".", "/")) +} + +func writeUTF(writer io.Writer, value string) error { + _, err := pbutil.WriteDelimited(writer, &pb.StringValue{Value: value}) + return err +} + +func writeObject(writer io.Writer, value proto.Message) error { + _, err := pbutil.WriteDelimited(writer, value) + return err +} + +func writeByte(writer io.Writer, v int32) error { + i32v := &pb.Int32Value{Value: v} + _, err := pbutil.WriteDelimited(writer, i32v) + return err +} + +func readUTF(reader io.Reader, value *string) error { + sv := &pb.StringValue{} + _, err := pbutil.ReadDelimited(reader, sv) + if err != nil { + return err + } + *value = sv.Value + return nil +} + +func readObject(reader io.Reader, value proto.Message) error { + _, err := pbutil.ReadDelimited(reader, value) + if err != nil { + return err + } + return nil +} + +// just as java protobuf serialize +func readByte(reader io.Reader, value *int32) error { + i32v := &pb.Int32Value{} + _, err := pbutil.ReadDelimited(reader, i32v) + if err != nil { + return err + } + *value = i32v.Value + return nil +} + +// +func readDelimitedLength(reader io.Reader, length *int) error { + var headerBuf [binary.MaxVarintLen32]byte + var bytesRead, varIntBytes int + var messageLength uint64 + for varIntBytes == 0 { // i.e. no varint has been decoded yet. + if bytesRead >= len(headerBuf) { + return errors.New("invalid varint32 encountered") + } + // We have to read byte by byte here to avoid reading more bytes + // than required. Each read byte is appended to what we have + // read before. + newBytesRead, err := reader.Read(headerBuf[bytesRead : bytesRead+1]) + if newBytesRead == 0 { + if err != nil { + return err + } + // A Reader should not return (0, nil), but if it does, + // it should be treated as no-op (according to the + // Reader contract). So let's go on... + continue + } + bytesRead += newBytesRead + // Now present everything read so far to the varint decoder and + // see if a varint can be decoded already. + messageLength, varIntBytes = proto.DecodeVarint(headerBuf[:bytesRead]) + } + *length = int(messageLength) + return nil +} diff --git a/protocol/dubbo/proto/payload.pb.go b/protocol/dubbo/proto/payload.pb.go new file mode 100644 index 0000000000..eeca59a2ee --- /dev/null +++ b/protocol/dubbo/proto/payload.pb.go @@ -0,0 +1,328 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: proto/payload.proto + +package payload + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// equivalent java StringValue +type StringValue struct { + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StringValue) Reset() { *m = StringValue{} } +func (m *StringValue) String() string { return proto.CompactTextString(m) } +func (*StringValue) ProtoMessage() {} +func (*StringValue) Descriptor() ([]byte, []int) { + return fileDescriptor_434bbf44284586dc, []int{0} +} + +func (m *StringValue) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StringValue.Unmarshal(m, b) +} +func (m *StringValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StringValue.Marshal(b, m, deterministic) +} +func (m *StringValue) XXX_Merge(src proto.Message) { + xxx_messageInfo_StringValue.Merge(m, src) +} +func (m *StringValue) XXX_Size() int { + return xxx_messageInfo_StringValue.Size(m) +} +func (m *StringValue) XXX_DiscardUnknown() { + xxx_messageInfo_StringValue.DiscardUnknown(m) +} + +var xxx_messageInfo_StringValue proto.InternalMessageInfo + +func (m *StringValue) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +// equivalent java Int32Value +type Int32Value struct { + Value int32 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Int32Value) Reset() { *m = Int32Value{} } +func (m *Int32Value) String() string { return proto.CompactTextString(m) } +func (*Int32Value) ProtoMessage() {} +func (*Int32Value) Descriptor() ([]byte, []int) { + return fileDescriptor_434bbf44284586dc, []int{1} +} + +func (m *Int32Value) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Int32Value.Unmarshal(m, b) +} +func (m *Int32Value) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Int32Value.Marshal(b, m, deterministic) +} +func (m *Int32Value) XXX_Merge(src proto.Message) { + xxx_messageInfo_Int32Value.Merge(m, src) +} +func (m *Int32Value) XXX_Size() int { + return xxx_messageInfo_Int32Value.Size(m) +} +func (m *Int32Value) XXX_DiscardUnknown() { + xxx_messageInfo_Int32Value.DiscardUnknown(m) +} + +var xxx_messageInfo_Int32Value proto.InternalMessageInfo + +func (m *Int32Value) GetValue() int32 { + if m != nil { + return m.Value + } + return 0 +} + +// equivalent java MapValue +type Map struct { + Attachments map[string]string `protobuf:"bytes,1,rep,name=attachments,proto3" json:"attachments,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Map) Reset() { *m = Map{} } +func (m *Map) String() string { return proto.CompactTextString(m) } +func (*Map) ProtoMessage() {} +func (*Map) Descriptor() ([]byte, []int) { + return fileDescriptor_434bbf44284586dc, []int{2} +} + +func (m *Map) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Map.Unmarshal(m, b) +} +func (m *Map) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Map.Marshal(b, m, deterministic) +} +func (m *Map) XXX_Merge(src proto.Message) { + xxx_messageInfo_Map.Merge(m, src) +} +func (m *Map) XXX_Size() int { + return xxx_messageInfo_Map.Size(m) +} +func (m *Map) XXX_DiscardUnknown() { + xxx_messageInfo_Map.DiscardUnknown(m) +} + +var xxx_messageInfo_Map proto.InternalMessageInfo + +func (m *Map) GetAttachments() map[string]string { + if m != nil { + return m.Attachments + } + return nil +} + +// copied from dubbo GenericProtobufObjectOutput.java +// Messages used for transporting debug information between server and client. +// An element in a stack trace, based on the Java type of the same name. +// +// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/StackTraceElement.html +type StackTraceElementProto struct { + // The fully qualified name of the class containing the execution point + // represented by the stack trace element. + ClassName string `protobuf:"bytes,1,opt,name=class_name,json=className,proto3" json:"class_name,omitempty"` + // The name of the method containing the execution point represented by the + // stack trace element + MethodName string `protobuf:"bytes,2,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"` + // The name of the file containing the execution point represented by the + // stack trace element, or null if this information is unavailable. + FileName string `protobuf:"bytes,3,opt,name=file_name,json=fileName,proto3" json:"file_name,omitempty"` + // The line number of the source line containing the execution point represented + // by this stack trace element, or a negative number if this information is + // unavailable. + LineNumber int32 `protobuf:"varint,4,opt,name=line_number,json=lineNumber,proto3" json:"line_number,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *StackTraceElementProto) Reset() { *m = StackTraceElementProto{} } +func (m *StackTraceElementProto) String() string { return proto.CompactTextString(m) } +func (*StackTraceElementProto) ProtoMessage() {} +func (*StackTraceElementProto) Descriptor() ([]byte, []int) { + return fileDescriptor_434bbf44284586dc, []int{3} +} + +func (m *StackTraceElementProto) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_StackTraceElementProto.Unmarshal(m, b) +} +func (m *StackTraceElementProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_StackTraceElementProto.Marshal(b, m, deterministic) +} +func (m *StackTraceElementProto) XXX_Merge(src proto.Message) { + xxx_messageInfo_StackTraceElementProto.Merge(m, src) +} +func (m *StackTraceElementProto) XXX_Size() int { + return xxx_messageInfo_StackTraceElementProto.Size(m) +} +func (m *StackTraceElementProto) XXX_DiscardUnknown() { + xxx_messageInfo_StackTraceElementProto.DiscardUnknown(m) +} + +var xxx_messageInfo_StackTraceElementProto proto.InternalMessageInfo + +func (m *StackTraceElementProto) GetClassName() string { + if m != nil { + return m.ClassName + } + return "" +} + +func (m *StackTraceElementProto) GetMethodName() string { + if m != nil { + return m.MethodName + } + return "" +} + +func (m *StackTraceElementProto) GetFileName() string { + if m != nil { + return m.FileName + } + return "" +} + +func (m *StackTraceElementProto) GetLineNumber() int32 { + if m != nil { + return m.LineNumber + } + return 0 +} + +// An exception that was thrown by some code, based on the Java type of the same name. +// +// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Throwable.html +type ThrowableProto struct { + // The name of the class of the exception that was actually thrown. Downstream readers + // of this message may or may not have the actual class available to initialize, so + // this is just used to prefix the message of a generic exception type. + OriginalClassName string `protobuf:"bytes,1,opt,name=original_class_name,json=originalClassName,proto3" json:"original_class_name,omitempty"` + // The message of this throwable. Not filled if there is no message. + OriginalMessage string `protobuf:"bytes,2,opt,name=original_message,json=originalMessage,proto3" json:"original_message,omitempty"` + // The stack trace of this Throwable. + StackTrace []*StackTraceElementProto `protobuf:"bytes,3,rep,name=stack_trace,json=stackTrace,proto3" json:"stack_trace,omitempty"` + // The cause of this Throwable. Not filled if there is no cause. + Cause *ThrowableProto `protobuf:"bytes,4,opt,name=cause,proto3" json:"cause,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ThrowableProto) Reset() { *m = ThrowableProto{} } +func (m *ThrowableProto) String() string { return proto.CompactTextString(m) } +func (*ThrowableProto) ProtoMessage() {} +func (*ThrowableProto) Descriptor() ([]byte, []int) { + return fileDescriptor_434bbf44284586dc, []int{4} +} + +func (m *ThrowableProto) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ThrowableProto.Unmarshal(m, b) +} +func (m *ThrowableProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ThrowableProto.Marshal(b, m, deterministic) +} +func (m *ThrowableProto) XXX_Merge(src proto.Message) { + xxx_messageInfo_ThrowableProto.Merge(m, src) +} +func (m *ThrowableProto) XXX_Size() int { + return xxx_messageInfo_ThrowableProto.Size(m) +} +func (m *ThrowableProto) XXX_DiscardUnknown() { + xxx_messageInfo_ThrowableProto.DiscardUnknown(m) +} + +var xxx_messageInfo_ThrowableProto proto.InternalMessageInfo + +func (m *ThrowableProto) GetOriginalClassName() string { + if m != nil { + return m.OriginalClassName + } + return "" +} + +func (m *ThrowableProto) GetOriginalMessage() string { + if m != nil { + return m.OriginalMessage + } + return "" +} + +func (m *ThrowableProto) GetStackTrace() []*StackTraceElementProto { + if m != nil { + return m.StackTrace + } + return nil +} + +func (m *ThrowableProto) GetCause() *ThrowableProto { + if m != nil { + return m.Cause + } + return nil +} + +func init() { + proto.RegisterType((*StringValue)(nil), "StringValue") + proto.RegisterType((*Int32Value)(nil), "Int32Value") + proto.RegisterType((*Map)(nil), "Map") + proto.RegisterMapType((map[string]string)(nil), "Map.AttachmentsEntry") + proto.RegisterType((*StackTraceElementProto)(nil), "StackTraceElementProto") + proto.RegisterType((*ThrowableProto)(nil), "ThrowableProto") +} + +func init() { proto.RegisterFile("proto/payload.proto", fileDescriptor_434bbf44284586dc) } + +var fileDescriptor_434bbf44284586dc = []byte{ + // 353 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0x4f, 0x4f, 0xea, 0x40, + 0x14, 0xc5, 0x53, 0xfa, 0x78, 0x79, 0xdc, 0x26, 0x0f, 0x1c, 0xfc, 0xd3, 0x68, 0x8c, 0xa4, 0xc6, + 0x04, 0x37, 0x35, 0x81, 0x85, 0xc4, 0x85, 0x89, 0x31, 0x2c, 0x5c, 0x40, 0x4c, 0x21, 0x6e, 0x9b, + 0x4b, 0x19, 0xa1, 0x61, 0x3a, 0x6d, 0x66, 0x06, 0x0d, 0x1b, 0x3f, 0x86, 0x9f, 0xca, 0x0f, 0x65, + 0x66, 0xc6, 0x02, 0x2a, 0xbb, 0x99, 0xdf, 0x39, 0xbd, 0x3d, 0xf7, 0x64, 0xa0, 0x59, 0x88, 0x5c, + 0xe5, 0x57, 0x05, 0xae, 0x58, 0x8e, 0xd3, 0xd0, 0xdc, 0x82, 0x73, 0xf0, 0x46, 0x4a, 0xa4, 0x7c, + 0xf6, 0x84, 0x6c, 0x49, 0xc9, 0x3e, 0x54, 0x5f, 0xf4, 0xc1, 0x77, 0x5a, 0x4e, 0xbb, 0x16, 0xd9, + 0x4b, 0x10, 0x00, 0x3c, 0x70, 0xd5, 0xed, 0xec, 0xf0, 0x54, 0x4b, 0xcf, 0x1b, 0xb8, 0x03, 0x2c, + 0xc8, 0x35, 0x78, 0xa8, 0x14, 0x26, 0xf3, 0x8c, 0x72, 0x25, 0x7d, 0xa7, 0xe5, 0xb6, 0xbd, 0xce, + 0x41, 0x38, 0xc0, 0x22, 0xbc, 0xdb, 0xf0, 0x3e, 0x57, 0x62, 0x15, 0x6d, 0x3b, 0x8f, 0x6f, 0xa1, + 0xf1, 0xd3, 0x40, 0x1a, 0xe0, 0x2e, 0xe8, 0xea, 0x2b, 0x8b, 0x3e, 0x6e, 0xfe, 0x5d, 0xd9, 0xca, + 0x77, 0x53, 0xe9, 0x39, 0xc1, 0xbb, 0x03, 0x87, 0x23, 0x85, 0xc9, 0x62, 0x2c, 0x30, 0xa1, 0x7d, + 0x46, 0xf5, 0x9c, 0x47, 0xbd, 0x23, 0x39, 0x05, 0x48, 0x18, 0x4a, 0x19, 0x73, 0xcc, 0xca, 0xcd, + 0x6a, 0x86, 0x0c, 0x31, 0xa3, 0xe4, 0x0c, 0xbc, 0x8c, 0xaa, 0x79, 0x3e, 0xb5, 0xba, 0x9d, 0x0c, + 0x16, 0x19, 0xc3, 0x09, 0xd4, 0x9e, 0x53, 0x46, 0xad, 0xec, 0x1a, 0xf9, 0x9f, 0x06, 0xe5, 0xd7, + 0x2c, 0xe5, 0x34, 0xe6, 0xcb, 0x6c, 0x42, 0x85, 0xff, 0xc7, 0x74, 0x02, 0x1a, 0x0d, 0x0d, 0x09, + 0x3e, 0x1c, 0xf8, 0x3f, 0x9e, 0x8b, 0xfc, 0x15, 0x27, 0x8c, 0xda, 0x40, 0x21, 0x34, 0x73, 0x91, + 0xce, 0x52, 0x8e, 0x2c, 0xfe, 0x95, 0x6c, 0xaf, 0x94, 0xee, 0xd7, 0x09, 0x2f, 0xa1, 0xb1, 0xf6, + 0x67, 0x54, 0x4a, 0x9c, 0x95, 0x31, 0xeb, 0x25, 0x1f, 0x58, 0x4c, 0x7a, 0xe0, 0x49, 0xdd, 0x42, + 0xac, 0x74, 0x0d, 0xbe, 0x6b, 0xfa, 0x3f, 0x0a, 0x77, 0x37, 0x13, 0x81, 0x5c, 0x73, 0x72, 0x01, + 0xd5, 0x04, 0x97, 0x92, 0x9a, 0x15, 0xbc, 0x4e, 0x3d, 0xfc, 0x1e, 0x3a, 0xb2, 0xea, 0xe4, 0xaf, + 0x79, 0x37, 0xdd, 0xcf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x04, 0x4d, 0x68, 0x3a, 0x4e, 0x02, 0x00, + 0x00, +} diff --git a/protocol/dubbo/proto/payload.proto b/protocol/dubbo/proto/payload.proto new file mode 100644 index 0000000000..19f644ee91 --- /dev/null +++ b/protocol/dubbo/proto/payload.proto @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +// equivalent java StringValue +message StringValue { + string value = 1; +} + +// equivalent java Int32Value +message Int32Value { + int32 value = 1; +} + +// equivalent java MapValue +message Map { + map attachments = 1; +} + +// copied from dubbo GenericProtobufObjectOutput.java +// Messages used for transporting debug information between server and client. +// An element in a stack trace, based on the Java type of the same name. +// +// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/StackTraceElement.html +message StackTraceElementProto { + // The fully qualified name of the class containing the execution point + // represented by the stack trace element. + string class_name = 1; + + // The name of the method containing the execution point represented by the + // stack trace element + string method_name = 2; + + // The name of the file containing the execution point represented by the + // stack trace element, or null if this information is unavailable. + string file_name = 3; + + // The line number of the source line containing the execution point represented + // by this stack trace element, or a negative number if this information is + // unavailable. + int32 line_number = 4; +} + +// An exception that was thrown by some code, based on the Java type of the same name. +// +// See: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Throwable.html +message ThrowableProto { + // The name of the class of the exception that was actually thrown. Downstream readers + // of this message may or may not have the actual class available to initialize, so + // this is just used to prefix the message of a generic exception type. + string original_class_name = 1; + + // The message of this throwable. Not filled if there is no message. + string original_message = 2; + + // The stack trace of this Throwable. + repeated StackTraceElementProto stack_trace = 3; + + // The cause of this Throwable. Not filled if there is no cause. + ThrowableProto cause = 4; +} + + diff --git a/protocol/dubbo/readwriter.go b/protocol/dubbo/readwriter.go index b5c4f50919..9841b81947 100644 --- a/protocol/dubbo/readwriter.go +++ b/protocol/dubbo/readwriter.go @@ -18,7 +18,6 @@ package dubbo import ( - "bytes" "reflect" ) @@ -29,8 +28,8 @@ import ( ) import ( - "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" ) @@ -49,42 +48,58 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { } func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - pkg := &DubboPackage{} - - buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf, p.client) - if err != nil { + pkg := NewClientResponsePackage(data) + if err := pkg.ReadHeader(); err != nil { originErr := perrors.Cause(err) if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { return nil, 0, nil } - - logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) - + logger.Errorf("[RpcClientPackageHandler.Read] ss:%+v, len(@data):%d) = error:%+v ", ss, len(data), err) return nil, 0, perrors.WithStack(err) } + if pkg.IsHeartBeat() { + // heartbeat package doesn't need deserialize + return pkg, pkg.GetLen(), nil + } +<<<<<<< HEAD if pkg.Header.Type&hessian.PackageRequest == 0x00 { pkg.Err = pkg.Body.(*hessian.Response).Exception pkg.Body = NewResponse(pkg.Body.(*hessian.Response).RspObj, pkg.Body.(*hessian.Response).Attachments) +======= + if err := loadSerializer(pkg); err != nil { + return nil, 0, err +>>>>>>> feature: support protobuf } - return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil + // load response + pendingRsp, ok := p.client.pendingResponses.Load(SequenceType(pkg.GetHeader().ID)) + if !ok { + return nil, 0, perrors.Errorf("client.GetPendingResopnse(%v) = nil", pkg.GetHeader().ID) + } + // set package body + body := NewResponsePayload(pendingRsp.(*PendingResponse).response.reply, nil, nil) + pkg.SetBody(body) + err := pkg.Unmarshal() + if err != nil { + return nil, 0, perrors.WithStack(err) + } + resp := pkg.Body.(*ResponsePayload) + pkg.Err = resp.Exception + pkg.Body = NewResponse(resp.RspObj, resp.Attachments) + return pkg, pkg.GetLen(), nil } func (p *RpcClientPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { req, ok := pkg.(*DubboPackage) if !ok { - logger.Errorf("illegal pkg:%+v\n", pkg) return nil, perrors.New("invalid rpc request") } - buf, err := req.Marshal() if err != nil { logger.Warnf("binary.Write(req{%#v}) = err{%#v}", req, perrors.WithStack(err)) return nil, perrors.WithStack(err) } - return buf.Bytes(), nil } @@ -96,16 +111,33 @@ var ( rpcServerPkgHandler = &RpcServerPackageHandler{} ) +<<<<<<< HEAD // RpcServerPackageHandler ... type RpcServerPackageHandler struct{} +======= +type RpcServerPackageHandler struct { +} +>>>>>>> feature: support protobuf func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { - pkg := &DubboPackage{ - Body: make([]interface{}, 7), + pkg := NewServerRequestPackage(data) + if err := pkg.ReadHeader(); err != nil { + originErr := perrors.Cause(err) + if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { + return nil, 0, nil + } + return nil, 0, perrors.WithStack(err) + } + + if pkg.IsHeartBeat() { + return pkg, pkg.GetLen(), nil + } + + if err := loadSerializer(pkg); err != nil { + return nil, 0, err } - buf := bytes.NewBuffer(data) - err := pkg.Unmarshal(buf) + err := pkg.Unmarshal() if err != nil { originErr := perrors.Cause(err) if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { @@ -113,60 +145,9 @@ func (p *RpcServerPackageHandler) Read(ss getty.Session, data []byte) (interface } logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) - return nil, 0, perrors.WithStack(err) } - - if pkg.Header.Type&hessian.PackageHeartbeat == 0x00 { - // convert params of request - req := pkg.Body.([]interface{}) // length of body should be 7 - if len(req) > 0 { - var dubboVersion, argsTypes string - var args []interface{} - var attachments map[string]string - if req[0] != nil { - dubboVersion = req[0].(string) - } - if req[1] != nil { - pkg.Service.Path = req[1].(string) - } - if req[2] != nil { - pkg.Service.Version = req[2].(string) - } - if req[3] != nil { - pkg.Service.Method = req[3].(string) - } - if req[4] != nil { - argsTypes = req[4].(string) - } - if req[5] != nil { - args = req[5].([]interface{}) - } - if req[6] != nil { - attachments = req[6].(map[string]string) - } - if pkg.Service.Path == "" && len(attachments[constant.PATH_KEY]) > 0 { - pkg.Service.Path = attachments[constant.PATH_KEY] - } - if _, ok := attachments[constant.INTERFACE_KEY]; ok { - pkg.Service.Interface = attachments[constant.INTERFACE_KEY] - } else { - pkg.Service.Interface = pkg.Service.Path - } - if len(attachments[constant.GROUP_KEY]) > 0 { - pkg.Service.Group = attachments[constant.GROUP_KEY] - } - pkg.Body = map[string]interface{}{ - "dubboVersion": dubboVersion, - "argsTypes": argsTypes, - "args": args, - "service": common.ServiceMap.GetService(DUBBO, pkg.Service.Path), // path as a key - "attachments": attachments, - } - } - } - - return pkg, hessian.HEADER_LENGTH + pkg.Header.BodyLen, nil + return pkg, pkg.GetLen(), nil } func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) { @@ -175,12 +156,24 @@ func (p *RpcServerPackageHandler) Write(ss getty.Session, pkg interface{}) ([]by logger.Errorf("illegal pkg:%+v\n, it is %+v", pkg, reflect.TypeOf(pkg)) return nil, perrors.New("invalid rpc response") } - buf, err := res.Marshal() if err != nil { logger.Warnf("binary.Write(res{%#v}) = err{%#v}", res, perrors.WithStack(err)) return nil, perrors.WithStack(err) } - return buf.Bytes(), nil } + +func loadSerializer(p *DubboPackage) error { + // NOTE: default serialID is S_Hessian + serialID := p.Header.SerialID + if serialID == 0 { + serialID = constant.S_Hessian2 + } + serializer, err := extension.GetSerializerById(serialID) + if err != nil { + return err + } + p.SetSerializer(serializer.(Serializer)) + return nil +} diff --git a/protocol/dubbo/request.go b/protocol/dubbo/request.go new file mode 100644 index 0000000000..54568dfc43 --- /dev/null +++ b/protocol/dubbo/request.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +type RequestPayload struct { + Params interface{} + Attachments map[string]string +} + +func NewRequestPayload(args interface{}, atta map[string]string) *RequestPayload { + if atta == nil { + atta = make(map[string]string) + } + return &RequestPayload{ + Params: args, + Attachments: atta, + } +} + +func EnsureRequestPayload(body interface{}) *RequestPayload { + if req, ok := body.(*RequestPayload); ok { + return req + } + return NewRequestPayload(body, nil) +} diff --git a/protocol/dubbo/response.go b/protocol/dubbo/response.go new file mode 100644 index 0000000000..95dcc98411 --- /dev/null +++ b/protocol/dubbo/response.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dubbo + +type ResponsePayload struct { + RspObj interface{} + Exception error + Attachments map[string]string +} + +// NewResponse create a new ResponsePayload +func NewResponsePayload(rspObj interface{}, exception error, attachments map[string]string) *ResponsePayload { + if attachments == nil { + attachments = make(map[string]string) + } + return &ResponsePayload{ + RspObj: rspObj, + Exception: exception, + Attachments: attachments, + } +} + +func EnsureResponsePayload(body interface{}) *ResponsePayload { + if res, ok := body.(*ResponsePayload); ok { + return res + } + if exp, ok := body.(error); ok { + return NewResponsePayload(nil, exp, nil) + } + return NewResponsePayload(body, nil, nil) +} diff --git a/protocol/dubbo/serialize.go b/protocol/dubbo/serialize.go new file mode 100644 index 0000000000..4da39474af --- /dev/null +++ b/protocol/dubbo/serialize.go @@ -0,0 +1,6 @@ +package dubbo + +type Serializer interface { + Marshal(p DubboPackage) ([]byte, error) + Unmarshal([]byte, *DubboPackage) error +} diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index e13443d57d..c5252a92e4 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -218,7 +218,8 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen //Interested register to dataconfig. r.dataListener.AddInterestedURL(conf) for _, v := range strings.Split(conf.GetParam(constant.CATEGORY_KEY, constant.DEFAULT_CATEGORY), ",") { - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+v, url.QueryEscape(conf.Service())), r.dataListener) + u := common.URL{Path: fmt.Sprintf("/dubbo/%s/"+v, url.QueryEscape(conf.Service()))} + go r.listener.ListenServiceEvent(u.Path, r.dataListener) } return zkListener, nil