Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: support to output old value #708

Merged
merged 28 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 145 additions & 82 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type baseKVEntry struct {

type rowKVEntry struct {
baseKVEntry
Row map[int64]types.Datum
Row map[int64]types.Datum
PreRow map[int64]types.Datum
}

type indexKVEntry struct {
Expand Down Expand Up @@ -124,10 +125,11 @@ type mounterImpl struct {
rawRowChangedChs []chan *model.PolymorphicEvent
tz *time.Location
workerNum int
enableOldValue bool
}

// NewMounter creates a mounter
func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter {
func NewMounter(schemaStorage *SchemaStorage, workerNum int, enableOldValue bool) Mounter {
if workerNum <= 0 {
workerNum = defaultMounterWorkerNum
}
Expand All @@ -139,6 +141,7 @@ func NewMounter(schemaStorage *SchemaStorage, workerNum int) Mounter {
schemaStorage: schemaStorage,
rawRowChangedChs: chs,
workerNum: workerNum,
enableOldValue: enableOldValue,
}
}

Expand Down Expand Up @@ -245,7 +248,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
switch {
case bytes.HasPrefix(key, recordPrefix):
rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, baseInfo)
rowKV, err := m.unmarshalRowKVEntry(tableInfo, key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -254,7 +257,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
return m.mountRowKVEntry(tableInfo, rowKV)
case bytes.HasPrefix(key, indexPrefix):
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, baseInfo)
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -272,7 +275,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return row, err
}

func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, base baseKVEntry) (*rowKVEntry, error) {
func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*rowKVEntry, error) {
key, recordID, err := decodeRecordID(restKey)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -284,16 +287,27 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, restKey []
if err != nil {
return nil, errors.Trace(err)
}
var preRow map[int64]types.Datum
if rawOldValue != nil {
preRow, err = decodeRow(rawOldValue, recordID, tableInfo, m.tz)
if err != nil {
return nil, errors.Trace(err)
}
}
base.RecordID = recordID
return &rowKVEntry{
baseKVEntry: base,
Row: row,
PreRow: preRow,
}, nil
}

func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// skip set index KV
if !base.Delete {
func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// Skip set index KV.
// By default we cannot get the old value of a deleted row, then we must get the value of unique key
// or primary key for seeking the deleted row through its index key.
// After the old value was enabled, we can skip the index key.
if !base.Delete || m.enableOldValue {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}

Expand Down Expand Up @@ -353,23 +367,22 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
return job, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
if row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}

datumsNum := 1
if !row.Delete {
datumsNum = len(tableInfo.Columns)
func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) (map[string]*model.Column, error) {
estimateLen := len(datums)
if fillWithDefaultValue {
estimateLen = len(tableInfo.Columns)
}

values := make(map[string]*model.Column, datumsNum)
for index, colValue := range row.Row {
cols := make(map[string]*model.Column, estimateLen)
for index, colValue := range datums {
colInfo, exist := tableInfo.GetColumnInfo(index)
if !exist {
return nil, errors.NotFoundf("column info, colID: %d", index)
}
if !row.Delete && !tableInfo.IsColWritable(colInfo) {
// the judge about `fillWithDefaultValue` is tricky
// if the `fillWithDefaultValue` is true, the event must be deletion
// we should output the generated column in deletion event
// this tricky code will be improve after pingcap/ticdc#787 merged
if !tableInfo.IsColWritable(colInfo) && fillWithDefaultValue {
continue
}
colName := colInfo.Name.O
Expand All @@ -386,52 +399,90 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
whereHandle := true
col.WhereHandle = &whereHandle
}
values[colName] = col
cols[colName] = col
}
if !fillWithDefaultValue {
return cols, nil
}
for _, col := range tableInfo.Columns {
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
_, ok := cols[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
cols[col.Name.O] = column
}
}
return cols, nil
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry) (*model.RowChangedEvent, error) {
// if m.enableOldValue == true, go into this function
// if m.enableNewValue == false and row.Delete == false, go into this function
// if m.enableNewValue == false and row.Delete == true and tableInfo.PKIsHandle = true, go into this function
// only if m.enableNewValue == false and row.Delete == true and tableInfo.PKIsHandle == false, skip this function
if !m.enableOldValue && row.Delete && !tableInfo.PKIsHandle {
return nil, nil
}

var err error
// Decode previous columns.
var preCols map[string]*model.Column
if len(row.PreRow) != 0 {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, err = datum2Column(tableInfo, row.PreRow, true)
if err != nil {
return nil, errors.Trace(err)
}
}

var cols map[string]*model.Column
oldValueDisabledAndRowIsDelete := !m.enableOldValue && row.Delete
cols, err = datum2Column(tableInfo, row.Row, !oldValueDisabledAndRowIsDelete)
if err != nil {
return nil, errors.Trace(err)
}
if oldValueDisabledAndRowIsDelete {
preCols = cols
cols = nil
}

var partitionID int64
if tableInfo.GetPartitionInfo() != nil {
partitionID = row.PhysicalTableID
}

event := &model.RowChangedEvent{
schemaName := tableInfo.TableName.Schema
tableName := tableInfo.TableName.Table
return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: row.RecordID,
TableInfoVersion: tableInfo.TableInfoVersion,
Table: &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
Schema: schemaName,
Table: tableName,
Partition: partitionID,
},
IndieMarkCol: tableInfo.IndieMarkCol,
}

if !row.Delete {
for _, col := range tableInfo.Columns {
_, ok := values[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
values[col.Name.O] = column
}
}
}
event.Delete = row.Delete
event.Columns = values
event.Keys = genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(event.Table.Schema, event.Table.Table))
return event, nil
Delete: row.Delete,
Columns: cols,
PreColumns: preCols,
// FIXME(leoppor): Correctness of conflict detection with old values
Keys: genMultipleKeys(tableInfo.TableInfo, preCols, cols, quotes.QuoteSchema(schemaName, tableName)),
}, nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete {
if !idx.Delete || m.enableOldValue {
return nil, nil
}

Expand All @@ -450,14 +501,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV
return nil, errors.Trace(err)
}

