Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: ailinkid <[email protected]>
  • Loading branch information
AilinKid committed Mar 10, 2021
1 parent 881640a commit 6b6f29c
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 366 deletions.
23 changes: 0 additions & 23 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,29 +346,6 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
return nil
}

<<<<<<< HEAD
=======
func doBatchDeleteIndiceRange(s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error {
logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs))
paramsList := make([]interface{}, 0, len(indexIDs)*5)
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
for i, indexID := range indexIDs {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(insertDeleteRangeSQLValue)
if i != len(indexIDs)-1 {
buf.WriteString(",")
}
paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts)
}
_, err := s.ExecuteInternal(context.Background(), buf.String(), paramsList...)
return errors.Trace(err)
}

>>>>>>> dedaabb80... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (2) (#22729)
func doInsert(s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error {
logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID))
startKeyEncoded := hex.EncodeToString(startKey)
Expand Down
343 changes: 0 additions & 343 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,349 +940,6 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, nil
}

<<<<<<< HEAD
=======
// onExchangeTablePartition exchange partition data
func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var (
// defID only for updateSchemaVersion
defID int64
ptSchemaID int64
ptID int64
partName string
withValidation bool
)

if err := job.DecodeArgs(&defID, &ptSchemaID, &ptID, &partName, &withValidation); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

ntDbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

nt, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

pt, err := getTableInfo(t, ptID, ptSchemaID)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
job.State = model.JobStateCancelled
}
return ver, errors.Trace(err)
}

if pt.State != model.StatePublic {
job.State = model.JobStateCancelled
return ver, ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", pt.Name, pt.State)
}

err = checkExchangePartition(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = checkTableDefCompatible(pt, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

index, _, err := getPartitionDef(pt, partName)
if err != nil {
return ver, errors.Trace(err)
}

if withValidation {
err = checkExchangePartitionRecordValidation(w, pt, index, ntDbInfo.Name, nt.Name)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}

// partition table base auto id
ptBaseID, err := t.GetAutoTableID(ptSchemaID, pt.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

ptRandID, err := t.GetAutoRandomID(ptSchemaID, pt.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// non-partition table base auto id
ntBaseID, err := t.GetAutoTableID(job.SchemaID, nt.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

ntRandID, err := t.GetAutoRandomID(job.SchemaID, nt.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

_, partDef, err := getPartitionDef(pt, partName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if pt.TiFlashReplica != nil {
for i, id := range pt.TiFlashReplica.AvailablePartitionIDs {
if id == partDef.ID {
pt.TiFlashReplica.AvailablePartitionIDs[i] = nt.ID
break
}
}
}

// exchange table meta id
partDef.ID, nt.ID = nt.ID, partDef.ID

err = t.UpdateTable(ptSchemaID, pt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

failpoint.Inject("exchangePartitionErr", func(val failpoint.Value) {
if val.(bool) {
job.State = model.JobStateCancelled
failpoint.Return(ver, errors.New("occur an error after updating partition id"))
}
})

// recreate non-partition table meta info
err = t.DropTableOrView(job.SchemaID, partDef.ID, true)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

err = t.CreateTableOrView(job.SchemaID, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

// both pt and nt set the maximum auto_id between ntBaseID and ptBaseID
if ntBaseID > ptBaseID {
_, err = t.GenAutoTableID(ptSchemaID, pt.ID, ntBaseID-ptBaseID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}

_, err = t.GenAutoTableID(job.SchemaID, nt.ID, mathutil.MaxInt64(ptBaseID, ntBaseID))
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if ntRandID != 0 || ptRandID != 0 {
if ntRandID > ptRandID {
_, err = t.GenAutoRandomID(ptSchemaID, pt.ID, ntRandID-ptRandID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}

_, err = t.GenAutoRandomID(job.SchemaID, nt.ID, mathutil.MaxInt64(ptRandID, ntRandID))
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}

// the follow code is a swap function for rules of two partitions
// though partitions has exchanged their ID, swap still take effect
if d.infoHandle != nil && d.infoHandle.IsValid() {
bundles := make([]*placement.Bundle, 0, 2)
ptBundle, ptOK := d.infoHandle.Get().BundleByName(placement.GroupID(partDef.ID))
ptOK = ptOK && !ptBundle.IsEmpty()
ntBundle, ntOK := d.infoHandle.Get().BundleByName(placement.GroupID(nt.ID))
ntOK = ntOK && !ntBundle.IsEmpty()
if ptOK && ntOK {
bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID))
} else if ptOK {
bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID))
} else if ntOK {
bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID))
}
err = infosync.PutRuleBundles(nil, bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
return ver, errors.Trace(err)
}

job.FinishTableJob(model.JobStateDone, model.StateNone, ver, pt)
return ver, nil
}

func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, index int, schemaName, tableName model.CIStr) error {
var sql string
var paramList []interface{}

pi := pt.Partition

switch pi.Type {
case model.PartitionTypeHash:
if pi.Num == 1 {
return nil
}
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where mod(")
buf.WriteString(pi.Expr)
buf.WriteString(", %?) != %? limit 1")
sql = buf.String()
paramList = append(paramList, schemaName.L, tableName.L, pi.Num, index)
case model.PartitionTypeRange:
// Table has only one partition and has the maximum value
if len(pi.Definitions) == 1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) {
return nil
}
// For range expression and range columns
if len(pi.Columns) == 0 {
sql, paramList = buildCheckSQLForRangeExprPartition(pi, index, schemaName, tableName)
} else if len(pi.Columns) == 1 {
sql, paramList = buildCheckSQLForRangeColumnsPartition(pi, index, schemaName, tableName)
}
case model.PartitionTypeList:
if len(pi.Columns) == 0 {
sql, paramList = buildCheckSQLForListPartition(pi, index, schemaName, tableName)
} else if len(pi.Columns) == 1 {
sql, paramList = buildCheckSQLForListColumnsPartition(pi, index, schemaName, tableName)
}
default:
return errUnsupportedPartitionType.GenWithStackByArgs(pt.Name.O)
}

var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)

stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), sql, paramList...)
if err != nil {
return errors.Trace(err)
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt)
if err != nil {
return errors.Trace(err)
}
rowCount := len(rows)
if rowCount != 0 {
return errors.Trace(ErrRowDoesNotMatchPartition)
}
return nil
}

func buildCheckSQLForRangeExprPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
var buf strings.Builder
paramList := make([]interface{}, 0, 4)
// Since the pi.Expr string may contain the identifier, which couldn't be escaped in our ParseWithParams(...)
// So we write it to the origin sql string here.
if index == 0 {
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" >= %? limit 1")
paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index].LessThan[0]))
return buf.String(), paramList
} else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) {
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" < %? limit 1")
paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]))
return buf.String(), paramList
} else {
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" < %? or ")
buf.WriteString(pi.Expr)
buf.WriteString(" >= %? limit 1")
paramList = append(paramList, schemaName.L, tableName.L, trimQuotation(pi.Definitions[index-1].LessThan[0]), trimQuotation(pi.Definitions[index].LessThan[0]))
return buf.String(), paramList
}
}

