diff --git a/go.mod b/go.mod index 09bb4fab1..e04c13223 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,9 @@ require ( github.com/stretchr/testify v1.7.1 go.uber.org/atomic v1.9.0 go.uber.org/zap v1.21.0 + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10 vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect ) diff --git a/go.sum b/go.sum index 33db763d0..a2f15438c 100644 --- a/go.sum +++ b/go.sum @@ -925,8 +925,9 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211105192438-b53810dc28af/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -945,8 +946,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1017,9 +1019,12 @@ golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220412211240-33da011f77ad h1:ntjMns5wyP/fN65tdBD4g8J5w8n015+iIIs9rtjXkY0= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/imports/imports.go b/pkg/client/client.go similarity index 87% rename from pkg/imports/imports.go rename to pkg/client/client.go index 6ee568a9d..0637e8c7a 100644 --- a/pkg/imports/imports.go +++ b/pkg/client/client.go @@ -15,10 +15,16 @@ * limitations under the License. */ -package imports +package client import ( - _ "github.com/seata/seata-go/pkg/remoting/getty" + _ "github.com/seata/seata-go/pkg/integration" _ "github.com/seata/seata-go/pkg/remoting/processor/client" _ "github.com/seata/seata-go/pkg/rm/tcc" ) + +// Init init seata client +func Init() { + initRmClient() + initTmClient() +} diff --git a/pkg/tc/tc_server.go b/pkg/client/rm_client.go similarity index 91% rename from pkg/tc/tc_server.go rename to pkg/client/rm_client.go index a97d29696..c48e2e014 100644 --- a/pkg/tc/tc_server.go +++ b/pkg/client/rm_client.go @@ -15,8 +15,8 @@ * limitations under the License. */ -package main +package client -func main() { - // start seata server +// InitRmClient init seata rm client +func initRmClient() { } diff --git a/pkg/client/tm_client.go b/pkg/client/tm_client.go new file mode 100644 index 000000000..b58f4230e --- /dev/null +++ b/pkg/client/tm_client.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "sync" + + "github.com/seata/seata-go/pkg/remoting/getty" +) + +var onceInitTmClient sync.Once + +// InitTmClient init seata tm client +func initTmClient() { + onceInitTmClient.Do(func() { + initConfig() + initRemoting() + }) +} + +// todo +// initConfig init config processor +func initConfig() { +} + +// initRemoting init rpc client +func initRemoting() { + getty.InitRpcClient() +} diff --git a/pkg/integration/dubbo/dubbo_transaction_filter.go b/pkg/integration/dubbo/dubbo_transaction_filter.go index c6bb1e6b3..4e7954537 100644 --- a/pkg/integration/dubbo/dubbo_transaction_filter.go +++ b/pkg/integration/dubbo/dubbo_transaction_filter.go @@ -22,6 +22,7 @@ import ( "strings" "sync" + "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/protocol" "github.com/seata/seata-go/pkg/common" @@ -34,6 +35,10 @@ var ( once sync.Once ) +func InitSeataDubbo() { + extension.SetFilter(common.SeataFilterKey, GetDubboTransactionFilter) +} + type Filter interface { } diff --git a/pkg/integration/integration.go b/pkg/integration/integration.go index 9f86ea900..e571944ff 100644 --- a/pkg/integration/integration.go +++ b/pkg/integration/integration.go @@ -18,12 +18,9 @@ package integration import ( - "dubbo.apache.org/dubbo-go/v3/common/extension" - - "github.com/seata/seata-go/pkg/common" "github.com/seata/seata-go/pkg/integration/dubbo" ) -func UseDubbo() { - extension.SetFilter(common.SeataFilterKey, dubbo.GetDubboTransactionFilter) +func init() { + dubbo.InitSeataDubbo() } diff --git a/pkg/protocol/codec/global_report_request_codec.go b/pkg/protocol/codec/global_report_request_codec.go index f47107c3b..817af8628 100644 --- a/pkg/protocol/codec/global_report_request_codec.go +++ b/pkg/protocol/codec/global_report_request_codec.go @@ -17,22 +17,40 @@ package codec -//func init() { -// GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalReportRequestCodec{}) -//} -// -//type GlobalReportRequestCodec struct { -// CommonGlobalEndRequestCodec -//} -// -//func (g *GlobalReportRequestCodec) Decode(in []byte) interface{} { -// req := g.CommonGlobalEndRequestCodec.Decode(in) -// abstractGlobalEndRequest := req.(message.AbstractGlobalEndRequest) -// return message.GlobalCommitRequest{ -// AbstractGlobalEndRequest: abstractGlobalEndRequest, -// } -//} -// -//func (g *GlobalReportRequestCodec) GetMessageType() message.MessageType { -// return message.MessageType_GlobalCommit -//} +import ( + "github.com/seata/seata-go/pkg/common/bytes" + "github.com/seata/seata-go/pkg/protocol/message" +) + +type GlobalReportRequestCodec struct { + CommonGlobalEndRequestCodec +} + +// Decode decode global report request +func (g *GlobalReportRequestCodec) Decode(in []byte) interface{} { + data := message.AbstractGlobalEndRequest{} + buf := bytes.NewByteBuffer(in) + + data.Xid = bytes.ReadString16Length(buf) + data.ExtraData = []byte(bytes.ReadString16Length(buf)) + status := bytes.ReadByte(buf) + + return message.GlobalReportRequest{ + AbstractGlobalEndRequest: data, + GlobalStatus: message.GlobalStatus(status), + } +} + +// Encode encode global report request +func (g *GlobalReportRequestCodec) Encode(in interface{}) []byte { + req := in.(message.GlobalReportRequest) + b := g.CommonGlobalEndRequestCodec.Encode(req.AbstractGlobalEndRequest) + buf := bytes.NewByteBuffer(b) + buf.WriteByte(byte(req.GlobalStatus)) + return buf.Bytes() +} + +// GetMessageType get global report request's message type +func (g *GlobalReportRequestCodec) GetMessageType() message.MessageType { + return message.MessageType_GlobalReportResult +} diff --git a/pkg/protocol/codec/global_report_request_codec_test.go b/pkg/protocol/codec/global_report_request_codec_test.go new file mode 100644 index 000000000..a06a7e2c4 --- /dev/null +++ b/pkg/protocol/codec/global_report_request_codec_test.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package codec + +import ( + "testing" + + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/stretchr/testify/assert" +) + +func TestGlobalReportRequestCodec(t *testing.T) { + msg := message.GlobalReportRequest{ + AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ + Xid: "test-transaction-id", + ExtraData: []byte("TestExtraData"), + }, + GlobalStatus: message.GlobalStatusBegin, + } + + codec := GlobalReportRequestCodec{} + bytes := codec.Encode(msg) + msg2 := codec.Decode(bytes) + + assert.Equal(t, msg, msg2) +} diff --git a/pkg/protocol/message/constant.go b/pkg/protocol/message/constant.go index 071f9cfca..44f993b5a 100644 --- a/pkg/protocol/message/constant.go +++ b/pkg/protocol/message/constant.go @@ -22,7 +22,7 @@ var MAGIC_CODE_BYTES = [2]byte{0xda, 0xda} type ( MessageType int GettyRequestType byte - GlobalStatus int64 + GlobalStatus byte ) const ( @@ -141,6 +141,11 @@ const ( * the constant TYPE_HEARTBEAT_MSG */ MessageType_HeartbeatMsg MessageType = 120 + + /** + * the constant MessageType_BatchResultMsg + */ + MessageType_BatchResultMsg MessageType = 121 ) const ( diff --git a/pkg/protocol/message/request_message.go b/pkg/protocol/message/request_message.go index eeb0a0910..27b02b157 100644 --- a/pkg/protocol/message/request_message.go +++ b/pkg/protocol/message/request_message.go @@ -108,7 +108,7 @@ type GlobalReportRequest struct { } func (req GlobalReportRequest) GetTypeCode() MessageType { - return MessageType_GlobalStatus + return MessageType_GlobalReport } type GlobalCommitRequest struct { diff --git a/pkg/protocol/message/request_message_test.go b/pkg/protocol/message/request_message_test.go index 62f80a7d1..f02b7408b 100644 --- a/pkg/protocol/message/request_message_test.go +++ b/pkg/protocol/message/request_message_test.go @@ -56,7 +56,7 @@ func TestGlobalLockQueryRequest_GetTypeCode(t *testing.T) { } func TestGlobalReportRequest_GetTypeCode(t *testing.T) { - assert.Equal(t, MessageType_GlobalStatus, GlobalReportRequest{}.GetTypeCode()) + assert.Equal(t, MessageType_GlobalReport, GlobalReportRequest{}.GetTypeCode()) } func TestUndoLogDeleteRequest_GetTypeCode(t *testing.T) { diff --git a/pkg/protocol/message/response_message.go b/pkg/protocol/message/response_message.go index ad9c24c22..fb72673bc 100644 --- a/pkg/protocol/message/response_message.go +++ b/pkg/protocol/message/response_message.go @@ -114,7 +114,7 @@ type GlobalReportResponse struct { } func (resp GlobalReportResponse) GetTypeCode() MessageType { - return MessageType_GlobalStatusResult + return MessageType_GlobalReportResult } type GlobalCommitResponse struct { diff --git a/pkg/protocol/message/response_message_test.go b/pkg/protocol/message/response_message_test.go index f5981ea6d..036ab0a76 100644 --- a/pkg/protocol/message/response_message_test.go +++ b/pkg/protocol/message/response_message_test.go @@ -32,7 +32,7 @@ func TestRegisterTMResponse_GetTypeCode(t *testing.T) { } func TestGlobalReportResponse_GetTypeCode(t *testing.T) { - assert.Equal(t, MessageType_GlobalStatusResult, GlobalReportResponse{}.GetTypeCode()) + assert.Equal(t, MessageType_GlobalReportResult, GlobalReportResponse{}.GetTypeCode()) } func TestGlobalLockQueryResponse_GetTypeCode(t *testing.T) { diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go index 5a7697632..3ae91098f 100644 --- a/pkg/remoting/getty/rpc_client.go +++ b/pkg/remoting/getty/rpc_client.go @@ -37,17 +37,12 @@ type RpcClient struct { futures *sync.Map } -func init() { - newRpcClient() -} - -func newRpcClient() *RpcClient { +func InitRpcClient() { rpcClient := &RpcClient{ conf: config.GetClientConfig(), gettyClients: make([]getty.Client, 0), } rpcClient.init() - return rpcClient } func (c *RpcClient) init() { diff --git a/pkg/remoting/processor/remoting_processor.go b/pkg/remoting/processor/remoting_processor.go index 74e838f1c..2878716bf 100644 --- a/pkg/remoting/processor/remoting_processor.go +++ b/pkg/remoting/processor/remoting_processor.go @@ -23,6 +23,7 @@ import ( "github.com/seata/seata-go/pkg/protocol/message" ) +// RemotingProcessor remoting message processor type RemotingProcessor interface { Process(ctx context.Context, rpcMessage message.RpcMessage) error } diff --git a/sample/tcc/dubbo/client/cmd/client.go b/sample/tcc/dubbo/client/cmd/client.go index dace06f4d..9e1318994 100644 --- a/sample/tcc/dubbo/client/cmd/client.go +++ b/sample/tcc/dubbo/client/cmd/client.go @@ -20,11 +20,11 @@ package main import ( "context" + "github.com/seata/seata-go/pkg/client" + "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/config" _ "dubbo.apache.org/dubbo-go/v3/imports" - _ "github.com/seata/seata-go/pkg/imports" - "github.com/seata/seata-go/pkg/integration" "github.com/seata/seata-go/pkg/rm/tcc" "github.com/seata/seata-go/pkg/tm" "github.com/seata/seata-go/sample/tcc/dubbo/client/service" @@ -32,7 +32,7 @@ import ( // need to setup environment variable "DUBBO_GO_CONFIG_PATH" to "conf/dubbogo.yml" before run func main() { - integration.UseDubbo() + client.Init() config.SetConsumerService(service.UserProviderInstance) err := config.Load() if err != nil { diff --git a/sample/tcc/dubbo/server/cmd/server.go b/sample/tcc/dubbo/server/cmd/server.go index 4b65358a0..e5a8b6eac 100644 --- a/sample/tcc/dubbo/server/cmd/server.go +++ b/sample/tcc/dubbo/server/cmd/server.go @@ -24,18 +24,18 @@ import ( "syscall" "time" + "github.com/seata/seata-go/pkg/client" + "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/config" _ "dubbo.apache.org/dubbo-go/v3/imports" - _ "github.com/seata/seata-go/pkg/imports" - "github.com/seata/seata-go/pkg/integration" "github.com/seata/seata-go/pkg/rm/tcc" "github.com/seata/seata-go/sample/tcc/dubbo/server/service" ) // need to setup environment variable "DUBBO_GO_CONFIG_PATH" to "conf/dubbogo.yml" before run func main() { - integration.UseDubbo() + client.Init() userProviderProxy, err := tcc.NewTCCServiceProxy(&service.UserProvider{}) if err != nil { logger.Errorf("get userProviderProxy tcc service proxy error, %v", err.Error()) diff --git a/sample/tcc/grpc/cmd/tcc_grpc_transation.go b/sample/tcc/grpc/cmd/tcc_grpc_transation.go index a3f8bb4c6..a673c1ac2 100644 --- a/sample/tcc/grpc/cmd/tcc_grpc_transation.go +++ b/sample/tcc/grpc/cmd/tcc_grpc_transation.go @@ -17,10 +17,6 @@ package main -import ( - _ "github.com/seata/seata-go/pkg/imports" -) - func main() { } diff --git a/sample/tcc/local/cmd/local.go b/sample/tcc/local/cmd/local.go index 99429d58e..46d6b4c27 100644 --- a/sample/tcc/local/cmd/local.go +++ b/sample/tcc/local/cmd/local.go @@ -20,13 +20,14 @@ package main import ( "context" + "github.com/seata/seata-go/pkg/client" "github.com/seata/seata-go/pkg/common/log" - _ "github.com/seata/seata-go/pkg/imports" "github.com/seata/seata-go/pkg/tm" "github.com/seata/seata-go/sample/tcc/local/service" ) func main() { + client.Init() var err error ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness") defer func() { diff --git a/test/rpc_remoting_client_test.go b/test/rpc_remoting_client_test.go index f891a30d7..fef1f9f8f 100644 --- a/test/rpc_remoting_client_test.go +++ b/test/rpc_remoting_client_test.go @@ -19,8 +19,6 @@ package test import ( "testing" - - _ "github.com/seata/seata-go/pkg/imports" ) func TestSendMsgWithResponse(test *testing.T) {