Skip to content

Commit

Permalink
codec(ticdc): adjust decoder interface to make it reusable (#8863)
Browse files Browse the repository at this point in the history
close #8861
  • Loading branch information
3AceShowHand authored Apr 27, 2023
1 parent ed5a5f5 commit fc2dc70
Show file tree
Hide file tree
Showing 14 changed files with 150 additions and 72 deletions.
31 changes: 18 additions & 13 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,21 +512,26 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
panic("sink should initialized")
}

var (
decoder codec.RowEventDecoder
err error
)
switch c.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder = open.NewBatchDecoder()
case config.ProtocolCanalJSON:
decoder = canal.NewBatchDecoder(c.enableTiDBExtension, "")
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol))
}
if err != nil {
return errors.Trace(err)
}

eventGroups := make(map[int64]*eventsGroup)
for message := range claim.Messages() {
var (
decoder codec.RowEventDecoder
err error
)
switch c.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder, err = open.NewBatchDecoder(message.Key, message.Value)
case config.ProtocolCanalJSON:
decoder = canal.NewBatchDecoder(message.Value, c.enableTiDBExtension, "")
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol))
}
if err != nil {
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
log.Error("add key value to the decoder failed", zap.Error(err))
return errors.Trace(err)
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,11 @@ func (c *consumer) emitDMLEvents(
case config.ProtocolCanalJSON:
// Always enable tidb extension for canal-json protocol
// because we need to get the commit ts from the extension field.
decoder = canal.NewBatchDecoder(content, true, c.codecCfg.Terminator)
decoder = canal.NewBatchDecoder(true, c.codecCfg.Terminator)
err := decoder.AddKeyValue(nil, content)
if err != nil {
return errors.Trace(err)
}
}

cnt := 0
Expand Down
28 changes: 14 additions & 14 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type BatchEncoder struct {

// Options is used to initialize the encoder, control the encoding behavior.
type Options struct {
enableTiDBExtension bool
enableRowChecksum bool
EnableTiDBExtension bool
EnableRowChecksum bool

decimalHandlingMode string
bigintUnsignedHandlingMode string
DecimalHandlingMode string
BigintUnsignedHandlingMode string
}

type avroEncodeInput struct {
Expand Down Expand Up @@ -194,8 +194,8 @@ func (a *BatchEncoder) avroEncode(
colInfos: e.ColInfos,
}

enableTiDBExtension = a.enableTiDBExtension
enableRowLevelChecksum = a.enableRowChecksum
enableTiDBExtension = a.EnableTiDBExtension
enableRowLevelChecksum = a.EnableRowChecksum
schemaManager = a.valueSchemaManager
if e.IsInsert() {
operation = insertOperation
Expand All @@ -220,8 +220,8 @@ func (a *BatchEncoder) avroEncode(
input,
enableTiDBExtension,
enableRowLevelChecksum,
a.decimalHandlingMode,
a.bigintUnsignedHandlingMode,
a.DecimalHandlingMode,
a.BigintUnsignedHandlingMode,
)
if err != nil {
log.Error("AvroEventBatchEncoder: generating schema failed", zap.Error(err))
Expand All @@ -245,8 +245,8 @@ func (a *BatchEncoder) avroEncode(
e.CommitTs,
operation,
enableTiDBExtension,
a.decimalHandlingMode,
a.bigintUnsignedHandlingMode,
a.DecimalHandlingMode,
a.BigintUnsignedHandlingMode,
)
if err != nil {
log.Error("AvroEventBatchEncoder: converting to native failed", zap.Error(err))
Expand Down Expand Up @@ -924,10 +924,10 @@ func (b *batchEncoderBuilder) Build() codec.RowEventEncoder {
valueSchemaManager: b.valueSchemaManager,
result: make([]*common.Message, 0, 1),
Options: &Options{
enableTiDBExtension: b.config.EnableTiDBExtension,
enableRowChecksum: b.config.EnableRowChecksum,
decimalHandlingMode: b.config.AvroDecimalHandlingMode,
bigintUnsignedHandlingMode: b.config.AvroBigintUnsignedHandlingMode,
EnableTiDBExtension: b.config.EnableTiDBExtension,
EnableRowChecksum: b.config.EnableRowChecksum,
DecimalHandlingMode: b.config.AvroDecimalHandlingMode,
BigintUnsignedHandlingMode: b.config.AvroBigintUnsignedHandlingMode,
},
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func setupEncoderAndSchemaRegistry(
keySchemaManager: keyManager,
result: make([]*common.Message, 0, 1),
Options: &Options{
enableTiDBExtension: enableTiDBExtension,
decimalHandlingMode: decimalHandlingMode,
bigintUnsignedHandlingMode: bigintUnsignedHandlingMode,
EnableTiDBExtension: enableTiDBExtension,
DecimalHandlingMode: decimalHandlingMode,
BigintUnsignedHandlingMode: bigintUnsignedHandlingMode,
},
}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/sink/codec/builder/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ func BenchmarkProtobuf2Encoding(b *testing.B) {
func BenchmarkCraftDecoding(b *testing.B) {
allocator := craft.NewSliceAllocator(128)
for i := 0; i < b.N; i++ {
decoder := craft.NewBatchDecoderWithAllocator(allocator)
for _, message := range codecCraftEncodedRowChanges {
if decoder, err := craft.NewBatchDecoderWithAllocator(
message.Value, allocator); err != nil {
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
panic(err)
} else {
for {
Expand All @@ -299,7 +299,8 @@ func BenchmarkCraftDecoding(b *testing.B) {
func BenchmarkJsonDecoding(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, message := range codecJSONEncodedRowChanges {
if decoder, err := open.NewBatchDecoder(message.Key, message.Value); err != nil {
decoder := open.NewBatchDecoder()
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
panic(err)
} else {
for {
Expand Down
10 changes: 7 additions & 3 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ type batchDecoder struct {
}

// NewBatchDecoder return a decoder for canal-json
func NewBatchDecoder(data []byte,
func NewBatchDecoder(
enableTiDBExtension bool,
terminator string,
) codec.RowEventDecoder {
return &batchDecoder{
data: data,
msg: nil,
enableTiDBExtension: enableTiDBExtension,
terminator: terminator,
}
}

// AddKeyValue implements the RowEventDecoder interface
func (b *batchDecoder) AddKeyValue(_, value []byte) error {
b.data = value
return nil
}

// HasNext implements the RowEventDecoder interface
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
var (
Expand Down
13 changes: 10 additions & 3 deletions pkg/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
msg := messages[0]

for _, decodeEnable := range []bool{false, true} {
decoder := NewBatchDecoder(msg.Value, decodeEnable, "")
decoder := NewBatchDecoder(decodeEnable, "")
err := decoder.AddKeyValue(msg.Key, msg.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
Expand Down Expand Up @@ -95,7 +97,9 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) {
require.NotNil(t, result)

for _, decodeEnable := range []bool{false, true} {
decoder := NewBatchDecoder(result.Value, decodeEnable, "")
decoder := NewBatchDecoder(decodeEnable, "")
err := decoder.AddKeyValue(nil, result.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
Expand Down Expand Up @@ -130,7 +134,10 @@ func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) {
encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}`
decoder := NewBatchDecoder([]byte(encodedValue), false, "\n")
decoder := NewBatchDecoder(false, "\n")
err := decoder.AddKeyValue(nil, []byte(encodedValue))
require.NoError(t, err)

cnt := 0
for {
tp, hasNext, err := decoder.HasNext()
Expand Down
5 changes: 4 additions & 1 deletion pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func TestEncodeCheckpointEvent(t *testing.T) {
}

require.NotNil(t, msg)
decoder := NewBatchDecoder(msg.Value, enable, "")
decoder := NewBatchDecoder(enable, "")

err = decoder.AddKeyValue(msg.Key, msg.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
Expand Down
30 changes: 18 additions & 12 deletions pkg/sink/codec/craft/craft_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,27 +118,33 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
return event, nil
}

// newBatchDecoder creates a new batchDecoder.
func newBatchDecoder(bits []byte) (codec.RowEventDecoder, error) {
return NewBatchDecoderWithAllocator(bits, NewSliceAllocator(64))
decoder := NewBatchDecoderWithAllocator(NewSliceAllocator(64))
err := decoder.AddKeyValue(nil, bits)
return decoder, err
}

// NewBatchDecoderWithAllocator creates a new batchDecoder with given allocator.
func NewBatchDecoderWithAllocator(
bits []byte, allocator *SliceAllocator,
) (codec.RowEventDecoder, error) {
decoder, err := NewMessageDecoder(bits, allocator)
allocator *SliceAllocator,
) codec.RowEventDecoder {
return &batchDecoder{
allocator: allocator,
}
}

// AddKeyValue implements the RowEventDecoder interface
func (b *batchDecoder) AddKeyValue(_, value []byte) error {
decoder, err := NewMessageDecoder(value, b.allocator)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
headers, err := decoder.Headers()
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
b.decoder = decoder
b.headers = headers

return &batchDecoder{
headers: headers,
decoder: decoder,
allocator: allocator,
}, nil
return nil
}
5 changes: 5 additions & 0 deletions pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func NewBatchDecoder(ctx context.Context,
}, nil
}

// AddKeyValue implements the RowEventDecoder interface.
func (b *batchDecoder) AddKeyValue(_, _ []byte) error {
return nil
}

// HasNext implements the RowEventDecoder interface.
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
err := b.parser.ReadRow()
Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/codec/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import "github.com/pingcap/tiflow/cdc/model"
// RowEventDecoder is an abstraction for events decoder
// this interface is only for testing now
type RowEventDecoder interface {
// AddKeyValue add the received key and values to the decoder,
// should be called before `HasNext`
// decoder decode the key and value into the event format.
AddKeyValue(key, value []byte) error

// HasNext returns
// 1. the type of the next event
// 2. a bool if the next event is exist
Expand Down
13 changes: 10 additions & 3 deletions pkg/sink/codec/internal/batch_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
Expand Down Expand Up @@ -288,8 +289,10 @@ func (s *BatchTester) TestBatchCodec(
res := encoder.Build()
require.Len(t, res, 1)
require.Equal(t, len(cs), res[0].GetRowsCount())

decoder, err := newDecoder(res[0].Key, res[0].Value)
require.Nil(t, err)
require.NoError(t, err)

checkRowDecoder(decoder, cs)
}
}
Expand All @@ -299,8 +302,10 @@ func (s *BatchTester) TestBatchCodec(
msg, err := encoder.EncodeDDLEvent(ddl)
require.Nil(t, err)
require.NotNil(t, msg)

decoder, err := newDecoder(msg.Key, msg.Value)
require.Nil(t, err)
require.NoError(t, err)

checkDDLDecoder(decoder, cs[i:i+1])

}
Expand All @@ -312,8 +317,10 @@ func (s *BatchTester) TestBatchCodec(
msg, err := encoder.EncodeCheckpointEvent(ts)
require.Nil(t, err)
require.NotNil(t, msg)

decoder, err := newDecoder(msg.Key, msg.Value)
require.Nil(t, err)
require.NoError(t, err)

checkTSDecoder(decoder, cs[i:i+1])
}
}
Expand Down
50 changes: 37 additions & 13 deletions pkg/sink/codec/open/open_protocol_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,44 @@ func (b *BatchDecoder) decodeNextKey() error {
}

// NewBatchDecoder creates a new BatchDecoder.
func NewBatchDecoder(key []byte, value []byte) (codec.RowEventDecoder, error) {
func NewBatchDecoder() codec.RowEventDecoder {
return &BatchDecoder{}

}

// AddKeyValue implements the RowEventDecoder interface
func (b *BatchDecoder) AddKeyValue(key, value []byte) error {
if len(b.keyBytes) != 0 || len(b.valueBytes) != 0 {
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("decoder key and value not nil")
}
version := binary.BigEndian.Uint64(key[:8])
key = key[8:]
if version != codec.BatchVersion1 {
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("unexpected key format version")
}

b.keyBytes = key
b.valueBytes = value

return nil

}

// AddKeyValue implements the RowEventDecoder interface
func (b *BatchMixedDecoder) AddKeyValue(key, value []byte) error {
if key != nil || value != nil {
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("decoder key and value not nil")
}
version := binary.BigEndian.Uint64(key[:8])
key = key[8:]
if version != codec.BatchVersion1 {
return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("unexpected key format version")
}
// if only decode one byte slice, we choose MixedDecoder
if len(key) > 0 && len(value) == 0 {
return &BatchMixedDecoder{
mixedBytes: key,
}, nil
}
return &BatchDecoder{
keyBytes: key,
valueBytes: value,
}, nil
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("unexpected key format version")
}

b.mixedBytes = key
return nil
}
Loading

0 comments on commit fc2dc70

Please sign in to comment.