From 0848713200babcfa1ecb7c64ba9eba4068065f1e Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Fri, 5 Feb 2021 18:05:45 +0800 Subject: [PATCH 1/3] cherry pick #22729 to release-4.0 Signed-off-by: ti-srebot --- ddl/delete_range.go | 41 ++++-- ddl/partition.go | 343 ++++++++++++++++++++++++++++++++++++++++++++ ddl/reorg.go | 8 +- 3 files changed, 382 insertions(+), 10 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 45059716a824d..4cfc68a80a2cc 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -16,8 +16,8 @@ package ddl import ( "context" "encoding/hex" - "fmt" "math" + "strings" "sync" "sync/atomic" @@ -35,7 +35,7 @@ import ( const ( insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES ` - insertDeleteRangeSQLValue = `("%d", "%d", "%s", "%s", "%d")` + insertDeleteRangeSQLValue = `(%?, %?, %?, %?, %?)` insertDeleteRangeSQL = insertDeleteRangeSQLPrefix + insertDeleteRangeSQLValue delBatchSize = 65536 @@ -346,29 +346,54 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error return nil } +<<<<<<< HEAD +======= +func doBatchDeleteIndiceRange(s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error { + logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs)) + paramsList := make([]interface{}, 0, len(indexIDs)*5) + var buf strings.Builder + buf.WriteString(insertDeleteRangeSQLPrefix) + for i, indexID := range indexIDs { + startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) + endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) + startKeyEncoded := hex.EncodeToString(startKey) + endKeyEncoded := hex.EncodeToString(endKey) + buf.WriteString(insertDeleteRangeSQLValue) + if i != len(indexIDs)-1 { + buf.WriteString(",") + } + paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts) + } + _, err := s.ExecuteInternal(context.Background(), buf.String(), paramsList...) + return errors.Trace(err) +} + +>>>>>>> dedaabb80... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (2) (#22729) func doInsert(s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error { logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID)) startKeyEncoded := hex.EncodeToString(startKey) endKeyEncoded := hex.EncodeToString(endKey) - sql := fmt.Sprintf(insertDeleteRangeSQL, jobID, elementID, startKeyEncoded, endKeyEncoded, ts) - _, err := s.Execute(context.Background(), sql) + _, err := s.ExecuteInternal(context.Background(), insertDeleteRangeSQL, jobID, elementID, startKeyEncoded, endKeyEncoded, ts) return errors.Trace(err) } func doBatchInsert(s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64) error { logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", tableIDs)) - sql := insertDeleteRangeSQLPrefix + var buf strings.Builder + buf.WriteString(insertDeleteRangeSQLPrefix) + paramsList := make([]interface{}, 0, len(tableIDs)*5) for i, tableID := range tableIDs { startKey := tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) startKeyEncoded := hex.EncodeToString(startKey) endKeyEncoded := hex.EncodeToString(endKey) - sql += fmt.Sprintf(insertDeleteRangeSQLValue, jobID, tableID, startKeyEncoded, endKeyEncoded, ts) + buf.WriteString(insertDeleteRangeSQLValue) if i != len(tableIDs)-1 { - sql += "," + buf.WriteString(",") } + paramsList = append(paramsList, jobID, tableID, startKeyEncoded, endKeyEncoded, ts) } - _, err := s.Execute(context.Background(), sql) + _, err := s.ExecuteInternal(context.Background(), buf.String(), paramsList...) return errors.Trace(err) } diff --git a/ddl/partition.go b/ddl/partition.go index 7ea5815036b18..243a0bbbfc3f2 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -940,6 +940,349 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, nil } +<<<<<<< HEAD +======= +// onExchangeTablePartition exchange partition data +func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { + var ( + // defID only for updateSchemaVersion + defID int64 + ptSchemaID int64 + ptID int64 + partName string + withValidation bool + ) + + if err := job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + ntDbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + nt, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + + pt, err := getTableInfo(t, ptID, ptSchemaID) + if err != nil { + if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { + job.State = model.JobStateCancelled + } + return ver, errors.Trace(err) + } + + if pt.State != model.StatePublic { + job.State = model.JobStateCancelled + return ver, ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) + } + + err = checkExchangePartition(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = checkTableDefCompatible(pt, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + index, _, err := getPartitionDef(pt, partName) + if err != nil { + return ver, errors.Trace(err) + } + + if withValidation { + err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + + // partition table base auto id + ptBaseID, err := t.GetAutoTableID(ptSchemaID, pt.ID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + ptRandID, err := t.GetAutoRandomID(ptSchemaID, pt.ID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // non-partition table base auto id + ntBaseID, err := t.GetAutoTableID(job.SchemaID, nt.ID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + ntRandID, err := t.GetAutoRandomID(job.SchemaID, nt.ID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + _, partDef, err := getPartitionDef(pt, partName) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + if pt.TiFlashReplica != nil { + for i, id := range pt.TiFlashReplica.AvailablePartitionIDs { + if id == partDef.ID { + pt.TiFlashReplica.AvailablePartitionIDs[i] = nt.ID + break + } + } + } + + // exchange table meta id + partDef.ID, nt.ID = nt.ID, partDef.ID + + err = t.UpdateTable(ptSchemaID, pt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { + if val.(bool) { + job.State = model.JobStateCancelled + failpoint.Return(ver, errors.New("occur an error after updating partition id")) + } + }) + + // recreate non-partition table meta info + err = t.DropTableOrView(job.SchemaID, partDef.ID, true) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + err = t.CreateTableOrView(job.SchemaID, nt) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // both pt and nt set the maximum auto_id between ntBaseID and ptBaseID + if ntBaseID > ptBaseID { + _, err = t.GenAutoTableID(ptSchemaID, pt.ID, ntBaseID-ptBaseID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + + _, err = t.GenAutoTableID(job.SchemaID, nt.ID, mathutil.MaxInt64(ptBaseID, ntBaseID)) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + if ntRandID != 0 || ptRandID != 0 { + if ntRandID > ptRandID { + _, err = t.GenAutoRandomID(ptSchemaID, pt.ID, ntRandID-ptRandID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + + _, err = t.GenAutoRandomID(job.SchemaID, nt.ID, mathutil.MaxInt64(ptRandID, ntRandID)) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + } + + // the follow code is a swap function for rules of two partitions + // though partitions has exchanged their ID, swap still take effect + if d.infoHandle != nil && d.infoHandle.IsValid() { + bundles := make([]*placement.Bundle, 0, 2) + ptBundle, ptOK := d.infoHandle.Get().BundleByName(placement.GroupID(partDef.ID)) + ptOK = ptOK && !ptBundle.IsEmpty() + ntBundle, ntOK := d.infoHandle.Get().BundleByName(placement.GroupID(nt.ID)) + ntOK = ntOK && !ntBundle.IsEmpty() + if ptOK && ntOK { + bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) + } else if ptOK { + bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) + } else if ntOK { + bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID)) + bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) + } + err = infosync.PutRuleBundles(nil, bundles) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + } + } + + ver, err = updateSchemaVersion(t, job) + if err != nil { + return ver, errors.Trace(err) + } + + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, pt) + return ver, nil +} + +func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, index int, schemaName, tableName model.CIStr) error { + var sql string + var paramList []interface{} + + pi := pt.Partition + + switch pi.Type { + case model.PartitionTypeHash: + if pi.Num == 1 { + return nil + } + var buf strings.Builder + buf.WriteString("select 1 from %n.%n where mod(") + buf.WriteString(pi.Expr) + buf.WriteString(", %?) != %? limit 1") + sql = buf.String() + paramList = append(paramList, schemaName.L, tableName.L, pi.Num, index) + case model.PartitionTypeRange: + // Table has only one partition and has the maximum value + if len(pi.Definitions) == 1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { + return nil + } + // For range expression and range columns + if len(pi.Columns) == 0 { + sql, paramList = buildCheckSQLForRangeExprPartition(pi, index, schemaName, tableName) + } else if len(pi.Columns) == 1 { + sql, paramList = buildCheckSQLForRangeColumnsPartition(pi, index, schemaName, tableName) + } + case model.PartitionTypeList: + if len(pi.Columns) == 0 { + sql, paramList = buildCheckSQLForListPartition(pi, index, schemaName, tableName) + } else if len(pi.Columns) == 1 { + sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName) + } + default: + return errUnsupportedPartitionType.GenWithStackByArgs(pt.Name.O) + } + + var ctx sessionctx.Context + ctx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(ctx) + + stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), sql, paramList...) + if err != nil { + return errors.Trace(err) + } + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) + if err != nil { + return errors.Trace(err) + } + rowCount := len(rows) + if rowCount != 0 { + return errors.Trace(ErrRowDoesNotMatchPartition) + } + return nil +} + +func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { + var buf strings.Builder + paramList := make([]interface{}, 0, 4) + // Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParams(...) + // So we write it to the origin sql string here. + if index == 0 { + buf.WriteString("select 1 from %n.%n where ") + buf.WriteString(pi.Expr) + buf.WriteString(" >= %? limit 1") + paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index].LessThan[0])) + return buf.String(), paramList + } else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { + buf.WriteString("select 1 from %n.%n where ") + buf.WriteString(pi.Expr) + buf.WriteString(" < %? limit 1") + paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0])) + return buf.String(), paramList + } else { + buf.WriteString("select 1 from %n.%n where ") + buf.WriteString(pi.Expr) + buf.WriteString(" < %? or ") + buf.WriteString(pi.Expr) + buf.WriteString(" >= %? limit 1") + paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]), trimQuotation(pi.Definitions[index].LessThan[0])) + return buf.String(), paramList + } +} + +func trimQuotation(str string) string { + return strings.Trim(str, "\"") +} + +func buildCheckSQLForRangeColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { + paramList := make([]interface{}, 0, 6) + colName := pi.Columns[0].L + if index == 0 { + paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index].LessThan[0])) + return "select 1 from %n.%n where %n >= %? limit 1", paramList + } else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { + paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0])) + return "select 1 from %n.%n where %n < %? limit 1", paramList + } else { + paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]), colName, trimQuotation(pi.Definitions[index].LessThan[0])) + return "select 1 from %n.%n where %n < %? or %n >= %? limit 1", paramList + } +} + +func buildCheckSQLForListPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { + var buf strings.Builder + buf.WriteString("select 1 from %n.%n where ") + buf.WriteString(pi.Expr) + buf.WriteString(" not in (%?) limit 1") + inValues := getInValues(pi, index) + + paramList := make([]interface{}, 0, 3) + paramList = append(paramList, schemaName.L, tableName.L, inValues) + return buf.String(), paramList +} + +func buildCheckSQLForListColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { + colName := pi.Columns[0].L + var buf strings.Builder + buf.WriteString("select 1 from %n.%n where %n not in (%?) limit 1") + inValues := getInValues(pi, index) + + paramList := make([]interface{}, 0, 4) + paramList = append(paramList, schemaName.L, tableName.L, colName, inValues) + return buf.String(), paramList +} + +func getInValues(pi *model.PartitionInfo, index int) []string { + inValues := make([]string, 0, len(pi.Definitions[index].InValues)) + for _, inValue := range pi.Definitions[index].InValues { + for _, one := range inValue { + inValues = append(inValues, one) + } + } + return inValues +} + +>>>>>>> dedaabb80... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (2) (#22729) func checkAddPartitionTooManyPartitions(piDefs uint64) error { if piDefs > uint64(PartitionCountLimit) { return errors.Trace(ErrTooManyPartitions) diff --git a/ddl/reorg.go b/ddl/reorg.go index 1eb68a822a265..2d5a9dd7b4c31 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -198,8 +198,12 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 { if !ok { return statistics.PseudoRowCount } - sql := fmt.Sprintf("select table_rows from information_schema.tables where tidb_table_id=%v;", tblInfo.ID) - rows, _, err := executor.ExecRestrictedSQL(sql) + sql := "select table_rows from information_schema.tables where tidb_table_id=%?;" + stmt, err := executor.ParseWithParams(context.Background(), sql, tblInfo.ID) + if err != nil { + return statistics.PseudoRowCount + } + rows, _, err := executor.ExecRestrictedStmt(context.Background(), stmt) if err != nil { return statistics.PseudoRowCount } From 6b6f29c34f6736f5a426ebcd35bc183c03f4dd06 Mon Sep 17 00:00:00 2001 From: ailinkid <314806019@qq.com> Date: Wed, 10 Mar 2021 09:54:56 +0800 Subject: [PATCH 2/3] . Signed-off-by: ailinkid <314806019@qq.com> --- ddl/delete_range.go | 23 --- ddl/partition.go | 343 -------------------------------------------- go.mod | 1 + go.sum | 1 + 4 files changed, 2 insertions(+), 366 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 4cfc68a80a2cc..bf06b6392fea6 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -346,29 +346,6 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error return nil } -<<<<<<< HEAD -======= -func doBatchDeleteIndiceRange(s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error { - logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs)) - paramsList := make([]interface{}, 0, len(indexIDs)*5) - var buf strings.Builder - buf.WriteString(insertDeleteRangeSQLPrefix) - for i, indexID := range indexIDs { - startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - startKeyEncoded := hex.EncodeToString(startKey) - endKeyEncoded := hex.EncodeToString(endKey) - buf.WriteString(insertDeleteRangeSQLValue) - if i != len(indexIDs)-1 { - buf.WriteString(",") - } - paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts) - } - _, err := s.ExecuteInternal(context.Background(), buf.String(), paramsList...) - return errors.Trace(err) -} - ->>>>>>> dedaabb80... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (2) (#22729) func doInsert(s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error { logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID)) startKeyEncoded := hex.EncodeToString(startKey) diff --git a/ddl/partition.go b/ddl/partition.go index 243a0bbbfc3f2..7ea5815036b18 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -940,349 +940,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, nil } -<<<<<<< HEAD -======= -// onExchangeTablePartition exchange partition data -func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { - var ( - // defID only for updateSchemaVersion - defID int64 - ptSchemaID int64 - ptID int64 - partName string - withValidation bool - ) - - if err := job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - ntDbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - nt, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID) - if err != nil { - return ver, errors.Trace(err) - } - - pt, err := getTableInfo(t, ptID, ptSchemaID) - if err != nil { - if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { - job.State = model.JobStateCancelled - } - return ver, errors.Trace(err) - } - - if pt.State != model.StatePublic { - job.State = model.JobStateCancelled - return ver, ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State) - } - - err = checkExchangePartition(pt, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - err = checkTableDefCompatible(pt, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - index, _, err := getPartitionDef(pt, partName) - if err != nil { - return ver, errors.Trace(err) - } - - if withValidation { - err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - } - - // partition table base auto id - ptBaseID, err := t.GetAutoTableID(ptSchemaID, pt.ID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - ptRandID, err := t.GetAutoRandomID(ptSchemaID, pt.ID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - // non-partition table base auto id - ntBaseID, err := t.GetAutoTableID(job.SchemaID, nt.ID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - ntRandID, err := t.GetAutoRandomID(job.SchemaID, nt.ID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - _, partDef, err := getPartitionDef(pt, partName) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - if pt.TiFlashReplica != nil { - for i, id := range pt.TiFlashReplica.AvailablePartitionIDs { - if id == partDef.ID { - pt.TiFlashReplica.AvailablePartitionIDs[i] = nt.ID - break - } - } - } - - // exchange table meta id - partDef.ID, nt.ID = nt.ID, partDef.ID - - err = t.UpdateTable(ptSchemaID, pt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) { - if val.(bool) { - job.State = model.JobStateCancelled - failpoint.Return(ver, errors.New("occur an error after updating partition id")) - } - }) - - // recreate non-partition table meta info - err = t.DropTableOrView(job.SchemaID, partDef.ID, true) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - err = t.CreateTableOrView(job.SchemaID, nt) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - // both pt and nt set the maximum auto_id between ntBaseID and ptBaseID - if ntBaseID > ptBaseID { - _, err = t.GenAutoTableID(ptSchemaID, pt.ID, ntBaseID-ptBaseID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - } - - _, err = t.GenAutoTableID(job.SchemaID, nt.ID, mathutil.MaxInt64(ptBaseID, ntBaseID)) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - if ntRandID != 0 || ptRandID != 0 { - if ntRandID > ptRandID { - _, err = t.GenAutoRandomID(ptSchemaID, pt.ID, ntRandID-ptRandID) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - } - - _, err = t.GenAutoRandomID(job.SchemaID, nt.ID, mathutil.MaxInt64(ptRandID, ntRandID)) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - } - - // the follow code is a swap function for rules of two partitions - // though partitions has exchanged their ID, swap still take effect - if d.infoHandle != nil && d.infoHandle.IsValid() { - bundles := make([]*placement.Bundle, 0, 2) - ptBundle, ptOK := d.infoHandle.Get().BundleByName(placement.GroupID(partDef.ID)) - ptOK = ptOK && !ptBundle.IsEmpty() - ntBundle, ntOK := d.infoHandle.Get().BundleByName(placement.GroupID(nt.ID)) - ntOK = ntOK && !ntBundle.IsEmpty() - if ptOK && ntOK { - bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) - } else if ptOK { - bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID)) - } else if ntOK { - bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID)) - bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID)) - } - err = infosync.PutRuleBundles(nil, bundles) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } - } - - ver, err = updateSchemaVersion(t, job) - if err != nil { - return ver, errors.Trace(err) - } - - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, pt) - return ver, nil -} - -func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, index int, schemaName, tableName model.CIStr) error { - var sql string - var paramList []interface{} - - pi := pt.Partition - - switch pi.Type { - case model.PartitionTypeHash: - if pi.Num == 1 { - return nil - } - var buf strings.Builder - buf.WriteString("select 1 from %n.%n where mod(") - buf.WriteString(pi.Expr) - buf.WriteString(", %?) != %? limit 1") - sql = buf.String() - paramList = append(paramList, schemaName.L, tableName.L, pi.Num, index) - case model.PartitionTypeRange: - // Table has only one partition and has the maximum value - if len(pi.Definitions) == 1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { - return nil - } - // For range expression and range columns - if len(pi.Columns) == 0 { - sql, paramList = buildCheckSQLForRangeExprPartition(pi, index, schemaName, tableName) - } else if len(pi.Columns) == 1 { - sql, paramList = buildCheckSQLForRangeColumnsPartition(pi, index, schemaName, tableName) - } - case model.PartitionTypeList: - if len(pi.Columns) == 0 { - sql, paramList = buildCheckSQLForListPartition(pi, index, schemaName, tableName) - } else if len(pi.Columns) == 1 { - sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName) - } - default: - return errUnsupportedPartitionType.GenWithStackByArgs(pt.Name.O) - } - - var ctx sessionctx.Context - ctx, err := w.sessPool.get() - if err != nil { - return errors.Trace(err) - } - defer w.sessPool.put(ctx) - - stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), sql, paramList...) - if err != nil { - return errors.Trace(err) - } - rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) - if err != nil { - return errors.Trace(err) - } - rowCount := len(rows) - if rowCount != 0 { - return errors.Trace(ErrRowDoesNotMatchPartition) - } - return nil -} - -func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { - var buf strings.Builder - paramList := make([]interface{}, 0, 4) - // Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParams(...) - // So we write it to the origin sql string here. - if index == 0 { - buf.WriteString("select 1 from %n.%n where ") - buf.WriteString(pi.Expr) - buf.WriteString(" >= %? limit 1") - paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index].LessThan[0])) - return buf.String(), paramList - } else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { - buf.WriteString("select 1 from %n.%n where ") - buf.WriteString(pi.Expr) - buf.WriteString(" < %? limit 1") - paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0])) - return buf.String(), paramList - } else { - buf.WriteString("select 1 from %n.%n where ") - buf.WriteString(pi.Expr) - buf.WriteString(" < %? or ") - buf.WriteString(pi.Expr) - buf.WriteString(" >= %? limit 1") - paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]), trimQuotation(pi.Definitions[index].LessThan[0])) - return buf.String(), paramList - } -} - -func trimQuotation(str string) string { - return strings.Trim(str, "\"") -} - -func buildCheckSQLForRangeColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { - paramList := make([]interface{}, 0, 6) - colName := pi.Columns[0].L - if index == 0 { - paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index].LessThan[0])) - return "select 1 from %n.%n where %n >= %? limit 1", paramList - } else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) { - paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0])) - return "select 1 from %n.%n where %n < %? limit 1", paramList - } else { - paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]), colName, trimQuotation(pi.Definitions[index].LessThan[0])) - return "select 1 from %n.%n where %n < %? or %n >= %? limit 1", paramList - } -} - -func buildCheckSQLForListPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { - var buf strings.Builder - buf.WriteString("select 1 from %n.%n where ") - buf.WriteString(pi.Expr) - buf.WriteString(" not in (%?) limit 1") - inValues := getInValues(pi, index) - - paramList := make([]interface{}, 0, 3) - paramList = append(paramList, schemaName.L, tableName.L, inValues) - return buf.String(), paramList -} - -func buildCheckSQLForListColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) { - colName := pi.Columns[0].L - var buf strings.Builder - buf.WriteString("select 1 from %n.%n where %n not in (%?) limit 1") - inValues := getInValues(pi, index) - - paramList := make([]interface{}, 0, 4) - paramList = append(paramList, schemaName.L, tableName.L, colName, inValues) - return buf.String(), paramList -} - -func getInValues(pi *model.PartitionInfo, index int) []string { - inValues := make([]string, 0, len(pi.Definitions[index].InValues)) - for _, inValue := range pi.Definitions[index].InValues { - for _, one := range inValue { - inValues = append(inValues, one) - } - } - return inValues -} - ->>>>>>> dedaabb80... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (2) (#22729) func checkAddPartitionTooManyPartitions(piDefs uint64) error { if piDefs > uint64(PartitionCountLimit) { return errors.Trace(ErrTooManyPartitions) diff --git a/go.mod b/go.mod index fcf11d9da816a..0601407e1421d 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 + github.com/kisielk/errcheck v1.2.0 // indirect github.com/klauspost/cpuid v1.2.1 github.com/kr/pretty v0.2.0 // indirect github.com/mattn/go-colorable v0.1.7 // indirect diff --git a/go.sum b/go.sum index 9d904cff63883..c90c0a7cfd276 100644 --- a/go.sum +++ b/go.sum @@ -281,6 +281,7 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= From a07ef87461a9215ab961080ad90c83c6d1ae0a91 Mon Sep 17 00:00:00 2001 From: ailinkid <314806019@qq.com> Date: Wed, 10 Mar 2021 09:58:30 +0800 Subject: [PATCH 3/3] . Signed-off-by: ailinkid <314806019@qq.com> --- go.mod | 1 - go.sum | 1 - 2 files changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 0601407e1421d..fcf11d9da816a 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 - github.com/kisielk/errcheck v1.2.0 // indirect github.com/klauspost/cpuid v1.2.1 github.com/kr/pretty v0.2.0 // indirect github.com/mattn/go-colorable v0.1.7 // indirect diff --git a/go.sum b/go.sum index c90c0a7cfd276..9d904cff63883 100644 --- a/go.sum +++ b/go.sum @@ -281,7 +281,6 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY= github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= -github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=