Skip to content

Commit

Permalink
sink (ticdc): add ddl encoder for simple protocol (#10022)
Browse files Browse the repository at this point in the history
ref #9898, close #9901
  • Loading branch information
asddongmen authored Nov 13, 2023
1 parent db3a2dd commit fa64375
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 7 deletions.
1 change: 1 addition & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ type DDLEvent struct {
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
IsBootstrap bool `msg:"-"`
}

// FromJob fills the values with DDLEvent from DDL job
Expand Down
12 changes: 10 additions & 2 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {

// NextDDLEvent returns the next DDL event if exists
func (d *decoder) NextDDLEvent() (*model.DDLEvent, error) {
// TODO implement me
panic("implement me")
if d.msg.Type != DDLType && d.msg.Type != BootstrapType {
return nil, cerror.ErrCodecDecode.GenWithStack(
"not found ddl event message")
}
ddl, err := d.msg.TableSchema.ToDDLEvent(d.msg)
if err != nil {
return nil, err
}
d.msg = nil
return ddl, nil
}
13 changes: 11 additions & 2 deletions pkg/sink/codec/simple/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func (e *encoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
//
//nolint:unused
func (e *encoder) EncodeDDLEvent(event *model.DDLEvent) (*common.Message, error) {
// TODO implement me
panic("implement me")
var message *message
if event.IsBootstrap {
message = newBootstrapMessage(event)
} else {
message = newDDLMessage(event)
}
value, err := json.Marshal(message)
if err != nil {
return nil, cerror.WrapError(cerror.ErrEncodeFailed, err)
}
return common.NewDDLMsg(config.ProtocolSimple, nil, value, event), nil
}
83 changes: 83 additions & 0 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package simple
import (
"testing"

"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)
Expand All @@ -42,3 +43,85 @@ func TestEncodeCheckpoint(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(checkpoint), ts)
}

func TestEncodeDDLEvent(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.test(id int primary key, name varchar(255) not null,
age int, email varchar(255) not null, key idx_name(name), key idx_name_email(name, email))`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(1, "test", 1, job.BinlogInfo.TableInfo)
enc := NewBuilder().Build()
ddlEvent := &model.DDLEvent{
StartTs: 1,
CommitTs: 2,
TableInfo: tableInfo,
Query: sql,
Type: job.Type,
}

m, err := enc.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)

dec := NewDecoder()
err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

event, err := dec.NextDDLEvent()
require.NoError(t, err)
require.Equal(t, ddlEvent.CommitTs, event.CommitTs)
// because we don't we don't set startTs in the encoded message,
// so the startTs is equal to commitTs
require.Equal(t, ddlEvent.CommitTs, event.StartTs)
require.Equal(t, ddlEvent.Query, event.Query)
require.Equal(t, len(ddlEvent.TableInfo.Columns), len(event.TableInfo.Columns))
require.Equal(t, len(ddlEvent.TableInfo.Indices), len(event.TableInfo.Indices))
}

func TestEncodeBootstrapEvent(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.test(id int primary key, name varchar(255) not null,
age int, email varchar(255) not null, key idx_name(name), key idx_name_email(name, email))`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(1, "test", 1, job.BinlogInfo.TableInfo)
enc := NewBuilder().Build()
ddlEvent := &model.DDLEvent{
StartTs: 1,
CommitTs: 2,
TableInfo: tableInfo,
Query: sql,
Type: job.Type,
}
ddlEvent.IsBootstrap = true

m, err := enc.EncodeDDLEvent(ddlEvent)
require.NoError(t, err)

dec := NewDecoder()
err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

event, err := dec.NextDDLEvent()
require.NoError(t, err)
require.Equal(t, ddlEvent.CommitTs, event.CommitTs)
// because we don't we don't set startTs in the encoded message,
// so the startTs is equal to commitTs
require.Equal(t, ddlEvent.CommitTs, event.StartTs)
// Bootstrap event doesn't have query
require.Equal(t, "", event.Query)
require.Equal(t, len(ddlEvent.TableInfo.Columns), len(event.TableInfo.Columns))
require.Equal(t, len(ddlEvent.TableInfo.Indices), len(event.TableInfo.Indices))
}
Loading

0 comments on commit fa64375

Please sign in to comment.