From d4d9b9b564e593755e90bd365db328e04f74867d Mon Sep 17 00:00:00 2001 From: georgehao Date: Sun, 20 Nov 2022 19:41:12 +0800 Subject: [PATCH] feat: add gin for at and fix async worker bug (#357) --- pkg/datasource/sql/async_worker.go | 4 ++ sample/at/gin/client/main.go | 64 ++++++++++++++++++++++++++++++ sample/at/gin/server/main.go | 53 +++++++++++++++++++++++++ sample/at/gin/server/service.go | 57 ++++++++++++++++++++++++++ 4 files changed, 178 insertions(+) create mode 100644 sample/at/gin/client/main.go create mode 100644 sample/at/gin/server/main.go create mode 100644 sample/at/gin/server/service.go diff --git a/pkg/datasource/sql/async_worker.go b/pkg/datasource/sql/async_worker.go index 056b6a096..c48dfa445 100644 --- a/pkg/datasource/sql/async_worker.go +++ b/pkg/datasource/sql/async_worker.go @@ -139,6 +139,10 @@ func (aw *AsyncWorker) run() { } func (aw *AsyncWorker) doBranchCommit(phaseCtxs *[]phaseTwoContext) { + if len(*phaseCtxs) == 0 { + return + } + copyPhaseCtxs := make([]phaseTwoContext, len(*phaseCtxs)) copy(copyPhaseCtxs, *phaseCtxs) *phaseCtxs = (*phaseCtxs)[:0] diff --git a/sample/at/gin/client/main.go b/sample/at/gin/client/main.go new file mode 100644 index 000000000..f45658ed1 --- /dev/null +++ b/sample/at/gin/client/main.go @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "fmt" + "time" + + "github.com/parnurzeal/gorequest" + + "github.com/seata/seata-go/pkg/client" + "github.com/seata/seata-go/pkg/constant" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/pkg/util/log" +) + +var serverIpPort = "http://127.0.0.1:8080" + +func main() { + flag.Parse() + client.Init() + + bgCtx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + + transInfo := &tm.TransactionInfo{ + Name: "ATSampleLocalGlobalTx", + TimeOut: time.Second * 30, + } + + if err := tm.WithGlobalTx(bgCtx, transInfo, updateData); err != nil { + panic(fmt.Sprintf("tm update data err, %v", err)) + } +} + +func updateData(ctx context.Context) (re error) { + request := gorequest.New() + log.Infof("branch transaction begin") + request.Post(serverIpPort+"/updateData"). + Set(constant.XidKey, tm.GetXID(ctx)). + End(func(response gorequest.Response, body string, errs []error) { + if len(errs) != 0 { + re = errs[0] + } + }) + return +} diff --git a/sample/at/gin/server/main.go b/sample/at/gin/server/main.go new file mode 100644 index 000000000..859b9bc2d --- /dev/null +++ b/sample/at/gin/server/main.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/seata/seata-go/pkg/client" + ginmiddleware "github.com/seata/seata-go/pkg/integration/gin" + "github.com/seata/seata-go/pkg/util/log" +) + +func main() { + client.Init() + initService() + + r := gin.Default() + + // NOTE: when use gin,must set ContextWithFallback true when gin version >= 1.8.1 + // r.ContextWithFallback = true + + r.Use(ginmiddleware.TransactionMiddleware()) + + r.POST("/updateData", func(c *gin.Context) { + log.Infof("get tm updateData") + if err := updateData(c); err != nil { + c.JSON(http.StatusOK, "updateData failure") + return + } + c.JSON(http.StatusOK, "updateData ok") + }) + + if err := r.Run(":8080"); err != nil { + log.Fatalf("start tcc server fatal: %v", err) + } +} diff --git a/sample/at/gin/server/service.go b/sample/at/gin/server/service.go new file mode 100644 index 000000000..038c0e3d6 --- /dev/null +++ b/sample/at/gin/server/service.go @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "database/sql" + "fmt" + "time" + + sql2 "github.com/seata/seata-go/pkg/datasource/sql" +) + +var ( + db *sql.DB +) + +func initService() { + var err error + db, err = sql.Open(sql2.SeataATMySQLDriver, "root:123456@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true") + if err != nil { + panic("init service error") + } +} + +// case1 : tm commit success +func updateData(ctx context.Context) error { + sql := "update order_tbl set descs=? where id=?" + ret, err := db.ExecContext(ctx, sql, fmt.Sprintf("NewDescs-%d", time.Now().UnixMilli()), 1) + if err != nil { + fmt.Printf("update failed, err:%v\n", err) + return nil + } + + rows, err := ret.RowsAffected() + if err != nil { + fmt.Printf("update failed, err:%v\n", err) + return nil + } + fmt.Printf("update success: %d.\n", rows) + return nil +}