Skip to content

Commit

Permalink
optimize at base executor (#394)
Browse files Browse the repository at this point in the history
optimize at executor
  • Loading branch information
luky116 authored Dec 10, 2022
1 parent c3cd13b commit 334879e
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 180 deletions.
14 changes: 1 addition & 13 deletions pkg/datasource/sql/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package sql

import (
"reflect"
"testing"

"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/exec/xa"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/util/reflectx"
"github.com/stretchr/testify/assert"
)

Expand All @@ -40,17 +38,7 @@ func TestConn_BuildXAExecutor(t *testing.T) {
executor, err := exec.BuildExecutor(types.DBTypeMySQL, types.XAMode, "SELECT * FROM user")

assert.NoError(t, err)
val, ok := executor.(*exec.BaseExecutor)
assert.True(t, ok, "need base executor")

v := reflect.ValueOf(val)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
field := v.FieldByName("ex")

fieldVal := reflectx.GetUnexportedField(field)

_, ok = fieldVal.(*xa.XAExecutor)
_, ok := executor.(*xa.XAExecutor)
assert.True(t, ok, "need xa executor")
}
165 changes: 146 additions & 19 deletions pkg/datasource/sql/exec/at/executor_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,182 @@ package at

import (
"context"
"fmt"

"github.com/mitchellh/copystructure"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/datasource/sql/parser"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/tm"

"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

type ATExecutor struct {
is []exec.SQLHook
ex exec.SQLExecutor
hooks []exec.SQLHook
ex exec.SQLExecutor
}

// Interceptors
func (e *ATExecutor) Interceptors(interceptors []exec.SQLHook) {
e.is = interceptors
func (e *ATExecutor) Interceptors(hooks []exec.SQLHook) {
e.hooks = hooks
}

// ExecWithNamedValue
func (e *ATExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.ExecContext, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
for i := range e.is {
e.is[i].Before(ctx, execCtx)
for _, hook := range e.hooks {
hook.Before(ctx, execCtx)
}

var (
beforeImages []*types.RecordImage
afterImages []*types.RecordImage
result types.ExecResult
err error
)

beforeImages, err = e.beforeImage(ctx, execCtx)
if err != nil {
return nil, err
}
if beforeImages != nil {
beforeImagesTmp, err := copystructure.Copy(beforeImages)
if err != nil {
return nil, err
}
newBeforeImages, ok := beforeImagesTmp.([]*types.RecordImage)
if !ok {
return nil, errors.New("copy beforeImages failed")
}
execCtx.TxCtx.RoundImages.AppendBeofreImages(newBeforeImages)
}

defer func() {
for i := range e.is {
e.is[i].After(ctx, execCtx)
for _, hook := range e.hooks {
hook.After(ctx, execCtx)
}
}()

if e.ex != nil {
return e.ex.ExecWithNamedValue(ctx, execCtx, f)
result, err = e.ex.ExecWithNamedValue(ctx, execCtx, f)
} else {
result, err = f(ctx, execCtx.Query, execCtx.NamedValues)
}

if err != nil {
return nil, err
}

return f(ctx, execCtx.Query, execCtx.NamedValues)
afterImages, err = e.afterImage(ctx, execCtx, beforeImages)
if err != nil {
return nil, err
}
if afterImages != nil {
execCtx.TxCtx.RoundImages.AppendAfterImages(afterImages)
}

return result, err
}

func (e *ATExecutor) prepareUndoLog(ctx context.Context, execCtx *types.ExecContext) error {
if execCtx.TxCtx.RoundImages.IsEmpty() {
return nil
}

if execCtx.ParseContext.UpdateStmt != nil {
if !execCtx.TxCtx.RoundImages.IsBeforeAfterSizeEq() {
return fmt.Errorf("Before image size is not equaled to after image size, probably because you updated the primary keys.")
}
}
undoLogManager, err := undo.GetUndoLogManager(execCtx.DBType)
if err != nil {
return err
}
return undoLogManager.FlushUndoLog(execCtx.TxCtx, execCtx.Conn)
}

// ExecWithValue
func (e *ATExecutor) ExecWithValue(ctx context.Context, execCtx *types.ExecContext, f exec.CallbackWithValue) (types.ExecResult, error) {
for i := range e.is {
e.is[i].Before(ctx, execCtx)
for _, hook := range e.hooks {
hook.Before(ctx, execCtx)
}

var (
beforeImages []*types.RecordImage
afterImages []*types.RecordImage
result types.ExecResult
err error
)

beforeImages, err = e.beforeImage(ctx, execCtx)
if err != nil {
return nil, err
}
if beforeImages != nil {
execCtx.TxCtx.RoundImages.AppendBeofreImages(beforeImages)
}

defer func() {
for i := range e.is {
e.is[i].After(ctx, execCtx)
for _, hook := range e.hooks {
hook.After(ctx, execCtx)
}
}()

if e.ex != nil {
return e.ex.ExecWithValue(ctx, execCtx, f)
result, err = e.ex.ExecWithValue(ctx, execCtx, f)
} else {
result, err = f(ctx, execCtx.Query, execCtx.Values)
}
if err != nil {
return nil, err
}

afterImages, err = e.afterImage(ctx, execCtx, beforeImages)
if err != nil {
return nil, err
}
if afterImages != nil {
execCtx.TxCtx.RoundImages.AppendAfterImages(afterImages)
}

return result, err
}

func (e *ATExecutor) beforeImage(ctx context.Context, execCtx *types.ExecContext) ([]*types.RecordImage, error) {
if !tm.IsGlobalTx(ctx) {
return nil, nil
}

return f(ctx, execCtx.Query, execCtx.Values)
pc, err := parser.DoParser(execCtx.Query)
if err != nil {
return nil, err
}
if !pc.HasValidStmt() {
return nil, nil
}
execCtx.ParseContext = pc

builder := undo.GetUndologBuilder(pc.ExecutorType)
if builder == nil {
return nil, nil
}
return builder.BeforeImage(ctx, execCtx)
}

// After
func (e *ATExecutor) afterImage(ctx context.Context, execCtx *types.ExecContext, beforeImages []*types.RecordImage) ([]*types.RecordImage, error) {
if !tm.IsGlobalTx(ctx) {
return nil, nil
}
pc, err := parser.DoParser(execCtx.Query)
if err != nil {
return nil, err
}
if !pc.HasValidStmt() {
return nil, nil
}
execCtx.ParseContext = pc
builder := undo.GetUndologBuilder(pc.ExecutorType)
if builder == nil {
return nil, nil
}
return builder.AfterImage(ctx, execCtx, beforeImages)
}
Loading

0 comments on commit 334879e

Please sign in to comment.