Skip to content

Commit

Permalink
This is an automated cherry-pick of #9281
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jun 29, 2023
1 parent 1677113 commit 71866fe
Show file tree
Hide file tree
Showing 11 changed files with 501 additions and 13 deletions.
4 changes: 4 additions & 0 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func NewManager4Test(
liveness *model.Liveness,
changefeedEpoch uint64,
) *processor {
<<<<<<< HEAD
return newProcessor4Test(t, state, captureInfo, createTablePipeline, m.liveness)
=======
return newProcessor4Test(t, state, captureInfo, m.liveness, cfg, false)
>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281))
}
return m
}
Expand Down
16 changes: 16 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,19 @@ func (p *processor) checkReadyForMessages() bool {
// 1. `Create Table`, a new table dispatched to the processor, `isPrepare` should be false
// 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true.
// 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false
<<<<<<< HEAD
func (p *processor) AddTable(
ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,
=======
func (p *processor) AddTableSpan(
ctx context.Context, span tablepb.Span, checkpoint tablepb.Checkpoint, isPrepare bool,
>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281))
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
}

startTs := checkpoint.CheckpointTs
if startTs == 0 {
log.Panic("table start ts must not be 0",
zap.String("captureID", p.captureInfo.ID),
Expand Down Expand Up @@ -166,12 +172,22 @@ func (p *processor) AddTable(
// table is `prepared`, and a `isPrepare = false` request indicate that old table should
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
<<<<<<< HEAD
if p.pullBasedSinking {
if err := p.sinkManager.StartTable(tableID, startTs); err != nil {
return false, errors.Trace(err)
}
} else {
p.tables[tableID].Start(startTs)
=======
if p.redo.r.Enabled() {
// ResolvedTs is store in external storage when redo log is enabled, so we need to
// start table with ResolvedTs in redoDMLManager.
p.redo.r.StartTable(span, checkpoint.ResolvedTs)
}
if err := p.sinkManager.r.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
>>>>>>> 7497ea66a8 (redo, processor(ticdc): set flushed resolvedTs when start table (#9281))
}
}
return true, nil
Expand Down
Loading

0 comments on commit 71866fe

Please sign in to comment.