Skip to content

Commit

Permalink
sink: support to output old value (#708)
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <[email protected]>
  • Loading branch information
5kbpers authored Aug 3, 2020
1 parent efd1df7 commit 62e246f
Show file tree
Hide file tree
Showing 22 changed files with 528 additions and 322 deletions.
227 changes: 145 additions & 82 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type baseKVEntry struct {

type rowKVEntry struct {
baseKVEntry
Row map[int64]types.Datum
Row map[int64]types.Datum
PreRow map[int64]types.Datum
}

type indexKVEntry struct {
Expand Down Expand Up @@ -124,10 +125,11 @@ type mounterImpl struct {
rawRowChangedChs []chan *model.PolymorphicEvent
tz *time.Location
workerNum int
enableOldValue bool
}

// NewMounter creates a mounter
func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter {
func NewMounter(schemaStorage *SchemaStorage, workerNum int, enableOldValue bool) Mounter {
if workerNum <= 0 {
workerNum = defaultMounterWorkerNum
}
Expand All @@ -139,6 +141,7 @@ func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter {
schemaStorage: schemaStorage,
rawRowChangedChs: chs,
workerNum: workerNum,
enableOldValue: enableOldValue,
}
}

Expand Down Expand Up @@ -245,7 +248,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
switch {
case bytes.HasPrefix(key, recordPrefix):
rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, baseInfo)
rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -254,7 +257,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
return m.mountRowKVEntry(tableInfo, rowKV)
case bytes.HasPrefix(key, indexPrefix):
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, baseInfo)
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -272,7 +275,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return row, err
}

func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, base baseKVEntry) (*rowKVEntry, error) {
func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*rowKVEntry, error) {
key, recordID, err := decodeRecordID(restKey)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -284,16 +287,27 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []
if err != nil {
return nil, errors.Trace(err)
}
var preRow map[int64]types.Datum
if rawOldValue != nil {
preRow, err = decodeRow(rawOldValue, recordID, tableInfo, m.tz)
if err != nil {
return nil, errors.Trace(err)
}
}
base.RecordID = recordID
return &rowKVEntry{
baseKVEntry: base,
Row: row,
PreRow: preRow,
}, nil
}

func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// skip set index KV
if !base.Delete {
func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// Skip set index KV.
// By default we cannot get the old value of a deleted row, then we must get the value of unique key
// or primary key for seeking the deleted row through its index key.
// After the old value was enabled, we can skip the index key.
if !base.Delete || m.enableOldValue {
return nil, nil
}

Expand Down Expand Up @@ -353,23 +367,22 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
return job, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
if row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}

datumsNum := 1
if !row.Delete {
datumsNum = len(tableInfo.Columns)
func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) (map[string]*model.Column, error) {
estimateLen := len(datums)
if fillWithDefaultValue {
estimateLen = len(tableInfo.Columns)
}

values := make(map[string]*model.Column, datumsNum)
for index, colValue := range row.Row {
cols := make(map[string]*model.Column, estimateLen)
for index, colValue := range datums {
colInfo, exist := tableInfo.GetColumnInfo(index)
if !exist {
return nil, errors.NotFoundf("column info, colID: %d", index)
}
if !row.Delete && !tableInfo.IsColWritable(colInfo) {
// the judge about `fillWithDefaultValue` is tricky
// if the `fillWithDefaultValue` is true, the event must be deletion
// we should output the generated column in deletion event
// this tricky code will be improve after pingcap/ticdc#787 merged
if !tableInfo.IsColWritable(colInfo) && fillWithDefaultValue {
continue
}
colName := colInfo.Name.O
Expand All @@ -386,52 +399,90 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
whereHandle := true
col.WhereHandle = &whereHandle
}
values[colName] = col
cols[colName] = col
}
if !fillWithDefaultValue {
return cols, nil
}
for _, col := range tableInfo.Columns {
_, ok := cols[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
cols[col.Name.O] = column
}
}
return cols, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
// if m.enableOldValue == true, go into this function
// if m.enableNewValue == false and row.Delete == false, go into this function
// if m.enableNewValue == false and row.Delete == true and tableInfo.PKIsHandle = true, go into this function
// only if m.enableNewValue == false and row.Delete == true and tableInfo.PKIsHandle == false, skip this function
if !m.enableOldValue && row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}

var err error
// Decode previous columns.
var preCols map[string]*model.Column
if len(row.PreRow) != 0 {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, err = datum2Column(tableInfo, row.PreRow, true)
if err != nil {
return nil, errors.Trace(err)
}
}

var cols map[string]*model.Column
oldValueDisabledAndRowIsDelete := !m.enableOldValue && row.Delete
cols, err = datum2Column(tableInfo, row.Row, !oldValueDisabledAndRowIsDelete)
if err != nil {
return nil, errors.Trace(err)
}
if oldValueDisabledAndRowIsDelete {
preCols = cols
cols = nil
}

var partitionID int64
if tableInfo.GetPartitionInfo() != nil {
partitionID = row.PhysicalTableID
}

event := &model.RowChangedEvent{
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: row.RecordID,
TableInfoVersion: tableInfo.TableInfoVersion,
Table: &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
Schema: schemaName,
Table: tableName,
Partition: partitionID,
},
IndieMarkCol: tableInfo.IndieMarkCol,
}

if !row.Delete {
for _, col := range tableInfo.Columns {
_, ok := values[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
values[col.Name.O] = column
}
}
}
event.Delete = row.Delete
event.Columns = values
event.Keys = genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(event.Table.Schema, event.Table.Table))
return event, nil
Delete: row.Delete,
Columns: cols,
PreColumns: preCols,
// FIXME(leoppor): Correctness of conflict detection with old values
Keys: genMultipleKeys(tableInfo.TableInfo, preCols, cols, quotes.QuoteSchema(schemaName, tableName)),
}, nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete {
if !idx.Delete || m.enableOldValue {
return nil, nil
}

Expand All @@ -450,14 +501,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV
return nil, errors.Trace(err)
}

