Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: Fix affect of online DDL #466 #476

Merged
merged 1 commit into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Schema struct {
schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
jobs: jobs,
}

Expand Down Expand Up @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the second value[string]: the table name
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
if skipJob(job) {
return "", "", "", nil
}

sql := job.Query
sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
}
Expand All @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, "", sql, nil
schemaName = schema.Name.O

case model.ActionDropSchema:
schemaName, err := s.DropSchema(job.SchemaID)
schemaName, err = s.DropSchema(job.SchemaID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schemaName, "", sql, nil

case model.ActionRenameTable:
// ignore schema doesn't support reanme ddl
Expand All @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable:
table := job.BinlogInfo.TableInfo
Expand All @@ -332,29 +335,31 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

tableName, err := s.DropTable(job.TableID)
tableName, err = s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tableName, sql, nil
schemaName = schema.Name.O

case model.ActionTruncateTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

// job.TableID is the old table id, different from table.ID
_, err := s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
Expand All @@ -372,7 +377,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
Expand All @@ -396,8 +403,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tbInfo.Name.O, sql, nil
schemaName = schema.Name.O
tableName = tbInfo.Name.O
}

return
}

// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
return ok
}

func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) {
Expand Down Expand Up @@ -425,11 +441,10 @@ func addImplicitColumn(table *model.TableInfo) {
table.Indices = []*model.IndexInfo{newIndex}
}

// there's only two status will be in HistoryDDLJob(we fetch at start time):
// JobStateSynced and JobStateRollbackDone
// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob),
// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog),
// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit
// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback
// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced*
// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*)
// At state *done*, it will be always and only changed to *synced*.
func skipJob(job *model.Job) bool {
return !job.IsSynced()
return !job.IsSynced() && !job.IsDone()
}
38 changes: 35 additions & 3 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type Syncer struct {
positions map[string]int64
initCommitTS int64

// because TiDB is case-insensitive, only lower-case here.
ignoreSchemaNames map[string]struct{}

ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error {
}
}
s.schema, err = NewSchema(jobs, false)
if err != nil {
return errors.Trace(err)
}

s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount)
if err != nil {
Expand All @@ -436,6 +436,7 @@ func (s *Syncer) run(jobs []*model.Job) error {
go s.sync(s.executors[i], s.jobCh[i], i)
}

var lastDDLSchemaVersion int64
var b *binlogItem
for {
select {
Expand Down Expand Up @@ -463,7 +464,16 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err)
}

err = s.rewriteForOldVersion(preWrite)
if err != nil {
return errors.Annotate(err, "rewrite for old version fail")
}

log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion)
if preWrite.SchemaVersion < lastDDLSchemaVersion {
log.Debug("encounter older schema dml")
}

err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand All @@ -481,6 +491,8 @@ func (s *Syncer) run(jobs []*model.Job) error {
s.schema.addJob(b.job)

log.Debug("DDL SchemaVersion: ", b.job.BinlogInfo.SchemaVersion)
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -645,3 +657,23 @@ func (s *Syncer) GetLastSyncTime() time.Time {
func (s *Syncer) GetLatestCommitTS() int64 {
return s.cp.TS()
}

// see https://github.com/pingcap/tidb/issues/9304
// currently, we only drop the data which table id is truncated.
// because of online DDL, different TiDB instance may see the different schema,
// it can't be treated simply as one timeline consider both DML and DDL,
// we must carefully handle every DDL type now and need to find a better design.
func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) {
var mutations = make([]pb.TableMutation, 0, len(pv.GetMutations()))
for _, mutation := range pv.GetMutations() {
if s.schema.IsTruncateTableID(mutation.TableId) {
log.Infof("skip old version truncate dml, table id: %d", mutation.TableId)
continue
}

mutations = append(mutations, mutation)
}
pv.Mutations = mutations

return nil
}
4 changes: 2 additions & 2 deletions drainer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateSynced}
job = &model.Job{ID: 1, State: model.JobStateDone}
_, _, sql, err = s.schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))
Expand Down Expand Up @@ -82,7 +82,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {

job = &model.Job{
ID: testCase.jobID,
State: model.JobStateSynced,
State: model.JobStateDone,
SchemaID: testCase.schemaID,
TableID: testCase.tableID,
Type: testCase.jobType,
Expand Down
26 changes: 3 additions & 23 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

// flashTranslator translates TiDB binlog to flash sqls
Expand All @@ -42,45 +41,26 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
version := makeInternalVersionValue(uint64(commitTS))
delFlag := makeInternalDelmarkValue(false)

colsTypeMap := toFlashColumnTypeMap(columns)
columnList := genColumnList(columns)
// addition 2 holder is for del flag and version
columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2)
sql := fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders)

for _, row := range rows {
//decode the pk value
remain, pk, err := codec.DecodeOne(row)
hashKey := pk.GetInt64()
pk, columnValues, err := insertRowToDatums(table, row)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, gotime.Local)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}
hashKey := pk.GetInt64()

var vals []interface{}
vals = append(vals, hashKey)
for _, col := range columns {
if IsPKHandleColumn(table, col) {
columnValues[col.ID] = pk
pkVal, err := formatFlashData(&pk, &col.FieldType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
vals = append(vals, pkVal)
continue
}

val, ok := columnValues[col.ID]
if !ok {
vals = append(vals, col.DefaultValue)
vals = append(vals, col.GetDefaultValue())
} else {
value, err := formatFlashData(&val, &col.FieldType)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions drainer/translator/flash_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ func isHandleTypeColumn(colDef *ast.ColumnDef) bool {
tp == mysql.TypeLonglong
}

func toFlashColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType {
colTypeMap := make(map[int64]*types.FieldType)
for _, col := range columns {
colTypeMap[col.ID] = &col.FieldType
}

return colTypeMap
}

func makeRow(pk int64, values []interface{}, version uint64, delFlag uint8) []interface{} {
var row []interface{}
row = append(row, pk)
Expand Down
Loading