Skip to content

Commit

Permalink
codec(ticdc): simple protocol decoder support cache message and retur…
Browse files Browse the repository at this point in the history
…n table info delayed events (#10677)

close #10678
  • Loading branch information
3AceShowHand authored Mar 1, 2024
1 parent dfda907 commit 1bf1c5b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 50 deletions.
32 changes: 27 additions & 5 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram

eventGroups := make(map[int64]*eventsGroup)
for message := range claim.Messages() {
if err := decoder.AddKeyValue(message.Key, message.Value); err != nil {
if err = decoder.AddKeyValue(message.Key, message.Value); err != nil {
log.Error("add key value to the decoder failed", zap.Error(err))
return cerror.Trace(err)
}
Expand Down Expand Up @@ -629,9 +629,31 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
zap.ByteString("value", message.Value),
zap.Error(err))
}
if partition == 0 {
// the Query maybe empty if using simple protocol, it's comes from `bootstrap` event.
if partition == 0 && ddl.Query != "" {
c.appendDDL(ddl)
}

if simple, ok := decoder.(*simple.Decoder); ok {
cachedEvents := simple.GetCachedEvents()
for _, row := range cachedEvents {
var partitionID int64
if row.TableInfo.IsPartitionTable() {
partitionID = row.PhysicalTableID
}
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), partitionID)
row.TableInfo.TableName.TableID = tableID

group, ok := eventGroups[tableID]
if !ok {
group = newEventsGroup()
eventGroups[tableID] = group
}
group.Append(row)
}
}

// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")
case model.MessageTypeRow:
Expand Down Expand Up @@ -755,7 +777,7 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
log.Warn("DDL CommitTs < maxCommitTsDDL.CommitTs",
zap.Uint64("commitTs", ddl.CommitTs),
zap.Uint64("maxCommitTs", c.ddlWithMaxCommitTs.CommitTs),
zap.Any("DDL", ddl))
zap.String("DDL", ddl.Query))
return
}

Expand All @@ -764,12 +786,12 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
// the current DDL and the DDL with max CommitTs.
if ddl == c.ddlWithMaxCommitTs {
log.Info("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs",
zap.Any("DDL", ddl))
zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))
return
}

c.ddlList = append(c.ddlList, ddl)
log.Info("DDL event received", zap.Any("DDL", ddl))
log.Info("DDL event received", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query))
c.ddlWithMaxCommitTs = ddl
}

Expand Down
103 changes: 58 additions & 45 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package simple

