Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize named for the resource manager api and tcc resource, adjust … #125

Merged
merged 1 commit into from
Jul 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ require (
dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220508105316-b27ec53b7bab
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/apache/dubbo-go-hessian2 v1.11.0
github.com/dubbogo/gost v1.12.5
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
golang.org/x/tools v0.1.11 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zouyx/agollo/v3 v3.4.5 h1:7YCxzY9ZYaH9TuVUBvmI6Tk0mwMggikah+cfbYogcHQ=
Expand Down Expand Up @@ -841,7 +840,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -881,8 +879,6 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -927,7 +923,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211105192438-b53810dc28af/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
Expand Down Expand Up @@ -1017,7 +1012,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211106132015-ebca88c72f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -1099,8 +1093,6 @@ golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4X
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY=
golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("branch commit error: %s", err.Error())
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestRmBranchCommitProcessor(t *testing.T) {
var ctx context.Context
var rbcProcessor rmBranchCommitProcessor

rm.GetResourceManagerInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())
rm.GetRmCacheInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())

// run tests
for _, tc := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess
applicationData := request.ApplicationData
log.Infof("Branch rollback request: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("branch rollback error: %s", err.Error())
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestRmBranchRollbackProcessor(t *testing.T) {
var ctx context.Context
var rbrProcessor rmBranchRollbackProcessor

rm.GetResourceManagerInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())
rm.GetRmCacheInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())

// run tests
for _, tc := range tests {
Expand Down
105 changes: 0 additions & 105 deletions pkg/rm/resource_manager.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/protocol/resource/resource.go → pkg/rm/rm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package resource
package rm

import (
"context"
Expand Down Expand Up @@ -59,11 +59,11 @@ type ResourceManager interface {
// Unregister a Resource from the Resource Manager
UnregisterResource(resource Resource) error
// Get all resources managed by this manager
GetManagedResources() *sync.Map
GetCachedResources() *sync.Map
// Get the BranchType
GetBranchType() branch.BranchType
}

type ResourceManagerGetter interface {
GetResourceManager() ResourceManager
GetResourceManager(branchType branch.BranchType) ResourceManager
}
57 changes: 57 additions & 0 deletions pkg/rm/rm_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rm

import (
"fmt"
"sync"

"github.com/seata/seata-go/pkg/protocol/branch"
)

var (
// singletone ResourceManagerCache
rmCacheInstance *ResourceManagerCache
onceRMFacade = &sync.Once{}
)

func GetRmCacheInstance() *ResourceManagerCache {
if rmCacheInstance == nil {
onceRMFacade.Do(func() {
rmCacheInstance = &ResourceManagerCache{}
})
}
return rmCacheInstance
}

type ResourceManagerCache struct {
// BranchType -> ResourceManagerCache
resourceManagerMap sync.Map
}

func (d *ResourceManagerCache) RegisterResourceManager(resourceManager ResourceManager) {
d.resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}

func (d *ResourceManagerCache) GetResourceManager(branchType branch.BranchType) ResourceManager {
rm, ok := d.resourceManagerMap.Load(branchType)
if !ok {
panic(fmt.Sprintf("No ResourceManagerCache for BranchType: %v", branchType))
}
return rm.(ResourceManager)
}
4 changes: 1 addition & 3 deletions pkg/rm/rm_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package rm
import (
"sync"

"github.com/seata/seata-go/pkg/protocol/resource"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
Expand Down Expand Up @@ -84,7 +82,7 @@ func (RMRemoting) LockQuery(branchType branch.BranchType, resourceId, xid, lockK
return false, nil
}

func (r *RMRemoting) RegisterResource(resource resource.Resource) error {
func (r *RMRemoting) RegisterResource(resource Resource) error {
req := message.RegisterRMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
//todo replace with config
Expand Down
13 changes: 5 additions & 8 deletions pkg/rm/tcc/tcc_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ import (

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"

"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/tm"

"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/tm"
)

var (
Expand Down Expand Up @@ -71,7 +68,7 @@ func (t *TCCResource) GetBranchType() branch.BranchType {
}

func init() {
rm.GetResourceManagerInstance().RegisterResourceManager(GetTCCResourceManagerInstance())
rm.GetRmCacheInstance().RegisterResourceManager(GetTCCResourceManagerInstance())
}

func GetTCCResourceManagerInstance() *TCCResourceManager {
Expand Down Expand Up @@ -107,20 +104,20 @@ func (t *TCCResourceManager) LockQuery(ctx context.Context, ranchType branch.Bra
panic("implement me")
}

func (t *TCCResourceManager) UnregisterResource(resource resource.Resource) error {
func (t *TCCResourceManager) UnregisterResource(resource rm.Resource) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) RegisterResource(resource resource.Resource) error {
func (t *TCCResourceManager) RegisterResource(resource rm.Resource) error {
if _, ok := resource.(*TCCResource); !ok {
panic(fmt.Sprintf("register tcc resource error, TCCResource is needed, param %v", resource))
}
t.resourceManagerMap.Store(resource.GetResourceId(), resource)
return t.rmRemoting.RegisterResource(resource)
}

func (t *TCCResourceManager) GetManagedResources() *sync.Map {
func (t *TCCResourceManager) GetCachedResources() *sync.Map {
return &t.resourceManagerMap
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/rm/tcc/tcc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (

"github.com/seata/seata-go/pkg/common/types"

"github.com/seata/seata-go/pkg/tm"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/net"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/tm"
)

type TCCServiceProxy struct {
Expand All @@ -56,7 +55,7 @@ func NewTCCServiceProxy(service interface{}) (*TCCServiceProxy, error) {
func (t *TCCServiceProxy) RegisterResource() error {
var err error
t.registerResourceOnce.Do(func() {
err = rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)
err = rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)
if err != nil {
log.Errorf("NewTCCServiceProxy RegisterResource error: %#v", err.Error())
}
Expand Down