Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: support to output old value #708

Merged
merged 28 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 81 additions & 53 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type baseKVEntry struct {

type rowKVEntry struct {
baseKVEntry
Row map[int64]types.Datum
Row map[int64]types.Datum
ChangedRow map[int64]types.Datum
zier-one marked this conversation as resolved.
Show resolved Hide resolved
}

type indexKVEntry struct {
Expand Down Expand Up @@ -124,10 +125,11 @@ type mounterImpl struct {
rawRowChangedChs []chan *model.PolymorphicEvent
tz *time.Location
workerNum int
enableOldValue bool
}

// NewMounter creates a mounter
func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter {
func NewMounter(schemaStorage *SchemaStorage, workerNum int, enableOldValue bool) Mounter {
if workerNum <= 0 {
workerNum = defaultMounterWorkerNum
}
Expand All @@ -139,6 +141,7 @@ func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter {
schemaStorage: schemaStorage,
rawRowChangedChs: chs,
workerNum: workerNum,
enableOldValue: enableOldValue,
}
}

Expand Down Expand Up @@ -245,7 +248,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
switch {
case bytes.HasPrefix(key, recordPrefix):
rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, baseInfo)
rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -254,7 +257,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
return m.mountRowKVEntry(tableInfo, rowKV)
case bytes.HasPrefix(key, indexPrefix):
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, baseInfo)
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -272,7 +275,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return row, err
}

func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *TableInfo, restKey []byte, rawValue []byte, base baseKVEntry) (*rowKVEntry, error) {
func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *TableInfo, restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*rowKVEntry, error) {
key, recordID, err := decodeRecordID(restKey)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -284,16 +287,24 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *TableInfo, restKey []byte,
if err != nil {
return nil, errors.Trace(err)
}
var changedRow map[int64]types.Datum
zier-one marked this conversation as resolved.
Show resolved Hide resolved
if rawOldValue != nil {
changedRow, err = decodeRow(rawOldValue, recordID, tableInfo, m.tz)
if err != nil {
return nil, errors.Trace(err)
}
}
base.RecordID = recordID
return &rowKVEntry{
baseKVEntry: base,
Row: row,
ChangedRow: changedRow,
}, nil
}

func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, base baseKVEntry) (*indexKVEntry, error) {
func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// skip set index KV
if !base.Delete {
if !base.Delete || m.enableOldValue {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

Expand Down Expand Up @@ -353,23 +364,14 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
return job, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
if row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}

datumsNum := 1
if !row.Delete {
datumsNum = len(tableInfo.Columns)
}

values := make(map[string]*model.Column, datumsNum)
for index, colValue := range row.Row {
func datum2Column(tableInfo *TableInfo, datums map[int64]types.Datum) (map[string]*model.Column, error) {
cols := make(map[string]*model.Column, len(datums))
for index, colValue := range datums {
colInfo, exist := tableInfo.GetColumnInfo(index)
if !exist {
return nil, errors.NotFoundf("column info, colID: %d", index)
}
if !row.Delete && !tableInfo.IsColWritable(colInfo) {
if !tableInfo.IsColWritable(colInfo) {
continue
}
colName := colInfo.Name.O
Expand All @@ -386,53 +388,79 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m
whereHandle := true
col.WhereHandle = &whereHandle
}
values[colName] = col
cols[colName] = col
}
for _, col := range tableInfo.Columns {
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
_, ok := cols[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
cols[col.Name.O] = column
}
}
return cols, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
if !m.enableOldValue && row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}

var err error
// Decode changed columns.
var changedCols map[string]*model.Column
if len(row.ChangedRow) != 0 {
changedCols, err = datum2Column(tableInfo, row.ChangedRow)
if err != nil {
return nil, errors.Trace(err)
}
}
// Try to decode columns, if row represents a delete event, columns should be == changed columns
var cols map[string]*model.Column
if !row.Delete {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
cols, err = datum2Column(tableInfo, row.Row)
zier-one marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
}
} else {
cols = changedCols
changedCols = nil
}
var partitionID int64
if tableInfo.GetPartitionInfo() != nil {
partitionID = row.PhysicalTableID
}

event := &model.RowChangedEvent{
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: row.RecordID,
SchemaID: tableInfo.SchemaID,
TableUpdateTs: tableInfo.UpdateTS,
Table: &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
Schema: schemaName,
Table: tableName,
Partition: partitionID,
},
IndieMarkCol: tableInfo.IndieMarkCol,
}

if !row.Delete {
for _, col := range tableInfo.Columns {
_, ok := values[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
values[col.Name.O] = column
}
}
}
event.Delete = row.Delete
event.Columns = values
event.Keys = genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(event.Table.Schema, event.Table.Table))
return event, nil
IndieMarkCol: tableInfo.IndieMarkCol,
Delete: row.Delete,
Columns: cols,
ChangedColumns: changedCols,
Keys: genMultipleKeys(tableInfo.TableInfo, cols, quotes.QuoteSchema(schemaName, tableName)),
}, nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete {
if !idx.Delete || m.enableOldValue {
return nil, nil
}

Expand All @@ -451,14 +479,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *TableInfo, idx *indexKVEntry)
return nil, errors.Trace(err)
}

