Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): adjust decoder interface to make it reusable #8863

Merged
merged 8 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,21 +512,26 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
panic("sink should initialized")
}

var (
decoder codec.RowEventDecoder
err error
)
switch c.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder = open.NewBatchDecoder()
case config.ProtocolCanalJSON:
decoder = canal.NewBatchDecoder(c.enableTiDBExtension, "")
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol))
}
if err != nil {
return errors.Trace(err)
}

eventGroups := make(map[int64]*eventsGroup)
for message := range claim.Messages() {
var (
decoder codec.RowEventDecoder
err error
)
switch c.protocol {
case config.ProtocolOpen, config.ProtocolDefault:
decoder, err = open.NewBatchDecoder(message.Key, message.Value)
case config.ProtocolCanalJSON:
decoder = canal.NewBatchDecoder(message.Value, c.enableTiDBExtension, "")
default:
log.Panic("Protocol not supported", zap.Any("Protocol", c.protocol))
}
if err != nil {
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
log.Error("add key value to the decoder failed", zap.Error(err))
return errors.Trace(err)
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,11 @@ func (c *consumer) emitDMLEvents(
case config.ProtocolCanalJSON:
// Always enable tidb extension for canal-json protocol
// because we need to get the commit ts from the extension field.
decoder = canal.NewBatchDecoder(content, true, c.codecCfg.Terminator)
decoder = canal.NewBatchDecoder(true, c.codecCfg.Terminator)
err := decoder.AddKeyValue(nil, content)
if err != nil {
return errors.Trace(err)
}
}

cnt := 0
Expand Down
28 changes: 14 additions & 14 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type BatchEncoder struct {

// Options is used to initialize the encoder, control the encoding behavior.
type Options struct {
enableTiDBExtension bool
enableRowChecksum bool
EnableTiDBExtension bool
EnableRowChecksum bool

decimalHandlingMode string
bigintUnsignedHandlingMode string
DecimalHandlingMode string
BigintUnsignedHandlingMode string
}

type avroEncodeInput struct {
Expand Down Expand Up @@ -194,8 +194,8 @@ func (a *BatchEncoder) avroEncode(
colInfos: e.ColInfos,
}

enableTiDBExtension = a.enableTiDBExtension
enableRowLevelChecksum = a.enableRowChecksum
enableTiDBExtension = a.EnableTiDBExtension
enableRowLevelChecksum = a.EnableRowChecksum
schemaManager = a.valueSchemaManager
if e.IsInsert() {
operation = insertOperation
Expand All @@ -220,8 +220,8 @@ func (a *BatchEncoder) avroEncode(
input,
enableTiDBExtension,
enableRowLevelChecksum,
a.decimalHandlingMode,
a.bigintUnsignedHandlingMode,
a.DecimalHandlingMode,
a.BigintUnsignedHandlingMode,
)
if err != nil {
log.Error("AvroEventBatchEncoder: generating schema failed", zap.Error(err))
Expand All @@ -245,8 +245,8 @@ func (a *BatchEncoder) avroEncode(
e.CommitTs,
operation,
enableTiDBExtension,
a.decimalHandlingMode,
a.bigintUnsignedHandlingMode,
a.DecimalHandlingMode,
a.BigintUnsignedHandlingMode,
)
if err != nil {
log.Error("AvroEventBatchEncoder: converting to native failed", zap.Error(err))
Expand Down Expand Up @@ -924,10 +924,10 @@ func (b *batchEncoderBuilder) Build() codec.RowEventEncoder {
valueSchemaManager: b.valueSchemaManager,
result: make([]*common.Message, 0, 1),
Options: &Options{
enableTiDBExtension: b.config.EnableTiDBExtension,
enableRowChecksum: b.config.EnableRowChecksum,
decimalHandlingMode: b.config.AvroDecimalHandlingMode,
bigintUnsignedHandlingMode: b.config.AvroBigintUnsignedHandlingMode,
EnableTiDBExtension: b.config.EnableTiDBExtension,
EnableRowChecksum: b.config.EnableRowChecksum,
DecimalHandlingMode: b.config.AvroDecimalHandlingMode,
BigintUnsignedHandlingMode: b.config.AvroBigintUnsignedHandlingMode,
},
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func setupEncoderAndSchemaRegistry(
keySchemaManager: keyManager,
result: make([]*common.Message, 0, 1),
Options: &Options{
enableTiDBExtension: enableTiDBExtension,
decimalHandlingMode: decimalHandlingMode,
bigintUnsignedHandlingMode: bigintUnsignedHandlingMode,
EnableTiDBExtension: enableTiDBExtension,
DecimalHandlingMode: decimalHandlingMode,
BigintUnsignedHandlingMode: bigintUnsignedHandlingMode,
},
}, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/sink/codec/builder/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ func BenchmarkProtobuf2Encoding(b *testing.B) {
func BenchmarkCraftDecoding(b *testing.B) {
allocator := craft.NewSliceAllocator(128)
for i := 0; i < b.N; i++ {
decoder := craft.NewBatchDecoderWithAllocator(allocator)
for _, message := range codecCraftEncodedRowChanges {
if decoder, err := craft.NewBatchDecoderWithAllocator(
message.Value, allocator); err != nil {
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
panic(err)
} else {
for {
Expand All @@ -299,7 +299,8 @@ func BenchmarkCraftDecoding(b *testing.B) {
func BenchmarkJsonDecoding(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, message := range codecJSONEncodedRowChanges {
if decoder, err := open.NewBatchDecoder(message.Key, message.Value); err != nil {
decoder := open.NewBatchDecoder()
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
panic(err)
} else {
for {
Expand Down
10 changes: 7 additions & 3 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ type batchDecoder struct {
}

// NewBatchDecoder return a decoder for canal-json
func NewBatchDecoder(data []byte,
func NewBatchDecoder(
enableTiDBExtension bool,
terminator string,
) codec.RowEventDecoder {
return &batchDecoder{
data: data,
msg: nil,
enableTiDBExtension: enableTiDBExtension,
terminator: terminator,
}
}

// AddKeyValue implements the RowEventDecoder interface
func (b *batchDecoder) AddKeyValue(_, value []byte) error {
b.data = value
return nil
}

// HasNext implements the RowEventDecoder interface
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
var (
Expand Down
13 changes: 10 additions & 3 deletions pkg/sink/codec/canal/canal_json_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) {
msg := messages[0]

for _, decodeEnable := range []bool{false, true} {
decoder := NewBatchDecoder(msg.Value, decodeEnable, "")
decoder := NewBatchDecoder(decodeEnable, "")
err := decoder.AddKeyValue(msg.Key, msg.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
Expand Down Expand Up @@ -95,7 +97,9 @@ func TestNewCanalJSONBatchDecoder4DDLMessage(t *testing.T) {
require.NotNil(t, result)

for _, decodeEnable := range []bool{false, true} {
decoder := NewBatchDecoder(result.Value, decodeEnable, "")
decoder := NewBatchDecoder(decodeEnable, "")
err := decoder.AddKeyValue(nil, result.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
Expand Down Expand Up @@ -130,7 +134,10 @@ func TestCanalJSONBatchDecoderWithTerminator(t *testing.T) {
encodedValue := `{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1668067205238,"ts":1668067206650,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}],"old":null}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"UPDATE","es":1668067229137,"ts":1668067230720,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":[{"FirstName":"Bob","HireDate":"2014-06-04","LastName":"Smith","OfficeLocation":"New York","id":"101"}]}
{"id":0,"database":"test","table":"employee","pkNames":["id"],"isDdl":false,"type":"DELETE","es":1668067230388,"ts":1668067231725,"sql":"","sqlType":{"FirstName":12,"HireDate":91,"LastName":12,"OfficeLocation":12,"id":4},"mysqlType":{"FirstName":"varchar","HireDate":"date","LastName":"varchar","OfficeLocation":"varchar","id":"int"},"data":[{"FirstName":"Bob","HireDate":"2015-10-08","LastName":"Smith","OfficeLocation":"Los Angeles","id":"101"}],"old":null}`
decoder := NewBatchDecoder([]byte(encodedValue), false, "\n")
decoder := NewBatchDecoder(false, "\n")
err := decoder.AddKeyValue(nil, []byte(encodedValue))
require.NoError(t, err)

cnt := 0
for {
tp, hasNext, err := decoder.HasNext()
Expand Down
5 changes: 4 additions & 1 deletion pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func TestEncodeCheckpointEvent(t *testing.T) {
}

require.NotNil(t, msg)
decoder := NewBatchDecoder(msg.Value, enable, "")
decoder := NewBatchDecoder(enable, "")

err = decoder.AddKeyValue(msg.Key, msg.Value)
require.NoError(t, err)

ty, hasNext, err := decoder.HasNext()
require.Nil(t, err)
Expand Down
30 changes: 18 additions & 12 deletions pkg/sink/codec/craft/craft_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,27 +118,33 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
return event, nil
}

// newBatchDecoder creates a new batchDecoder.
func newBatchDecoder(bits []byte) (codec.RowEventDecoder, error) {
return NewBatchDecoderWithAllocator(bits, NewSliceAllocator(64))
decoder := NewBatchDecoderWithAllocator(NewSliceAllocator(64))
err := decoder.AddKeyValue(nil, bits)
return decoder, err
}

// NewBatchDecoderWithAllocator creates a new batchDecoder with given allocator.
func NewBatchDecoderWithAllocator(
bits []byte, allocator *SliceAllocator,
) (codec.RowEventDecoder, error) {
decoder, err := NewMessageDecoder(bits, allocator)
allocator *SliceAllocator,
) codec.RowEventDecoder {
return &batchDecoder{
allocator: allocator,
}
}

// AddKeyValue implements the RowEventDecoder interface
func (b *batchDecoder) AddKeyValue(_, value []byte) error {
decoder, err := NewMessageDecoder(value, b.allocator)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
headers, err := decoder.Headers()
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
b.decoder = decoder
b.headers = headers

return &batchDecoder{
headers: headers,
decoder: decoder,
allocator: allocator,
}, nil
return nil
}
5 changes: 5 additions & 0 deletions pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func NewBatchDecoder(ctx context.Context,
}, nil
}

// AddKeyValue implements the RowEventDecoder interface.
func (b *batchDecoder) AddKeyValue(_, _ []byte) error {
return nil
}

// HasNext implements the RowEventDecoder interface.
func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
err := b.parser.ReadRow()
Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/codec/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import "github.com/pingcap/tiflow/cdc/model"
// RowEventDecoder is an abstraction for events decoder
// this interface is only for testing now
type RowEventDecoder interface {
// AddKeyValue add the received key and values to the decoder,
// should be called before `HasNext`
// decoder decode the key and value into the event format.
AddKeyValue(key, value []byte) error

// HasNext returns
// 1. the type of the next event
// 2. a bool if the next event is exist
Expand Down
13 changes: 10 additions & 3 deletions pkg/sink/codec/internal/batch_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
Expand Down Expand Up @@ -288,8 +289,10 @@ func (s *BatchTester) TestBatchCodec(
res := encoder.Build()
require.Len(t, res, 1)
require.Equal(t, len(cs), res[0].GetRowsCount())

decoder, err := newDecoder(res[0].Key, res[0].Value)
require.Nil(t, err)
require.NoError(t, err)

checkRowDecoder(decoder, cs)
}
}
Expand All @@ -299,8 +302,10 @@ func (s *BatchTester) TestBatchCodec(
msg, err := encoder.EncodeDDLEvent(ddl)
require.Nil(t, err)
require.NotNil(t, msg)

decoder, err := newDecoder(msg.Key, msg.Value)
require.Nil(t, err)
require.NoError(t, err)

checkDDLDecoder(decoder, cs[i:i+1])

}
Expand All @@ -312,8 +317,10 @@ func (s *BatchTester) TestBatchCodec(
msg, err := encoder.EncodeCheckpointEvent(ts)
require.Nil(t, err)
require.NotNil(t, msg)

decoder, err := newDecoder(msg.Key, msg.Value)
require.Nil(t, err)
require.NoError(t, err)

checkTSDecoder(decoder, cs[i:i+1])
}
}
Expand Down
50 changes: 37 additions & 13 deletions pkg/sink/codec/open/open_protocol_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,20 +224,44 @@ func (b *BatchDecoder) decodeNextKey() error {
}

// NewBatchDecoder creates a new BatchDecoder.
func NewBatchDecoder(key []byte, value []byte) (codec.RowEventDecoder, error) {
func NewBatchDecoder() codec.RowEventDecoder {
return &BatchDecoder{}

}

// AddKeyValue implements the RowEventDecoder interface
func (b *BatchDecoder) AddKeyValue(key, value []byte) error {
if len(b.keyBytes) != 0 || len(b.valueBytes) != 0 {
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("decoder key and value not nil")
}
version := binary.BigEndian.Uint64(key[:8])
key = key[8:]
if version != codec.BatchVersion1 {
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("unexpected key format version")
}

b.keyBytes = key
b.valueBytes = value

return nil

}

// AddKeyValue implements the RowEventDecoder interface
func (b *BatchMixedDecoder) AddKeyValue(key, value []byte) error {
if key != nil || value != nil {
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("decoder key and value not nil")
}
version := binary.BigEndian.Uint64(key[:8])
key = key[8:]
if version != codec.BatchVersion1 {
return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("unexpected key format version")
}
// if only decode one byte slice, we choose MixedDecoder
if len(key) > 0 && len(value) == 0 {
return &BatchMixedDecoder{
mixedBytes: key,
}, nil
}
return &BatchDecoder{
keyBytes: key,
valueBytes: value,
}, nil
return cerror.ErrOpenProtocolCodecInvalidData.
GenWithStack("unexpected key format version")
}

b.mixedBytes = key
return nil
}
Loading