Skip to content

Commit

Permalink
bugfix apache#704
Browse files Browse the repository at this point in the history
  • Loading branch information
AsterZephyr committed Dec 2, 2024
1 parent abdd938 commit 4debbde
Showing 1 changed file with 0 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,13 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a
if err != nil {
return "", nil, err
}

paramMap, err := u.buildImageParameters(insertStmt, args, insertRows)
if err != nil {
return "", nil, err
}

// 如果没有参数或没有主键索引,直接返回空
if len(paramMap) == 0 || len(metaData.Indexs) == 0 {
return "", nil, nil
}

// 检查是否有主键
hasPK := false
for _, index := range metaData.Indexs {
if strings.EqualFold("PRIMARY", index.Name) {
Expand All @@ -132,29 +127,24 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a
if !hasPK {
return "", nil, nil
}

var sql strings.Builder
sql.WriteString("SELECT * FROM " + metaData.TableName + " ")

var selectArgs []driver.Value
isContainWhere := false
hasConditions := false

for i := 0; i < len(insertRows); i++ {
var rowConditions []string
var rowArgs []driver.Value
usedParams := make(map[string]bool)

// First try unique indexes
for _, index := range metaData.Indexs {
if index.NonUnique || strings.EqualFold("PRIMARY", index.Name) {
continue
}

if !isIndexValueNotNull(index, paramMap, i) {
continue
}

var indexConditions []string
var indexArgs []driver.Value
allColumnsPresent := true
Expand All @@ -171,20 +161,17 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a
break
}
}

if allColumnsPresent && len(indexConditions) > 0 {
rowConditions = append(rowConditions, "("+strings.Join(indexConditions, " and ")+")")
rowArgs = append(rowArgs, indexArgs...)
hasConditions = true
}
}

// Then try primary key
for _, index := range metaData.Indexs {
if !strings.EqualFold("PRIMARY", index.Name) {
continue
}

var pkConditions []string
var pkArgs []driver.Value
for _, colMeta := range index.Columns {
Expand All @@ -195,14 +182,12 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a
pkArgs = append(pkArgs, params[i])
}
}

if len(pkConditions) > 0 {
rowConditions = append(rowConditions, "("+strings.Join(pkConditions, " and ")+")")
rowArgs = append(rowArgs, pkArgs...)
hasConditions = true
}
}

if len(rowConditions) > 0 {
if !isContainWhere {
sql.WriteString("WHERE ")
Expand All @@ -219,11 +204,9 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildBeforeImageSQL(insertStmt *a
selectArgs = append(selectArgs, rowArgs...)
}
}

if !hasConditions {
return "", nil, nil
}

sqlStr := sql.String()
log.Infof("build select sql by insert on duplicate sourceQuery, sql: %s", sqlStr)
return sqlStr, selectArgs, nil
Expand All @@ -237,38 +220,29 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) AfterImage(ctx context.Context, e
return nil, err
}
defer stmt.Close()

tableName := execCtx.ParseContext.InsertStmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.O
metaData := execCtx.MetaDataMap[tableName]

rows, err := stmt.Query(selectArgs)
if err != nil {
return nil, err
}
defer rows.Close()

image, err := u.buildRecordImages(rows, &metaData)
if err != nil {
return nil, err
}

return []*types.RecordImage{image}, nil
}

func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Context, beforeImages []*types.RecordImage) (string, []driver.Value) {
selectSQL, selectArgs := u.BeforeSelectSql, u.Args

var beforeImage *types.RecordImage
if len(beforeImages) > 0 {
beforeImage = beforeImages[0]
}

// 如果没有before image,直接返回原始SQL和参数
if beforeImage == nil || len(beforeImage.Rows) == 0 {
return selectSQL, selectArgs
}

// 收集主键值
primaryValueMap := make(map[string][]interface{})
for _, row := range beforeImage.Rows {
for _, col := range row.Columns {
Expand All @@ -277,19 +251,13 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Co
}
}
}

var afterImageSql strings.Builder
afterImageSql.WriteString(selectSQL)

// 如果原始SQL已经包含了所有需要的条件,直接返回
if len(primaryValueMap) == 0 || len(selectArgs) == len(beforeImage.Rows)*len(primaryValueMap) {
return selectSQL, selectArgs
}

// 添加主键条件
var primaryValues []driver.Value
usedPrimaryKeys := make(map[string]bool)

for name := range primaryValueMap {
if !u.BeforeImageSqlPrimaryKeys[name] {
usedPrimaryKeys[name] = true
Expand All @@ -304,15 +272,12 @@ func (u *MySQLInsertOnDuplicateUndoLogBuilder) buildAfterImageSQL(ctx context.Co
}
}
}

if len(primaryValues) > 0 {
afterImageSql.WriteString(" OR (" + strings.Join(u.buildPrimaryKeyConditions(primaryValueMap, usedPrimaryKeys), " and ") + ") ")
}

finalArgs := make([]driver.Value, len(selectArgs)+len(primaryValues))
copy(finalArgs, selectArgs)
copy(finalArgs[len(selectArgs):], primaryValues)

sqlStr := afterImageSql.String()
log.Infof("build after select sql by insert on duplicate sourceQuery, sql %s", sqlStr)
return sqlStr, finalArgs
Expand Down

0 comments on commit 4debbde

Please sign in to comment.