Skip to content

Commit

Permalink
This is an automated cherry-pick of #9281
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jun 29, 2023
1 parent 631ef8c commit 6fdc31b
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 37 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewManager4Test(
changefeedEpoch uint64,
cfg *config.SchedulerConfig,
) *processor {
return newProcessor4Test(t, state, captureInfo, m.liveness, cfg)
return newProcessor4Test(t, state, captureInfo, m.liveness, cfg, false)
}
return m
}
Expand Down
13 changes: 13 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,17 @@ var _ scheduler.TableExecutor = (*processor)(nil)
// 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true.
// 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false
func (p *processor) AddTableSpan(
<<<<<<< HEAD
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool,
=======
ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool,
>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281))
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
}

startTs := checkpoint.CheckpointTs
if startTs == 0 {
log.Panic("table start ts must not be 0",
zap.String("captureID", p.captureInfo.ID),
Expand Down Expand Up @@ -145,6 +150,14 @@ func (p *processor) AddTableSpan(
// table is `prepared`, and a `isPrepare = false` request indicate that old table should
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
<<<<<<< HEAD
=======
if p.redo.r.Enabled() {
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redo.r.StartTable(span, checkpoint.ResolvedTs)
}
>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281))
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 6fdc31b

Please sign in to comment.