Skip to content

Commit

Permalink
Merge branch 'master' into fix_4778_openapi_stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Mar 9, 2022
2 parents d10a6a3 + f085477 commit a776844
Show file tree
Hide file tree
Showing 12 changed files with 973 additions and 240 deletions.
16 changes: 16 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,22 @@ func (r *RowChangedEvent) HandleKeyColumns() []*Column {
return pkeyCols
}

// WithHandlePrimaryFlag set `HandleKeyFlag` and `PrimaryKeyFlag`
func (r *RowChangedEvent) WithHandlePrimaryFlag(colNames map[string]struct{}) {
for _, col := range r.Columns {
if _, ok := colNames[col.Name]; ok {
col.Flag.SetIsHandleKey()
col.Flag.SetIsPrimaryKey()
}
}
for _, col := range r.PreColumns {
if _, ok := colNames[col.Name]; ok {
col.Flag.SetIsHandleKey()
col.Flag.SetIsPrimaryKey()
}
}
}

// ApproximateBytes returns approximate bytes in memory consumed by the event.
func (r *RowChangedEvent) ApproximateBytes() int {
const sizeOfRowEvent = int(unsafe.Sizeof(*r))
Expand Down
148 changes: 107 additions & 41 deletions cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -77,6 +78,9 @@ type canalFlatMessageInterface interface {
getData() map[string]interface{}
getMySQLType() map[string]string
getJavaSQLType() map[string]int32
mqMessageType() model.MqMessageType
eventType() canal.EventType
pkNameSet() map[string]struct{}
}

// adapted from https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java#L1
Expand Down Expand Up @@ -148,6 +152,30 @@ func (c *canalFlatMessage) getJavaSQLType() map[string]int32 {
return c.SQLType
}

func (c *canalFlatMessage) mqMessageType() model.MqMessageType {
if c.IsDDL {
return model.MqMessageTypeDDL
}

if c.EventType == tidbWaterMarkType {
return model.MqMessageTypeResolved
}

return model.MqMessageTypeRow
}

func (c *canalFlatMessage) eventType() canal.EventType {
return canal.EventType(canal.EventType_value[c.EventType])
}

func (c *canalFlatMessage) pkNameSet() map[string]struct{} {
result := make(map[string]struct{}, len(c.PKNames))
for _, item := range c.PKNames {
result[item] = struct{}{}
}
return result
}

type tidbExtension struct {
CommitTs uint64 `json:"commitTs,omitempty"`
WatermarkTs uint64 `json:"watermarkTs,omitempty"`
Expand Down Expand Up @@ -358,11 +386,12 @@ func (c *CanalFlatEventBatchEncoder) Size() int {
// CanalFlatEventBatchDecoder decodes the byte into the original message.
type CanalFlatEventBatchDecoder struct {
data []byte
msg *MQMessage
msg canalFlatMessageInterface
enableTiDBExtension bool
}

func newCanalFlatEventBatchDecoder(data []byte, enableTiDBExtension bool) EventBatchDecoder {
// NewCanalFlatEventBatchDecoder return a decoder for canal-json
func NewCanalFlatEventBatchDecoder(data []byte, enableTiDBExtension bool) EventBatchDecoder {
return &CanalFlatEventBatchDecoder{
data: data,
msg: nil,
Expand All @@ -375,71 +404,69 @@ func (b *CanalFlatEventBatchDecoder) HasNext() (model.MqMessageType, bool, error
if len(b.data) == 0 {
return model.MqMessageTypeUnknown, false, nil
}
msg := &MQMessage{}
var msg canalFlatMessageInterface = &canalFlatMessage{}
if b.enableTiDBExtension {
msg = &canalFlatMessageWithTiDBExtension{
canalFlatMessage: &canalFlatMessage{},
Extensions: &tidbExtension{},
}
}
if err := json.Unmarshal(b.data, msg); err != nil {
log.Error("canal-json decoder unmarshal data failed",
zap.Error(err), zap.ByteString("data", b.data))
return model.MqMessageTypeUnknown, false, err
}
b.msg = msg
b.data = nil
if b.msg.Type == model.MqMessageTypeUnknown {
return model.MqMessageTypeUnknown, false, nil
}
return b.msg.Type, true, nil

return b.msg.mqMessageType(), true, nil
}

// NextRowChangedEvent implements the EventBatchDecoder interface
// `HasNext` should be called before this.
func (b *CanalFlatEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if b.msg == nil || b.msg.Type != model.MqMessageTypeRow {
return nil, cerrors.ErrCanalDecodeFailed.GenWithStack("not found row changed event message")
}

var data canalFlatMessageInterface = &canalFlatMessage{}
if b.enableTiDBExtension {
data = &canalFlatMessageWithTiDBExtension{canalFlatMessage: &canalFlatMessage{}, Extensions: &tidbExtension{}}
if b.msg == nil || b.msg.mqMessageType() != model.MqMessageTypeRow {
return nil, cerrors.ErrCanalDecodeFailed.
GenWithStack("not found row changed event message")
}

if err := json.Unmarshal(b.msg.Value, data); err != nil {
return nil, errors.Trace(err)
result, err := canalFlatMessage2RowChangedEvent(b.msg)
if err != nil {
return nil, err
}
b.msg = nil
return canalFlatMessage2RowChangedEvent(data)
return result, nil
}

// NextDDLEvent implements the EventBatchDecoder interface
// `HasNext` should be called before this.
func (b *CanalFlatEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
if b.msg == nil || b.msg.Type != model.MqMessageTypeDDL {
return nil, cerrors.ErrCanalDecodeFailed.GenWithStack("not found ddl event message")
if b.msg == nil || b.msg.mqMessageType() != model.MqMessageTypeDDL {
return nil, cerrors.ErrCanalDecodeFailed.
GenWithStack("not found ddl event message")
}

var data canalFlatMessageInterface = &canalFlatMessage{}
if b.enableTiDBExtension {
data = &canalFlatMessageWithTiDBExtension{canalFlatMessage: &canalFlatMessage{}, Extensions: &tidbExtension{}}
}

if err := json.Unmarshal(b.msg.Value, data); err != nil {
return nil, errors.Trace(err)
}
result := canalFlatMessage2DDLEvent(b.msg)
b.msg = nil
return canalFlatMessage2DDLEvent(data), nil
return result, nil
}

// NextResolvedEvent implements the EventBatchDecoder interface
// `HasNext` should be called before this.
func (b *CanalFlatEventBatchDecoder) NextResolvedEvent() (uint64, error) {
if b.msg == nil || b.msg.Type != model.MqMessageTypeResolved {
return 0, cerrors.ErrCanalDecodeFailed.GenWithStack("not found resolved event message")
if b.msg == nil || b.msg.mqMessageType() != model.MqMessageTypeResolved {
return 0, cerrors.ErrCanalDecodeFailed.
GenWithStack("not found resolved event message")
}

message := &canalFlatMessageWithTiDBExtension{
canalFlatMessage: &canalFlatMessage{},
}
if err := json.Unmarshal(b.msg.Value, message); err != nil {
return 0, errors.Trace(err)
withExtensionEvent, ok := b.msg.(*canalFlatMessageWithTiDBExtension)
if !ok {
log.Error("canal-json resolved event message should have tidb extension, but not found",
zap.Any("msg", b.msg))
return 0, cerrors.ErrCanalDecodeFailed.
GenWithStack("MqMessageTypeResolved tidb extension not found")
}
b.msg = nil
return message.Extensions.WatermarkTs, nil
return withExtensionEvent.Extensions.WatermarkTs, nil
}

func canalFlatMessage2RowChangedEvent(flatMessage canalFlatMessageInterface) (*model.RowChangedEvent, error) {
Expand All @@ -450,15 +477,37 @@ func canalFlatMessage2RowChangedEvent(flatMessage canalFlatMessageInterface) (*m
Table: *flatMessage.getTable(),
}

mysqlType := flatMessage.getMySQLType()
javaSQLType := flatMessage.getJavaSQLType()

var err error
result.Columns, err = canalFlatJSONColumnMap2SinkColumns(flatMessage.getData(), flatMessage.getMySQLType(), flatMessage.getJavaSQLType())
if flatMessage.eventType() == canal.EventType_DELETE {
// for `DELETE` event, `data` contain the old data, set it as the `PreColumns`
result.PreColumns, err = canalFlatJSONColumnMap2SinkColumns(
flatMessage.getData(), mysqlType, javaSQLType)
// canal-json encoder does not encode `Flag` information into the result,
// we have to set the `Flag` to make it can be handled by MySQL Sink.
// see https://github.com/pingcap/tiflow/blob/7bfce98/cdc/sink/mysql.go#L869-L888
result.WithHandlePrimaryFlag(flatMessage.pkNameSet())
return result, err
}

// for `INSERT` and `UPDATE`, `data` contain fresh data, set it as the `Columns`
result.Columns, err = canalFlatJSONColumnMap2SinkColumns(flatMessage.getData(),
mysqlType, javaSQLType)
if err != nil {
return nil, err
}
result.PreColumns, err = canalFlatJSONColumnMap2SinkColumns(flatMessage.getOld(), flatMessage.getMySQLType(), flatMessage.getJavaSQLType())
if err != nil {
return nil, err

// for `UPDATE`, `old` contain old data, set it as the `PreColumns`
if flatMessage.eventType() == canal.EventType_UPDATE {
result.PreColumns, err = canalFlatJSONColumnMap2SinkColumns(flatMessage.getOld(),
mysqlType, javaSQLType)
if err != nil {
return nil, err
}
}
result.WithHandlePrimaryFlag(flatMessage.pkNameSet())

return result, nil
}
Expand Down Expand Up @@ -504,5 +553,22 @@ func canalFlatMessage2DDLEvent(flatDDL canalFlatMessageInterface) *model.DDLEven
// we lost DDL type from canal flat json format, only got the DDL SQL.
result.Query = flatDDL.getQuery()

// hack the DDL Type to be compatible with MySQL sink's logic
// see https://github.com/pingcap/tiflow/blob/0578db337d/cdc/sink/mysql.go#L362-L370
result.Type = getDDLActionType(result.Query)
return result
}

// return DDL ActionType by the prefix
// see https://github.com/pingcap/tidb/blob/6dbf2de2f/parser/model/ddl.go#L101-L102
func getDDLActionType(query string) timodel.ActionType {
query = strings.ToLower(query)
if strings.HasPrefix(query, "create schema") || strings.HasPrefix(query, "create database") {
return timodel.ActionCreateSchema
}
if strings.HasPrefix(query, "drop schema") || strings.HasPrefix(query, "drop database") {
return timodel.ActionDropSchema
}

return timodel.ActionNone
}
40 changes: 14 additions & 26 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ func (s *canalFlatSuite) TestNewCanalFlatMessage4DML(c *check.C) {
obtainedValue, ok := obtainedDataMap[item.column.Name]
c.Assert(ok, check.IsTrue)
if !item.column.Flag.IsBinary() {
c.Assert(obtainedValue, check.Equals, item.expectedValue)
c.Assert(obtainedValue, check.Equals, item.expectedEncodedValue)
continue
}

// for `Column.Value` is nil, which mean's it is nullable, set the value to `""`
if obtainedValue == nil {
c.Assert(item.expectedValue, check.Equals, "")
c.Assert(item.expectedEncodedValue, check.Equals, "")
continue
}

Expand All @@ -131,7 +131,7 @@ func (s *canalFlatSuite) TestNewCanalFlatMessage4DML(c *check.C) {
continue
}

c.Assert(obtainedValue, check.Equals, item.expectedValue)
c.Assert(obtainedValue, check.Equals, item.expectedEncodedValue)
}

message, err = encoder.newFlatMessageForDML(testCaseUpdate)
Expand Down Expand Up @@ -165,7 +165,7 @@ func (s *canalFlatSuite) TestNewCanalFlatMessage4DML(c *check.C) {
func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C) {
defer testleak.AfterTest(c)()

expectedDecodedValues := collectDecodeValueByColumns(testColumnsTable)
expectedDecodedValue := collectExpectedDecodedValue(testColumnsTable)
for _, encodeEnable := range []bool{false, true} {
encoder := &CanalFlatEventBatchEncoder{builder: NewCanalEntryBuilder(), enableTiDBExtension: encodeEnable}
c.Assert(encoder, check.NotNil)
Expand All @@ -175,12 +175,10 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C

mqMessages := encoder.Build()
c.Assert(len(mqMessages), check.Equals, 1)

rawBytes, err := json.Marshal(mqMessages[0])
c.Assert(err, check.IsNil)
msg := mqMessages[0]

for _, decodeEnable := range []bool{false, true} {
decoder := newCanalFlatEventBatchDecoder(rawBytes, decodeEnable)
decoder := NewCanalFlatEventBatchDecoder(msg.Value, decodeEnable)

ty, hasNext, err := decoder.HasNext()
c.Assert(err, check.IsNil)
Expand All @@ -198,13 +196,9 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C
}

for _, col := range consumed.Columns {
expected, ok := expectedDecodedValues[col.Name]
expected, ok := expectedDecodedValue[col.Name]
c.Assert(ok, check.IsTrue)
if col.Value == nil {
c.Assert(expected, check.Equals, "")
} else {
c.Assert(col.Value, check.Equals, expected)
}
c.Assert(col.Value, check.Equals, expected)

for _, item := range testCaseInsert.Columns {
if item.Name == col.Name {
Expand Down Expand Up @@ -264,11 +258,8 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4DDLMessage(c *check.C
c.Assert(err, check.IsNil)
c.Assert(result, check.NotNil)

rawBytes, err := json.Marshal(result)
c.Assert(err, check.IsNil)

for _, decodeEnable := range []bool{false, true} {
decoder := newCanalFlatEventBatchDecoder(rawBytes, decodeEnable)
decoder := NewCanalFlatEventBatchDecoder(result.Value, decodeEnable)

ty, hasNext, err := decoder.HasNext()
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -339,17 +330,14 @@ func (s *canalFlatSuite) TestEncodeCheckpointEvent(c *check.C) {

msg, err := encoder.EncodeCheckpointEvent(watermark)
c.Assert(err, check.IsNil)
if enable {
c.Assert(msg, check.NotNil)
} else {

if !enable {
c.Assert(msg, check.IsNil)
continue
}

rawBytes, err := json.Marshal(msg)
c.Assert(err, check.IsNil)
c.Assert(rawBytes, check.NotNil)

decoder := newCanalFlatEventBatchDecoder(rawBytes, enable)
c.Assert(msg, check.NotNil)
decoder := NewCanalFlatEventBatchDecoder(msg.Value, enable)

ty, hasNext, err := decoder.HasNext()
c.Assert(err, check.IsNil)
Expand Down
Loading

0 comments on commit a776844

Please sign in to comment.