Skip to content

Commit

Permalink
sync_point (ticdc): add new config and change sync point table struct (
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored Sep 7, 2022
1 parent 8a637c8 commit 6fba812
Show file tree
Hide file tree
Showing 25 changed files with 302 additions and 188 deletions.
20 changes: 9 additions & 11 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,15 @@ func verifyCreateChangefeedConfig(

// init ChangefeedInfo
info := &model.ChangeFeedInfo{
UpstreamID: up.ID,
SinkURI: changefeedConfig.SinkURI,
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Config: replicaConfig,
Engine: sortEngine,
State: model.StateNormal,
SyncPointEnabled: false,
SyncPointInterval: 10 * time.Minute,
CreatorVersion: version.ReleaseVersion,
UpstreamID: up.ID,
SinkURI: changefeedConfig.SinkURI,
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Config: replicaConfig,
Engine: sortEngine,
State: model.StateNormal,
CreatorVersion: version.ReleaseVersion,
}
f, err := filter.NewFilter(replicaConfig, "")
if err != nil {
Expand Down
36 changes: 17 additions & 19 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(

// fill replicaConfig
replicaCfg := cfg.ReplicaConfig.ToInternalReplicaConfig()

// verify replicaConfig
sinkURIParsed, err := url.Parse(cfg.SinkURI)
if err != nil {
Expand Down Expand Up @@ -241,19 +242,17 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
}

return &model.ChangeFeedInfo{
UpstreamID: pdClient.GetClusterID(ctx),
Namespace: cfg.Namespace,
ID: cfg.ID,
SinkURI: cfg.SinkURI,
CreateTime: time.Now(),
StartTs: cfg.StartTs,
TargetTs: cfg.TargetTs,
Engine: cfg.Engine,
Config: replicaCfg,
State: model.StateNormal,
SyncPointEnabled: cfg.SyncPointEnabled,
SyncPointInterval: cfg.SyncPointInterval,
CreatorVersion: version.ReleaseVersion,
UpstreamID: pdClient.GetClusterID(ctx),
Namespace: cfg.Namespace,
ID: cfg.ID,
SinkURI: cfg.SinkURI,
CreateTime: time.Now(),
StartTs: cfg.StartTs,
TargetTs: cfg.TargetTs,
Engine: cfg.Engine,
Config: replicaCfg,
State: model.StateNormal,
CreatorVersion: version.ReleaseVersion,
}, nil
}

Expand Down Expand Up @@ -310,14 +309,13 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
newInfo.TargetTs = cfg.TargetTs
}

// verify syncPoint
newInfo.SyncPointEnabled = cfg.SyncPointEnabled
if cfg.SyncPointInterval != 0 {
newInfo.SyncPointInterval = cfg.SyncPointInterval
}

// verify replica config
if cfg.ReplicaConfig != nil {
newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig()
err = newInfo.Config.ValidateAndAdjust(nil)
if err != nil {
return nil, nil, err
}
}

f, err := filter.NewFilter(newInfo.Config, "")
Expand Down
10 changes: 5 additions & 5 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 10
cfg.Engine = model.SortInMemory
cfg.ReplicaConfig = ToAPIReplicaConfig(config.GetDefaultReplicaConfig())
cfg.SyncPointEnabled = true
cfg.SyncPointInterval = 10 * time.Second
cfg.ReplicaConfig.EnableSyncPoint = true
cfg.ReplicaConfig.SyncPointInterval = 30 * time.Second
cfg.PDAddrs = []string{"a", "b"}
cfg.CertPath = "p1"
cfg.CAPath = "p2"
Expand All @@ -131,9 +131,9 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
require.Equal(t, uint64(0), newCfInfo.StartTs)
require.Equal(t, uint64(10), newCfInfo.TargetTs)
require.Equal(t, model.SortInMemory, newCfInfo.Engine)
require.Equal(t, true, newCfInfo.SyncPointEnabled)
require.Equal(t, 10*time.Second, newCfInfo.SyncPointInterval)
require.Equal(t, config.GetDefaultReplicaConfig(), newCfInfo.Config)
require.Equal(t, true, newCfInfo.Config.EnableSyncPoint)
require.Equal(t, 30*time.Second, newCfInfo.Config.SyncPointInterval)
require.Equal(t, cfg.ReplicaConfig.ToInternalReplicaConfig(), newCfInfo.Config)
require.Equal(t, "a,b", newUpInfo.PDEndpoints)
require.Equal(t, "p1", newUpInfo.CertPath)
require.Equal(t, "p2", newUpInfo.CAPath)
Expand Down
29 changes: 13 additions & 16 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
}

updateCfConfig := &ChangefeedConfig{}
updateCfConfig.SyncPointEnabled = cfInfo.SyncPointEnabled
updateCfConfig.ReplicaConfig = ToAPIReplicaConfig(cfInfo.Config)
if err = c.BindJSON(updateCfConfig); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
Expand Down Expand Up @@ -396,21 +395,19 @@ func toAPIModel(info *model.ChangeFeedInfo, maskSinkURI bool) *ChangeFeedInfo {
}

apiInfoModel := &ChangeFeedInfo{
UpstreamID: info.UpstreamID,
Namespace: info.Namespace,
ID: info.ID,
SinkURI: sinkURI,
CreateTime: info.CreateTime,
StartTs: info.StartTs,
TargetTs: info.TargetTs,
AdminJobType: info.AdminJobType,
Engine: info.Engine,
Config: ToAPIReplicaConfig(info.Config),
State: info.State,
Error: runningError,
SyncPointEnabled: info.SyncPointEnabled,
SyncPointInterval: info.SyncPointInterval,
CreatorVersion: info.CreatorVersion,
UpstreamID: info.UpstreamID,
Namespace: info.Namespace,
ID: info.ID,
SinkURI: sinkURI,
CreateTime: info.CreateTime,
StartTs: info.StartTs,
TargetTs: info.TargetTs,
AdminJobType: info.AdminJobType,
Engine: info.Engine,
Config: ToAPIReplicaConfig(info.Config),
State: info.State,
Error: runningError,
CreatorVersion: info.CreatorVersion,
}
return apiInfoModel
}
Expand Down
48 changes: 28 additions & 20 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@ type PDConfig struct {

// ChangefeedConfig use by create changefeed api
type ChangefeedConfig struct {
Namespace string `json:"namespace"`
ID string `json:"changefeed_id"`
StartTs uint64 `json:"start_ts"`
TargetTs uint64 `json:"target_ts"`
SinkURI string `json:"sink_uri"`
Engine string `json:"engine"`
ReplicaConfig *ReplicaConfig `json:"replica_config"`
SyncPointEnabled bool `json:"sync_point_enabled"`
SyncPointInterval time.Duration `json:"sync_point_interval"`
Namespace string `json:"namespace"`
ID string `json:"changefeed_id"`
StartTs uint64 `json:"start_ts"`
TargetTs uint64 `json:"target_ts"`
SinkURI string `json:"sink_uri"`
Engine string `json:"engine"`
ReplicaConfig *ReplicaConfig `json:"replica_config"`
PDConfig
}

Expand All @@ -96,6 +94,9 @@ type ReplicaConfig struct {
ForceReplicate bool `json:"force_replicate"`
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
EnableSyncPoint bool `json:"enable_sync_point"`
SyncPointInterval time.Duration `json:"sync_point_interval"`
SyncPointRetention time.Duration `json:"sync_point_retention"`
Filter *FilterConfig `json:"filter"`
Sink *SinkConfig `json:"sink"`
Consistent *ConsistentConfig `json:"consistent"`
Expand All @@ -108,6 +109,9 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
res.EnableOldValue = c.EnableOldValue
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
res.SyncPointInterval = c.SyncPointInterval
res.SyncPointRetention = c.SyncPointRetention

if c.Filter != nil {
var mySQLReplicationRules *filter.MySQLReplicationRules
Expand Down Expand Up @@ -193,6 +197,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
ForceReplicate: cloned.ForceReplicate,
IgnoreIneligibleTable: false,
CheckGCSafePoint: cloned.CheckGCSafePoint,
EnableSyncPoint: cloned.EnableSyncPoint,
SyncPointInterval: cloned.SyncPointInterval,
SyncPointRetention: cloned.SyncPointRetention,
}

if cloned.Filter != nil {
Expand Down Expand Up @@ -274,9 +281,12 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
// GetDefaultReplicaConfig returns a default ReplicaConfig
func GetDefaultReplicaConfig() *ReplicaConfig {
return &ReplicaConfig{
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
EnableSyncPoint: false,
SyncPointInterval: 10 * time.Second,
SyncPointRetention: 24 * time.Hour,
Filter: &FilterConfig{
Rules: []string{"*.*"},
},
Expand Down Expand Up @@ -436,14 +446,12 @@ type ChangeFeedInfo struct {
// The ChangeFeed will exits until sync to timestamp TargetTs
TargetTs uint64 `json:"target_ts,omitempty"`
// used for admin job notification, trigger watch event in capture
AdminJobType model.AdminJobType `json:"admin_job_type,omitempty"`
Engine string `json:"engine,omitempty"`
Config *ReplicaConfig `json:"config,omitempty"`
State model.FeedState `json:"state,omitempty"`
Error *RunningError `json:"error,omitempty"`
SyncPointEnabled bool `json:"sync_point_enabled,omitempty"`
SyncPointInterval time.Duration `json:"sync_point_interval,omitempty"`
CreatorVersion string `json:"creator_version,omitempty"`
AdminJobType model.AdminJobType `json:"admin_job_type,omitempty"`
Engine string `json:"engine,omitempty"`
Config *ReplicaConfig `json:"config,omitempty"`
State model.FeedState `json:"state,omitempty"`
Error *RunningError `json:"error,omitempty"`
CreatorVersion string `json:"creator_version,omitempty"`
}

// RunningError represents some running error from cdc components, such as processor.
Expand Down
4 changes: 1 addition & 3 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ type ChangeFeedInfo struct {
State FeedState `json:"state"`
Error *RunningError `json:"error"`

SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
CreatorVersion string `json:"creator-version"`
CreatorVersion string `json:"creator-version"`
}

const changeFeedIDMaxLen = 128
Expand Down
8 changes: 5 additions & 3 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ func TestVerifyAndComplete(t *testing.T) {
SinkURI: "blackhole://",
StartTs: 417257993615179777,
Config: &config.ReplicaConfig{
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
CaseSensitive: true,
EnableOldValue: true,
CheckGCSafePoint: true,
SyncPointInterval: time.Minute * 10,
SyncPointRetention: time.Hour * 24,
},
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ LOOP:
}

c.barriers = newBarriers()
if c.state.Info.SyncPointEnabled {
if c.state.Info.Config.EnableSyncPoint {
c.barriers.Update(syncPointBarrier, resolvedTs)
}
c.barriers.Update(ddlJobBarrier, ddlStartTs)
Expand Down Expand Up @@ -737,7 +737,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
if !blocked {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.Config.SyncPointInterval))
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ func TestEmitCheckpointTs(t *testing.T) {

func TestSyncPoint(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.SyncPointEnabled = true
ctx.ChangefeedVars().Info.SyncPointInterval = 1 * time.Second
ctx.ChangefeedVars().Info.Config.EnableSyncPoint = true
ctx.ChangefeedVars().Info.Config.SyncPointInterval = 1 * time.Second
cf, captures, tester := createChangefeed4Test(ctx, t)
defer cf.Close(ctx)

Expand Down Expand Up @@ -990,8 +990,8 @@ func TestBarrierAdvance(t *testing.T) {
for i := 0; i < 2; i++ {
ctx := cdcContext.NewBackendContext4Test(true)
if i == 1 {
ctx.ChangefeedVars().Info.SyncPointEnabled = true
ctx.ChangefeedVars().Info.SyncPointInterval = 100 * time.Second
ctx.ChangefeedVars().Info.Config.EnableSyncPoint = true
ctx.ChangefeedVars().Info.Config.SyncPointInterval = 100 * time.Second
}

cf, captures, tester := createChangefeed4Test(ctx, t)
Expand Down
10 changes: 5 additions & 5 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type DDLSink interface {

type ddlSinkImpl struct {
lastSyncPoint model.Ts
syncPointStore mysql.SyncpointStore
syncPointStore mysql.SyncPointStore

// It is used to record the checkpointTs and the names of the table at that time.
mu struct {
Expand Down Expand Up @@ -124,10 +124,10 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
a.sinkV2 = s
}

if !info.SyncPointEnabled {
if !info.Config.EnableSyncPoint {
return nil
}
syncPointStore, err := mysql.NewSyncpointStore(stdCtx, id, info.SinkURI)
syncPointStore, err := mysql.NewSyncPointStore(stdCtx, id, info.SinkURI, info.Config.SyncPointRetention)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -136,7 +136,7 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
})
a.syncPointStore = syncPointStore

if err := a.syncPointStore.CreateSynctable(stdCtx); err != nil {
if err := a.syncPointStore.CreateSyncTable(stdCtx); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -333,7 +333,7 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64)
}
s.lastSyncPoint = checkpointTs
// TODO implement async sink syncPoint
return s.syncPointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs)
return s.syncPointStore.SinkSyncPoint(ctx, ctx.ChangefeedVars().ID, checkpointTs)
}

func (s *ddlSinkImpl) close(ctx context.Context) (err error) {
Expand Down
Loading

0 comments on commit 6fba812

Please sign in to comment.