Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple (ticdc): support send all tables bootstrap message at changefeed start #11239

Merged
merged 10 commits into from
Jun 20, 2024
9 changes: 9 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.SendBootstrapToAllPartition != nil {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*c.Sink.SendBootstrapToAllPartition)
}

if c.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*c.Sink.SendAllBootstrapAtStart)
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -792,6 +796,10 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res.Sink.SendBootstrapToAllPartition = util.AddressOf(*cloned.Sink.SendBootstrapToAllPartition)
}

if cloned.Sink.SendAllBootstrapAtStart != nil {
res.Sink.SendAllBootstrapAtStart = util.AddressOf(*cloned.Sink.SendAllBootstrapAtStart)
}

if cloned.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema)
}
Expand Down Expand Up @@ -957,6 +965,7 @@ type SinkConfig struct {
SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"`
SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"`
SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"`
SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"`
OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var defaultAPIConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true},
DebeziumConfig: &DebeziumConfig{OutputOldValue: true},
Expand Down
5 changes: 5 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,8 @@
s.domain.Close()
s.storage.Close() //nolint:errcheck
}

// SchemaStorage returns the schema storage
func (s *SchemaTestHelper) SchemaStorage() SchemaStorage {
return s.schemaStorage

Check warning on line 303 in cdc/entry/schema_test_helper.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/schema_test_helper.go#L302-L303

Added lines #L302 - L303 were not covered by tests
}
4 changes: 3 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,9 @@ LOOP2:
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode))
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink,
)

// create scheduler
cfg := *c.cfg
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type mockDDLSink struct {
// whether to record the DDL history, only for rename table
recordDDLHistory bool
// a slice of DDL history, only for rename table
ddlHistory []string
ddlHistory []*model.DDLEvent
mu struct {
sync.Mutex
checkpointTs model.Ts
Expand Down Expand Up @@ -117,7 +117,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
}
}()
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, ddl.Query)
m.ddlHistory = append(m.ddlHistory, ddl)
} else {
m.ddlHistory = nil
}
Expand Down Expand Up @@ -155,6 +155,13 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) emitBootstarp(ctx context.Context, bootstrap *model.DDLEvent) error {
if m.recordDDLHistory {
m.ddlHistory = append(m.ddlHistory, bootstrap)
}
return nil
}

type mockScheduler struct {
currentTables []model.TableID
}
Expand Down
53 changes: 51 additions & 2 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -130,6 +131,9 @@

BDRMode bool
ddlResolvedTs model.Ts

sinkConfig *config.SinkConfig
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
bootstraped bool
}

func newDDLManager(
Expand All @@ -143,6 +147,7 @@
redoManager redo.DDLManager,
redoMetaManager redo.MetaManager,
bdrMode bool,
sinkConfig *config.SinkConfig,
) *ddlManager {
log.Info("owner create ddl manager",
zap.String("namespace", changefeedID.Namespace),
Expand All @@ -164,7 +169,44 @@
ddlResolvedTs: startTs,
BDRMode: bdrMode,
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
sinkConfig: sinkConfig,
}
}

func (m *ddlManager) checkAndSendBootstarpMsgs(ctx context.Context) (bool, error) {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if !m.sinkConfig.ShouldSendAllBootstrapAtStart() || m.bootstraped {
return true, nil
}
start := time.Now()
defer func() {
log.Info("send bootstrap messages finished",
zap.Stringer("changefeed", m.changfeedID),
zap.Duration("cost", time.Since(start)))
}()
// Send bootstrap messages to downstream.
tableInfo, err := m.allTables(ctx)
if err != nil {
return false, errors.Trace(err)
}

Check warning on line 190 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L189-L190

Added lines #L189 - L190 were not covered by tests
log.Info("start to send bootstrap messages",
zap.Stringer("changefeed", m.changfeedID),
zap.Int("tables", len(tableInfo)))

for _, table := range tableInfo {
if table.TableInfo.IsView() {
continue

Check warning on line 197 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L197

Added line #L197 was not covered by tests
}
ddlEvent := &model.DDLEvent{
TableInfo: table,
IsBootstrap: true,
}
err := m.ddlSink.emitBootstarp(ctx, ddlEvent)
if err != nil {
return false, errors.Trace(err)
}

Check warning on line 206 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L205-L206

Added lines #L205 - L206 were not covered by tests
}
m.bootstraped = true
return true, nil
}

// tick the ddlHandler, it does the following things:
Expand All @@ -183,6 +225,14 @@
m.justSentDDL = nil
m.checkpointTs = checkpointTs

ok, err := m.checkAndSendBootstarpMsgs(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, errors.Trace(err)
}

Check warning on line 231 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L230-L231

Added lines #L230 - L231 were not covered by tests
if !ok {
return nil, nil, nil
}

Check warning on line 234 in cdc/owner/ddl_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_manager.go#L233-L234

Added lines #L233 - L234 were not covered by tests

currentTables, err := m.allTables(ctx)
if err != nil {
return nil, nil, errors.Trace(err)
Expand Down Expand Up @@ -483,8 +533,7 @@
return barrier
}

// allTables returns all tables in the schema that
// less or equal than the checkpointTs.
// allTables returns all tables in the schema in current checkpointTs.
func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) {
if m.tableInfoCache == nil {
ts := m.getSnapshotTs()
Expand Down
51 changes: 48 additions & 3 deletions cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
false)
false,
cfg.Sink,
)
return res
}

