From 62e246f7f82b9fbeeb4fdf0d094330fbefed7e1b Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Mon, 3 Aug 2020 14:37:33 +0800 Subject: [PATCH] sink: support to output old value (#708) Signed-off-by: 5kbpers --- cdc/entry/mounter.go | 227 +++++++++++++++++++++------------ cdc/kv/client.go | 28 ++-- cdc/kv/client_test.go | 4 +- cdc/kv/matcher.go | 16 ++- cdc/kv/matcher_test.go | 2 +- cdc/kv/testing.go | 6 +- cdc/model/kv.go | 8 +- cdc/model/sink.go | 1 + cdc/owner_operator.go | 4 +- cdc/processor.go | 13 +- cdc/puller/puller.go | 41 +++--- cdc/sink/codec/canal.go | 15 ++- cdc/sink/codec/canal_test.go | 4 +- cdc/sink/codec/json.go | 11 +- cdc/sink/common/common.go | 125 ++++++++++++++++++ cdc/sink/common/common_test.go | 96 ++++++++++++++ cdc/sink/dispatcher/default.go | 9 +- cdc/sink/mysql.go | 124 +++++------------- cdc/sink/mysql_test.go | 76 +---------- pkg/config/config.go | 16 ++- pkg/regionspan/span.go | 15 ++- pkg/regionspan/span_test.go | 9 +- 22 files changed, 528 insertions(+), 322 deletions(-) create mode 100644 cdc/sink/common/common.go create mode 100644 cdc/sink/common/common_test.go diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 2c163e09214..171ef3098b5 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -54,7 +54,8 @@ type baseKVEntry struct { type rowKVEntry struct { baseKVEntry - Row map[int64]types.Datum + Row map[int64]types.Datum + PreRow map[int64]types.Datum } type indexKVEntry struct { @@ -124,10 +125,11 @@ type mounterImpl struct { rawRowChangedChs []chan *model.PolymorphicEvent tz *time.Location workerNum int + enableOldValue bool } // NewMounter creates a mounter -func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter { +func NewMounter(schemaStorage *SchemaStorage, workerNum int, enableOldValue bool) Mounter { if workerNum <= 0 { workerNum = defaultMounterWorkerNum } @@ -139,6 +141,7 @@ func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter { schemaStorage: schemaStorage, rawRowChangedChs: chs, workerNum: workerNum, + enableOldValue: enableOldValue, } } @@ -245,7 +248,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode } switch { case bytes.HasPrefix(key, recordPrefix): - rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, baseInfo) + rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, raw.OldValue, baseInfo) if err != nil { return nil, errors.Trace(err) } @@ -254,7 +257,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode } return m.mountRowKVEntry(tableInfo, rowKV) case bytes.HasPrefix(key, indexPrefix): - indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, baseInfo) + indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo) if err != nil { return nil, errors.Trace(err) } @@ -272,7 +275,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode return row, err } -func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, base baseKVEntry) (*rowKVEntry, error) { +func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*rowKVEntry, error) { key, recordID, err := decodeRecordID(restKey) if err != nil { return nil, errors.Trace(err) @@ -284,16 +287,27 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey [] if err != nil { return nil, errors.Trace(err) } + var preRow map[int64]types.Datum + if rawOldValue != nil { + preRow, err = decodeRow(rawOldValue, recordID, tableInfo, m.tz) + if err != nil { + return nil, errors.Trace(err) + } + } base.RecordID = recordID return &rowKVEntry{ baseKVEntry: base, Row: row, + PreRow: preRow, }, nil } -func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, base baseKVEntry) (*indexKVEntry, error) { - // skip set index KV - if !base.Delete { +func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) { + // Skip set index KV. + // By default we cannot get the old value of a deleted row, then we must get the value of unique key + // or primary key for seeking the deleted row through its index key. + // After the old value was enabled, we can skip the index key. + if !base.Delete || m.enableOldValue { return nil, nil } @@ -353,23 +367,22 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) { return job, nil } -func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) { - if row.Delete && !tableInfo.PKIsHandle { - return nil, nil - } - - datumsNum := 1 - if !row.Delete { - datumsNum = len(tableInfo.Columns) +func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) (map[string]*model.Column, error) { + estimateLen := len(datums) + if fillWithDefaultValue { + estimateLen = len(tableInfo.Columns) } - - values := make(map[string]*model.Column, datumsNum) - for index, colValue := range row.Row { + cols := make(map[string]*model.Column, estimateLen) + for index, colValue := range datums { colInfo, exist := tableInfo.GetColumnInfo(index) if !exist { return nil, errors.NotFoundf("column info, colID: %d", index) } - if !row.Delete && !tableInfo.IsColWritable(colInfo) { + // the judge about `fillWithDefaultValue` is tricky + // if the `fillWithDefaultValue` is true, the event must be deletion + // we should output the generated column in deletion event + // this tricky code will be improve after pingcap/ticdc#787 merged + if !tableInfo.IsColWritable(colInfo) && fillWithDefaultValue { continue } colName := colInfo.Name.O @@ -386,52 +399,90 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr whereHandle := true col.WhereHandle = &whereHandle } - values[colName] = col + cols[colName] = col + } + if !fillWithDefaultValue { + return cols, nil + } + for _, col := range tableInfo.Columns { + _, ok := cols[col.Name.O] + if !ok && tableInfo.IsColWritable(col) { + column := &model.Column{ + Type: col.Tp, + Value: getDefaultOrZeroValue(col), + Flag: transColumnFlag(col), + } + if tableInfo.IsColumnUnique(col.ID) { + whereHandle := true + column.WhereHandle = &whereHandle + } + cols[col.Name.O] = column + } + } + return cols, nil +} + +func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) { + // if m.enableOldValue == true, go into this function + // if m.enableNewValue == false and row.Delete == false, go into this function + // if m.enableNewValue == false and row.Delete == true and tableInfo.PKIsHandle = true, go into this function + // only if m.enableNewValue == false and row.Delete == true and tableInfo.PKIsHandle == false, skip this function + if !m.enableOldValue && row.Delete && !tableInfo.PKIsHandle { + return nil, nil + } + + var err error + // Decode previous columns. + var preCols map[string]*model.Column + if len(row.PreRow) != 0 { + // FIXME(leoppro): using pre table info to mounter pre column datum + // the pre column and current column in one event may using different table info + preCols, err = datum2Column(tableInfo, row.PreRow, true) + if err != nil { + return nil, errors.Trace(err) + } + } + + var cols map[string]*model.Column + oldValueDisabledAndRowIsDelete := !m.enableOldValue && row.Delete + cols, err = datum2Column(tableInfo, row.Row, !oldValueDisabledAndRowIsDelete) + if err != nil { + return nil, errors.Trace(err) } + if oldValueDisabledAndRowIsDelete { + preCols = cols + cols = nil + } + var partitionID int64 if tableInfo.GetPartitionInfo() != nil { partitionID = row.PhysicalTableID } - event := &model.RowChangedEvent{ + schemaName := tableInfo.TableName.Schema + tableName := tableInfo.TableName.Table + return &model.RowChangedEvent{ StartTs: row.StartTs, CommitTs: row.CRTs, RowID: row.RecordID, TableInfoVersion: tableInfo.TableInfoVersion, Table: &model.TableName{ - Schema: tableInfo.TableName.Schema, - Table: tableInfo.TableName.Table, + Schema: schemaName, + Table: tableName, Partition: partitionID, }, IndieMarkCol: tableInfo.IndieMarkCol, - } - - if !row.Delete { - for _, col := range tableInfo.Columns { - _, ok := values[col.Name.O] - if !ok && tableInfo.IsColWritable(col) { - column := &model.Column{ - Type: col.Tp, - Value: getDefaultOrZeroValue(col), - Flag: transColumnFlag(col), - } - if tableInfo.IsColumnUnique(col.ID) { - whereHandle := true - column.WhereHandle = &whereHandle - } - values[col.Name.O] = column - } - } - } - event.Delete = row.Delete - event.Columns = values - event.Keys = genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(event.Table.Schema, event.Table.Table)) - return event, nil + Delete: row.Delete, + Columns: cols, + PreColumns: preCols, + // FIXME(leoppor): Correctness of conflict detection with old values + Keys: genMultipleKeys(tableInfo.TableInfo, preCols, cols, quotes.QuoteSchema(schemaName, tableName)), + }, nil } func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) { // skip set index KV - if !idx.Delete { + if !idx.Delete || m.enableOldValue { return nil, nil } @@ -450,14 +501,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV return nil, errors.Trace(err) } - values := make(map[string]*model.Column, len(idx.IndexValue)) + preCols := make(map[string]*model.Column, len(idx.IndexValue)) for i, idxCol := range indexInfo.Columns { value, err := formatColVal(idx.IndexValue[i], tableInfo.Columns[idxCol.Offset].Tp) if err != nil { return nil, errors.Trace(err) } whereHandle := true - values[idxCol.Name.O] = &model.Column{ + preCols[idxCol.Name.O] = &model.Column{ Type: tableInfo.Columns[idxCol.Offset].Tp, WhereHandle: &whereHandle, Value: value, @@ -474,8 +525,8 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV }, IndieMarkCol: tableInfo.IndieMarkCol, Delete: true, - Columns: values, - Keys: genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)), + PreColumns: preCols, + Keys: genMultipleKeys(tableInfo.TableInfo, preCols, nil, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)), }, nil } @@ -553,43 +604,55 @@ func fetchHandleValue(tableInfo *model.TableInfo, recordID int64) (pkCoID int64, return } -func genMultipleKeys(ti *timodel.TableInfo, values map[string]*model.Column, table string) []string { - multipleKeys := make([]string, 0, len(ti.Indices)+1) - if ti.PKIsHandle { - if pk := ti.GetPkColInfo(); pk != nil && !pk.IsGenerated() { - cols := []*timodel.ColumnInfo{pk} - key := genKeyList(table, cols, values) - if len(key) > 0 { // ignore `null` value. - multipleKeys = append(multipleKeys, key) - } else { - log.L().Debug("ignore empty primary key", zap.String("table", table)) - } - } +func genMultipleKeys(ti *timodel.TableInfo, preCols, cols map[string]*model.Column, table string) []string { + estimateLen := len(ti.Indices) + 1 + if len(preCols) != 0 && len(cols) != 0 { + estimateLen *= 2 } - - for _, indexCols := range ti.Indices { - if !indexCols.Unique { - continue + multipleKeys := make([]string, 0, estimateLen) + buildKeys := func(colValues map[string]*model.Column) { + if len(colValues) == 0 { + return } - cols := getIndexColumns(ti.Columns, indexCols) - key := genKeyList(table, cols, values) - if len(key) > 0 { // ignore `null` value. - noGeneratedColumn := true - for _, col := range cols { - if col.IsGenerated() { - noGeneratedColumn = false - break + if ti.PKIsHandle { + if pk := ti.GetPkColInfo(); pk != nil && !pk.IsGenerated() { + cols := []*timodel.ColumnInfo{pk} + + key := genKeyList(table, cols, colValues) + if len(key) > 0 { // ignore `null` value. + multipleKeys = append(multipleKeys, key) + } else { + log.L().Debug("ignore empty primary key", zap.String("table", table)) } } - // If the index contain generated column, we can't use this key to detect conflict with other DML, - // Because such as insert can't specified the generated value. - if noGeneratedColumn { - multipleKeys = append(multipleKeys, key) + } + + for _, indexCols := range ti.Indices { + if !indexCols.Unique { + continue + } + cols := getIndexColumns(ti.Columns, indexCols) + key := genKeyList(table, cols, colValues) + if len(key) > 0 { // ignore `null` value. + noGeneratedColumn := true + for _, col := range cols { + if col.IsGenerated() { + noGeneratedColumn = false + break + } + } + // If the index contain generated column, we can't use this key to detect conflict with other DML, + // Because such as insert can't specified the generated value. + if noGeneratedColumn { + multipleKeys = append(multipleKeys, key) + } + } else { + log.L().Debug("ignore empty index key", zap.String("table", table)) } - } else { - log.L().Debug("ignore empty index key", zap.String("table", table)) } } + buildKeys(preCols) + buildKeys(cols) if len(multipleKeys) == 0 { // use table name as key if no key generated (no PK/UK), diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 1adce972240..2eba4222fe1 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -330,9 +330,9 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) // provided channel. // The `Start` and `End` field in input span must be memcomparable encoded. func (c *CDCClient) EventFeed( - ctx context.Context, span regionspan.ComparableSpan, ts uint64, eventCh chan<- *model.RegionFeedEvent, + ctx context.Context, span regionspan.ComparableSpan, ts uint64, enableOldValue bool, eventCh chan<- *model.RegionFeedEvent, ) error { - s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, eventCh) + s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, enableOldValue, eventCh) return s.eventFeed(ctx, ts) } @@ -360,7 +360,8 @@ type eventFeedSession struct { // The channel to schedule scanning and requesting regions in a specified range. requestRangeCh chan rangeRequestTask - rangeLock *regionspan.RegionRangeLock + rangeLock *regionspan.RegionRangeLock + enableOldValue bool // To identify metrics of different eventFeedSession id string @@ -379,6 +380,7 @@ func newEventFeedSession( regionCache *tikv.RegionCache, kvStorage tikv.Storage, totalSpan regionspan.ComparableSpan, + enableOldValue bool, eventCh chan<- *model.RegionFeedEvent, ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) @@ -392,6 +394,7 @@ func newEventFeedSession( errCh: make(chan regionErrorInfo, 16), requestRangeCh: make(chan rangeRequestTask, 16), rangeLock: regionspan.NewRegionRangeLock(), + enableOldValue: enableOldValue, id: strconv.FormatUint(allocID(), 10), regionChSizeGauge: clientChannelSize.WithLabelValues(id, "region"), errChSizeGauge: clientChannelSize.WithLabelValues(id, "err"), @@ -604,6 +607,11 @@ MainLoop: requestID := allocID() + extraOp := kvrpcpb.ExtraOp_Noop + if s.enableOldValue { + extraOp = kvrpcpb.ExtraOp_ReadOldValue + } + req := &cdcpb.ChangeDataRequest{ Header: &cdcpb.Header{ ClusterId: s.client.clusterID, @@ -614,6 +622,7 @@ MainLoop: CheckpointTs: sri.ts, StartKey: sri.span.Start, EndKey: sri.span.End, + ExtraOp: extraOp, } // The receiver thread need to know the span, which is only known in the sender thread. So create the @@ -1363,7 +1372,7 @@ func (s *eventFeedSession) resolveLock(ctx context.Context, regionID uint64, max return nil } -func assembleCommitEvent(entry *cdcpb.Event_Row, value []byte) (*model.RegionFeedEvent, error) { +func assembleCommitEvent(entry *cdcpb.Event_Row, value *pendingValue) (*model.RegionFeedEvent, error) { var opType model.OpType switch entry.GetOpType() { case cdcpb.Event_Row_DELETE: @@ -1376,11 +1385,12 @@ func assembleCommitEvent(entry *cdcpb.Event_Row, value []byte) (*model.RegionFee revent := &model.RegionFeedEvent{ Val: &model.RawKVEntry{ - OpType: opType, - Key: entry.Key, - Value: value, - StartTs: entry.StartTs, - CRTs: entry.CommitTs, + OpType: opType, + Key: entry.Key, + Value: value.value, + OldValue: value.oldValue, + StartTs: entry.StartTs, + CRTs: entry.CommitTs, }, } return revent, nil diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 10ce3f8f493..d90dd92a15a 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -151,7 +151,7 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) wg.Done() }() @@ -212,7 +212,7 @@ func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() eventCh := make(chan *model.RegionFeedEvent, 10) - err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, eventCh) + err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, eventCh) _ = err // TODO find a way to verify the error } diff --git a/cdc/kv/matcher.go b/cdc/kv/matcher.go index 6ecd72cc02a..1d60157f084 100644 --- a/cdc/kv/matcher.go +++ b/cdc/kv/matcher.go @@ -17,9 +17,14 @@ import ( "github.com/pingcap/kvproto/pkg/cdcpb" ) +type pendingValue struct { + value []byte + oldValue []byte +} + type matcher struct { // TODO : clear the single prewrite - unmatchedValue map[matchKey][]byte + unmatchedValue map[matchKey]*pendingValue cachedCommit []*cdcpb.Event_Row } @@ -34,7 +39,7 @@ func newMatchKey(row *cdcpb.Event_Row) matchKey { func newMatcher() *matcher { return &matcher{ - unmatchedValue: make(map[matchKey][]byte), + unmatchedValue: make(map[matchKey]*pendingValue), } } @@ -46,10 +51,13 @@ func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) { if _, exist := m.unmatchedValue[key]; exist && len(value) == 0 { return } - m.unmatchedValue[key] = value + m.unmatchedValue[key] = &pendingValue{ + value: value, + oldValue: row.GetOldValue(), + } } -func (m *matcher) matchRow(row *cdcpb.Event_Row) ([]byte, bool) { +func (m *matcher) matchRow(row *cdcpb.Event_Row) (*pendingValue, bool) { if value, exist := m.unmatchedValue[newMatchKey(row)]; exist { delete(m.unmatchedValue, newMatchKey(row)) return value, true diff --git a/cdc/kv/matcher_test.go b/cdc/kv/matcher_test.go index 3d06f094801..c0db0136a18 100644 --- a/cdc/kv/matcher_test.go +++ b/cdc/kv/matcher_test.go @@ -50,5 +50,5 @@ func (s *MatcherSuite) TestMatcher(c *check.C) { } value2, ok := matcher.matchRow(commitRow2) c.Assert(ok, check.IsTrue) - c.Assert(value2, check.BytesEquals, []byte("v2")) + c.Assert(value2.value, check.BytesEquals, []byte("v2")) } diff --git a/cdc/kv/testing.go b/cdc/kv/testing.go index 2d7c55b9b7c..12faf04a546 100644 --- a/cdc/kv/testing.go +++ b/cdc/kv/testing.go @@ -149,7 +149,7 @@ func TestSplit(t require.TestingT, pdCli pd.Client, storage kv.Storage) { startTS := mustGetTimestamp(t, storage) go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, eventCh) require.Equal(t, err, context.Canceled) }() @@ -236,7 +236,7 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage kv.Storage) { startTS := mustGetTimestamp(t, storage) go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, checker.eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, checker.eventCh) require.Equal(t, err, context.Canceled) }() @@ -258,7 +258,7 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage kv.Storage) { if i == 1 { checker = newEventChecker(t) go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, checker.eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, checker.eventCh) require.Equal(t, err, context.Canceled) }() } diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 8efbb2f4d18..e2fd3af965b 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -59,9 +59,11 @@ type ResolvedSpan struct { type RawKVEntry struct { OpType OpType Key []byte - // Nil fro delete type - Value []byte - StartTs uint64 + // nil for delete type + Value []byte + // nil for insert type + OldValue []byte + StartTs uint64 // Commit or resolved TS CRTs uint64 } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 66f9f45be43..3e18fe7fc32 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -103,6 +103,7 @@ type RowChangedEvent struct { // IndieMarkCol will be set to the name of the unique index IndieMarkCol string `json:"indie-mark-col"` Columns map[string]*Column `json:"columns"` + PreColumns map[string]*Column `json:"pre-columns"` Keys []string `json:"keys"` } diff --git a/cdc/owner_operator.go b/cdc/owner_operator.go index d189a113b03..b9e2c9ba71e 100644 --- a/cdc/owner_operator.go +++ b/cdc/owner_operator.go @@ -43,9 +43,7 @@ type ddlHandler struct { } func newDDLHandler(pdCli pd.Client, credential *security.Credential, kvStorage tidbkv.Storage, checkpointTS uint64) *ddlHandler { - // The key in DDL kv pair returned from TiKV is already memcompariable encoded, - // so we set `needEncode` to false. - plr := puller.NewPuller(pdCli, credential, kvStorage, checkpointTS, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, nil) + plr := puller.NewPuller(pdCli, credential, kvStorage, checkpointTS, []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}, nil, false) ctx, cancel := context.WithCancel(context.Background()) h := &ddlHandler{ puller: plr, diff --git a/cdc/processor.go b/cdc/processor.go index cb8595421ed..5ebc08a6636 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -166,11 +166,9 @@ func newProcessor( limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity) - // The key in DDL kv pair returned from TiKV is already memcompariable encoded, - // so we set `needEncode` to false. log.Info("start processor with startts", zap.Uint64("startts", checkpointTs)) ddlspans := []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()} - ddlPuller := puller.NewPuller(pdCli, credential, kvStorage, checkpointTs, ddlspans, limitter) + ddlPuller := puller.NewPuller(pdCli, credential, kvStorage, checkpointTs, ddlspans, limitter, false) filter, err := filter.NewFilter(changefeed.Config) if err != nil { return nil, errors.Trace(err) @@ -195,7 +193,7 @@ func newProcessor( session: session, sink: sink, ddlPuller: ddlPuller, - mounter: entry.NewMounter(schemaStorage, changefeed.Config.Mounter.WorkerNum), + mounter: entry.NewMounter(schemaStorage, changefeed.Config.Mounter.WorkerNum, changefeed.Config.EnableOldValue), schemaStorage: schemaStorage, errCh: errCh, @@ -808,10 +806,9 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo startPuller := func(tableID model.TableID, pResolvedTs *uint64) { // start table puller - // The key in DML kv pair returned from TiKV is not memcompariable encoded, - // so we set `needEncode` to true. - span := regionspan.GetTableSpan(tableID) - plr := puller.NewPuller(p.pdCli, p.credential, p.kvStorage, replicaInfo.StartTs, []regionspan.Span{span}, p.limitter) + enableOldValue := p.changefeed.Config.EnableOldValue + span := regionspan.GetTableSpan(tableID, enableOldValue) + plr := puller.NewPuller(p.pdCli, p.credential, p.kvStorage, replicaInfo.StartTs, []regionspan.Span{span}, p.limitter, enableOldValue) go func() { err := plr.Run(ctx) if errors.Cause(err) != context.Canceled { diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index b3f9ec45d1d..891d8203270 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -48,15 +48,16 @@ type Puller interface { } type pullerImpl struct { - pdCli pd.Client - credential *security.Credential - kvStorage tikv.Storage - checkpointTs uint64 - spans []regionspan.ComparableSpan - buffer *memBuffer - outputCh chan *model.RawKVEntry - tsTracker frontier.Frontier - resolvedTs uint64 + pdCli pd.Client + credential *security.Credential + kvStorage tikv.Storage + checkpointTs uint64 + spans []regionspan.ComparableSpan + buffer *memBuffer + outputCh chan *model.RawKVEntry + tsTracker frontier.Frontier + resolvedTs uint64 + enableOldValue bool } // NewPuller create a new Puller fetch event start from checkpointTs @@ -68,6 +69,7 @@ func NewPuller( checkpointTs uint64, spans []regionspan.Span, limitter *BlurResourceLimitter, + enableOldValue bool, ) Puller { tikvStorage, ok := kvStorage.(tikv.Storage) if !ok { @@ -78,15 +80,16 @@ func NewPuller( comparableSpans[i] = regionspan.ToComparableSpan(spans[i]) } p := &pullerImpl{ - pdCli: pdCli, - credential: credential, - kvStorage: tikvStorage, - checkpointTs: checkpointTs, - spans: comparableSpans, - buffer: makeMemBuffer(limitter), - outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), - tsTracker: frontier.NewFrontier(checkpointTs, comparableSpans...), - resolvedTs: checkpointTs, + pdCli: pdCli, + credential: credential, + kvStorage: tikvStorage, + checkpointTs: checkpointTs, + spans: comparableSpans, + buffer: makeMemBuffer(limitter), + outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), + tsTracker: frontier.NewFrontier(checkpointTs, comparableSpans...), + resolvedTs: checkpointTs, + enableOldValue: enableOldValue, } return p } @@ -113,7 +116,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { span := span g.Go(func() error { - return cli.EventFeed(ctx, span, checkpointTs, eventCh) + return cli.EventFeed(ctx, span, checkpointTs, p.enableOldValue, eventCh) }) } diff --git a/cdc/sink/codec/canal.go b/cdc/sink/codec/canal.go index 23b8c4fdc15..9da68ae9165 100644 --- a/cdc/sink/codec/canal.go +++ b/cdc/sink/codec/canal.go @@ -219,13 +219,18 @@ func (b *canalEntryBuilder) buildRowData(e *model.RowChangedEvent) (*canal.RowDa } columns = append(columns, c) } + var preColumns []*canal.Column + for name, column := range e.PreColumns { + c, err := b.buildColumn(column, name, !e.Delete) + if err != nil { + return nil, errors.Trace(err) + } + preColumns = append(preColumns, c) + } rowData := &canal.RowData{} - if e.Delete { - rowData.BeforeColumns = columns - } else { - rowData.AfterColumns = columns - } + rowData.BeforeColumns = preColumns + rowData.AfterColumns = columns return rowData, nil } diff --git a/cdc/sink/codec/canal_test.go b/cdc/sink/codec/canal_test.go index 0846a8329d2..645c757d4bd 100644 --- a/cdc/sink/codec/canal_test.go +++ b/cdc/sink/codec/canal_test.go @@ -152,7 +152,7 @@ func (s *canalEntrySuite) TestConvertEntry(c *check.C) { Table: "person", }, Delete: true, - Columns: map[string]*model.Column{ + PreColumns: map[string]*model.Column{ "id": {Type: mysql.TypeLong, WhereHandle: &trueVar, Value: 1}, }, } @@ -241,7 +241,7 @@ func (s *canalEntrySuite) TestConvertEntry(c *check.C) { rowDatas = rc.GetRowDatas() c.Assert(len(rowDatas), check.Equals, 1) columns = rowDatas[0].BeforeColumns - c.Assert(len(columns), check.Equals, len(testCaseDelete.Columns)) + c.Assert(len(columns), check.Equals, len(testCaseDelete.PreColumns)) for _, col := range columns { c.Assert(col.GetUpdated(), check.IsFalse) switch col.GetName() { diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 7cfbc8a5bfb..aa80f611c95 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -74,8 +74,9 @@ func (m *mqMessageKey) Decode(data []byte) error { } type mqMessageRow struct { - Update map[string]*column `json:"u,omitempty"` - Delete map[string]*column `json:"d,omitempty"` + Update map[string]*column `json:"u,omitempty"` + PreColumns map[string]*column `json:"p,omitempty"` + Delete map[string]*column `json:"d,omitempty"` } func (m *mqMessageRow) Encode() ([]byte, error) { @@ -132,9 +133,10 @@ func rowEventToMqMessage(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow } value := &mqMessageRow{} if e.Delete { - value.Delete = e.Columns + value.Delete = e.PreColumns } else { value.Update = e.Columns + value.PreColumns = e.PreColumns } return key, value } @@ -154,10 +156,11 @@ func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChang if len(value.Delete) != 0 { e.Delete = true - e.Columns = value.Delete + e.PreColumns = value.Delete } else { e.Delete = false e.Columns = value.Update + e.PreColumns = value.PreColumns } return e } diff --git a/cdc/sink/common/common.go b/cdc/sink/common/common.go new file mode 100644 index 00000000000..4e87c298cb8 --- /dev/null +++ b/cdc/sink/common/common.go @@ -0,0 +1,125 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "sort" + "sync" + "sync/atomic" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/filter" + "go.uber.org/zap" +) + +// UnresolvedTxnCache caches unresolved txns +type UnresolvedTxnCache struct { + unresolvedTxnsMu sync.Mutex + unresolvedTxns map[model.TableName][]*model.Txn + checkpointTs uint64 +} + +// NewUnresolvedTxnCache returns a new UnresolvedTxnCache +func NewUnresolvedTxnCache() *UnresolvedTxnCache { + return &UnresolvedTxnCache{ + unresolvedTxns: make(map[model.TableName][]*model.Txn), + } +} + +// Append adds unresolved rows to cache +func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowChangedEvent) { + c.unresolvedTxnsMu.Lock() + defer c.unresolvedTxnsMu.Unlock() + for _, row := range rows { + if filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { + log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs)) + continue + } + key := *row.Table + txns := c.unresolvedTxns[key] + if len(txns) == 0 || txns[len(txns)-1].StartTs != row.StartTs { + // fail-fast check + if len(txns) != 0 && txns[len(txns)-1].CommitTs > row.CommitTs { + log.Fatal("the commitTs of the emit row is less than the received row", + zap.Stringer("table", row.Table), + zap.Uint64("emit row startTs", row.StartTs), + zap.Uint64("emit row commitTs", row.CommitTs), + zap.Uint64("last received row startTs", txns[len(txns)-1].StartTs), + zap.Uint64("last received row commitTs", txns[len(txns)-1].CommitTs)) + } + txns = append(txns, &model.Txn{ + StartTs: row.StartTs, + CommitTs: row.CommitTs, + }) + c.unresolvedTxns[key] = txns + } + txns[len(txns)-1].Append(row) + } +} + +// Resolved returns resolved txns according to resolvedTs +func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableName][]*model.Txn { + if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) { + return nil + } + + c.unresolvedTxnsMu.Lock() + defer c.unresolvedTxnsMu.Unlock() + if len(c.unresolvedTxns) == 0 { + return nil + } + + _, resolvedTxnsMap := splitResolvedTxn(resolvedTs, c.unresolvedTxns) + return resolvedTxnsMap +} + +// Unresolved returns unresolved txns +func (c *UnresolvedTxnCache) Unresolved() map[model.TableName][]*model.Txn { + return c.unresolvedTxns +} + +// UpdateCheckpoint updates the checkpoint ts +func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) { + atomic.StoreUint64(&c.checkpointTs, checkpointTs) +} + +func splitResolvedTxn( + resolvedTs uint64, unresolvedTxns map[model.TableName][]*model.Txn, +) (minTs uint64, resolvedRowsMap map[model.TableName][]*model.Txn) { + resolvedRowsMap = make(map[model.TableName][]*model.Txn, len(unresolvedTxns)) + minTs = resolvedTs + for key, txns := range unresolvedTxns { + i := sort.Search(len(txns), func(i int) bool { + return txns[i].CommitTs > resolvedTs + }) + if i == 0 { + continue + } + var resolvedTxns []*model.Txn + if i == len(txns) { + resolvedTxns = txns + delete(unresolvedTxns, key) + } else { + resolvedTxns = txns[:i] + unresolvedTxns[key] = txns[i:] + } + resolvedRowsMap[key] = resolvedTxns + + if len(resolvedTxns) > 0 && resolvedTxns[0].CommitTs < minTs { + minTs = resolvedTxns[0].CommitTs + } + } + return +} diff --git a/cdc/sink/common/common_test.go b/cdc/sink/common/common_test.go new file mode 100644 index 00000000000..26352d0b005 --- /dev/null +++ b/cdc/sink/common/common_test.go @@ -0,0 +1,96 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/model" +) + +type SinkCommonSuite struct{} + +func Test(t *testing.T) { check.TestingT(t) } + +var _ = check.Suite(&SinkCommonSuite{}) + +func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) { + testCases := []struct { + unresolvedTxns map[model.TableName][]*model.Txn + resolvedTs uint64 + expectedResolvedTxns map[model.TableName][]*model.Txn + expectedUnresolvedTxns map[model.TableName][]*model.Txn + expectedMinTs uint64 + }{{ + unresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + resolvedTs: 5, + expectedResolvedTxns: map[model.TableName][]*model.Txn{}, + expectedUnresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + expectedMinTs: 5, + }, { + unresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + resolvedTs: 23, + expectedResolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}}, + {Table: "t2"}: {{CommitTs: 23}}, + }, + expectedUnresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + expectedMinTs: 11, + }, { + unresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + resolvedTs: 30, + expectedResolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + expectedUnresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 33}, {CommitTs: 34}}, + }, + expectedMinTs: 11, + }, { + unresolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + resolvedTs: 40, + expectedResolvedTxns: map[model.TableName][]*model.Txn{ + {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, + {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, + }, + expectedUnresolvedTxns: map[model.TableName][]*model.Txn{}, + expectedMinTs: 11, + }} + for _, tc := range testCases { + minTs, resolvedTxns := splitResolvedTxn(tc.resolvedTs, tc.unresolvedTxns) + c.Assert(minTs, check.Equals, tc.expectedMinTs) + c.Assert(resolvedTxns, check.DeepEquals, tc.expectedResolvedTxns) + c.Assert(tc.unresolvedTxns, check.DeepEquals, tc.expectedUnresolvedTxns) + } +} diff --git a/cdc/sink/dispatcher/default.go b/cdc/sink/dispatcher/default.go index 0c49ec24ec7..229c4460b34 100644 --- a/cdc/sink/dispatcher/default.go +++ b/cdc/sink/dispatcher/default.go @@ -40,8 +40,15 @@ func (d *defaultDispatcher) Dispatch(row *model.RowChangedEvent) int32 { } return int32(hash.Sum32() % uint32(d.partitionNum)) } + // FIXME(leoppro): if the row events includes both pre-cols and cols + // the dispatch logic here is wrong + // distribute partition by rowid or unique column value - value := row.Columns[row.IndieMarkCol].Value + dispatchCols := row.Columns + if len(row.Columns) == 0 { + dispatchCols = row.PreColumns + } + value := dispatchCols[row.IndieMarkCol].Value b, err := json.Marshal(value) if err != nil { log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err)) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 8d654af6e97..cbb6284727a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -19,11 +19,9 @@ import ( "fmt" "net/url" "runtime" - "sort" "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/cenkalti/backoff" @@ -35,6 +33,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sink/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/cyclic" "github.com/pingcap/ticdc/pkg/cyclic/mark" @@ -62,18 +61,16 @@ const ( ) type mysqlSink struct { - db *sql.DB - checkpointTs uint64 - params *sinkParams + db *sql.DB + params *sinkParams filter *filter.Filter cyclic *cyclic.Cyclic - unresolvedTxnsMu sync.Mutex - unresolvedTxns map[model.TableName][]*model.Txn - workers []*mysqlSinkWorker - notifier *notify.Notifier - errCh chan error + txnCache *common.UnresolvedTxnCache + workers []*mysqlSinkWorker + notifier *notify.Notifier + errCh chan error statistics *Statistics @@ -83,55 +80,14 @@ type mysqlSink struct { } func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { - s.unresolvedTxnsMu.Lock() - defer s.unresolvedTxnsMu.Unlock() - for _, row := range rows { - if s.filter.ShouldIgnoreDMLEvent(row.StartTs, row.Table.Schema, row.Table.Table) { - log.Info("Row changed event ignored", zap.Uint64("start-ts", row.StartTs)) - continue - } - key := *row.Table - txns := s.unresolvedTxns[key] - if len(txns) == 0 || txns[len(txns)-1].StartTs != row.StartTs { - // fail-fast check - if len(txns) != 0 && txns[len(txns)-1].CommitTs > row.CommitTs { - log.Fatal("the commitTs of the emit row is less than the received row", - zap.Stringer("table", row.Table), - zap.Uint64("emit row startTs", row.StartTs), - zap.Uint64("emit row commitTs", row.CommitTs), - zap.Uint64("last received row startTs", txns[len(txns)-1].StartTs), - zap.Uint64("last received row commitTs", txns[len(txns)-1].CommitTs)) - } - txns = append(txns, &model.Txn{ - StartTs: row.StartTs, - CommitTs: row.CommitTs, - }) - s.unresolvedTxns[key] = txns - } - txns[len(txns)-1].Append(row) - } + s.txnCache.Append(s.filter, rows...) return nil } func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) error { - if resolvedTs <= atomic.LoadUint64(&s.checkpointTs) { - return nil - } - - defer s.statistics.PrintStatus() - - s.unresolvedTxnsMu.Lock() - if len(s.unresolvedTxns) == 0 { - atomic.StoreUint64(&s.checkpointTs, resolvedTs) - s.unresolvedTxnsMu.Unlock() - return nil - } - - _, resolvedTxnsMap := splitResolvedTxn(resolvedTs, s.unresolvedTxns) - s.unresolvedTxnsMu.Unlock() - + resolvedTxnsMap := s.txnCache.Resolved(resolvedTs) if len(resolvedTxnsMap) == 0 { - atomic.StoreUint64(&s.checkpointTs, resolvedTs) + s.txnCache.UpdateCheckpoint(resolvedTs) return nil } @@ -143,7 +99,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 if err := s.concurrentExec(ctx, resolvedTxnsMap); err != nil { return errors.Trace(err) } - atomic.StoreUint64(&s.checkpointTs, resolvedTs) + s.txnCache.UpdateCheckpoint(resolvedTs) return nil } @@ -222,35 +178,6 @@ func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error { return nil } -func splitResolvedTxn( - resolvedTs uint64, unresolvedTxns map[model.TableName][]*model.Txn, -) (minTs uint64, resolvedRowsMap map[model.TableName][]*model.Txn) { - resolvedRowsMap = make(map[model.TableName][]*model.Txn, len(unresolvedTxns)) - minTs = resolvedTs - for key, txns := range unresolvedTxns { - i := sort.Search(len(txns), func(i int) bool { - return txns[i].CommitTs > resolvedTs - }) - if i == 0 { - continue - } - var resolvedTxns []*model.Txn - if i == len(txns) { - resolvedTxns = txns - delete(unresolvedTxns, key) - } else { - resolvedTxns = txns[:i] - unresolvedTxns[key] = txns[i:] - } - resolvedRowsMap[key] = resolvedTxns - - if len(resolvedTxns) > 0 && resolvedTxns[0].CommitTs < minTs { - minTs = resolvedTxns[0].CommitTs - } - } - return -} - // adjustSQLMode adjust sql mode according to sink config. func (s *mysqlSink) adjustSQLMode(ctx context.Context) error { // Must relax sql mode to support cyclic replication, as downstream may have @@ -444,9 +371,9 @@ func newMySQLSink(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI sink := &mysqlSink{ db: db, - unresolvedTxns: make(map[model.TableName][]*model.Txn), params: params, filter: filter, + txnCache: common.NewUnresolvedTxnCache(), statistics: NewStatistics("mysql", opts), metricConflictDetectDurationHis: metricConflictDetectDurationHis, metricBucketSizeCounters: metricBucketSizeCounters, @@ -664,6 +591,9 @@ func (s *mysqlSink) Close() error { func (s *mysqlSink) execDMLWithMaxRetries( ctx context.Context, sqls []string, values [][]interface{}, maxRetries uint64, bucket int, ) error { + if len(sqls) != len(values) { + log.Fatal("unexpected number of sqls and values", zap.Strings("sqls", sqls), zap.Any("values", values)) + } checkTxnErr := func(err error) error { if errors.Cause(err) == context.Canceled { return backoff.Permanent(err) @@ -713,16 +643,23 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, var query string var args []interface{} var err error - if row.Delete { - query, args, err = prepareDelete(row.Table.Schema, row.Table.Table, row.Columns) - } else { - query, args, err = prepareReplace(row.Table.Schema, row.Table.Table, row.Columns) + // TODO(leoppro): using `UPDATE` instead of `REPLACE` if the old value is enabled + if len(row.PreColumns) != 0 { + query, args, err = prepareDelete(row.Table.Schema, row.Table.Table, row.PreColumns) + if err != nil { + return nil, nil, errors.Trace(err) + } + sqls = append(sqls, query) + values = append(values, args) } - if err != nil { - return nil, nil, errors.Trace(err) + if len(row.Columns) != 0 { + query, args, err = prepareReplace(row.Table.Schema, row.Table.Table, row.Columns) + if err != nil { + return nil, nil, errors.Trace(err) + } + sqls = append(sqls, query) + values = append(values, args) } - sqls = append(sqls, query) - values = append(values, args) } if s.cyclic != nil && len(rows) > 0 { // Write mark table with the current replica ID. @@ -740,6 +677,7 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, if err != nil { return errors.Trace(err) } + log.Debug("show prepareDMLs", zap.Any("rows", rows), zap.Strings("sqls", sqls), zap.Any("values", values)) if err := s.execDMLWithMaxRetries(ctx, sqls, values, defaultDMLMaxRetryTime, bucket); err != nil { ts := make([]uint64, 0, len(rows)) for _, row := range rows { diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 40ae963ce13..539a1133fff 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -21,6 +21,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/pingcap/check" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sink/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/notify" @@ -37,8 +38,8 @@ func newMySQLSink4Test(c *check.C) *mysqlSink { f, err := filter.NewFilter(config.GetDefaultReplicaConfig()) c.Assert(err, check.IsNil) return &mysqlSink{ - unresolvedTxns: make(map[model.TableName][]*model.Txn), - filter: f, + txnCache: common.NewUnresolvedTxnCache(), + filter: f, } } @@ -246,76 +247,7 @@ func (s MySQLSinkSuite) TestEmitRowChangedEvents(c *check.C) { ms := newMySQLSink4Test(c) err := ms.EmitRowChangedEvents(ctx, tc.input...) c.Assert(err, check.IsNil) - c.Assert(ms.unresolvedTxns, check.DeepEquals, tc.expected) - } -} - -func (s MySQLSinkSuite) TestSplitResolvedTxn(c *check.C) { - testCases := []struct { - unresolvedTxns map[model.TableName][]*model.Txn - resolvedTs uint64 - expectedResolvedTxns map[model.TableName][]*model.Txn - expectedUnresolvedTxns map[model.TableName][]*model.Txn - expectedMinTs uint64 - }{{ - unresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - resolvedTs: 5, - expectedResolvedTxns: map[model.TableName][]*model.Txn{}, - expectedUnresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - expectedMinTs: 5, - }, { - unresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - resolvedTs: 23, - expectedResolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}}, - {Table: "t2"}: {{CommitTs: 23}}, - }, - expectedUnresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - expectedMinTs: 11, - }, { - unresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - resolvedTs: 30, - expectedResolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - expectedUnresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 33}, {CommitTs: 34}}, - }, - expectedMinTs: 11, - }, { - unresolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - resolvedTs: 40, - expectedResolvedTxns: map[model.TableName][]*model.Txn{ - {Table: "t1"}: {{CommitTs: 11}, {CommitTs: 21}, {CommitTs: 21}, {CommitTs: 23}, {CommitTs: 33}, {CommitTs: 34}}, - {Table: "t2"}: {{CommitTs: 23}, {CommitTs: 24}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 26}, {CommitTs: 29}}, - }, - expectedUnresolvedTxns: map[model.TableName][]*model.Txn{}, - expectedMinTs: 11, - }} - for _, tc := range testCases { - minTs, resolvedTxns := splitResolvedTxn(tc.resolvedTs, tc.unresolvedTxns) - c.Assert(minTs, check.Equals, tc.expectedMinTs) - c.Assert(resolvedTxns, check.DeepEquals, tc.expectedResolvedTxns) - c.Assert(tc.unresolvedTxns, check.DeepEquals, tc.expectedUnresolvedTxns) + c.Assert(ms.txnCache.Unresolved(), check.DeepEquals, tc.expected) } } diff --git a/pkg/config/config.go b/pkg/config/config.go index 8dd4b801e57..edcc331fcf9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,7 +25,8 @@ import ( ) var defaultReplicaConfig = &ReplicaConfig{ - CaseSensitive: true, + CaseSensitive: true, + EnableOldValue: false, Filter: &FilterConfig{ Rules: []string{"*.*"}, }, @@ -48,12 +49,13 @@ var defaultReplicaConfig = &ReplicaConfig{ type ReplicaConfig replicaConfig type replicaConfig struct { - CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` - Filter *FilterConfig `toml:"filter" json:"filter"` - Mounter *MounterConfig `toml:"mounter" json:"mounter"` - Sink *SinkConfig `toml:"sink" json:"sink"` - Cyclic *CyclicConfig `toml:"cyclic-replication" json:"cyclic-replication"` - Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` + EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` + Filter *FilterConfig `toml:"filter" json:"filter"` + Mounter *MounterConfig `toml:"mounter" json:"mounter"` + Sink *SinkConfig `toml:"sink" json:"sink"` + Cyclic *CyclicConfig `toml:"cyclic-replication" json:"cyclic-replication"` + Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` } // Marshal returns the json marshal format of a ReplicationConfig diff --git a/pkg/regionspan/span.go b/pkg/regionspan/span.go index 3424d5ed19a..b554c4ce0bd 100644 --- a/pkg/regionspan/span.go +++ b/pkg/regionspan/span.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" @@ -62,11 +63,19 @@ func hackSpan(originStart []byte, originEnd []byte) (start []byte, end []byte) { } // GetTableSpan returns the span to watch for the specified table -func GetTableSpan(tableID int64) Span { +func GetTableSpan(tableID int64, exceptIndexSpan bool) Span { sep := byte('_') + recordMarker := byte('r') tablePrefix := tablecodec.GenTablePrefix(tableID) - start := append(tablePrefix, sep) - end := append(tablePrefix, sep+1) + var start, end kv.Key + // ignore index keys if we don't need them + if exceptIndexSpan { + start = append(tablePrefix, sep, recordMarker) + end = append(tablePrefix, sep, recordMarker+1) + } else { + start = append(tablePrefix, sep) + end = append(tablePrefix, sep+1) + } return Span{ Start: start, End: end, diff --git a/pkg/regionspan/span_test.go b/pkg/regionspan/span_test.go index 5b8b0dfc4d1..bdd230413b6 100644 --- a/pkg/regionspan/span_test.go +++ b/pkg/regionspan/span_test.go @@ -100,12 +100,19 @@ func (s *spanSuite) TestIntersect(c *check.C) { } func (s *spanSuite) TestGetTableSpan(c *check.C) { - span := GetTableSpan(123) + span := GetTableSpan(123, false) c.Assert(span.Start, check.Less, span.End) prefix := []byte(tablecodec.GenTablePrefix(123)) c.Assert(span.Start, check.Greater, prefix) prefix[len(prefix)-1]++ c.Assert(span.End, check.Less, prefix) + + span = GetTableSpan(123, true) + c.Assert(span.Start, check.Less, span.End) + prefix = []byte(tablecodec.GenTableRecordPrefix(123)) + c.Assert(span.Start, check.GreaterEqual, prefix) + prefix[len(prefix)-1]++ + c.Assert(span.End, check.LessEqual, prefix) } func (s *spanSuite) TestSpanHack(c *check.C) {