diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder.go b/pkg/sink/codec/canal/canal_json_row_event_encoder.go new file mode 100644 index 00000000000..45fc98e99cd --- /dev/null +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder.go @@ -0,0 +1,419 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "context" + "time" + + "github.com/goccy/go-json" + "github.com/mailru/easyjson/jwriter" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "go.uber.org/zap" +) + +func newJSONMessageForDML( + builder *canalEntryBuilder, + enableTiDBExtension bool, + e *model.RowChangedEvent, + onlyOutputUpdatedColumns bool, +) ([]byte, error) { + isDelete := e.IsDelete() + mysqlTypeMap := make(map[string]string, len(e.Columns)) + + filling := func(columns []*model.Column, out *jwriter.Writer, + onlyOutputUpdatedColumn bool, + newColumnMap map[string]*model.Column, + ) error { + if len(columns) == 0 { + out.RawString("null") + return nil + } + out.RawByte('[') + out.RawByte('{') + isFirst := true + for _, col := range columns { + if col != nil { + // column equal, do not output it + if onlyOutputUpdatedColumn && shouldIgnoreColumn(col, newColumnMap) { + continue + } + if isFirst { + isFirst = false + } else { + out.RawByte(',') + } + mysqlType := getMySQLType(col) + javaType, err := getJavaSQLType(col, mysqlType) + if err != nil { + return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + value, err := builder.formatValue(col.Value, javaType) + if err != nil { + return cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + out.String(col.Name) + out.RawByte(':') + if col.Value == nil { + out.RawString("null") + } else { + out.String(value) + } + } + } + out.RawByte('}') + out.RawByte(']') + return nil + } + + out := &jwriter.Writer{} + out.RawByte('{') + { + const prefix string = ",\"id\":" + out.RawString(prefix[1:]) + out.Int64(0) // ignored by both Canal Adapter and Flink + } + { + const prefix string = ",\"database\":" + out.RawString(prefix) + out.String(e.Table.Schema) + } + { + const prefix string = ",\"table\":" + out.RawString(prefix) + out.String(e.Table.Table) + } + { + const prefix string = ",\"pkNames\":" + out.RawString(prefix) + pkNames := e.PrimaryKeyColumnNames() + if pkNames == nil { + out.RawString("null") + } else { + out.RawByte('[') + for v25, v26 := range pkNames { + if v25 > 0 { + out.RawByte(',') + } + out.String(v26) + } + out.RawByte(']') + } + } + { + const prefix string = ",\"isDdl\":" + out.RawString(prefix) + out.Bool(false) + } + { + const prefix string = ",\"type\":" + out.RawString(prefix) + out.String(eventTypeString(e)) + } + { + const prefix string = ",\"es\":" + out.RawString(prefix) + out.Int64(convertToCanalTs(e.CommitTs)) + } + { + const prefix string = ",\"ts\":" + out.RawString(prefix) + out.Int64(time.Now().UnixMilli()) // ignored by both Canal Adapter and Flink + } + { + const prefix string = ",\"sql\":" + out.RawString(prefix) + out.String("") + } + { + columns := e.PreColumns + if !isDelete { + columns = e.Columns + } + const prefix string = ",\"sqlType\":" + out.RawString(prefix) + emptyColumn := true + for _, col := range columns { + if col != nil { + if emptyColumn { + out.RawByte('{') + emptyColumn = false + } else { + out.RawByte(',') + } + mysqlType := getMySQLType(col) + javaType, err := getJavaSQLType(col, mysqlType) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + out.String(col.Name) + out.RawByte(':') + out.Int32(int32(javaType)) + mysqlTypeMap[col.Name] = mysqlType + } + } + if emptyColumn { + out.RawString(`null`) + } else { + out.RawByte('}') + } + } + { + const prefix string = ",\"mysqlType\":" + out.RawString(prefix) + if mysqlTypeMap == nil { + out.RawString(`null`) + } else { + out.RawByte('{') + isFirst := true + for typeKey, typeValue := range mysqlTypeMap { + if isFirst { + isFirst = false + } else { + out.RawByte(',') + } + out.String(typeKey) + out.RawByte(':') + out.String(typeValue) + } + out.RawByte('}') + } + } + + if e.IsDelete() { + out.RawString(",\"old\":null") + out.RawString(",\"data\":") + if err := filling(e.PreColumns, out, false, nil); err != nil { + return nil, err + } + } else if e.IsInsert() { + out.RawString(",\"old\":null") + out.RawString(",\"data\":") + if err := filling(e.Columns, out, false, nil); err != nil { + return nil, err + } + } else if e.IsUpdate() { + var newColsMap map[string]*model.Column + if onlyOutputUpdatedColumns { + newColsMap = make(map[string]*model.Column, len(e.Columns)) + for _, col := range e.Columns { + newColsMap[col.Name] = col + } + } + out.RawString(",\"old\":") + if err := filling(e.PreColumns, out, onlyOutputUpdatedColumns, newColsMap); err != nil { + return nil, err + } + out.RawString(",\"data\":") + if err := filling(e.Columns, out, false, nil); err != nil { + return nil, err + } + } else { + log.Panic("unreachable event type", zap.Any("event", e)) + } + + if enableTiDBExtension { + const prefix string = ",\"_tidb\":" + out.RawString(prefix) + out.RawByte('{') + out.RawString("\"commitTs\":") + out.Uint64(e.CommitTs) + out.RawByte('}') + } + out.RawByte('}') + + return out.BuildBytes() +} + +func eventTypeString(e *model.RowChangedEvent) string { + if e.IsDelete() { + return "DELETE" + } + if len(e.PreColumns) == 0 { + return "INSERT" + } + return "UPDATE" +} + +// JSONRowEventEncoder encodes row event in JSON format +type JSONRowEventEncoder struct { + builder *canalEntryBuilder + + // When it is true, canal-json would generate TiDB extension information + // which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields. + enableTiDBExtension bool + maxMessageBytes int + messages []*common.Message + + onlyOutputUpdatedColumns bool +} + +// newJSONRowEventEncoder creates a new JSONRowEventEncoder +func newJSONRowEventEncoder(config *common.Config) codec.RowEventEncoder { + encoder := &JSONRowEventEncoder{ + builder: newCanalEntryBuilder(), + enableTiDBExtension: config.EnableTiDBExtension, + onlyOutputUpdatedColumns: config.OnlyOutputUpdatedColumns, + messages: make([]*common.Message, 0, 1), + maxMessageBytes: config.MaxMessageBytes, + } + return encoder +} + +func (c *JSONRowEventEncoder) newJSONMessageForDDL(e *model.DDLEvent) canalJSONMessageInterface { + msg := &JSONMessage{ + ID: 0, // ignored by both Canal Adapter and Flink + Schema: e.TableInfo.TableName.Schema, + Table: e.TableInfo.TableName.Table, + IsDDL: true, + EventType: convertDdlEventType(e).String(), + ExecutionTime: convertToCanalTs(e.CommitTs), + BuildTime: time.Now().UnixMilli(), // timestamp + Query: e.Query, + } + + if !c.enableTiDBExtension { + return msg + } + + return &canalJSONMessageWithTiDBExtension{ + JSONMessage: msg, + Extensions: &tidbExtension{CommitTs: e.CommitTs}, + } +} + +func (c *JSONRowEventEncoder) newJSONMessage4CheckpointEvent( + ts uint64, +) *canalJSONMessageWithTiDBExtension { + return &canalJSONMessageWithTiDBExtension{ + JSONMessage: &JSONMessage{ + ID: 0, + IsDDL: false, + EventType: tidbWaterMarkType, + ExecutionTime: convertToCanalTs(ts), + BuildTime: time.Now().UnixNano() / int64(time.Millisecond), // converts to milliseconds + }, + Extensions: &tidbExtension{WatermarkTs: ts}, + } +} + +// EncodeCheckpointEvent implements the RowEventEncoder interface +func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { + if !c.enableTiDBExtension { + return nil, nil + } + + msg := c.newJSONMessage4CheckpointEvent(ts) + value, err := json.Marshal(msg) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + return common.NewResolvedMsg(config.ProtocolCanalJSON, nil, value, ts), nil +} + +// AppendRowChangedEvent implements the interface EventJSONBatchEncoder +func (c *JSONRowEventEncoder) AppendRowChangedEvent( + _ context.Context, + _ string, + e *model.RowChangedEvent, + callback func(), +) error { + value, err := newJSONMessageForDML(c.builder, + c.enableTiDBExtension, e, c.onlyOutputUpdatedColumns) + if err != nil { + return errors.Trace(err) + } + + length := len(value) + common.MaxRecordOverhead + // for single message that is longer than max-message-bytes, do not send it. + if length > c.maxMessageBytes { + log.Warn("Single message is too large for canal-json", + zap.Int("maxMessageBytes", c.maxMessageBytes), + zap.Int("length", length), + zap.Any("table", e.Table)) + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + m := &common.Message{ + Key: nil, + Value: value, + Ts: e.CommitTs, + Schema: &e.Table.Schema, + Table: &e.Table.Table, + Type: model.MessageTypeRow, + Protocol: config.ProtocolCanalJSON, + Callback: callback, + } + m.IncRowsCount() + + c.messages = append(c.messages, m) + return nil +} + +// Build implements the RowEventEncoder interface +func (c *JSONRowEventEncoder) Build() []*common.Message { + if len(c.messages) == 0 { + return nil + } + + result := c.messages + c.messages = nil + return result +} + +// EncodeDDLEvent encodes DDL events +func (c *JSONRowEventEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) { + message := c.newJSONMessageForDDL(e) + value, err := json.Marshal(message) + if err != nil { + return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) + } + return common.NewDDLMsg(config.ProtocolCanalJSON, nil, value, e), nil +} + +type jsonRowEventEncoderBuilder struct { + config *common.Config +} + +// NewJSONRowEventEncoderBuilder creates a canal-json batchEncoderBuilder. +func NewJSONRowEventEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder { + return &jsonRowEventEncoderBuilder{config: config} +} + +// Build a `jsonRowEventEncoderBuilder` +func (b *jsonRowEventEncoderBuilder) Build() codec.RowEventEncoder { + return newJSONRowEventEncoder(b.config) +} + +func shouldIgnoreColumn(col *model.Column, + newColumnMap map[string]*model.Column, +) bool { + newCol, ok := newColumnMap[col.Name] + if ok && newCol != nil { + // sql type is not equal + if newCol.Type != col.Type { + return false + } + // value equal + if codec.IsColumnValueEqual(newCol.Value, col.Value) { + return true + } + } + return false +} diff --git a/pkg/sink/codec/canal/canal_json_txn_event_encoder.go b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go new file mode 100644 index 00000000000..cdc55c1eec2 --- /dev/null +++ b/pkg/sink/codec/canal/canal_json_txn_event_encoder.go @@ -0,0 +1,127 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package canal + +import ( + "bytes" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "go.uber.org/zap" +) + +// JSONTxnEventEncoder encodes txn event in JSON format +type JSONTxnEventEncoder struct { + builder *canalEntryBuilder + + // When it is true, canal-json would generate TiDB extension information + // which, at the moment, only includes `tidbWaterMarkType` and `_tidb` fields. + enableTiDBExtension bool + + onlyOutputUpdatedColumns bool + // the symbol separating two lines + terminator []byte + maxMessageBytes int + valueBuf *bytes.Buffer + batchSize int + callback func() + + // Store some fields of the txn event. + txnCommitTs uint64 + txnSchema *string + txnTable *string +} + +// AppendTxnEvent appends a txn event to the encoder. +func (j *JSONTxnEventEncoder) AppendTxnEvent( + txn *model.SingleTableTxn, + callback func(), +) error { + for _, row := range txn.Rows { + value, err := newJSONMessageForDML(j.builder, + j.enableTiDBExtension, row, j.onlyOutputUpdatedColumns) + if err != nil { + return errors.Trace(err) + } + length := len(value) + common.MaxRecordOverhead + // For single message that is longer than max-message-bytes, do not send it. + if length > j.maxMessageBytes { + log.Warn("Single message is too large for canal-json", + zap.Int("maxMessageBytes", j.maxMessageBytes), + zap.Int("length", length), + zap.Any("table", row.Table)) + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + j.valueBuf.Write(value) + j.valueBuf.Write(j.terminator) + j.batchSize++ + } + j.callback = callback + j.txnCommitTs = txn.CommitTs + j.txnSchema = &txn.Table.Schema + j.txnTable = &txn.Table.Table + return nil +} + +// Build builds a message from the encoder and resets the encoder. +func (j *JSONTxnEventEncoder) Build() []*common.Message { + if j.batchSize == 0 { + return nil + } + + ret := common.NewMsg(config.ProtocolCanalJSON, nil, + j.valueBuf.Bytes(), j.txnCommitTs, model.MessageTypeRow, j.txnSchema, j.txnTable) + ret.SetRowsCount(j.batchSize) + ret.Callback = j.callback + j.valueBuf.Reset() + j.callback = nil + j.batchSize = 0 + j.txnCommitTs = 0 + j.txnSchema = nil + j.txnTable = nil + + return []*common.Message{ret} +} + +// newJSONTxnEventEncoder creates a new JSONTxnEventEncoder +func newJSONTxnEventEncoder(config *common.Config) codec.TxnEventEncoder { + encoder := &JSONTxnEventEncoder{ + builder: newCanalEntryBuilder(), + enableTiDBExtension: config.EnableTiDBExtension, + onlyOutputUpdatedColumns: config.OnlyOutputUpdatedColumns, + valueBuf: &bytes.Buffer{}, + terminator: []byte(config.Terminator), + maxMessageBytes: config.MaxMessageBytes, + } + return encoder +} + +type jsonTxnEventEncoderBuilder struct { + config *common.Config +} + +// NewJSONTxnEventEncoderBuilder creates a jsonTxnEventEncoderBuilder. +func NewJSONTxnEventEncoderBuilder(config *common.Config) codec.TxnEventEncoderBuilder { + return &jsonTxnEventEncoderBuilder{config: config} +} + +// Build a `jsonTxnEventEncoderBuilder` +func (b *jsonTxnEventEncoderBuilder) Build() codec.TxnEventEncoder { + return newJSONTxnEventEncoder(b.config) +} diff --git a/pkg/sink/codec/open/open_protocol_encoder.go b/pkg/sink/codec/open/open_protocol_encoder.go new file mode 100644 index 00000000000..be5e9ef4f92 --- /dev/null +++ b/pkg/sink/codec/open/open_protocol_encoder.go @@ -0,0 +1,212 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package open + +import ( + "bytes" + "context" + "encoding/binary" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "go.uber.org/zap" +) + +// BatchEncoder encodes the events into the byte of a batch into. +type BatchEncoder struct { + messageBuf []*common.Message + callbackBuff []func() + curBatchSize int + + // configs + MaxMessageBytes int + MaxBatchSize int + OnlyOutputUpdatedColumns bool +} + +// AppendRowChangedEvent implements the RowEventEncoder interface +func (d *BatchEncoder) AppendRowChangedEvent( + _ context.Context, + _ string, + e *model.RowChangedEvent, + callback func(), +) error { + keyMsg, valueMsg := rowChangeToMsg(e) + key, err := keyMsg.Encode() + if err != nil { + return errors.Trace(err) + } + value, err := valueMsg.encode(d.OnlyOutputUpdatedColumns) + if err != nil { + return errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) + + // for single message that is longer than max-message-bytes, do not send it. + // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` + length := len(key) + len(value) + common.MaxRecordOverhead + 16 + 8 + if length > d.MaxMessageBytes { + log.Warn("Single message is too large for open-protocol", + zap.Int("maxMessageBytes", d.MaxMessageBytes), + zap.Int("length", length), + zap.Any("table", e.Table), + zap.Any("key", key)) + return cerror.ErrMessageTooLarge.GenWithStackByArgs() + } + + if len(d.messageBuf) == 0 || + d.curBatchSize >= d.MaxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.MaxMessageBytes { + // Before we create a new message, we should handle the previous callbacks. + d.tryBuildCallback() + versionHead := make([]byte, 8) + binary.BigEndian.PutUint64(versionHead, codec.BatchVersion1) + msg := common.NewMsg(config.ProtocolOpen, versionHead, nil, + 0, model.MessageTypeRow, nil, nil) + d.messageBuf = append(d.messageBuf, msg) + d.curBatchSize = 0 + } + + message := d.messageBuf[len(d.messageBuf)-1] + message.Key = append(message.Key, keyLenByte[:]...) + message.Key = append(message.Key, key...) + message.Value = append(message.Value, valueLenByte[:]...) + message.Value = append(message.Value, value...) + message.Ts = e.CommitTs + message.Schema = &e.Table.Schema + message.Table = &e.Table.Table + message.IncRowsCount() + + if callback != nil { + d.callbackBuff = append(d.callbackBuff, callback) + } + + d.curBatchSize++ + return nil +} + +// EncodeDDLEvent implements the RowEventEncoder interface +func (d *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) { + keyMsg, valueMsg := ddlEventToMsg(e) + key, err := keyMsg.Encode() + if err != nil { + return nil, errors.Trace(err) + } + value, err := valueMsg.encode() + if err != nil { + return nil, errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) + + keyBuf := new(bytes.Buffer) + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], codec.BatchVersion1) + keyBuf.Write(versionByte[:]) + keyBuf.Write(keyLenByte[:]) + keyBuf.Write(key) + + valueBuf := new(bytes.Buffer) + valueBuf.Write(valueLenByte[:]) + valueBuf.Write(value) + + ret := common.NewDDLMsg(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), e) + return ret, nil +} + +// EncodeCheckpointEvent implements the RowEventEncoder interface +func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) { + keyMsg := newResolvedMessage(ts) + key, err := keyMsg.Encode() + if err != nil { + return nil, errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], 0) + + keyBuf := new(bytes.Buffer) + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], codec.BatchVersion1) + keyBuf.Write(versionByte[:]) + keyBuf.Write(keyLenByte[:]) + keyBuf.Write(key) + + valueBuf := new(bytes.Buffer) + valueBuf.Write(valueLenByte[:]) + + ret := common.NewResolvedMsg(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), ts) + return ret, nil +} + +// Build implements the RowEventEncoder interface +func (d *BatchEncoder) Build() (messages []*common.Message) { + d.tryBuildCallback() + ret := d.messageBuf + d.messageBuf = make([]*common.Message, 0) + return ret +} + +// tryBuildCallback will collect all the callbacks into one message's callback. +func (d *BatchEncoder) tryBuildCallback() { + if len(d.messageBuf) != 0 && len(d.callbackBuff) != 0 { + lastMsg := d.messageBuf[len(d.messageBuf)-1] + callbacks := d.callbackBuff + lastMsg.Callback = func() { + for _, cb := range callbacks { + cb() + } + } + d.callbackBuff = make([]func(), 0) + } +} + +type batchEncoderBuilder struct { + config *common.Config +} + +// Build a BatchEncoder +func (b *batchEncoderBuilder) Build() codec.RowEventEncoder { + encoder := NewBatchEncoder() + encoder.(*BatchEncoder).MaxMessageBytes = b.config.MaxMessageBytes + encoder.(*BatchEncoder).MaxBatchSize = b.config.MaxBatchSize + encoder.(*BatchEncoder).OnlyOutputUpdatedColumns = b.config.OnlyOutputUpdatedColumns + + return encoder +} + +// NewBatchEncoderBuilder creates an open-protocol batchEncoderBuilder. +func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder { + return &batchEncoderBuilder{config: config} +} + +// NewBatchEncoder creates a new BatchEncoder. +func NewBatchEncoder() codec.RowEventEncoder { + batch := &BatchEncoder{} + return batch +}