Skip to content

Commit

Permalink
feat: use undo config (apache#419)
Browse files Browse the repository at this point in the history
* use undo config
  • Loading branch information
luky116 authored Jan 2, 2023
1 parent 33151a4 commit f87fbea
Show file tree
Hide file tree
Showing 28 changed files with 258 additions and 173 deletions.
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func initRmClient(cfg *Config) {
client.RegisterProcessor()
integration.Init()
tcc.InitTCC()
at.InitAT()
at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig)
})
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
Expand Down Expand Up @@ -72,11 +73,12 @@ type Config struct {
EnableAutoDataSourceProxy bool `yaml:"enable-auto-data-source-proxy" json:"enable-auto-data-source-proxy,omitempty" koanf:"enable-auto-data-source-proxy"`
DataSourceProxyMode string `yaml:"data-source-proxy-mode" json:"data-source-proxy-mode,omitempty" koanf:"data-source-proxy-mode"`

TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
AsyncWorkerConfig sql.AsyncWorkerConfig `yaml:"async" json:"async" koanf:"async"`
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -88,6 +90,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&c.EnableAutoDataSourceProxy, "enable-auto-data-source-proxy", true, "Whether enable auto proxying of datasource bean.")
f.StringVar(&c.DataSourceProxyMode, "data-source-proxy-mode", "AT", "Data source proxy mode.")

c.AsyncWorkerConfig.RegisterFlagsWithPrefix("async-worker", f)
c.TCCConfig.RegisterFlagsWithPrefix("tcc", f)
c.ClientConfig.RegisterFlagsWithPrefix("client", f)
c.GettyConfig.RegisterFlagsWithPrefix("getty", f)
Expand Down
44 changes: 0 additions & 44 deletions pkg/constant/client_table_columns_name.go

This file was deleted.

28 changes: 0 additions & 28 deletions pkg/constant/undo.go

This file was deleted.

12 changes: 6 additions & 6 deletions pkg/datasource/sql/async_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ type AsyncWorkerConfig struct {
CommitWorkerBufferSize int `yaml:"commit_worker_buffer_size" json:"commit_worker_buffer_size"`
}

func (cfg *AsyncWorkerConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.BufferLimit, "async-worker.commit.buffer_size", 10000, "async worker commit buffer limit.")
f.DurationVar(&cfg.BufferCleanInterval, "async-worker.commit.buffer.clean_interval", time.Second, "async worker commit buffer interval")
f.IntVar(&cfg.ReceiveChanSize, "async-worker.commit.channel_size", 10000, "async worker commit channel size")
f.IntVar(&cfg.CommitWorkerCount, "async-worker.commit.worker_count", 10, "async worker commit worker count")
f.IntVar(&cfg.CommitWorkerBufferSize, "async-worker.commit.worker_buffer_size", 1000, "async worker commit worker buffer size")
func (cfg *AsyncWorkerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.BufferLimit, prefix+".buffer_size", 10000, "async worker commit buffer limit.")
f.DurationVar(&cfg.BufferCleanInterval, prefix+".buffer.clean_interval", time.Second, "async worker commit buffer interval")
f.IntVar(&cfg.ReceiveChanSize, prefix+".channel_size", 10000, "async worker commit channel size")
f.IntVar(&cfg.CommitWorkerCount, prefix+".worker_count", 10, "async worker commit worker count")
f.IntVar(&cfg.CommitWorkerBufferSize, prefix+".worker_buffer_size", 1000, "async worker commit worker buffer size")
}

// AsyncWorker executor for branch transaction commit and undo log
Expand Down
16 changes: 3 additions & 13 deletions pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package sql
import (
"context"
"database/sql"
"flag"
"fmt"
"sync"

Expand All @@ -33,24 +32,15 @@ import (
"github.com/seata/seata-go/pkg/rm"
)

const (
_defaultResourceSize = 16
_undoLogDeleteLimitSize = 1000
)

func InitAT() {
func InitAT(cfg undo.Config, asyncCfg AsyncWorkerConfig) {
atSourceManager := &ATSourceManager{
resourceCache: sync.Map{},
basic: datasource.NewBasicSourceManager(),
rmRemoting: rm.GetRMRemotingInstance(),
}

fs := flag.NewFlagSet("", flag.PanicOnError)
asyncWorkerConf := AsyncWorkerConfig{}
asyncWorkerConf.RegisterFlags(fs)
_ = fs.Parse([]string{})

atSourceManager.worker = NewAsyncWorker(prometheus.DefaultRegisterer, asyncWorkerConf, atSourceManager)
undo.InitUndoConfig(cfg)
atSourceManager.worker = NewAsyncWorker(prometheus.DefaultRegisterer, asyncCfg, atSourceManager)
rm.GetRmCacheInstance().RegisterResourceManager(atSourceManager)
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/datasource/sql/exec/at/update_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/util/bytes"
"github.com/seata/seata-go/pkg/util/log"
)

var (
// todo: OnlyCareUpdateColumns should load from config first
onlyCareUpdateColumns = true
maxInSize = 1000
maxInSize = 1000
)

// updateExecutor execute update SQL
Expand Down Expand Up @@ -185,7 +184,7 @@ func (u *updateExecutor) buildAfterImageSQL(beforeImage types.RecordImage, meta
// todo: OnlyCareUpdateColumns should load from config first
var selectFields string
var separator = ","
if onlyCareUpdateColumns {
if undo.UndoConfig.OnlyCareUpdateColumns {
for _, row := range beforeImage.Rows {
for _, column := range row.Columns {
selectFields += column.ColumnName + separator
Expand All @@ -211,7 +210,7 @@ func (u *updateExecutor) buildBeforeImageSQL(ctx context.Context, args []driver.
updateStmt := u.parserCtx.UpdateStmt
fields := make([]*ast.SelectField, 0, len(updateStmt.List))

if onlyCareUpdateColumns {
if undo.UndoConfig.OnlyCareUpdateColumns {
for _, column := range updateStmt.List {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Expand Down
11 changes: 5 additions & 6 deletions pkg/datasource/sql/exec/at/update_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@ import (
"testing"

"github.com/agiledragon/gomonkey"
_ "github.com/arana-db/parser/test_driver"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql"
"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/util"

"github.com/seata/seata-go/pkg/datasource/sql/types"

"github.com/seata/seata-go/pkg/datasource/sql/parser"

_ "github.com/arana-db/parser/test_driver"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/util"
_ "github.com/seata/seata-go/pkg/util/log"
"github.com/stretchr/testify/assert"
)

func TestBuildSelectSQLByUpdate(t *testing.T) {
undo.InitUndoConfig(undo.Config{OnlyCareUpdateColumns: true})
datasource.RegisterTableCache(types.DBTypeMySQL, mysql.NewTableMetaInstance(nil))
stub := gomonkey.ApplyMethod(reflect.TypeOf(datasource.GetTableCache(types.DBTypeMySQL)), "GetTableMeta",
func(_ *mysql.TableMetaCache, ctx context.Context, dbName, tableName string) (*types.TableMeta, error) {
Expand Down
Loading

0 comments on commit f87fbea

Please sign in to comment.