Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use undo config #419

Merged
merged 7 commits into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func initRmClient(cfg *Config) {
client.RegisterProcessor()
integration.Init()
tcc.InitTCC()
at.InitAT()
at.InitAT(cfg.ClientConfig.UndoConfig)
})
}

Expand Down
44 changes: 0 additions & 44 deletions pkg/constant/client_table_columns_name.go

This file was deleted.

28 changes: 0 additions & 28 deletions pkg/constant/undo.go

This file was deleted.

8 changes: 2 additions & 6 deletions pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ import (
"github.com/seata/seata-go/pkg/rm"
)

const (
_defaultResourceSize = 16
_undoLogDeleteLimitSize = 1000
)

func InitAT() {
func InitAT(cfg undo.Config) {
atSourceManager := &ATSourceManager{
resourceCache: sync.Map{},
basic: datasource.NewBasicSourceManager(),
Expand All @@ -49,6 +44,7 @@ func InitAT() {
asyncWorkerConf := AsyncWorkerConfig{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要增加async的配置

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加好了

asyncWorkerConf.RegisterFlags(fs)
_ = fs.Parse([]string{})
undo.InitUndoConfig(cfg)

atSourceManager.worker = NewAsyncWorker(prometheus.DefaultRegisterer, asyncWorkerConf, atSourceManager)
rm.GetRmCacheInstance().RegisterResourceManager(atSourceManager)
Expand Down
9 changes: 4 additions & 5 deletions pkg/datasource/sql/exec/at/update_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/util/bytes"
"github.com/seata/seata-go/pkg/util/log"
)

var (
// todo: OnlyCareUpdateColumns should load from config first
onlyCareUpdateColumns = true
maxInSize = 1000
maxInSize = 1000
)

// updateExecutor execute update SQL
Expand Down Expand Up @@ -185,7 +184,7 @@ func (u *updateExecutor) buildAfterImageSQL(beforeImage types.RecordImage, meta
// todo: OnlyCareUpdateColumns should load from config first
var selectFields string
var separator = ","
if onlyCareUpdateColumns {
if undo.UndoConfig.OnlyCareUpdateColumns {
for _, row := range beforeImage.Rows {
for _, column := range row.Columns {
selectFields += column.ColumnName + separator
Expand All @@ -211,7 +210,7 @@ func (u *updateExecutor) buildBeforeImageSQL(ctx context.Context, args []driver.
updateStmt := u.parserCtx.UpdateStmt
fields := make([]*ast.SelectField, 0, len(updateStmt.List))

if onlyCareUpdateColumns {
if undo.UndoConfig.OnlyCareUpdateColumns {
for _, column := range updateStmt.List {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Expand Down
2 changes: 2 additions & 0 deletions pkg/datasource/sql/exec/at/update_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package at
import (
"context"
"database/sql/driver"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好了

"reflect"
"testing"

Expand All @@ -39,6 +40,7 @@ import (
)

func TestBuildSelectSQLByUpdate(t *testing.T) {
undo.InitUndoConfig(undo.Config{OnlyCareUpdateColumns: true})
datasource.RegisterTableCache(types.DBTypeMySQL, mysql.NewTableMetaInstance(nil))
stub := gomonkey.ApplyMethod(reflect.TypeOf(datasource.GetTableCache(types.DBTypeMySQL)), "GetTableMeta",
func(_ *mysql.TableMetaCache, ctx context.Context, dbName, tableName string) (*types.TableMeta, error) {
Expand Down
82 changes: 43 additions & 39 deletions pkg/datasource/sql/undo/base/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,51 @@ import (
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/arana-db/parser/mysql"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/undo/factor"
"github.com/seata/seata-go/pkg/util/log"
)

// checkUndoLogTableExistSql check undo log if exist
var (
ErrorDeleteUndoLogParamsFault = errors.New("xid or branch_id can't nil")
)

var (
checkUndoLogTableExistSql = "SELECT 1 FROM " + constant.UndoLogTableName + " LIMIT 1"
insertUndoLogSql = "INSERT INTO " + constant.UndoLogTableName + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))"
selectUndoLogSql = "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + constant.UndoLogTableName + " WHERE " + constant.UndoLogBranchXid + " = ? AND " + constant.UndoLogXid + " = ? FOR UPDATE" // todo 替换成常量吧,不用使用变量来表示字段名
)

const (
PairSplit = "&"
KvSplit = "="

CompressorTypeKey = "compressorTypeKey"
SerializerKey = "serializerKey"

// CheckUndoLogTableExistSql check undo log if exist
CheckUndoLogTableExistSql = "SELECT 1 FROM " + constant.UndoLogTableName + " LIMIT 1"
// DeleteUndoLogSql delete undo log
DeleteUndoLogSql = constant.DeleteFrom + constant.UndoLogTableName + " WHERE " + constant.UndoLogBranchXid + " = ? AND " + constant.UndoLogXid + " = ?"
compressorTypeKey = "compressorTypeKey"
serializerKey = "serializerKey"
defaultUndoLogTableName = " undo_log "
)

func getUndoLogTableName() string {
if undo.UndoConfig.LogTable != "" {
return undo.UndoConfig.LogTable
}
return defaultUndoLogTableName
}

func getCheckUndoLogTableExistSql() string {
return "SELECT 1 FROM " + getUndoLogTableName() + " LIMIT 1"
}

func getInsertUndoLogSql() string {
return "INSERT INTO " + getUndoLogTableName() + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))"
}

func getSelectUndoLogSql() string {
return "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + getUndoLogTableName() + " WHERE branch_id = ? AND xid = ? FOR UPDATE"
}

func getDeleteUndoLogSql() string {
return "DELETE FROM " + getUndoLogTableName() + " WHERE branch_id = ? AND xid = ?"
}

// undo log status
const (
// UndoLogStatusNormal This state can be properly rolled back by services
Expand All @@ -81,7 +89,7 @@ func (m *BaseUndoLogManager) Init() {
// InsertUndoLog
func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn driver.Conn) error {
log.Infof("begin to insert undo log, xid %v, branch id %v", record.XID, record.BranchID)
stmt, err := conn.Prepare(insertUndoLogSql)
stmt, err := conn.Prepare(getInsertUndoLogSql())
if err != nil {
return err
}
Expand All @@ -93,7 +101,7 @@ func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn drive
}

func (m *BaseUndoLogManager) InsertUndoLogWithSqlConn(ctx context.Context, record undo.UndologRecord, conn *sql.Conn) error {
stmt, err := conn.PrepareContext(ctx, insertUndoLogSql)
stmt, err := conn.PrepareContext(ctx, getInsertUndoLogSql())
if err != nil {
return err
}
Expand All @@ -106,7 +114,7 @@ func (m *BaseUndoLogManager) InsertUndoLogWithSqlConn(ctx context.Context, recor

// DeleteUndoLog exec delete single undo log operate
func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error {
stmt, err := conn.PrepareContext(ctx, constant.DeleteUndoLogSql)
stmt, err := conn.PrepareContext(ctx, getDeleteUndoLogSql())
if err != nil {
log.Errorf("[DeleteUndoLog] prepare sql fail, err: %v", err)
return err
Expand Down Expand Up @@ -205,8 +213,8 @@ func (m *BaseUndoLogManager) FlushUndoLog(tranCtx *types.TransactionContext, con
}

parseContext := make(map[string]string, 0)
parseContext[SerializerKey] = "jackson"
parseContext[CompressorTypeKey] = "NONE"
parseContext[serializerKey] = "jackson"
parseContext[compressorTypeKey] = "NONE"
undoLogContent, err := json.Marshal(parseContext)
if err != nil {
return err
Expand Down Expand Up @@ -246,7 +254,7 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid
}
}()

stmt, err := conn.PrepareContext(ctx, selectUndoLogSql)
stmt, err := conn.PrepareContext(ctx, getSelectUndoLogSql())
if err != nil {
log.Errorf("prepare sql fail, err: %v", err)
return err
Expand Down Expand Up @@ -346,8 +354,8 @@ func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid
func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context, xid string, branchID uint64, conn *sql.Conn) error {
// todo use config to replace
parseContext := make(map[string]string, 0)
parseContext[SerializerKey] = "jackson"
parseContext[CompressorTypeKey] = "NONE"
parseContext[serializerKey] = "jackson"
parseContext[compressorTypeKey] = "NONE"
undoLogContent, err := json.Marshal(parseContext)
if err != nil {
return err
Expand Down Expand Up @@ -375,7 +383,7 @@ func (m *BaseUndoLogManager) DBType() types.DBType {

// HasUndoLogTable check undo log table if exist
func (m *BaseUndoLogManager) HasUndoLogTable(ctx context.Context, conn *sql.Conn) (res bool, err error) {
if _, err = conn.QueryContext(ctx, checkUndoLogTableExistSql); err != nil {
if _, err = conn.QueryContext(ctx, getCheckUndoLogTableExistSql()); err != nil {
// 1146 mysql table not exist fault code
if e, ok := err.(*mysql.SQLError); ok && e.Code == mysql.ErrNoSuchTable {
return false, nil
Expand All @@ -390,19 +398,15 @@ func (m *BaseUndoLogManager) HasUndoLogTable(ctx context.Context, conn *sql.Conn
// getBatchDeleteUndoLogSql build batch delete undo log
func (m *BaseUndoLogManager) getBatchDeleteUndoLogSql(xid []string, branchID []int64) (string, error) {
if len(xid) == 0 || len(branchID) == 0 {
return "", ErrorDeleteUndoLogParamsFault
return "", fmt.Errorf("xid or branch_id can't nil")
}

var undoLogDeleteSql strings.Builder
undoLogDeleteSql.WriteString(constant.DeleteFrom)
undoLogDeleteSql.WriteString(constant.UndoLogTableName)
undoLogDeleteSql.WriteString(" WHERE ")
undoLogDeleteSql.WriteString(constant.UndoLogBranchXid)
undoLogDeleteSql.WriteString(" IN ")
undoLogDeleteSql.WriteString(" DELETE FROM ")
undoLogDeleteSql.WriteString(getUndoLogTableName())
undoLogDeleteSql.WriteString(" WHERE branch_id IN ")
m.appendInParam(len(branchID), &undoLogDeleteSql)
undoLogDeleteSql.WriteString(" AND ")
undoLogDeleteSql.WriteString(constant.UndoLogXid)
undoLogDeleteSql.WriteString(" IN ")
undoLogDeleteSql.WriteString(" AND xid IN ")
m.appendInParam(len(xid), &undoLogDeleteSql)

return undoLogDeleteSql.String(), nil
Expand Down Expand Up @@ -481,7 +485,7 @@ func (m *BaseUndoLogManager) DecodeMap(str string) map[string]string {
func (m *BaseUndoLogManager) getRollbackInfo(rollbackInfo []byte, undoContext map[string]string) []byte {
// Todo use compressor
// get compress type
/*compressorType, ok := undoContext[constant.CompressorTypeKey]
/*compressorType, ok := undoContext[constant.compressorTypeKey]
if ok {

}*/
Expand All @@ -494,6 +498,6 @@ func (m *BaseUndoLogManager) getSerializer(undoLogContext map[string]string) (se
if undoLogContext == nil {
return
}
serializer, _ = undoLogContext[SerializerKey]
serializer, _ = undoLogContext[serializerKey]
return
}
8 changes: 8 additions & 0 deletions pkg/datasource/sql/undo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (
"flag"
)

var (
UndoConfig Config
)

func InitUndoConfig(cfg Config) {
UndoConfig = cfg
}

type CompressConfig struct {
Comment on lines +24 to 32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里没看懂,为啥是这么写的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为这个undo config在多个地方用了,所以搞了一个全局变量来使用。感觉是不太好

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一下没想好放哪里比较合适。后面重构,可以把这个放到具体的struct的初始化里面去,会更好点

Enable bool `yaml:"enable" json:"enable,omitempty" koanf:"enable"`
Type string `yaml:"type" json:"type,omitempty" koanf:"type"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/datasource/sql/undo/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (b *BaseExecutor) UndoPrepare(undoPST *sql.Stmt, undoValues []types.ColumnI
}

func (b *BaseExecutor) dataValidationAndGoOn(ctx context.Context, conn *sql.Conn) (bool, error) {
if !undo.UndoConfig.DataValidation {
return true, nil
}
beforeImage := b.sqlUndoLog.BeforeImage
afterImage := b.sqlUndoLog.AfterImage

Expand Down
15 changes: 3 additions & 12 deletions sample/at/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,8 @@ import (
"github.com/seata/seata-go/pkg/tm"
)

type OrderTbl struct {
id int
userID string
commodityCode string
count int64
money int64
descs string
}

func main() {
client.InitPath("./testdata/conf/seatago.yml")
client.InitPath("./sample/conf/seatago.yml")
initService()
tm.WithGlobalTx(context.Background(), &tm.GtxConfig{
Name: "ATSampleLocalGlobalTx",
Expand All @@ -47,7 +38,7 @@ func main() {

func insertData(ctx context.Context) error {
sql := "INSERT INTO `order_tbl` (`id`, `user_id`, `commodity_code`, `count`, `money`, `descs`) VALUES (?, ?, ?, ?, ?, ?);"
ret, err := db.ExecContext(ctx, sql, 3, "NO-100001", "C100000", 100, nil, "init desc")
ret, err := db.ExecContext(ctx, sql, 333, "NO-100001", "C100000", 100, nil, "init desc")
if err != nil {
fmt.Printf("insert failed, err:%v\n", err)
return err
Expand All @@ -58,7 +49,7 @@ func insertData(ctx context.Context) error {
return err
}
fmt.Printf("insert success: %d.\n", rows)
return nil
return fmt.Errorf("111")
}

func deleteData(ctx context.Context) error {
Expand Down
Loading