Expand Down Expand Up @@ -246,9 +248,9 @@ func TestExecRenameTablesDDL(t *testing.T) {
}
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
mockDDLSink.ddlHistory[0].Query)
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])
mockDDLSink.ddlHistory[1].Query)

// mock all rename table statements have been done
mockDDLSink.resetDDLDone = false
Expand Down Expand Up @@ -459,3 +461,46 @@ func TestIsGlobalDDL(t *testing.T) {
require.Equal(t, c.ret, isGlobalDDL(c.ddl))
}
}

func TestCheckAndSendBootstarpMsgs(t *testing.T) {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)")
ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)")

ctx := context.Background()
dm := createDDLManagerForTest(t)
dm.schema = helper.SchemaStorage()
sendBootstrapIntervalInSec := int64(1)
sendBootstrapInMsgCount := int32(2)
protocol := "simple"
dm.sinkConfig.SendBootstrapIntervalInSec = &sendBootstrapIntervalInSec
dm.sinkConfig.SendBootstrapInMsgCount = &sendBootstrapInMsgCount
dm.sinkConfig.Protocol = &protocol
dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs

mockDDLSink := dm.ddlSink.(*mockDDLSink)
mockDDLSink.recordDDLHistory = true

// do not send all bootstrap messages
sendAllBootstrapAtStart := false
dm.sinkConfig.SendAllBootstrapAtStart = &sendAllBootstrapAtStart
send, err := dm.checkAndSendBootstarpMsgs(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
require.Nil(t, err)
require.True(t, send)
require.False(t, dm.bootstraped)
require.Equal(t, 0, len(mockDDLSink.ddlHistory))

// send all bootstrap messages -> tb1 and tb2
sendAllBootstrapAtStart = true
dm.sinkConfig.SendAllBootstrapAtStart = &sendAllBootstrapAtStart
send, err = dm.checkAndSendBootstarpMsgs(ctx)
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
require.Nil(t, err)
require.True(t, send)
require.True(t, dm.bootstraped)
require.Equal(t, 2, len(mockDDLSink.ddlHistory))
require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap)
require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap)
require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName)
require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName)
}
12 changes: 8 additions & 4 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
emitBootstarp(ctx context.Context, bootstrap *model.DDLEvent) error
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
}
Expand Down Expand Up @@ -121,10 +122,6 @@
return errors.Trace(err)
}
a.sink = s

if !util.GetOrZero(a.info.Config.EnableSyncPoint) {
return nil
}
return nil
}

Expand Down Expand Up @@ -472,3 +469,10 @@

return result, nil
}

func (s *ddlSinkImpl) emitBootstarp(ctx context.Context, bootstrap *model.DDLEvent) error {
if err := s.makeSinkReady(ctx); err != nil {
return errors.Trace(err)
}
return s.sink.WriteDDLEvent(ctx, bootstrap)

Check warning on line 477 in cdc/owner/ddl_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/owner/ddl_sink.go#L473-L477

Added lines #L473 - L477 were not covered by tests
}
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
SendBootstrapIntervalInSec: util.AddressOf(int64(120)),
SendBootstrapInMsgCount: util.AddressOf(int32(10000)),
SendBootstrapToAllPartition: util.AddressOf(true),
SendAllBootstrapAtStart: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true},
Debezium: &config.DebeziumConfig{OutputOldValue: true},
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -337,6 +338,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down Expand Up @@ -511,6 +513,7 @@ const (
"send-bootstrap-interval-in-sec": 120,
"send-bootstrap-in-msg-count": 10000,
"send-bootstrap-to-all-partition": true,
"send-all-bootstrap-at-start": false,
"debezium-disable-schema": false,
"open": {
"output-old-value": true
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var defaultReplicaConfig = &ReplicaConfig{
SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec),
SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount),
SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition),
SendAllBootstrapAtStart: util.AddressOf(DefaultSendAllBootstrapAtStart),
DebeziumDisableSchema: util.AddressOf(false),
OpenProtocol: &OpenProtocolConfig{OutputOldValue: true},
Debezium: &DebeziumConfig{OutputOldValue: true},
Expand Down
16 changes: 15 additions & 1 deletion pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@
// DefaultSendBootstrapToAllPartition is the default value of
// whether to send bootstrap message to all partitions.
DefaultSendBootstrapToAllPartition = true
// DefaultSendAllBootstrapAtStart is the default value of whether
// to send all tables bootstrap message at changefeed start.
DefaultSendAllBootstrapAtStart = false

// DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker.
// The pulsar client uses an exponential backoff with jitter to reconnect to the broker.
Expand Down Expand Up @@ -188,7 +191,8 @@
// If set to false, bootstrap message will only be sent to the first partition of each topic.
// Default value is true.
SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"`

// SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start.
SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"`
// Debezium only. Whether schema should be excluded in the output.
DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"`

Expand Down Expand Up @@ -227,6 +231,16 @@
util.GetOrZero(s.SendBootstrapInMsgCount) > 0
}

// ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start.
func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool {
if s == nil {
return false
}

Check warning on line 238 in pkg/config/sink.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/sink.go#L237-L238

Added lines #L237 - L238 were not covered by tests
should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart)
log.Info("should send all bootstrap at start", zap.Bool("should", should))
return should
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields, it can be 1 character or at most 2 characters
Expand Down
Loading
Loading