Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize at #336

Merged
merged 9 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/datasource/sql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Conn struct {
txCtx *types.TransactionContext
targetConn driver.Conn
autoCommit bool
dbName string
}

// ResetSession is called prior to executing a query on the connection
Expand Down Expand Up @@ -93,7 +94,7 @@ func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
}, nil
}

// Exec
// Exec warning: if you want to use global transaction, please use ExecContext function
func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error) {
conn, ok := c.targetConn.(driver.Execer)
if !ok {
Expand All @@ -113,7 +114,7 @@ func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error) {
Conn: c.targetConn,
}

return executor.ExecWithValue(context.Background(), execCtx,
return executor.ExecWithValue(context.Background(), execCtx, // todo context传的不对
func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error) {
ret, err := conn.Exec(query, args)
if err != nil {
Expand All @@ -132,7 +133,7 @@ func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error) {
// ExecContext
func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
targetConn, ok := c.targetConn.(driver.ExecerContext)
if ok {
if !ok {
values := make([]driver.Value, 0, len(args))

for i := range args {
Expand All @@ -153,6 +154,7 @@ func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.Name
Query: query,
NamedValues: args,
Conn: c.targetConn,
DBName: c.dbName,
}

ret, err := executor.ExecWithNamedValue(ctx, execCtx,
Expand Down
6 changes: 4 additions & 2 deletions pkg/datasource/sql/conn_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *ATConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
c.txCtx.DBType = c.res.dbType
c.txCtx.TxOpt = opts

if IsGlobalTx(ctx) {
if tm.IsGlobalTx(ctx) {
c.txCtx.XaID = tm.GetXID(ctx)
c.txCtx.TransType = types.ATMode
}
Expand All @@ -85,13 +85,15 @@ func (c *ATConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
}

func (c *ATConn) createOnceTxContext(ctx context.Context) bool {
onceTx := IsGlobalTx(ctx) && c.autoCommit
onceTx := tm.IsGlobalTx(ctx) && c.autoCommit

if onceTx {
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.XaID = tm.GetXID(ctx)
c.txCtx.XID = tm.GetXID(ctx)
c.txCtx.TransType = types.ATMode
c.txCtx.GlobalLockRequire = true
}

return onceTx
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/conn_at_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func initAtConnTestResource(t *testing.T) (*gomock.Controller, *sql.DB, *mockSQL
mockMgr := initMockResourceManager(t, ctrl)
_ = mockMgr

db, err := sql.Open("seata-at-mysql", "root:seata_go@tcp(127.0.0.1:3306)/seata_go_test?multiStatements=true")
db, err := sql.Open(SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true")
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/conn_xa.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *XAConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
c.txCtx.DBType = c.res.dbType
c.txCtx.TxOpt = opts

if IsGlobalTx(ctx) {
if tm.IsGlobalTx(ctx) {
c.txCtx.TransType = types.XAMode
c.txCtx.XaID = tm.GetXID(ctx)
}
Expand All @@ -86,7 +86,7 @@ func (c *XAConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
}

func (c *XAConn) createOnceTxContext(ctx context.Context) bool {
onceTx := IsGlobalTx(ctx) && c.autoCommit
onceTx := tm.IsGlobalTx(ctx) && c.autoCommit

if onceTx {
c.txCtx = types.NewTxCtx()
Expand Down
4 changes: 4 additions & 0 deletions pkg/datasource/sql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"database/sql/driver"
"sync"

"github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/datasource/sql/types"
)

Expand Down Expand Up @@ -92,6 +94,7 @@ type seataConnector struct {
once sync.Once
driver driver.Driver
target driver.Connector
cfg *mysql.Config
}

// Connect returns a connection to the database.
Expand All @@ -118,6 +121,7 @@ func (c *seataConnector) Connect(ctx context.Context) (driver.Conn, error) {
res: c.res,
txCtx: types.NewTxCtx(),
autoCommit: true,
dbName: c.cfg.DBName,
}, nil
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/datasource/sql/datasource/base/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package base

import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"sync"
"time"
Expand All @@ -30,7 +30,7 @@ import (
type (
// trigger
trigger interface {
LoadOne(ctx context.Context, dbName string, table string, conn *sql.Conn) (*types.TableMeta, error)
LoadOne(ctx context.Context, dbName string, table string, conn driver.Conn) (*types.TableMeta, error)

LoadAll() ([]types.TableMeta, error)
}
Expand All @@ -47,14 +47,13 @@ type BaseTableMetaCache struct {
expireDuration time.Duration
capity int32
size int32
dbName string
cache map[string]*entry
cancel context.CancelFunc
trigger trigger
}

// NewBaseCache
func NewBaseCache(capity int32, dbName string, expireDuration time.Duration, trigger trigger) *BaseTableMetaCache {
func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger) *BaseTableMetaCache {
ctx, cancel := context.WithCancel(context.Background())

c := &BaseTableMetaCache{
Expand All @@ -63,7 +62,6 @@ func NewBaseCache(capity int32, dbName string, expireDuration time.Duration, tri
size: 0,
expireDuration: expireDuration,
cache: map[string]*entry{},
dbName: dbName,
cancel: cancel,
trigger: trigger,
}
Expand Down Expand Up @@ -136,13 +134,13 @@ func (c *BaseTableMetaCache) scanExpire(ctx context.Context) {
}

// GetTableMeta
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, tableName string, conn *sql.Conn) (types.TableMeta, error) {
func (c *BaseTableMetaCache) GetTableMeta(ctx context.Context, dbName, tableName string, conn driver.Conn) (types.TableMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()

v, ok := c.cache[tableName]
if !ok {
meta, err := c.trigger.LoadOne(ctx, c.dbName, tableName, conn)
meta, err := c.trigger.LoadOne(ctx, dbName, tableName, conn)
if err != nil {
return types.TableMeta{}, err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/datasource/sql/datasource/datasource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package datasource
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"sync"

Expand Down Expand Up @@ -61,6 +62,7 @@ func GetDataSourceManager(b branch.BranchType) DataSourceManager {
return nil
}

// todo implements ResourceManagerOutbound interface
// DataSourceManager
type DataSourceManager interface {
// Register a Resource to be managed by Resource Manager
Expand Down Expand Up @@ -176,7 +178,7 @@ type TableMetaCache interface {
// Init
Init(ctx context.Context, conn *sql.DB) error
// GetTableMeta
GetTableMeta(ctx context.Context, table string, conn *sql.Conn) (*types.TableMeta, error)
GetTableMeta(ctx context.Context, dbName, table string, conn driver.Conn) (*types.TableMeta, error)
// Destroy
Destroy() error
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/datasource/mysql/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ import (
// todo
func init() {
datasource.RegisterTableCache(types.DBTypeMySQL, func() datasource.TableMetaCache {
return &tableMetaCache{}
return &TableMetaCache{}
})
}
20 changes: 10 additions & 10 deletions pkg/datasource/sql/datasource/mysql/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package mysql
import (
"context"
"database/sql"
"database/sql/driver"
"sync"
"time"

Expand All @@ -32,38 +33,37 @@ import (
var (
capacity int32 = 1024
EexpireTime = 15 * time.Minute
tableMetaInstance *tableMetaCache
tableMetaInstance *TableMetaCache
tableMetaOnce sync.Once
DBName = "seata"
)

type tableMetaCache struct {
type TableMetaCache struct {
tableMetaCache *base.BaseTableMetaCache
}

func GetTableMetaInstance() *tableMetaCache {
func GetTableMetaInstance() *TableMetaCache {
// Todo constant.DBName get from config
tableMetaOnce.Do(func() {
tableMetaInstance = &tableMetaCache{
tableMetaCache: base.NewBaseCache(capacity, DBName, EexpireTime, NewMysqlTrigger()),
tableMetaInstance = &TableMetaCache{
tableMetaCache: base.NewBaseCache(capacity, EexpireTime, NewMysqlTrigger()),
}
})

return tableMetaInstance
}

// Init
func (c *tableMetaCache) Init(ctx context.Context, conn *sql.DB) error {
func (c *TableMetaCache) Init(ctx context.Context, conn *sql.DB) error {
return nil
}

// GetTableMeta get table info from cache or information schema
func (c *tableMetaCache) GetTableMeta(ctx context.Context, tableName string, conn *sql.Conn) (*types.TableMeta, error) {
func (c *TableMetaCache) GetTableMeta(ctx context.Context, dbName, tableName string, conn driver.Conn) (*types.TableMeta, error) {
if tableName == "" {
return nil, errors.New("TableMeta cannot be fetched without tableName")
}

tableMeta, err := c.tableMetaCache.GetTableMeta(ctx, tableName, conn)
tableMeta, err := c.tableMetaCache.GetTableMeta(ctx, dbName, tableName, conn)
if err != nil {
return nil, err
}
Expand All @@ -72,6 +72,6 @@ func (c *tableMetaCache) GetTableMeta(ctx context.Context, tableName string, con
}

// Destroy
func (c *tableMetaCache) Destroy() error {
func (c *TableMetaCache) Destroy() error {
return nil
}
3 changes: 1 addition & 2 deletions pkg/datasource/sql/datasource/mysql/meta_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ func TestGetTableMeta(t *testing.T) {
defer db.Close()

ctx := context.Background()
conn, _ := db.Conn(ctx)

tableMeta, err := metaInstance.GetTableMeta(ctx, "undo_log", conn)
tableMeta, err := metaInstance.GetTableMeta(ctx, "seata_client", "undo_log", nil)
assert.NilError(t, err)

t.Logf("%+v", tableMeta)
Expand Down
Loading