Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#55477
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
joechenrh authored and ti-chi-bot committed Dec 6, 2024
1 parent 9f4dec9 commit 1eca5b3
Show file tree
Hide file tree
Showing 23 changed files with 11,274 additions and 0 deletions.
5 changes: 5 additions & 0 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,13 @@ const (
`

insertIntoConflictErrorData = `
<<<<<<< HEAD:br/pkg/lightning/errormanager/errormanager.go
INSERT INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row)
=======
INSERT IGNORE INTO %s.` + ConflictErrorTableName + `
(task_id, table_name, index_name, key_data, row_data, raw_key, raw_value, raw_handle, raw_row, kv_type)
>>>>>>> 91beef4bb14 (*: disable insert null to not-null column for single-row insertion in non-strict mode (#55477)):pkg/lightning/errormanager/errormanager.go
VALUES
`

Expand Down
37 changes: 37 additions & 0 deletions br/pkg/lightning/errormanager/errormanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func TestReplaceConflictKeys(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
mockDB.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_task_info`\\.conflict_error_v1.*").
WillReturnResult(sqlmock.NewResult(2, 1))
<<<<<<< HEAD:br/pkg/lightning/errormanager/errormanager_test.go
mockDB.ExpectQuery("\\QSELECT raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v1 WHERE table_name = ? AND index_name <> 'PRIMARY' ORDER BY raw_key\\E").
WillReturnRows(sqlmock.NewRows([]string{"raw_key", "index_name", "raw_value", "raw_handle"}).
AddRow(data1IndexKey, "uni_b", data1IndexValue, data1RowKey).
Expand All @@ -492,6 +493,42 @@ func TestReplaceConflictKeys(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"raw_key", "raw_value", "raw_handle"}).
AddRow(data1RowKey, data1RowValue, data1RowKey).
AddRow(data1RowKey, data3RowValue, data1RowKey))
=======
mockDB.ExpectExec("CREATE OR REPLACE VIEW `lightning_task_info`\\.conflict_view.*").
WillReturnResult(sqlmock.NewResult(3, 1))
mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E").
WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}).
AddRow(1, data1IndexKey, "uni_b", data1IndexValue, data1RowKey).
AddRow(2, data1IndexKey, "uni_b", data2IndexValue, data2RowKey).
AddRow(3, data3IndexKey, "uni_b", data3IndexValue, data3RowKey).
AddRow(4, data3IndexKey, "uni_b", data4IndexValue, data4RowKey))
mockDB.ExpectBegin()
mockDB.ExpectExec("INSERT IGNORE INTO `lightning_task_info`\\.conflict_error_v3.*").
WithArgs(0, "test", nil, nil, data2RowKey, data2RowValue, 2,
0, "test", nil, nil, data4RowKey, data4RowValue, 2).
WillReturnResult(driver.ResultNoRows)
mockDB.ExpectCommit()
for i := 0; i < 2; i++ {
mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, index_name, raw_value, raw_handle FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type = 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E").
WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "index_name", "raw_value", "raw_handle"}))
}
mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E").
WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}).
AddRow(1, data1RowKey, data1RowValue).
AddRow(2, data1RowKey, data3RowValue))
for i := 0; i < 2; i++ {
mockDB.ExpectQuery("\\QSELECT _tidb_rowid, raw_key, raw_value FROM `lightning_task_info`.conflict_error_v3 WHERE table_name = ? AND kv_type <> 0 AND _tidb_rowid >= ? and _tidb_rowid < ? ORDER BY _tidb_rowid LIMIT ?\\E").
WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid", "raw_key", "raw_value"}))
}
mockDB.ExpectBegin()
mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*").
WillReturnResult(sqlmock.NewResult(0, 2))
mockDB.ExpectCommit()
mockDB.ExpectBegin()
mockDB.ExpectExec("DELETE FROM `lightning_task_info`\\.conflict_error_v3.*").
WillReturnResult(sqlmock.NewResult(0, 0))
mockDB.ExpectCommit()
>>>>>>> 91beef4bb14 (*: disable insert null to not-null column for single-row insertion in non-strict mode (#55477)):pkg/lightning/errormanager/errormanager_test.go

cfg := config.NewConfig()
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgReplace
Expand Down
161 changes: 161 additions & 0 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,169 @@ func newContext(store kv.Storage) sessionctx.Context {

const defaultWaitReorgTimeout = 10 * time.Second

<<<<<<< HEAD
// ReorgWaitTimeout is the timeout that wait ddl in write reorganization stage.
var ReorgWaitTimeout = 5 * time.Second
=======
ctx := newReorgExprCtx()
evalCtx := ctx.GetStaticEvalCtx().Apply(
exprstatic.WithSQLMode(reorgMeta.SQLMode),
exprstatic.WithLocation(loc),
exprstatic.WithTypeFlags(reorgTypeFlagsWithSQLMode(reorgMeta.SQLMode)),
exprstatic.WithErrLevelMap(reorgErrLevelsWithSQLMode(reorgMeta.SQLMode)),
exprstatic.WithWarnHandler(warnHandler),
)
return ctx.Apply(exprstatic.WithEvalCtx(evalCtx)), nil
}

// reorgTableMutateContext implements table.MutateContext for reorganization.
type reorgTableMutateContext struct {
exprCtx exprctx.ExprContext
encodingConfig tblctx.RowEncodingConfig
mutateBuffers *tblctx.MutateBuffers
shardID *variable.RowIDShardGenerator
reservedRowIDAlloc stmtctx.ReservedRowIDAlloc
}

// AlternativeAllocators implements table.MutateContext.AlternativeAllocators.
func (*reorgTableMutateContext) AlternativeAllocators(*model.TableInfo) (autoid.Allocators, bool) {
// No alternative allocators for all tables because temporary tables
// are not supported (temporary tables do not have any data in TiKV) in reorganization.
return autoid.Allocators{}, false
}

// GetExprCtx implements table.MutateContext.GetExprCtx.
func (ctx *reorgTableMutateContext) GetExprCtx() exprctx.ExprContext {
return ctx.exprCtx
}

// ConnectionID implements table.MutateContext.ConnectionID.
func (*reorgTableMutateContext) ConnectionID() uint64 {
return 0
}

// InRestrictedSQL implements table.MutateContext.InRestrictedSQL.
func (*reorgTableMutateContext) InRestrictedSQL() bool {
return false
}

// TxnAssertionLevel implements table.MutateContext.TxnAssertionLevel.
func (*reorgTableMutateContext) TxnAssertionLevel() variable.AssertionLevel {
// Because only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method,
// we can just return `AssertionLevelOff`.
return variable.AssertionLevelOff
}

// EnableMutationChecker implements table.MutateContext.EnableMutationChecker.
func (*reorgTableMutateContext) EnableMutationChecker() bool {
// Because only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method,
// we can just return false.
return false
}

// GetRowEncodingConfig implements table.MutateContext.GetRowEncodingConfig.
func (ctx *reorgTableMutateContext) GetRowEncodingConfig() tblctx.RowEncodingConfig {
return ctx.encodingConfig
}

// GetMutateBuffers implements table.MutateContext.GetMutateBuffers.
func (ctx *reorgTableMutateContext) GetMutateBuffers() *tblctx.MutateBuffers {
return ctx.mutateBuffers
}

// GetRowIDShardGenerator implements table.MutateContext.GetRowIDShardGenerator.
func (ctx *reorgTableMutateContext) GetRowIDShardGenerator() *variable.RowIDShardGenerator {
return ctx.shardID
}

// GetReservedRowIDAlloc implements table.MutateContext.GetReservedRowIDAlloc.
func (ctx *reorgTableMutateContext) GetReservedRowIDAlloc() (*stmtctx.ReservedRowIDAlloc, bool) {
return &ctx.reservedRowIDAlloc, true
}

// GetStatisticsSupport implements table.MutateContext.GetStatisticsSupport.
func (*reorgTableMutateContext) GetStatisticsSupport() (tblctx.StatisticsSupport, bool) {
// We can just return `(nil, false)` because:
// - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method.
// - DDL reorg do need to collect statistics in this way.
return nil, false
}

// GetCachedTableSupport implements table.MutateContext.GetCachedTableSupport.
func (*reorgTableMutateContext) GetCachedTableSupport() (tblctx.CachedTableSupport, bool) {
// We can just return `(nil, false)` because:
// - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method.
// - It is not allowed to execute DDL on a cached table.
return nil, false
}

// GetTemporaryTableSupport implements table.MutateContext.GetTemporaryTableSupport.
func (*reorgTableMutateContext) GetTemporaryTableSupport() (tblctx.TemporaryTableSupport, bool) {
// We can just return `(nil, false)` because:
// - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method.
// - Temporary tables do not have any data in TiKV.
return nil, false
}

// GetExchangePartitionDMLSupport implements table.MutateContext.GetExchangePartitionDMLSupport.
func (*reorgTableMutateContext) GetExchangePartitionDMLSupport() (tblctx.ExchangePartitionDMLSupport, bool) {
// We can just return `(nil, false)` because:
// - Only `index.Create` and `index.Delete` are invoked in reorganization which does not use this method.
return nil, false
}

// newReorgTableMutateContext creates a new table.MutateContext for reorganization.
func newReorgTableMutateContext(exprCtx exprctx.ExprContext) table.MutateContext {
rowEncoder := &rowcodec.Encoder{
Enable: variable.GetDDLReorgRowFormat() != variable.DefTiDBRowFormatV1,
}

encodingConfig := tblctx.RowEncodingConfig{
IsRowLevelChecksumEnabled: rowEncoder.Enable,
RowEncoder: rowEncoder,
}

return &reorgTableMutateContext{
exprCtx: exprCtx,
encodingConfig: encodingConfig,
mutateBuffers: tblctx.NewMutateBuffers(&variable.WriteStmtBufs{}),
// Though currently, `RowIDShardGenerator` is not required in DDL reorg,
// we still provide a valid one to keep the context complete and to avoid panic if it is used in the future.
shardID: variable.NewRowIDShardGenerator(
rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404
variable.DefTiDBShardAllocateStep,
),
}
}

func reorgTypeFlagsWithSQLMode(mode mysql.SQLMode) types.Flags {
return types.StrictFlags.
WithTruncateAsWarning(!mode.HasStrictMode()).
WithIgnoreInvalidDateErr(mode.HasAllowInvalidDatesMode()).
WithIgnoreZeroInDate(!mode.HasStrictMode() || mode.HasAllowInvalidDatesMode()).
WithCastTimeToYearThroughConcat(true)
}

func reorgErrLevelsWithSQLMode(mode mysql.SQLMode) errctx.LevelMap {
return errctx.LevelMap{
errctx.ErrGroupTruncate: errctx.ResolveErrLevel(false, !mode.HasStrictMode()),
errctx.ErrGroupBadNull: errctx.ResolveErrLevel(false, !mode.HasStrictMode()),
errctx.ErrGroupNoDefault: errctx.ResolveErrLevel(false, !mode.HasStrictMode()),
errctx.ErrGroupDividedByZero: errctx.ResolveErrLevel(
!mode.HasErrorForDivisionByZeroMode(),
!mode.HasStrictMode(),
),
}
}

func reorgTimeZoneWithTzLoc(tzLoc *model.TimeZoneLocation) (*time.Location, error) {
if tzLoc == nil {
// It is set to SystemLocation to be compatible with nil LocationInfo.
return timeutil.SystemLocation(), nil
}
return tzLoc.GetLocation()
}
>>>>>>> 91beef4bb14 (*: disable insert null to not-null column for single-row insertion in non-strict mode (#55477))

func (rc *reorgCtx) notifyJobState(state model.JobState) {
atomic.StoreInt32((*int32)(&rc.jobState), int32(state))
Expand Down
Loading

0 comments on commit 1eca5b3

Please sign in to comment.