Skip to content

Commit

Permalink
mounter: support to output pre table info in ddl events (#799)
Browse files Browse the repository at this point in the history
* support to output pre table info in ddl events

* fix check

* Update cdc/entry/schema_storage.go

Co-authored-by: Zixiong Liu <[email protected]>

* Update cdc/model/schema_storage.go

Co-authored-by: Zixiong Liu <[email protected]>

* remove the branch about partition in pre table info function

Co-authored-by: Zixiong Liu <[email protected]>
Co-authored-by: ti-srebot <[email protected]>
  • Loading branch information
3 people authored Aug 3, 2020
1 parent 427cfd2 commit 9f55889
Show file tree
Hide file tree
Showing 18 changed files with 384 additions and 291 deletions.
12 changes: 8 additions & 4 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *changeFeed) dropSchema(schemaID model.SchemaID, targetTs model.Ts) {
delete(c.schemas, schemaID)
}

func (c *changeFeed) addTable(tblInfo *entry.TableInfo, targetTs model.Ts) {
func (c *changeFeed) addTable(tblInfo *model.TableInfo, targetTs model.Ts) {
if c.filter.ShouldIgnoreTable(tblInfo.TableName.Schema, tblInfo.TableName.Table) {
return
}
Expand Down Expand Up @@ -621,7 +621,12 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
zap.String("query", todoDDLJob.Query),
zap.Uint64("ts", todoDDLJob.BinlogInfo.FinishedTS))

err := c.schema.HandleDDL(todoDDLJob)
ddlEvent := new(model.DDLEvent)
preTableInfo, err := c.schema.PreTableInfo(todoDDLJob)
if err != nil {
return errors.Trace(err)
}
err = c.schema.HandleDDL(todoDDLJob)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -630,8 +635,7 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C
return errors.Trace(err)
}

ddlEvent := new(model.DDLEvent)
ddlEvent.FromJob(todoDDLJob)
ddlEvent.FromJob(todoDDLJob, preTableInfo)

// Execute DDL Job asynchronously
c.ddlState = model.ChangeFeedExecDDL
Expand Down
8 changes: 5 additions & 3 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"time"

"github.com/pingcap/ticdc/cdc/model"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -182,7 +184,7 @@ func decodeMetaKey(ek []byte) (meta, error) {
}

// decodeRow decodes a byte slice into datums with a existing row map.
func decodeRow(b []byte, recordID int64, tableInfo *TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
func decodeRow(b []byte, recordID int64, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
if len(b) == 0 {
if tableInfo.PKIsHandle {
id, pkValue, err := fetchHandleValue(tableInfo, recordID)
Expand All @@ -201,7 +203,7 @@ func decodeRow(b []byte, recordID int64, tableInfo *TableInfo, tz *time.Location

// decodeRowV1 decodes value data using old encoding format.
// Row layout: colID1, value1, colID2, value2, .....
func decodeRowV1(b []byte, recordID int64, tableInfo *TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
func decodeRowV1(b []byte, recordID int64, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
row := make(map[int64]types.Datum)
if len(b) == 1 && b[0] == codec.NilFlag {
b = b[1:]
Expand Down Expand Up @@ -257,7 +259,7 @@ func decodeRowV1(b []byte, recordID int64, tableInfo *TableInfo, tz *time.Locati
// decodeRowV2 decodes value data using new encoding format.
// Ref: https://github.com/pingcap/tidb/pull/12634
// https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
func decodeRowV2(data []byte, recordID int64, tableInfo *TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
func decodeRowV2(data []byte, recordID int64, tableInfo *model.TableInfo, tz *time.Location) (map[int64]types.Datum, error) {
handleColID, reqCols := tableInfo.GetRowColInfos()
decoder := rowcodec.NewDatumMapDecoder(reqCols, []int64{handleColID}, tz)
return decoder.DecodeToDatumMap(data, kv.IntHandle(recordID), nil)
Expand Down
10 changes: 5 additions & 5 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type indexKVEntry struct {
IndexValue []types.Datum
}

func (idx *indexKVEntry) unflatten(tableInfo *TableInfo, tz *time.Location) error {
func (idx *indexKVEntry) unflatten(tableInfo *model.TableInfo, tz *time.Location) error {
if tableInfo.ID != idx.PhysicalTableID {
isPartition := false
if pi := tableInfo.GetPartitionInfo(); pi != nil {
Expand Down Expand Up @@ -272,7 +272,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return row, err
}

func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *TableInfo, restKey []byte, rawValue []byte, base baseKVEntry) (*rowKVEntry, error) {
func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, base baseKVEntry) (*rowKVEntry, error) {
key, recordID, err := decodeRecordID(restKey)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -353,7 +353,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
return job, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
if row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m
return event, nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) {
func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete {
return nil, nil
Expand Down Expand Up @@ -531,7 +531,7 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {
return d.GetValue()
}

func fetchHandleValue(tableInfo *TableInfo, recordID int64) (pkCoID int64, pkValue *types.Datum, err error) {
func fetchHandleValue(tableInfo *model.TableInfo, recordID int64) (pkCoID int64, pkValue *types.Datum, err error) {
handleColOffset := -1
for i, col := range tableInfo.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
Expand Down
Loading

0 comments on commit 9f55889

Please sign in to comment.