diff --git a/cdc/sink/codec/craft.go b/cdc/sink/codec/craft.go new file mode 100644 index 00000000000..ae638127215 --- /dev/null +++ b/cdc/sink/codec/craft.go @@ -0,0 +1,301 @@ +package codec + +import ( + "math" + "strconv" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + +// (string/bytes) array layout +// n bytes array of elements' size, format: uvarint array +// n bytes elements, format: bits +// +// varint/uvarint array layout +// n bytes elements. format: varint / uvarint +// +// delta varint/uvarint array layout +// n bytes base number +// n bytes offsets. format: varint/uvarint +// +// string/bytes layout +// n bytes varint length +// n bytes payload +// +// float layout, standard protobuf float +// double layout, standard protobuf double +// varint layout, standard protobuf varint +// uvarint layout, standard protobuf uvarint +// +// Message layout +// 2 bytes version +// 2 bytes number of pairs +// n bytes keys +// n bytes values +// n bytes size tables +// +// Keys layout +// n bytes array of commit ts, format: delta uvarint array +// n bytes array of type, format: uvarint array +// n bytes array of row id, format: uvarint array +// n bytes array of partition id, format: varint array, -1 means field is not set +// n bytes array of schema, format: string array +// n bytes array of table, format: string array +// +// Row changed layout +// n bytes multiple column groups +// +// Column group layout +// 1 byte column group type: 1 New Values, 2: Old Values, 3: Delete Values +// n bytes number of columns, format: uvarint +// n bytes array of name, format: string array +// n bytes array of type, format: uvarint array +// n bytes array of flag, format: uvarint array +// n bytes array of value, format: nullable bytes array +// +// DDL layout +// n bytes type, format: uvarint +// n bytes query, format: string +// +// Size tables layout +// n bytes table to store size of serialized keys +// n bytes table to store size of values +// n bytes tables to store of serialized column groups +// n bytes size of serialized size tables, format: reversed uvarint +// +// Size table layout +// n bytes number of elements, format: uvarint +// n bytes repeated elements, format: uvarint +// +const ( + // CraftVersion1 represents the version of craft format + CraftVersion1 uint64 = 1 + + // craftMaxMessageBytes sets the default value for max-message-bytes + craftMaxMessageBytes int = 64 * 1024 * 1024 // 64M + // craftMaxBatchSize sets the default value for max-batch-size + craftMaxBatchSize int = 4096 + // default buffer size + craftDefaultBufferCapacity = 512 + + // Column group types + craftColumnGroupTypeDelete = 0x3 + craftColumnGroupTypeOld = 0x2 + craftColumnGroupTypeNew = 0x1 + + // Size tables index + craftKeySizeTableIndex = 0 + craftValueSizeTableIndex = 1 + craftColumnGroupSizeTableStartIndex = 2 +) + +type CraftEventBatchEncoder struct { + rowChangedBuffer *craftRowChangedEventBuffer + messageBuf []*MQMessage + + // configs + maxMessageSize int + maxBatchSize int +} + +// EncodeCheckpointEvent implements the EventBatchEncoder interface +func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { + return newResolvedMQMessage(ProtocolCraft, nil, newCraftResolvedEventEncoder(ts).encode(), ts), nil +} + +func (e *CraftEventBatchEncoder) flush() { + keys := e.rowChangedBuffer.getKeys() + ts := keys.getTs(0) + schema := keys.getSchema(0) + table := keys.getTable(0) + e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.encode(), ts, model.MqMessageTypeRow, &schema, &table)) +} + +func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) (EncoderResult, error) { + rows, size := e.rowChangedBuffer.appendRowChangedEvent(ev) + if size > e.maxMessageSize || rows >= e.maxBatchSize { + e.flush() + } + return EncoderNoOperation, nil +} + +// AppendResolvedEvent is no-op +func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) { + return EncoderNoOperation, nil +} + +func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) { + return newDDLMQMessage(ProtocolCraft, nil, newCraftDDLEventEncoder(ev).encode(), ev), nil +} + +func (e *CraftEventBatchEncoder) Build() []*MQMessage { + if e.rowChangedBuffer.size() > 0 { + // flush buffered data to message buffer + e.flush() + } + ret := e.messageBuf + e.messageBuf = make([]*MQMessage, 0) + return ret +} + +func (e *CraftEventBatchEncoder) MixedBuild(withVersion bool) []byte { + panic("Only JsonEncoder supports mixed build") +} + +func (e *CraftEventBatchEncoder) Size() int { + return e.rowChangedBuffer.size() +} + +func (e *CraftEventBatchEncoder) Reset() { + e.rowChangedBuffer.reset() +} + +func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error { + var err error + if maxMessageBytes, ok := params["max-message-bytes"]; ok { + e.maxMessageSize, err = strconv.Atoi(maxMessageBytes) + if err != nil { + return cerror.ErrSinkInvalidConfig.Wrap(err) + } + } else { + e.maxMessageSize = DefaultMaxMessageBytes + } + + if e.maxMessageSize <= 0 || e.maxMessageSize > math.MaxInt32 { + return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageSize)) + } + + if maxBatchSize, ok := params["max-batch-size"]; ok { + e.maxBatchSize, err = strconv.Atoi(maxBatchSize) + if err != nil { + return cerror.ErrSinkInvalidConfig.Wrap(err) + } + } else { + e.maxBatchSize = DefaultMaxBatchSize + } + + if e.maxBatchSize <= 0 || e.maxBatchSize > math.MaxUint16 { + return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-batch-size %d", e.maxBatchSize)) + } + return nil +} + +func NewCraftEventBatchEncoder() EventBatchEncoder { + return &CraftEventBatchEncoder{ + rowChangedBuffer: &craftRowChangedEventBuffer{ + keys: &craftColumnarKeys{}, + }, + } +} + +// CraftEventBatchDecoder decodes the byte of a batch into the original messages. +type CraftEventBatchDecoder struct { + keys *craftColumnarKeys + decoder *craftMessageDecoder + index int +} + +// HasNext implements the EventBatchDecoder interface +func (b *CraftEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) { + if b.index >= b.keys.count { + return model.MqMessageTypeUnknown, false, nil + } + return b.keys.getType(b.index), true, nil +} + +// NextResolvedEvent implements the EventBatchDecoder interface +func (b *CraftEventBatchDecoder) NextResolvedEvent() (uint64, error) { + ty, hasNext, err := b.HasNext() + if err != nil { + return 0, errors.Trace(err) + } + if !hasNext || ty != model.MqMessageTypeResolved { + return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message") + } + ts := b.keys.getTs(b.index) + b.index++ + return ts, nil +} + +// NextRowChangedEvent implements the EventBatchDecoder interface +func (b *CraftEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { + ty, hasNext, err := b.HasNext() + if err != nil { + return nil, errors.Trace(err) + } + if !hasNext || ty != model.MqMessageTypeRow { + return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message") + } + old, new, err := b.decoder.decodeRowChangedEvent(b.index) + if err != nil { + return nil, errors.Trace(err) + } + ev := &model.RowChangedEvent{} + if old != nil { + if ev.PreColumns, err = old.toModel(); err != nil { + return nil, errors.Trace(err) + } + } + if new != nil { + if ev.Columns, err = new.toModel(); err != nil { + return nil, errors.Trace(err) + } + } + ev.CommitTs = b.keys.getTs(b.index) + ev.Table = &model.TableName{ + Schema: b.keys.getSchema(b.index), + Table: b.keys.getTable(b.index), + } + partition := b.keys.getPartition(b.index) + if partition >= 0 { + ev.Table.TableID = partition + ev.Table.IsPartition = true + } + b.index++ + return ev, nil +} + +// NextDDLEvent implements the EventBatchDecoder interface +func (b *CraftEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { + ty, hasNext, err := b.HasNext() + if err != nil { + return nil, errors.Trace(err) + } + if !hasNext || ty != model.MqMessageTypeDDL { + return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message") + } + ddlType, query, err := b.decoder.decodeDDLEvent(b.index) + if err != nil { + return nil, errors.Trace(err) + } + event := &model.DDLEvent{ + CommitTs: b.keys.getTs(b.index), + Query: query, + Type: ddlType, + TableInfo: &model.SimpleTableInfo{ + Schema: b.keys.getSchema(b.index), + Table: b.keys.getTable(b.index), + }, + } + b.index++ + return event, nil +} + +// NewCraftEventBatchDecoder creates a new CraftEventBatchDecoder. +func NewCraftEventBatchDecoder(bits []byte) (EventBatchDecoder, error) { + decoder, err := newCraftMessageDecoder(bits) + if err != nil { + return nil, errors.Trace(err) + } + keys, err := decoder.decodeKeys() + if err != nil { + return nil, errors.Trace(err) + } + + return &CraftEventBatchDecoder{ + keys: keys, + decoder: decoder, + }, nil +} diff --git a/cdc/sink/codec/craft_decoder.go b/cdc/sink/codec/craft_decoder.go new file mode 100644 index 00000000000..71c8083d48f --- /dev/null +++ b/cdc/sink/codec/craft_decoder.go @@ -0,0 +1,475 @@ +package codec + +import ( + "encoding/binary" + "math" + + "github.com/pingcap/errors" + pmodel "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/ticdc/cdc/model" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + +/// Primitive type decoders +func decodeUint8(bits []byte) ([]byte, byte, error) { + if len(bits) < 1 { + return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("buffer underflow") + } + return bits[1:], bits[0], nil +} + +func decodeVarint(bits []byte) ([]byte, int64, error) { + x, rd := binary.Varint(bits) + if rd < 0 { + return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("invalid varint data") + } + return bits[rd:], x, nil +} + +func decodeUvarint(bits []byte) ([]byte, uint64, error) { + x, rd := binary.Uvarint(bits) + if rd < 0 { + return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("invalid uvarint data") + } + return bits[rd:], x, nil +} + +func decodeUvarintReversed(bits []byte) (int, uint64, error) { + l := len(bits) - 1 + var x uint64 + var s uint + i := 0 + for l >= 0 { + b := bits[l] + if b < 0x80 { + if i >= binary.MaxVarintLen64 || i == binary.MaxVarintLen64-1 && b > 1 { + return 0, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("invalid reversed uvarint data") + } + return i + 1, x | uint64(b)< math.MaxInt32 { + return 0, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("length %d is greater than max limit of %d", x, math.MaxInt32) + } + return nb, int(x), err +} + +func decodeUvarint32(bits []byte) ([]byte, int32, error) { + newBits, x, err := decodeUvarint(bits) + if err != nil { + return bits, 0, errors.Trace(err) + } + if x > math.MaxInt32 { + return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("length %d is greater than max limit of %d", x, math.MaxInt32) + } + return newBits, int32(x), nil +} + +func decodeVarint32(bits []byte) ([]byte, int32, error) { + newBits, x, err := decodeVarint(bits) + if err != nil { + return bits, 0, errors.Trace(err) + } + if x > math.MaxInt32 { + return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("length %d is greater than max limit of %d", x, math.MaxInt32) + } + return newBits, int32(x), nil +} + +func decodeUvarintLength(bits []byte) ([]byte, int, error) { + bits, x, err := decodeUvarint32(bits) + return bits, int(x), err +} + +func decodeVarintLength(bits []byte) ([]byte, int, error) { + bits, x, err := decodeVarint32(bits) + return bits, int(x), err +} + +func decodeFloat64(bits []byte) ([]byte, float64, error) { + if len(bits) < 8 { + return bits, 0, cerror.ErrCraftCodecInvalidData.GenWithStack("buffer underflow") + } + x := binary.BigEndian.Uint64(bits) + return bits[8:], math.Float64frombits(x), nil +} + +func decodeBytes(bits []byte) ([]byte, []byte, error) { + newBits, l, err := decodeUvarintLength(bits) + if err != nil { + return bits, nil, errors.Trace(err) + } + if len(newBits) < l { + return bits, nil, cerror.ErrCraftCodecInvalidData.GenWithStack("buffer underflow") + } + return newBits[l:], newBits[:l], nil +} + +func decodeString(bits []byte) ([]byte, string, error) { + bits, bytes, err := decodeBytes(bits) + if err == nil { + return bits, string(bytes), nil + } else { + return bits, "", errors.Trace(err) + } +} + +/// Chunk decoders +func decodeStringChunk(bits []byte, size int) ([]byte, []string, error) { + newBits, data, err := decodeBytesChunk(bits, size) + if err != nil { + return bits, nil, errors.Trace(err) + } + result := make([]string, size) + for i, d := range data { + result[i] = string(d) + } + return newBits, result, nil +} + +func decodeNullableStringChunk(bits []byte, size int) ([]byte, []*string, error) { + newBits, data, err := decodeNullableBytesChunk(bits, size) + if err != nil { + return bits, nil, errors.Trace(err) + } + result := make([]*string, size) + for i, d := range data { + if d != nil { + s := string(d) + result[i] = &s + } + } + return newBits, result, nil +} + +func decodeBytesChunk(bits []byte, size int) ([]byte, [][]byte, error) { + return doDecodeBytesChunk(bits, size, decodeUvarintLength) +} + +func doDecodeBytesChunk(bits []byte, size int, lengthDecoder func([]byte) ([]byte, int, error)) ([]byte, [][]byte, error) { + larray := make([]int, size) + newBits := bits + var bl int + var err error + for i := 0; i < size; i++ { + newBits, bl, err = lengthDecoder(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + larray[i] = bl + } + + data := make([][]byte, size) + for i := 0; i < size; i++ { + if larray[i] != -1 { + data[i] = newBits[:larray[i]] + newBits = newBits[larray[i]:] + } + } + return newBits, data, nil +} + +func decodeNullableBytesChunk(bits []byte, size int) ([]byte, [][]byte, error) { + return doDecodeBytesChunk(bits, size, decodeVarintLength) +} + +func decodeVarintChunk(bits []byte, size int) ([]byte, []int64, error) { + array := make([]int64, size) + newBits := bits + var i64 int64 + var err error + for i := 0; i < size; i++ { + newBits, i64, err = decodeVarint(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + array[i] = i64 + } + return newBits, array, nil +} + +func decodeUvarintChunk(bits []byte, size int) ([]byte, []uint64, error) { + array := make([]uint64, size) + newBits := bits + var u64 uint64 + var err error + for i := 0; i < size; i++ { + newBits, u64, err = decodeUvarint(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + array[i] = u64 + } + return newBits, array, nil +} + +func decodeDeltaVarintChunk(bits []byte, size int) ([]byte, []int64, error) { + array := make([]int64, size) + newBits := bits + var err error + newBits, array[0], err = decodeVarint(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + for i := 1; i < size; i++ { + newBits, array[i], err = decodeVarint(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + array[i] = array[i-1] + array[i] + } + return newBits, array, nil +} + +func decodeDeltaUvarintChunk(bits []byte, size int) ([]byte, []uint64, error) { + array := make([]uint64, size) + newBits := bits + var err error + newBits, array[0], err = decodeUvarint(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + for i := 1; i < size; i++ { + newBits, array[i], err = decodeUvarint(newBits) + if err != nil { + return bits, nil, errors.Trace(err) + } + array[i] = array[i-1] + array[i] + } + return newBits, array, nil +} + +// size tables are always at end of serialized data, there is no unread bytes to return +func decodeSizeTables(bits []byte) (int, [][]uint64, error) { + nb, size, _ := decodeUvarintReversedLength(bits) + sizeOffset := len(bits) - nb + tablesOffset := sizeOffset - size + tables := bits[tablesOffset:sizeOffset] + + tableSize := size + nb + var err error + var table []uint64 + result := make([][]uint64, 0, 1) + for len(tables) > 0 { + tables, size, err = decodeUvarintLength(tables) + if err != nil { + return 0, nil, errors.Trace(err) + } + tables, table, err = decodeDeltaUvarintChunk(tables, size) + if err != nil { + return 0, nil, errors.Trace(err) + } + result = append(result, table) + } + + return tableSize, result, nil +} + +/// TiDB types decoder +func decodeTiDBType(ty byte, flag model.ColumnFlagType, bits []byte) (interface{}, error) { + if bits == nil { + return nil, nil + } + switch ty { + case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeJSON, mysql.TypeNewDecimal: + // value type for these mysql types are string + return string(bits), nil + case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit: + // value type for thest mysql types are uint64 + _, u64, err := decodeUvarint(bits) + return u64, err + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, + mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: + // value type for these mysql types are []byte + return bits, nil + case mysql.TypeFloat, mysql.TypeDouble: + // value type for these mysql types are float64 + _, f64, err := decodeFloat64(bits) + return f64, err + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear: + // value type for these mysql types are int64 or uint64 depends on flags + if flag.IsUnsigned() { + _, u64, err := decodeUvarint(bits) + return u64, err + } else { + _, i64, err := decodeUvarint(bits) + return i64, err + } + case mysql.TypeUnspecified: + fallthrough + case mysql.TypeNull: + fallthrough + case mysql.TypeGeometry: + return nil, nil + } + return nil, nil +} + +// Message decoder +type craftMessageDecoder struct { + bits []byte + sizeTables [][]uint64 + valuesSizeTable []uint64 +} + +func newCraftMessageDecoder(bits []byte) (*craftMessageDecoder, error) { + bits, version, err := decodeUvarint(bits) + if err != nil { + return nil, errors.Trace(err) + } + if version < CraftVersion1 { + return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("unexpected craft version") + } + sizeTablesSize, sizeTables, err := decodeSizeTables(bits) + if err != nil { + return nil, errors.Trace(err) + } + return &craftMessageDecoder{ + bits: bits[:len(bits)-sizeTablesSize], + sizeTables: sizeTables, + valuesSizeTable: sizeTables[craftValueSizeTableIndex], + }, nil +} + +func (d *craftMessageDecoder) decodeKeys() (*craftColumnarKeys, error) { + var pairs, keysSize int + var err error + d.bits, pairs, err = decodeUvarintLength(d.bits) + if err != nil { + return nil, errors.Trace(err) + } + keysSize = int(d.sizeTables[craftKeySizeTableIndex][0]) + var keys *craftColumnarKeys + keys, err = decodeCraftColumnarKeys(d.bits[:keysSize], pairs) + if err != nil { + return nil, errors.Trace(err) + } + // skip keys + d.bits = d.bits[keysSize:] + return keys, nil +} + +func (d *craftMessageDecoder) valueBits(index int) []byte { + start := 0 + if index > 0 { + start = int(d.valuesSizeTable[index-1]) + } + return d.bits[start:int(d.valuesSizeTable[index])] +} + +func (d *craftMessageDecoder) decodeDDLEvent(index int) (pmodel.ActionType, string, error) { + bits, ty, err := decodeUvarint(d.valueBits(index)) + if err != nil { + return pmodel.ActionNone, "", errors.Trace(err) + } + _, query, err := decodeString(bits) + return pmodel.ActionType(ty), query, err +} + +func decodeCraftColumnarColumnGroup(bits []byte) (*craftColumnarColumnGroup, error) { + var numColumns int + bits, ty, err := decodeUint8(bits) + if err != nil { + return nil, errors.Trace(err) + } + bits, numColumns, err = decodeUvarintLength(bits) + if err != nil { + return nil, errors.Trace(err) + } + var names, values [][]byte + var types, flags []uint64 + bits, names, err = decodeBytesChunk(bits, numColumns) + if err != nil { + return nil, errors.Trace(err) + } + bits, types, err = decodeUvarintChunk(bits, numColumns) + if err != nil { + return nil, errors.Trace(err) + } + bits, flags, err = decodeUvarintChunk(bits, numColumns) + if err != nil { + return nil, errors.Trace(err) + } + bits, values, err = decodeBytesChunk(bits, numColumns) + if err != nil { + return nil, errors.Trace(err) + } + return &craftColumnarColumnGroup{ + ty: ty, + names: names, + types: types, + flags: flags, + values: values, + }, nil +} + +func (d *craftMessageDecoder) decodeRowChangedEvent(index int) (preColumns, columns *craftColumnarColumnGroup, err error) { + bits := d.valueBits(index) + columnGroupSizeTable := d.sizeTables[craftValueSizeTableIndex+index] + columnGroupIndex := 0 + for len(bits) > 0 { + columnGroupSize := columnGroupSizeTable[columnGroupIndex] + columnGroup, err := decodeCraftColumnarColumnGroup(bits[:columnGroupSize]) + bits = bits[columnGroupSize:] + columnGroupIndex++ + if err != nil { + return nil, nil, errors.Trace(err) + } + switch columnGroup.ty { + case craftColumnGroupTypeDelete: + fallthrough + case craftColumnGroupTypeOld: + preColumns = columnGroup + case craftColumnGroupTypeNew: + columns = columnGroup + } + } + return preColumns, columns, nil +} + +// Events decoder +func decodeRowChangedEvent(keys *craftColumnarKeys, keyIndex int, sizeTables [][]uint64, bits []byte) (*model.RowChangedEvent, error) { + ev := &model.RowChangedEvent{} + ev.CommitTs = keys.getTs(keyIndex) + ev.Table = &model.TableName{ + Schema: keys.getSchema(keyIndex), + Table: keys.getTable(keyIndex), + } + partition := keys.getPartition(keyIndex) + if partition >= 0 { + ev.Table.TableID = partition + ev.Table.IsPartition = true + } + var columnGroup *craftColumnarColumnGroup + var err error + columnGroupSizeTable := sizeTables[keyIndex] + columnGroupIndex := 0 + for len(bits) > 0 { + columnGroupSize := columnGroupSizeTable[columnGroupIndex] + columnGroup, err = decodeCraftColumnarColumnGroup(bits[:columnGroupSize]) + bits = bits[columnGroupSize:] + columnGroupIndex++ + if err != nil { + return nil, errors.Trace(err) + } + switch columnGroup.ty { + case craftColumnGroupTypeDelete: + fallthrough + case craftColumnGroupTypeOld: + ev.PreColumns, err = columnGroup.toModel() + case craftColumnGroupTypeNew: + ev.Columns, err = columnGroup.toModel() + } + } + return ev, nil +} diff --git a/cdc/sink/codec/craft_encoder.go b/cdc/sink/codec/craft_encoder.go new file mode 100644 index 00000000000..e1a376ac084 --- /dev/null +++ b/cdc/sink/codec/craft_encoder.go @@ -0,0 +1,286 @@ +package codec + +import ( + "encoding/binary" + "math" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/ticdc/cdc/model" +) + +/// Primitive type encoders +func encodeFloat64(bits []byte, data float64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, math.Float64bits(data)) + return buf +} + +func encodeVarint(bits []byte, data int64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + l := binary.PutVarint(buf, data) + buf = buf[:l] + if bits == nil { + return buf + } else { + return append(bits, buf...) + } +} + +func encodeUvarint(bits []byte, data uint64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + l := binary.PutUvarint(buf, data) + buf = buf[:l] + if bits == nil { + return buf + } else { + return append(bits, buf...) + } +} + +func encodeUvarintReversed(bits []byte, data uint64) ([]byte, int) { + buf := make([]byte, binary.MaxVarintLen64) + i := 0 + for data >= 0x80 { + buf[i] = byte(data) | 0x80 + data >>= 7 + i++ + } + buf[i] = byte(data) + for bi := i; bi >= 0; bi-- { + bits = append(bits, buf[bi]) + } + return bits, i + 1 +} + +func encodeBytes(bits []byte, data []byte) []byte { + l := len(data) + if bits == nil { + bits = make([]byte, 0, binary.MaxVarintLen64+len(data)) + } + bits = encodeUvarint(bits, uint64(l)) + return append(bits, data...) +} + +func encodeString(bits []byte, data string) []byte { + return encodeBytes(bits, []byte(data)) +} + +/// Chunk encoders +func encodeStringChunk(bits []byte, data []string) []byte { + for _, s := range data { + bits = encodeUvarint(bits, uint64(len(s))) + } + for _, s := range data { + bits = append(bits, []byte(s)...) + } + return bits +} + +func encodeNullableStringChunk(bits []byte, data []*string) []byte { + for _, s := range data { + var l int64 = -1 + if s != nil { + l = int64(len(*s)) + } + bits = encodeVarint(bits, l) + } + for _, s := range data { + if s != nil { + bits = append(bits, []byte(*s)...) + } + } + return bits +} + +func encodeBytesChunk(bits []byte, data [][]byte) []byte { + for _, b := range data { + bits = encodeUvarint(bits, uint64(len(b))) + } + for _, b := range data { + bits = append(bits, b...) + } + return bits +} + +func encodeNullableBytesChunk(bits []byte, data [][]byte) []byte { + for _, b := range data { + var l int64 = -1 + if b != nil { + l = int64(len(b)) + } + bits = encodeVarint(bits, l) + } + for _, b := range data { + if b != nil { + bits = append(bits, b...) + } + } + return bits +} + +func encodeVarintChunk(bits []byte, data []int64) []byte { + for _, v := range data { + bits = encodeVarint(bits, v) + } + return bits +} + +func encodeUvarintChunk(bits []byte, data []uint64) []byte { + for _, v := range data { + bits = encodeUvarint(bits, v) + } + return bits +} + +func encodeDeltaVarintChunk(bits []byte, data []int64) []byte { + last := data[0] + bits = encodeVarint(bits, last) + for _, v := range data[1:] { + bits = encodeVarint(bits, v-last) + last = v + } + return bits +} + +func encodeDeltaUvarintChunk(bits []byte, data []uint64) []byte { + last := data[0] + bits = encodeUvarint(bits, last) + for _, v := range data[1:] { + bits = encodeUvarint(bits, v-last) + last = v + } + return bits +} + +func encodeSizeTables(bits []byte, tables [][]uint64) []byte { + size := len(bits) + for _, table := range tables { + bits = encodeUvarint(bits, uint64(len(table))) + bits = encodeDeltaUvarintChunk(bits, table) + } + bits, _ = encodeUvarintReversed(bits, uint64(len(bits)-size)) + return bits +} + +/// TiDB types encoder +func encodeTiDBType(ty byte, flag model.ColumnFlagType, value interface{}) []byte { + if value == nil { + return nil + } + switch ty { + case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeJSON, mysql.TypeNewDecimal: + // value type for these mysql types are string + return []byte(value.(string)) + case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit: + // value type for thest mysql types are uint64 + return encodeUvarint(nil, value.(uint64)) + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar, + mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: + // value type for these mysql types are []byte + return value.([]byte) + case mysql.TypeFloat, mysql.TypeDouble: + // value type for these mysql types are float64 + return encodeFloat64(nil, value.(float64)) + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear: + // value type for these mysql types are int64 or uint64 depends on flags + if flag.IsUnsigned() { + return encodeUvarint(nil, value.(uint64)) + } else { + return encodeVarint(nil, value.(int64)) + } + case mysql.TypeUnspecified: + fallthrough + case mysql.TypeNull: + fallthrough + case mysql.TypeGeometry: + return nil + } + return nil +} + +/// Message encoder +type craftMessageEncoder struct { + bits []byte + sizeTables [][]uint64 + valuesStartOffset int + valuesSizes []uint64 + valuesSizesIndex int +} + +func newCraftMessageEncoder() *craftMessageEncoder { + return &craftMessageEncoder{ + bits: encodeUvarint(make([]byte, 0, craftDefaultBufferCapacity), CraftVersion1), + } +} + +func (e *craftMessageEncoder) encodeValueSize() *craftMessageEncoder { + e.valuesSizes[e.valuesSizesIndex] = uint64(len(e.bits) - e.valuesStartOffset) + e.valuesSizesIndex++ + return e +} + +func (e *craftMessageEncoder) encodeUvarint(u64 uint64) *craftMessageEncoder { + e.bits = encodeUvarint(e.bits, u64) + return e +} + +func (e *craftMessageEncoder) encodeString(s string) *craftMessageEncoder { + e.bits = encodeString(e.bits, s) + return e +} + +func (e *craftMessageEncoder) encodeKeys(keys *craftColumnarKeys) *craftMessageEncoder { + e.bits = encodeUvarint(e.bits, uint64(keys.count)) + oldSize := len(e.bits) + e.valuesSizes = make([]uint64, keys.count) + e.bits = keys.encode(e.bits) + e.valuesStartOffset = len(e.bits) + e.sizeTables = append(e.sizeTables, []uint64{uint64(len(e.bits) - oldSize)}, e.valuesSizes) + return e +} + +func (e *craftMessageEncoder) encode() []byte { + return encodeSizeTables(e.bits, e.sizeTables) +} + +func (e *craftMessageEncoder) encodeRowChangeEvents(events []craftRowChangedEvent) *craftMessageEncoder { + sizeTables := e.sizeTables + for _, event := range events { + columnGroupSizeTable := make([]uint64, len(event)) + for gi, group := range event { + oldSize := len(e.bits) + e.bits = group.encode(e.bits) + columnGroupSizeTable[gi] = uint64(len(e.bits) - oldSize) + } + sizeTables = append(sizeTables, columnGroupSizeTable) + e.encodeValueSize() + } + e.sizeTables = sizeTables + return e +} + +func newCraftResolvedEventEncoder(ts uint64) *craftMessageEncoder { + return newCraftMessageEncoder().encodeKeys(&craftColumnarKeys{ + ts: []uint64{uint64(ts)}, + ty: []uint64{uint64(model.MqMessageTypeResolved)}, + rowID: []int64{int64(-1)}, + partition: []int64{int64(-1)}, + schema: [][]byte{nil}, + table: [][]byte{nil}, + count: 1, + }).encodeValueSize() +} + +func newCraftDDLEventEncoder(ev *model.DDLEvent) *craftMessageEncoder { + ty := uint64(ev.Type) + query := ev.Query + return newCraftMessageEncoder().encodeKeys(&craftColumnarKeys{ + ts: []uint64{uint64(ev.CommitTs)}, + ty: []uint64{uint64(model.MqMessageTypeDDL)}, + rowID: []int64{int64(-1)}, + partition: []int64{int64(-1)}, + schema: [][]byte{[]byte(ev.TableInfo.Schema)}, + table: [][]byte{[]byte(ev.TableInfo.Table)}, + count: 1, + }).encodeUvarint(ty).encodeString(query).encodeValueSize() +} diff --git a/cdc/sink/codec/craft_model.go b/cdc/sink/codec/craft_model.go new file mode 100644 index 00000000000..50e48f9832b --- /dev/null +++ b/cdc/sink/codec/craft_model.go @@ -0,0 +1,303 @@ +package codec + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" +) + +/// Utility functions for buffer allocation +func newBufferSize(oldSize int) int { + var newSize int + if oldSize > 128 { + newSize = oldSize + 128 + } else { + if oldSize > 0 { + newSize = oldSize * 2 + } else { + newSize = 8 + } + } + return newSize +} + +func newUint64Buffers(eachSize int) ([]uint64, []uint64) { + buffer := make([]uint64, eachSize*2) + return buffer[:eachSize], buffer[eachSize:] +} + +func resizeUint64Buffer(buffer1, buffer2 []uint64) ([]uint64, []uint64) { + newBuffer1, newBuffer2 := newUint64Buffers(newBufferSize(len(buffer1))) + copy(newBuffer1, buffer1) + copy(newBuffer2, buffer2) + return newBuffer1, newBuffer2 +} + +func newInt64Buffers(eachSize int) ([]int64, []int64) { + buffer := make([]int64, eachSize*2) + return buffer[:eachSize], buffer[eachSize:] +} + +func resizeInt64Buffer(buffer1, buffer2 []int64) ([]int64, []int64) { + newBuffer1, newBuffer2 := newInt64Buffers(newBufferSize(len(buffer1))) + copy(newBuffer1, buffer1) + copy(newBuffer2, buffer2) + return newBuffer1, newBuffer2 +} + +func newBytesBuffers(eachSize int) ([][]byte, [][]byte) { + buffer := make([][]byte, eachSize*2) + return buffer[:eachSize], buffer[eachSize:] +} + +func resizeBytesBuffer(buffer1, buffer2 [][]byte) ([][]byte, [][]byte) { + newBuffer1, newBuffer2 := newBytesBuffers(newBufferSize(len(buffer1))) + copy(newBuffer1, buffer1) + copy(newBuffer2, buffer2) + return newBuffer1, newBuffer2 +} + +/// Keys in columnar layout +type craftColumnarKeys struct { + ts []uint64 + ty []uint64 + rowID []int64 + partition []int64 + schema [][]byte + table [][]byte + + count int +} + +func (k *craftColumnarKeys) encode(bits []byte) []byte { + bits = encodeDeltaUvarintChunk(bits, k.ts[:k.count]) + bits = encodeDeltaUvarintChunk(bits, k.ty[:k.count]) + bits = encodeDeltaVarintChunk(bits, k.rowID[:k.count]) + bits = encodeDeltaVarintChunk(bits, k.partition[:k.count]) + bits = encodeNullableBytesChunk(bits, k.schema[:k.count]) + bits = encodeNullableBytesChunk(bits, k.table[:k.count]) + return bits +} + +func (k *craftColumnarKeys) appendKey(ts, ty uint64, rowID, partition int64, schema, table []byte) int { + idx := k.count + if idx+1 > len(k.ty) { + k.ts, k.ty = resizeUint64Buffer(k.ts, k.ty) + k.rowID, k.partition = resizeInt64Buffer(k.rowID, k.partition) + k.schema, k.table = resizeBytesBuffer(k.schema, k.table) + } + k.ts[idx] = ts + k.ty[idx] = ty + k.rowID[idx] = rowID + k.partition[idx] = partition + k.schema[idx] = schema + k.table[idx] = table + k.count++ + + return 32 + len(schema) + len(table) /* 4 64-bits integers and two bytes array */ +} + +func (k *craftColumnarKeys) reset() { + k.count = 0 +} + +func (k *craftColumnarKeys) getType(index int) model.MqMessageType { + return model.MqMessageType(k.ty[index]) +} + +func (k *craftColumnarKeys) getTs(index int) uint64 { + return k.ts[index] +} + +func (k *craftColumnarKeys) getPartition(index int) int64 { + return k.partition[index] +} + +func (k *craftColumnarKeys) getSchema(index int) string { + return string(k.schema[index]) +} + +func (k *craftColumnarKeys) getTable(index int) string { + return string(k.table[index]) +} + +func decodeCraftColumnarKeys(bits []byte, numKeys int) (*craftColumnarKeys, error) { + var ts, ty []uint64 + var rowID, partition []int64 + var schema, table [][]byte + var err error + if bits, ts, err = decodeDeltaUvarintChunk(bits, numKeys); err != nil { + return nil, errors.Trace(err) + } + if bits, ty, err = decodeDeltaUvarintChunk(bits, numKeys); err != nil { + return nil, errors.Trace(err) + } + if bits, rowID, err = decodeDeltaVarintChunk(bits, numKeys); err != nil { + return nil, errors.Trace(err) + } + if bits, partition, err = decodeDeltaVarintChunk(bits, numKeys); err != nil { + return nil, errors.Trace(err) + } + if bits, schema, err = decodeNullableBytesChunk(bits, numKeys); err != nil { + return nil, errors.Trace(err) + } + if bits, table, err = decodeNullableBytesChunk(bits, numKeys); err != nil { + return nil, errors.Trace(err) + } + return &craftColumnarKeys{ + ts: ts, + ty: ty, + rowID: rowID, + partition: partition, + schema: schema, + table: table, + count: numKeys, + }, nil +} + +/// Column group in columnar layout +type craftColumnarColumnGroup struct { + ty byte + names [][]byte + types []uint64 + flags []uint64 + values [][]byte +} + +func (g *craftColumnarColumnGroup) encode(bits []byte) []byte { + bits = append(bits, g.ty) + bits = encodeUvarint(bits, uint64(len(g.names))) + bits = encodeBytesChunk(bits, g.names) + bits = encodeUvarintChunk(bits, g.types) + bits = encodeUvarintChunk(bits, g.flags) + bits = encodeBytesChunk(bits, g.values) + return bits +} + +func (g *craftColumnarColumnGroup) toModel() ([]*model.Column, error) { + columns := make([]*model.Column, len(g.names)) + for i, name := range g.names { + ty := byte(g.types[i]) + flag := model.ColumnFlagType(g.flags[i]) + value, err := decodeTiDBType(ty, flag, g.values[i]) + if err != nil { + return nil, errors.Trace(err) + } + columns[i] = &model.Column{ + Name: string(name), + Type: ty, + Flag: flag, + Value: value, + } + } + return columns, nil +} + +func newCraftColumnarColumnGroup(ty byte, columns []*model.Column) (int, *craftColumnarColumnGroup) { + var names [][]byte + var values [][]byte + var types []uint64 + var flags []uint64 + estimatedSize := 0 + for _, col := range columns { + if col == nil { + continue + } + name := []byte(col.Name) + names = append(names, name) + types = append(types, uint64(col.Type)) + flags = append(flags, uint64(col.Flag)) + value := encodeTiDBType(col.Type, col.Flag, col.Value) + values = append(values, value) + estimatedSize += len(name) + len(value) + 8 /* two bytes array and two 64-bits integers */ + } + if len(names) > 0 { + return estimatedSize, &craftColumnarColumnGroup{ + ty: ty, + names: names, + types: types, + flags: flags, + values: values, + } + } + return estimatedSize, nil +} + +/// Row changed message is basically an array of column groups +type craftRowChangedEvent = []*craftColumnarColumnGroup + +func newCraftRowChangedMessage(ev *model.RowChangedEvent) (int, craftRowChangedEvent) { + var groups []*craftColumnarColumnGroup + estimatedSize := 0 + if ev.IsDelete() { + if size, group := newCraftColumnarColumnGroup(craftColumnGroupTypeDelete, ev.PreColumns); group != nil { + groups = append(groups, group) + estimatedSize += size + } + } else { + if size, group := newCraftColumnarColumnGroup(craftColumnGroupTypeNew, ev.Columns); group != nil { + groups = append(groups, group) + estimatedSize += size + } + if size, group := newCraftColumnarColumnGroup(craftColumnGroupTypeOld, ev.PreColumns); group != nil { + groups = append(groups, group) + estimatedSize += size + } + } + return estimatedSize, craftRowChangedEvent(groups) +} + +/// A buffer to save row changed events in batch +type craftRowChangedEventBuffer struct { + keys *craftColumnarKeys + + events []craftRowChangedEvent + eventsCount int + estimatedSize int +} + +func (b *craftRowChangedEventBuffer) encode() []byte { + bits := newCraftMessageEncoder().encodeKeys(b.keys).encodeRowChangeEvents(b.events[:b.eventsCount]).encode() + b.reset() + return bits +} + +func (b *craftRowChangedEventBuffer) appendRowChangedEvent(ev *model.RowChangedEvent) (rows, size int) { + var partition int64 = -1 + if ev.Table.IsPartition { + partition = ev.Table.TableID + } + + b.estimatedSize += b.keys.appendKey( + ev.CommitTs, + uint64(model.MqMessageTypeRow), + ev.RowID, + partition, + []byte(ev.Table.Schema), + []byte(ev.Table.Table), + ) + if b.eventsCount+1 > len(b.events) { + newSize := newBufferSize(b.eventsCount) + events := make([]craftRowChangedEvent, newSize) + copy(events, b.events) + b.events = events + } + size, message := newCraftRowChangedMessage(ev) + b.events[b.eventsCount] = message + b.eventsCount++ + b.estimatedSize += size + return b.eventsCount, b.estimatedSize +} + +func (b *craftRowChangedEventBuffer) reset() { + b.keys.reset() + b.eventsCount = 0 + b.estimatedSize = 0 +} + +func (b *craftRowChangedEventBuffer) size() int { + return b.estimatedSize +} + +func (b *craftRowChangedEventBuffer) getKeys() *craftColumnarKeys { + return b.keys +} diff --git a/cdc/sink/codec/craft_test.go b/cdc/sink/codec/craft_test.go new file mode 100644 index 00000000000..e511d89b6a4 --- /dev/null +++ b/cdc/sink/codec/craft_test.go @@ -0,0 +1,297 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "math" + "math/rand" + "strconv" + + "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/util/testleak" +) + +type craftBatchSuite struct { + rowCases [][]*model.RowChangedEvent + ddlCases [][]*model.DDLEvent + resolvedTsCases [][]uint64 +} + +var _ = check.Suite(&craftBatchSuite{ + rowCases: codecRowCases, + ddlCases: codecDDLCases, + resolvedTsCases: codecResolvedTSCases, +}) + +func (s *craftBatchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEncoder, newDecoder func(value []byte) (EventBatchDecoder, error)) { + checkRowDecoder := func(decoder EventBatchDecoder, cs []*model.RowChangedEvent) { + index := 0 + for { + tp, hasNext, err := decoder.HasNext() + c.Assert(err, check.IsNil) + if !hasNext { + break + } + c.Assert(tp, check.Equals, model.MqMessageTypeRow) + row, err := decoder.NextRowChangedEvent() + c.Assert(err, check.IsNil) + c.Assert(row, check.DeepEquals, cs[index]) + index++ + } + } + checkDDLDecoder := func(decoder EventBatchDecoder, cs []*model.DDLEvent) { + index := 0 + for { + tp, hasNext, err := decoder.HasNext() + c.Assert(err, check.IsNil) + if !hasNext { + break + } + c.Assert(tp, check.Equals, model.MqMessageTypeDDL) + ddl, err := decoder.NextDDLEvent() + c.Assert(err, check.IsNil) + c.Assert(ddl, check.DeepEquals, cs[index]) + index++ + } + } + checkTSDecoder := func(decoder EventBatchDecoder, cs []uint64) { + index := 0 + for { + tp, hasNext, err := decoder.HasNext() + c.Assert(err, check.IsNil) + if !hasNext { + break + } + c.Assert(tp, check.Equals, model.MqMessageTypeResolved) + ts, err := decoder.NextResolvedEvent() + c.Assert(err, check.IsNil) + c.Assert(ts, check.DeepEquals, cs[index]) + index++ + } + } + + for _, cs := range s.rowCases { + encoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + + events := 0 + for _, row := range cs { + op, err := encoder.AppendRowChangedEvent(row) + events++ + c.Assert(err, check.IsNil) + c.Assert(op, check.Equals, EncoderNoOperation) + } + // test normal decode + if len(cs) > 0 { + res := encoder.Build() + c.Assert(res, check.HasLen, 1) + decoder, err := newDecoder(res[0].Value) + c.Assert(err, check.IsNil) + checkRowDecoder(decoder, cs) + } + } + + for _, cs := range s.ddlCases { + encoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + + for i, ddl := range cs { + msg, err := encoder.EncodeDDLEvent(ddl) + c.Assert(err, check.IsNil) + c.Assert(msg, check.NotNil) + decoder, err := newDecoder(msg.Value) + c.Assert(err, check.IsNil) + checkDDLDecoder(decoder, cs[i:i+1]) + } + } + + for _, cs := range s.resolvedTsCases { + encoder := newEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "8192", "max-batch-size": "64"}) + c.Assert(err, check.IsNil) + + for i, ts := range cs { + msg, err := encoder.EncodeCheckpointEvent(ts) + c.Assert(err, check.IsNil) + c.Assert(msg, check.NotNil) + decoder, err := newDecoder(msg.Value) + c.Assert(err, check.IsNil) + checkTSDecoder(decoder, cs[i:i+1]) + } + } +} + +func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewCraftEventBatchEncoder().(*CraftEventBatchEncoder) + err := encoder.SetParams(map[string]string{}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, 4096) + c.Assert(encoder.maxMessageSize, check.Equals, 64*1024*1024) + + err = encoder.SetParams(map[string]string{"max-message-bytes": "0"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-message-bytes": "-1"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxInt32)}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, 4096) + c.Assert(encoder.maxMessageSize, check.Equals, math.MaxInt32) + + err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)}) + c.Assert(err, check.NotNil) + + err = encoder.SetParams(map[string]string{"max-batch-size": "0"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-batch-size": "-1"}) + c.Assert(err, check.ErrorMatches, ".*invalid.*") + + err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint16)}) + c.Assert(err, check.IsNil) + c.Assert(encoder.maxBatchSize, check.Equals, int(math.MaxUint16)) + c.Assert(encoder.maxMessageSize, check.Equals, 64*1024*1024) + + err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)}) + c.Assert(err, check.NotNil) + + err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)}) + c.Assert(err, check.NotNil) +} + +func (s *craftBatchSuite) TestMaxMessageBytes(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewCraftEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-message-bytes": "256"}) + c.Check(err, check.IsNil) + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + for _, msg := range messages { + c.Assert(msg.Length(), check.LessEqual, 256) + } +} + +func (s *craftBatchSuite) TestMaxBatchSize(c *check.C) { + defer testleak.AfterTest(c)() + encoder := NewCraftEventBatchEncoder() + err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) + c.Check(err, check.IsNil) + + testEvent := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, + } + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages := encoder.Build() + sum := 0 + for _, msg := range messages { + decoder, err := NewCraftEventBatchDecoder(msg.Value) + c.Check(err, check.IsNil) + count := 0 + for { + t, hasNext, err := decoder.HasNext() + c.Check(err, check.IsNil) + if !hasNext { + break + } + + c.Check(t, check.Equals, model.MqMessageTypeRow) + _, err = decoder.NextRowChangedEvent() + c.Check(err, check.IsNil) + count++ + } + c.Check(count, check.LessEqual, 64) + sum += count + } + c.Check(sum, check.Equals, 10000) +} + +func (s *craftBatchSuite) TestDefaultEventBatchCodec(c *check.C) { + defer testleak.AfterTest(c)() + s.testBatchCodec(c, func() EventBatchEncoder { + encoder := NewCraftEventBatchEncoder() + return encoder + }, NewCraftEventBatchDecoder) +} + +var _ = check.Suite(&craftCodecSuite{}) + +type craftCodecSuite struct{} + +func (s *craftCodecSuite) TestSizeTable(c *check.C) { + defer testleak.AfterTest(c)() + tables := [][]uint64{ + []uint64{ + 1, 3, 5, 7, 9, + }, + []uint64{ + 2, 4, 6, 8, 10, + }, + } + bits := make([]byte, 16) + rand.Read(bits) + bits = encodeSizeTables(bits, tables) + + size, decoded, err := decodeSizeTables(bits) + c.Check(err, check.IsNil) + c.Check(decoded, check.DeepEquals, tables) + c.Check(size, check.Equals, len(bits)-16) +} + +func (s *craftCodecSuite) TestUvarintReverse(c *check.C) { + defer testleak.AfterTest(c)() + + var i uint64 = 0 + + for i < 0x8000000000000000 { + bits := make([]byte, 16) + rand.Read(bits) + bits, bytes1 := encodeUvarintReversed(bits, i) + bytes2, u64, err := decodeUvarintReversed(bits) + c.Check(err, check.IsNil) + c.Check(u64, check.Equals, i) + c.Check(bytes1, check.Equals, len(bits)-16) + c.Check(bytes1, check.Equals, bytes2) + if i == 0 { + i = 1 + } else { + i <<= 1 + } + } +} diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index f63b8e82ca3..b70397264f2 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -143,6 +143,7 @@ const ( ProtocolAvro ProtocolMaxwell ProtocolCanalJSON + ProtocolCraft ) // FromString converts the protocol from string to Protocol enum type @@ -158,6 +159,8 @@ func (p *Protocol) FromString(protocol string) { *p = ProtocolMaxwell case "canal-json": *p = ProtocolCanalJSON + case "craft": + *p = ProtocolCraft default: *p = ProtocolDefault log.Warn("can't support codec protocol, using default protocol", zap.String("protocol", protocol)) @@ -177,6 +180,8 @@ func NewEventBatchEncoder(p Protocol) func() EventBatchEncoder { return NewMaxwellEventBatchEncoder case ProtocolCanalJSON: return NewCanalFlatEventBatchEncoder + case ProtocolCraft: + return NewCraftEventBatchEncoder default: log.Warn("unknown codec protocol value of EventBatchEncoder", zap.Int("protocol_value", int(p))) return NewJSONEventBatchEncoder diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index e7a851609d3..ba5e6330b61 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -535,27 +535,27 @@ func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { if maxMessageBytes, ok := params["max-message-bytes"]; ok { d.maxKafkaMessageSize, err = strconv.Atoi(maxMessageBytes) if err != nil { - return cerror.ErrKafkaInvalidConfig.Wrap(err) + return cerror.ErrSinkInvalidConfig.Wrap(err) } } else { d.maxKafkaMessageSize = DefaultMaxMessageBytes } if d.maxKafkaMessageSize <= 0 { - return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize)) + return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxKafkaMessageSize)) } if maxBatchSize, ok := params["max-batch-size"]; ok { d.maxBatchSize, err = strconv.Atoi(maxBatchSize) if err != nil { - return cerror.ErrKafkaInvalidConfig.Wrap(err) + return cerror.ErrSinkInvalidConfig.Wrap(err) } } else { d.maxBatchSize = DefaultMaxBatchSize } if d.maxBatchSize <= 0 { - return cerror.ErrKafkaInvalidConfig.Wrap(errors.Errorf("invalid max-batch-size %d", d.maxBatchSize)) + return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-batch-size %d", d.maxBatchSize)) } return nil } diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 6b3d19f49c5..f2d6db245d8 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -33,57 +33,9 @@ type batchSuite struct { } var _ = check.Suite(&batchSuite{ - rowCases: [][]*model.RowChangedEvent{{{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, - }}, {{ - CommitTs: 1, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, - }, { - CommitTs: 2, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, - }, { - CommitTs: 3, - Table: &model.TableName{Schema: "a", Table: "b"}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "bb"}}, - }, { - CommitTs: 4, - Table: &model.TableName{Schema: "a", Table: "c", TableID: 6, IsPartition: true}, - Columns: []*model.Column{{Name: "col1", Type: 1, Value: "cc"}}, - }}, {}}, - ddlCases: [][]*model.DDLEvent{{{ - CommitTs: 1, - TableInfo: &model.SimpleTableInfo{ - Schema: "a", Table: "b", - }, - Query: "create table a", - Type: 1, - }}, {{ - CommitTs: 1, - TableInfo: &model.SimpleTableInfo{ - Schema: "a", Table: "b", - }, - Query: "create table a", - Type: 1, - }, { - CommitTs: 2, - TableInfo: &model.SimpleTableInfo{ - Schema: "a", Table: "b", - }, - Query: "create table b", - Type: 2, - }, { - CommitTs: 3, - TableInfo: &model.SimpleTableInfo{ - Schema: "a", Table: "b", - }, - Query: "create table c", - Type: 3, - }}, {}}, - resolvedTsCases: [][]uint64{{1}, {1, 2, 3}, {}}, + rowCases: codecRowCases, + ddlCases: codecDDLCases, + resolvedTsCases: codecResolvedTSCases, }) func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEncoder, newDecoder func(key []byte, value []byte) (EventBatchDecoder, error)) { diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index acdb6da8661..881842b047e 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -105,6 +105,8 @@ var ( ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed")) ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed")) ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled")) + ErrSinkInvalidConfig = errors.Normalize("sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig")) + ErrCraftCodecInvalidData = errors.Normalize("craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData")) // utilities related errors ErrToTLSConfigFailed = errors.Normalize("generate tls config failed", errors.RFCCodeText("CDC:ErrToTLSConfigFailed"))