values := make(map[string]*model.Column, len(idx.IndexValue))
preCols := make(map[string]*model.Column, len(idx.IndexValue))
for i, idxCol := range indexInfo.Columns {
value, err := formatColVal(idx.IndexValue[i], tableInfo.Columns[idxCol.Offset].Tp)
if err != nil {
return nil, errors.Trace(err)
}
whereHandle := true
values[idxCol.Name.O] = &model.Column{
preCols[idxCol.Name.O] = &model.Column{
Type: tableInfo.Columns[idxCol.Offset].Tp,
WhereHandle: &whereHandle,
Value: value,
Expand All @@ -474,8 +525,8 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV
},
IndieMarkCol: tableInfo.IndieMarkCol,
Delete: true,
Columns: values,
Keys: genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)),
PreColumns: preCols,
Keys: genMultipleKeys(tableInfo.TableInfo, preCols, nil, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)),
}, nil
}

Expand Down Expand Up @@ -553,43 +604,55 @@ func fetchHandleValue(tableInfo *model.TableInfo, recordID int64) (pkCoID int64,
return
}

func genMultipleKeys(ti *timodel.TableInfo, values map[string]*model.Column, table string) []string {
multipleKeys := make([]string, 0, len(ti.Indices)+1)
if ti.PKIsHandle {
if pk := ti.GetPkColInfo(); pk != nil && !pk.IsGenerated() {
cols := []*timodel.ColumnInfo{pk}
key := genKeyList(table, cols, values)
if len(key) > 0 { // ignore `null` value.
multipleKeys = append(multipleKeys, key)
} else {
log.L().Debug("ignore empty primary key", zap.String("table", table))
}
}
func genMultipleKeys(ti *timodel.TableInfo, preCols, cols map[string]*model.Column, table string) []string {
estimateLen := len(ti.Indices) + 1
if len(preCols) != 0 && len(cols) != 0 {
estimateLen *= 2
}

for _, indexCols := range ti.Indices {
if !indexCols.Unique {
continue
multipleKeys := make([]string, 0, estimateLen)
buildKeys := func(colValues map[string]*model.Column) {
if len(colValues) == 0 {
return
}
cols := getIndexColumns(ti.Columns, indexCols)
key := genKeyList(table, cols, values)
if len(key) > 0 { // ignore `null` value.
noGeneratedColumn := true
for _, col := range cols {
if col.IsGenerated() {
noGeneratedColumn = false
break
if ti.PKIsHandle {
if pk := ti.GetPkColInfo(); pk != nil && !pk.IsGenerated() {
cols := []*timodel.ColumnInfo{pk}

key := genKeyList(table, cols, colValues)
if len(key) > 0 { // ignore `null` value.
multipleKeys = append(multipleKeys, key)
} else {
log.L().Debug("ignore empty primary key", zap.String("table", table))
}
}
// If the index contain generated column, we can't use this key to detect conflict with other DML,
// Because such as insert can't specified the generated value.
if noGeneratedColumn {
multipleKeys = append(multipleKeys, key)
}

for _, indexCols := range ti.Indices {
if !indexCols.Unique {
continue
}
cols := getIndexColumns(ti.Columns, indexCols)
key := genKeyList(table, cols, colValues)
if len(key) > 0 { // ignore `null` value.
noGeneratedColumn := true
for _, col := range cols {
if col.IsGenerated() {
noGeneratedColumn = false
break
}
}
// If the index contain generated column, we can't use this key to detect conflict with other DML,
// Because such as insert can't specified the generated value.
if noGeneratedColumn {
multipleKeys = append(multipleKeys, key)
}
} else {
log.L().Debug("ignore empty index key", zap.String("table", table))
}
} else {
log.L().Debug("ignore empty index key", zap.String("table", table))
}
}
buildKeys(preCols)
buildKeys(cols)

if len(multipleKeys) == 0 {
// use table name as key if no key generated (no PK/UK),
Expand Down
Loading

0 comments on commit 62e246f

Please sign in to comment.