values := make(map[string]*model.Column, len(idx.IndexValue))
preCols := make(map[string]*model.Column, len(idx.IndexValue))
for i, idxCol := range indexInfo.Columns {
value, err := formatColVal(idx.IndexValue[i], tableInfo.Columns[idxCol.Offset].Tp)
if err != nil {
return nil, errors.Trace(err)
}
whereHandle := true
values[idxCol.Name.O] = &model.Column{
preCols[idxCol.Name.O] = &model.Column{
Type: tableInfo.Columns[idxCol.Offset].Tp,
WhereHandle: &whereHandle,
Value: value,
Expand All @@ -474,8 +525,8 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV
},
IndieMarkCol: tableInfo.IndieMarkCol,
Delete: true,
Columns: values,
Keys: genMultipleKeys(tableInfo.TableInfo, values, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)),
PreColumns: preCols,
Keys: genMultipleKeys(tableInfo.TableInfo, preCols, nil, quotes.QuoteSchema(tableInfo.TableName.Schema, tableInfo.TableName.Table)),
}, nil
}

Expand Down Expand Up @@ -553,43 +604,55 @@ func fetchHandleValue(tableInfo *model.TableInfo, recordID int64) (pkCoID int64,
return
}

func genMultipleKeys(ti *timodel.TableInfo, values map[string]*model.Column, table string) []string {
multipleKeys := make([]string, 0, len(ti.Indices)+1)
if ti.PKIsHandle {
if pk := ti.GetPkColInfo(); pk != nil && !pk.IsGenerated() {
cols := []*timodel.ColumnInfo{pk}
key := genKeyList(table, cols, values)
if len(key) > 0 { // ignore `null` value.
multipleKeys = append(multipleKeys, key)
} else {
log.L().Debug("ignore empty primary key", zap.String("table", table))
}
}
func genMultipleKeys(ti *timodel.TableInfo, preCols, cols map[string]*model.Column, table string) []string {
estimateLen := len(ti.Indices) + 1
if len(preCols) != 0 && len(cols) != 0 {
estimateLen *= 2
}

for _, indexCols := range ti.Indices {
if !indexCols.Unique {
continue
multipleKeys := make([]string, 0, estimateLen)
buildKeys := func(colValues map[string]*model.Column) {
if len(colValues) == 0 {
return
}
cols := getIndexColumns(ti.Columns, indexCols)
key := genKeyList(table, cols, values)
if len(key) > 0 { // ignore `null` value.
noGeneratedColumn := true
for _, col := range cols {
if col.IsGenerated() {
noGeneratedColumn = false
break
if ti.PKIsHandle {
if pk := ti.GetPkColInfo(); pk != nil && !pk.IsGenerated() {
cols := []*timodel.ColumnInfo{pk}

key := genKeyList(table, cols, colValues)
if len(key) > 0 { // ignore `null` value.
multipleKeys = append(multipleKeys, key)
} else {
log.L().Debug("ignore empty primary key", zap.String("table", table))
}
}
// If the index contain generated column, we can't use this key to detect conflict with other DML,
// Because such as insert can't specified the generated value.
if noGeneratedColumn {
multipleKeys = append(multipleKeys, key)
}

for _, indexCols := range ti.Indices {
if !indexCols.Unique {
continue
}
cols := getIndexColumns(ti.Columns, indexCols)
key := genKeyList(table, cols, colValues)
if len(key) > 0 { // ignore `null` value.
noGeneratedColumn := true
for _, col := range cols {
if col.IsGenerated() {
noGeneratedColumn = false
break
}
}
// If the index contain generated column, we can't use this key to detect conflict with other DML,
// Because such as insert can't specified the generated value.
if noGeneratedColumn {
multipleKeys = append(multipleKeys, key)
}
} else {
log.L().Debug("ignore empty index key", zap.String("table", table))
}
} else {
log.L().Debug("ignore empty index key", zap.String("table", table))
}
}
buildKeys(preCols)
buildKeys(cols)

if len(multipleKeys) == 0 {
// use table name as key if no key generated (no PK/UK),
Expand Down
Loading