values := make(map[string]*model.Column, len(idx.IndexValue))
cols := make(map[string]*model.Column, len(idx.IndexValue))
for i, idxCol := range indexInfo.Columns {
value, err := formatColVal(idx.IndexValue[i], tableInfo.Columns[idxCol.Offset].Tp)
if err != nil {
return nil, errors.Trace(err)
}
whereHandle := true
values[idxCol.Name.O] = &model.Column{
cols[idxCol.Name.O] = &model.Column{
Type: tableInfo.Columns[idxCol.Offset].Tp,
WhereHandle: &whereHandle,
Value: value,
Expand All @@ -476,8 +504,8 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *TableInfo, idx *indexKVEntry)
},
IndieMarkCol: tableInfo.IndieMarkCol,
Delete: true,
Columns: values,
Keys: genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)),
Columns: cols,
Keys: genMultipleKeys(tableInfo.TableInfo, cols, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)),
}, nil
}

Expand Down
26 changes: 20 additions & 6 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ type CDCClient struct {

regionCache *tikv.RegionCache
kvStorage tikv.Storage

enableOldValue bool
}

// NewCDCClient creates a CDCClient instance
Expand Down Expand Up @@ -286,6 +288,11 @@ func (c *CDCClient) Close() error {
return nil
}

// EnableOldValue enables to request old value
func (c *CDCClient) EnableOldValue() {
c.enableOldValue = true
}

func (c *CDCClient) getConn(ctx context.Context, addr string) (*grpc.ClientConn, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -604,6 +611,11 @@ MainLoop:

requestID := allocID()

extraOp := kvrpcpb.ExtraOp_Noop
if s.client.enableOldValue {
extraOp = kvrpcpb.ExtraOp_ReadOldValue
}

req := &cdcpb.ChangeDataRequest{
Header: &cdcpb.Header{
ClusterId: s.client.clusterID,
Expand All @@ -614,6 +626,7 @@ MainLoop:
CheckpointTs: sri.ts,
StartKey: sri.span.Start,
EndKey: sri.span.End,
ExtraOp: extraOp,
}

// The receiver thread need to know the span, which is only known in the sender thread. So create the
Expand Down Expand Up @@ -1363,7 +1376,7 @@ func (s *eventFeedSession) resolveLock(ctx context.Context, regionID uint64, max
return nil
}

func assembleCommitEvent(entry *cdcpb.Event_Row, value []byte) (*model.RegionFeedEvent, error) {
func assembleCommitEvent(entry *cdcpb.Event_Row, value *pendingValue) (*model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
case cdcpb.Event_Row_DELETE:
Expand All @@ -1376,11 +1389,12 @@ func assembleCommitEvent(entry *cdcpb.Event_Row, value []byte) (*model.RegionFee

revent := &model.RegionFeedEvent{
Val: &model.RawKVEntry{
OpType: opType,
Key: entry.Key,
Value: value,
StartTs: entry.StartTs,
CRTs: entry.CommitTs,
OpType: opType,
Key: entry.Key,
Value: value.value,
OldValue: value.oldValue,
StartTs: entry.StartTs,
CRTs: entry.CommitTs,
},
}
return revent, nil
Expand Down
16 changes: 12 additions & 4 deletions cdc/kv/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import (
"github.com/pingcap/kvproto/pkg/cdcpb"
)

type pendingValue struct {
value []byte
oldValue []byte
}

type matcher struct {
// TODO : clear the single prewrite
unmatchedValue map[matchKey][]byte
unmatchedValue map[matchKey]*pendingValue
cachedCommit []*cdcpb.Event_Row
}

Expand All @@ -34,7 +39,7 @@ func newMatchKey(row *cdcpb.Event_Row) matchKey {

func newMatcher() *matcher {
return &matcher{
unmatchedValue: make(map[matchKey][]byte),
unmatchedValue: make(map[matchKey]*pendingValue),
}
}

Expand All @@ -46,10 +51,13 @@ func (m *matcher) putPrewriteRow(row *cdcpb.Event_Row) {
if _, exist := m.unmatchedValue[key]; exist && len(value) == 0 {
return
}
m.unmatchedValue[key] = value
m.unmatchedValue[key] = &pendingValue{
value: value,
oldValue: row.GetOldValue(),
}
}

func (m *matcher) matchRow(row *cdcpb.Event_Row) ([]byte, bool) {
func (m *matcher) matchRow(row *cdcpb.Event_Row) (*pendingValue, bool) {
if value, exist := m.unmatchedValue[newMatchKey(row)]; exist {
delete(m.unmatchedValue, newMatchKey(row))
return value, true
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ func (s *MatcherSuite) TestMatcher(c *check.C) {
}
value2, ok := matcher.matchRow(commitRow2)
c.Assert(ok, check.IsTrue)
c.Assert(value2, check.BytesEquals, []byte("v2"))
c.Assert(value2.value, check.BytesEquals, []byte("v2"))
}
8 changes: 5 additions & 3 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ type ResolvedSpan struct {
type RawKVEntry struct {
OpType OpType
Key []byte
// Nil fro delete type
Value []byte
StartTs uint64
// nil for delete type
Value []byte
// nil for insert type
OldValue []byte
StartTs uint64
// Commit or resolved TS
CRTs uint64
}
Expand Down
7 changes: 4 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ type RowChangedEvent struct {

// if the table of this row only has one unique index(includes primary key),
// IndieMarkCol will be set to the name of the unique index
IndieMarkCol string `json:"indie-mark-col"`
Columns map[string]*Column `json:"columns"`
Keys []string `json:"keys"`
IndieMarkCol string `json:"indie-mark-col"`
Columns map[string]*Column `json:"columns"`
ChangedColumns map[string]*Column `json:"changed-columns"`
zier-one marked this conversation as resolved.
Show resolved Hide resolved
Keys []string `json:"keys"`
}

// Column represents a column value in row changed event
Expand Down
Loading