func trimQuotation(str string) string {
return strings.Trim(str, "\"")
}

func buildCheckSQLForRangeColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
paramList := make([]interface{}, 0, 6)
colName := pi.Columns[0].L
if index == 0 {
paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index].LessThan[0]))
return "select 1 from %n.%n where %n >= %? limit 1", paramList
} else if index == len(pi.Definitions)-1 && strings.EqualFold(pi.Definitions[index].LessThan[0], partitionMaxValue) {
paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]))
return "select 1 from %n.%n where %n < %? limit 1", paramList
} else {
paramList = append(paramList, schemaName.L, tableName.L, colName, trimQuotation(pi.Definitions[index-1].LessThan[0]), colName, trimQuotation(pi.Definitions[index].LessThan[0]))
return "select 1 from %n.%n where %n < %? or %n >= %? limit 1", paramList
}
}

func buildCheckSQLForListPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where ")
buf.WriteString(pi.Expr)
buf.WriteString(" not in (%?) limit 1")
inValues := getInValues(pi, index)

paramList := make([]interface{}, 0, 3)
paramList = append(paramList, schemaName.L, tableName.L, inValues)
return buf.String(), paramList
}

func buildCheckSQLForListColumnsPartition(pi *model.PartitionInfo, index int, schemaName, tableName model.CIStr) (string, []interface{}) {
colName := pi.Columns[0].L
var buf strings.Builder
buf.WriteString("select 1 from %n.%n where %n not in (%?) limit 1")
inValues := getInValues(pi, index)

paramList := make([]interface{}, 0, 4)
paramList = append(paramList, schemaName.L, tableName.L, colName, inValues)
return buf.String(), paramList
}

func getInValues(pi *model.PartitionInfo, index int) []string {
inValues := make([]string, 0, len(pi.Definitions[index].InValues))
for _, inValue := range pi.Definitions[index].InValues {
for _, one := range inValue {
inValues = append(inValues, one)
}
}
return inValues
}

>>>>>>> dedaabb80... ddl: migrate part of ddl package code from Execute/ExecRestricted to safe API (2) (#22729)
func checkAddPartitionTooManyPartitions(piDefs uint64) error {
if piDefs > uint64(PartitionCountLimit) {
return errors.Trace(ErrTooManyPartitions)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/kisielk/errcheck v1.2.0 // indirect
github.com/klauspost/cpuid v1.2.1
github.com/kr/pretty v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.7 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
Expand Down

0 comments on commit 6b6f29c

Please sign in to comment.