Skip to content

Commit

Permalink
optimize: optimize global transaction usage (apache#281)
Browse files Browse the repository at this point in the history
* refact the tm executor and adjust the sample

* optimize interface, adjust samples

* fix ci

* optimize comment

* upgrade go version

* add tx name configuration

* add panic handler
  • Loading branch information
106umao authored Sep 26, 2022
1 parent b753049 commit 533afe8
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 114 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0fZcnECiDrKJsfxka0=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw=
go.etcd.io/etcd/api/v3 v3.5.4 h1:OHVyt3TopwtUQ2GKdd5wu3PmmipR4FTwCqoEjSyRdIc=
Expand Down
39 changes: 34 additions & 5 deletions pkg/tm/transaction_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"time"

"github.com/pkg/errors"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)
Expand All @@ -34,7 +36,34 @@ type TransactionInfo struct {
LockRetryTimes int64
}

func Begin(ctx context.Context, name string) context.Context {
// CallbackWithCtx business callback definition
type CallbackWithCtx func(ctx context.Context) error

// WithGlobalTx begin a global transaction and make it step into committed or rollbacked status.
func WithGlobalTx(ctx context.Context, ti *TransactionInfo, business CallbackWithCtx) (re error) {
if ti == nil {
return errors.New("global transaction config info is required.")
}
if ti.Name == "" {
return errors.New("global transaction name is required.")
}

if ctx, re = begin(ctx, ti.Name); re != nil {
return
}
defer func() {
// business maybe to throw panic, so need to recover it here.
re = commitOrRollback(ctx, recover() == nil && re == nil)
log.Infof("global transaction result %v", re)
}()

re = business(ctx)

return
}

// begin a global transaction, it will obtain a xid from tc in tcp call.
func begin(ctx context.Context, name string) (rc context.Context, re error) {
if !IsSeataContext(ctx) {
ctx = InitSeataContext(ctx)
}
Expand Down Expand Up @@ -67,14 +96,14 @@ func Begin(ctx context.Context, name string) context.Context {
// todo timeout should read from config
err := GetGlobalTransactionManager().Begin(ctx, tx, time.Second*30, name)
if err != nil {
panic(fmt.Sprintf("transactionTemplate: begin transaction failed, error %v", err))
re = fmt.Errorf("transactionTemplate: begin transaction failed, error %v", err)
}

return ctx
return ctx, re
}

// CommitOrRollback commit global transaction
func CommitOrRollback(ctx context.Context, isSuccess bool) (re error) {
// commitOrRollback commit or rollback the global transaction
func commitOrRollback(ctx context.Context, isSuccess bool) (re error) {
role := *GetTransactionRole(ctx)
if role == PARTICIPANT {
// Participant has no responsibility of rollback
Expand Down
6 changes: 3 additions & 3 deletions pkg/tm/transaction_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestTransactionExecutorBegin(t *testing.T) {
assert.Equal(t, v.wantErrString, err.Error)
}
}(v)
Begin(v.ctx, v.name)
begin(v.ctx, v.name)
}()

// rest up stub
Expand All @@ -101,7 +101,7 @@ func TestTransactionExecutorCommit(t *testing.T) {
SetTransactionRole(ctx, LAUNCHER)
SetTxStatus(ctx, message.GlobalStatusBegin)
SetXID(ctx, "")
assert.Equal(t, "Commit xid should not be empty", CommitOrRollback(ctx, true).Error())
assert.Equal(t, "Commit xid should not be empty", commitOrRollback(ctx, true).Error())
}

func TestTransactionExecurotRollback(t *testing.T) {
Expand All @@ -110,6 +110,6 @@ func TestTransactionExecurotRollback(t *testing.T) {
SetTransactionRole(ctx, LAUNCHER)
SetTxStatus(ctx, message.GlobalStatusBegin)
SetXID(ctx, "")
errActual := CommitOrRollback(ctx, false)
errActual := commitOrRollback(ctx, false)
assert.Equal(t, "Rollback xid should not be empty", errActual.Error())
}
30 changes: 14 additions & 16 deletions sample/tcc/dubbo/client/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,24 @@ import (
func main() {
client.Init()
config.SetConsumerService(service.UserProviderInstance)
err := config.Load()
if err != nil {
if err := config.Load(); err != nil {
panic(err)
}
test()
run()
}

func test() {
var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err == nil)
logger.Infof("tx result %v", resp)
<-make(chan struct{})
}()
func run() {
tm.WithGlobalTx(context.Background(), &tm.TransactionInfo{
Name: "TccSampleLocalGlobalTx",
}, business)
<-make(chan struct{})
}

resp, err := service.UserProviderInstance.Prepare(ctx, 1)
if err != nil {
logger.Infof("response prepare: %v", err)
return
func business(ctx context.Context) (re error) {
if resp, re := service.UserProviderInstance.Prepare(ctx, 1); re != nil {
logger.Infof("response prepare: %v", re)
} else {
logger.Infof("get resp %#v", resp)
}
logger.Infof("get resp %#v", resp)
return
}
28 changes: 14 additions & 14 deletions sample/tcc/fence/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ import (

func main() {
client.Init()
var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err == nil)
log.Infof("tx result %v", resp)
<-make(chan struct{})
}()
tm.WithGlobalTx(context.Background(), &tm.TransactionInfo{
Name: "TccSampleLocalGlobalTx",
}, business)
<-make(chan struct{})
}

func business(ctx context.Context) (re error) {
tccService := service.NewTestTCCServiceBusinessProxy()
tccService2 := service.NewTestTCCServiceBusiness2Proxy()

_, err = tccService.Prepare(ctx, 1)
if err != nil {
log.Errorf("TestTCCServiceBusiness prepare error, %v", err.Error())
_, re = tccService.Prepare(ctx, 1)
if re != nil {
log.Errorf("TestTCCServiceBusiness prepare error, %v", re.Error())
return
}
_, err = tccService2.Prepare(ctx, 3)
if err != nil {
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", err.Error())
_, re = tccService2.Prepare(ctx, 3)
if re != nil {
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", re.Error())
return
}

return
}
46 changes: 18 additions & 28 deletions sample/tcc/gin/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,27 @@ import (
)

func main() {
client.Init()
flag.Parse()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
client.Init()
bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()

var err error

// GlobalTransactional is starting
log.Infof("global transaction begin")
ctx = tm.Begin(ctx, "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err == nil)
log.Infof("tx result %v", resp)
<-make(chan bool)
}()

var serverIpPort = "http://127.0.0.1:8080"
request := gorequest.New()

var xid string
if tm.IsSeataContext(ctx) {
xid = tm.GetXID(ctx)
}

log.Infof("branch transaction begin")
request.Post(serverIpPort+"/prepare").
Set(common.XidKey, xid).
End(func(response gorequest.Response, body string, errs []error) {
if len(errs) != 0 {
err = errs[0]
}
tm.WithGlobalTx(
bgCtx,
&tm.TransactionInfo{
Name: "TccSampleLocalGlobalTx",
},
func(ctx context.Context) (re error) {
request := gorequest.New()
log.Infof("branch transaction begin")
request.Post(serverIpPort+"/prepare").
Set(common.XidKey, tm.GetXID(ctx)).
End(func(response gorequest.Response, body string, errs []error) {
if len(errs) != 0 {
re = errs[0]
}
})
return
})
}
55 changes: 25 additions & 30 deletions sample/tcc/grpc/cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package main
import (
"context"
"flag"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -34,44 +33,40 @@ import (
)

func main() {
client.Init()
flag.Parse()
// Set up a connection to the server.
// to set up grpc env
// set up a connection to the server.
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(grpc2.ClientTransactionInterceptor))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c1 := pb.NewTCCServiceBusiness1Client(conn)
c2 := pb.NewTCCServiceBusiness2Client(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
c1, c2 := pb.NewTCCServiceBusiness1Client(conn), pb.NewTCCServiceBusiness2Client(conn)

// GlobalTransactional is starting
log.Infof("global transaction begin")
ctx = tm.Begin(ctx, "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err == nil)
log.Infof("tx result %v", resp)
<-make(chan bool)
}()
defer cancel()
client.Init()
tm.WithGlobalTx(
context.Background(),
&tm.TransactionInfo{
Name: "TccSampleLocalGlobalTx",
},
func(ctx context.Context) (re error) {
r1, re := c1.Remoting(ctx, &pb.Params{A: "1", B: "2"})
if re != nil {
log.Fatalf("could not do TestTCCServiceBusiness 1: %v", re)
return
}
log.Infof("TestTCCServiceBusiness#Prepare res: %s", r1)

// BranchTransactional 1 remoting running
log.Infof("branch transaction 1 begin")
r1, err := c1.Remoting(ctx, &pb.Params{A: "1", B: "2"})
if err != nil {
log.Fatalf("could not do TestTCCServiceBusiness 1: %v", err)
}
log.Infof("TestTCCServiceBusiness#Prepare res: %s", r1)
r2, re := c2.Remoting(ctx, &pb.Params{A: "3", B: "4"})
if re != nil {
log.Fatalf("could not do TestTCCServiceBusiness 2: %v", re)
return
}
log.Infof("TestTCCServiceBusiness#Prepare res: %v", r2)

// BranchTransactional 2 remoting running
log.Infof("branch transaction 2 begin")
r2, err := c2.Remoting(ctx, &pb.Params{A: "3", B: "4"})
if err != nil {
log.Fatalf("could not do TestTCCServiceBusiness 2: %v", err)
}
log.Infof("TestTCCServiceBusiness#Prepare res: %v", r2)
return
})
<-make(chan struct{})
}
29 changes: 12 additions & 17 deletions sample/tcc/local/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,21 @@ import (

func main() {
client.Init()
var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err == nil)
log.Infof("tx result %v", resp)
<-make(chan struct{})
}()

tccService := service.NewTestTCCServiceBusinessProxy()
tccService2 := service.NewTestTCCServiceBusiness2Proxy()
tm.WithGlobalTx(context.Background(), &tm.TransactionInfo{
Name: "TccSampleLocalGlobalTx",
}, business)
<-make(chan struct{})
}

_, err = tccService.Prepare(ctx, 1)
if err != nil {
log.Errorf("TestTCCServiceBusiness prepare error, %v", err.Error())
func business(ctx context.Context) (re error) {
if _, re = service.NewTestTCCServiceBusiness1Proxy().Prepare(ctx, 1); re != nil {
log.Errorf("TestTCCServiceBusiness1 prepare error, %v", re)
return
}
_, err = tccService2.Prepare(ctx, 3)
if err != nil {
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", err.Error())

if _, re = service.NewTestTCCServiceBusiness2Proxy().Prepare(ctx, 3); re != nil {
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", re)
return
}

return
}
2 changes: 1 addition & 1 deletion sample/tcc/local/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
type TestTCCServiceBusiness struct {
}

func NewTestTCCServiceBusinessProxy() *tcc.TCCServiceProxy {
func NewTestTCCServiceBusiness1Proxy() *tcc.TCCServiceProxy {
if tccService != nil {
return tccService
}
Expand Down

0 comments on commit 533afe8

Please sign in to comment.