import (
"container/list"
"context"
"database/sql"
"path/filepath"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -30,7 +30,8 @@ import (
"go.uber.org/zap"
)

type decoder struct {
// Decoder implement the RowEventDecoder interface
type Decoder struct {
config *common.Config

marshaller marshaller
Expand All @@ -41,10 +42,15 @@ type decoder struct {
value []byte
msg *message
memo TableInfoProvider

// cachedMessages is used to store the messages which does not have received corresponding table info yet.
cachedMessages *list.List
// CachedRowChangedEvents are events just decoded from the cachedMessages
CachedRowChangedEvents []*model.RowChangedEvent
}

// NewDecoder returns a new decoder
func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*decoder, error) {
// NewDecoder returns a new Decoder
func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decoder, error) {
var (
externalStorage storage.ExternalStorage
err error
Expand All @@ -67,22 +73,23 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*decode
return nil, errors.Trace(err)
}

return &decoder{
return &Decoder{
config: config,
marshaller: m,

storage: externalStorage,
upstreamTiDB: db,

memo: newMemoryTableInfoProvider(),
memo: newMemoryTableInfoProvider(),
cachedMessages: list.New(),
}, nil
}

// AddKeyValue add the received key and values to the decoder,
func (d *decoder) AddKeyValue(_, value []byte) error {
// AddKeyValue add the received key and values to the Decoder,
func (d *Decoder) AddKeyValue(_, value []byte) error {
if d.value != nil {
return cerror.ErrDecodeFailed.GenWithStack(
"decoder value already exists, not consumed yet")
"Decoder value already exists, not consumed yet")
}
value, err := common.Decompress(d.config.LargeMessageHandle.LargeMessageHandleCompression, value)
if err != nil {
Expand All @@ -93,7 +100,7 @@ func (d *decoder) AddKeyValue(_, value []byte) error {
}

// HasNext returns whether there is any event need to be consumed
func (d *decoder) HasNext() (model.MessageType, bool, error) {
func (d *Decoder) HasNext() (model.MessageType, bool, error) {
if d.value == nil {
return model.MessageTypeUnknown, false, nil
}
Expand All @@ -118,7 +125,7 @@ func (d *decoder) HasNext() (model.MessageType, bool, error) {
}

// NextResolvedEvent returns the next resolved event if exists
func (d *decoder) NextResolvedEvent() (uint64, error) {
func (d *Decoder) NextResolvedEvent() (uint64, error) {
if d.msg.Type != MessageTypeWatermark {
return 0, cerror.ErrCodecDecode.GenWithStack(
"not found resolved event message")
Expand All @@ -131,7 +138,7 @@ func (d *decoder) NextResolvedEvent() (uint64, error) {
}

// NextRowChangedEvent returns the next row changed event if exists
func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
func (d *Decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
if d.msg == nil || (d.msg.Data == nil && d.msg.Old == nil) {
return nil, cerror.ErrCodecDecode.GenWithStack(
"invalid row changed event message")
Expand All @@ -147,9 +154,14 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {

tableInfo := d.memo.Read(d.msg.Schema, d.msg.Table, d.msg.SchemaVersion)
if tableInfo == nil {
return nil, cerror.ErrCodecDecode.GenWithStack(
"cannot found the table info, schema: %s, table: %s, version: %d",
d.msg.Schema, d.msg.Table, d.msg.SchemaVersion)
log.Warn("table info not found for the event, "+
"the consumer should cache this event temporarily, and update the tableInfo after it's received",
zap.String("schema", d.msg.Schema),
zap.String("table", d.msg.Table),
zap.Uint64("version", d.msg.SchemaVersion))
d.cachedMessages.PushBack(d.msg)
d.msg = nil
return nil, nil
}

event, err := buildRowChangedEvent(d.msg, tableInfo, d.config.EnableRowChecksum)
Expand All @@ -161,7 +173,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return event, nil
}

func (d *decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (*model.RowChangedEvent, error) {
func (d *Decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (*model.RowChangedEvent, error) {
_, claimCheckFileName := filepath.Split(claimCheckLocation)
data, err := d.storage.ReadFile(context.Background(), claimCheckFileName)
if err != nil {
Expand All @@ -186,7 +198,7 @@ func (d *decoder) assembleClaimCheckRowChangedEvent(claimCheckLocation string) (
return d.NextRowChangedEvent()
}

func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowChangedEvent, error) {
func (d *Decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowChangedEvent, error) {
tableInfo := d.memo.Read(m.Schema, m.Table, m.SchemaVersion)
if tableInfo == nil {
return nil, cerror.ErrCodecDecode.GenWithStack(
Expand Down Expand Up @@ -256,7 +268,7 @@ func (d *decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
return d.NextRowChangedEvent()
}

func (d *decoder) buildData(
func (d *Decoder) buildData(
holder *common.ColumnsHolder, fieldTypeMap map[string]*types.FieldType,
) (map[string]interface{}, error) {
columnsCount := holder.Length()
Expand All @@ -278,7 +290,7 @@ func (d *decoder) buildData(
}

// NextDDLEvent returns the next DDL event if exists
func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) {
func (d *Decoder) NextDDLEvent() (*model.DDLEvent, error) {
if d.msg == nil {
return nil, cerror.ErrCodecDecode.GenWithStack(
"no message found when decode DDL event")
Expand All @@ -292,9 +304,32 @@ func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) {
d.memo.Write(ddl.TableInfo)
d.memo.Write(ddl.PreTableInfo)

for ele := d.cachedMessages.Front(); ele != nil; {
d.msg = ele.Value.(*message)
event, err := d.NextRowChangedEvent()
if err != nil {
return nil, err
}
if event == nil {
ele = ele.Next()
continue
}
d.CachedRowChangedEvents = append(d.CachedRowChangedEvents, event)

next := ele.Next()
d.cachedMessages.Remove(ele)
ele = next
}
return ddl, nil
}

// GetCachedEvents returns the cached events
func (d *Decoder) GetCachedEvents() []*model.RowChangedEvent {
result := d.CachedRowChangedEvents
d.CachedRowChangedEvents = nil
return result
}

// TableInfoProvider is used to store and read table info
// It works like a schema cache when consuming simple protocol messages
// It will store multiple versions of table info for a table
Expand Down Expand Up @@ -349,33 +384,11 @@ func (m *memoryTableInfoProvider) Read(schema, table string, version uint64) *mo
version: version,
}

// Note(dongmen): Since the decoder is only use in unit test for now,
// we don't need to consider the performance
// Just use a ticker to check if the table info is stored every second.
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

for {
entry, ok := m.memo[key]
if ok {
return entry
}
select {
case <-ticker.C:
entry, ok = m.memo[key]
if ok {
return entry
}
case <-ctx.Done():
log.Panic("table info read timeout",
zap.String("schema", schema),
zap.String("table", table),
zap.Uint64("version", version))
return nil
}
entry, ok := m.memo[key]
if ok {
return entry
}
return nil
}

type tableSchemaKey struct {
Expand Down

0 comments on commit 1bf1c5b

Please sign in to comment.