Skip to content

Commit

Permalink
remove force consume
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jun 6, 2022
1 parent 939570a commit fe9c91f
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 42 deletions.
6 changes: 4 additions & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type tableActor struct {
// backend mounter
mounter entry.Mounter
// backend tableSink
tableSink sink.Sink
tableSink sink.Sink
redoLogEnabled bool

pullerNode *pullerNode
sortNode *sorterNode
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewTableActor(cdcCtx cdcContext.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
redoLogEnabled bool,
targetTs model.Ts,
) (TablePipeline, error) {
config := cdcCtx.ChangefeedVars().Info.Config
Expand Down Expand Up @@ -279,7 +281,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

flowController := flowcontrol.NewTableFlowController(t.memoryQuota)
flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled)
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig,
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func TestNewTableActor(t *testing.T) {
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
}, &mockSink{}, 10)
}, &mockSink{}, false, 10)
require.NotNil(t, tbl)
require.Nil(t, err)
require.NotPanics(t, func() {
Expand All @@ -375,7 +375,7 @@ func TestNewTableActor(t *testing.T) {
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
}, &mockSink{}, 10)
}, &mockSink{}, false, 10)
require.Nil(t, tbl)
require.NotNil(t, err)

Expand Down
1 change: 1 addition & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ func (p *processor) createTablePipelineImpl(
tableName,
replicaInfo,
s,
p.redoManager.Enabled(),
p.changefeed.Info.GetTargetTs())
if err != nil {
return nil, errors.Trace(err)
Expand Down
63 changes: 38 additions & 25 deletions cdc/sink/flowcontrol/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ import (
)

const (
maxRowsPerTxn = 1024
maxSizePerTxn = 1024 * 1024 /* 1MB */
batchSize = 100
defaultRowsPerTxn = 1024
defaultSizePerTxn = 1024 * 1024 /* 1MB */
defaultBatchSize = 100
)

// TableFlowController provides a convenient interface to control the memory consumption of a per table event stream
type TableFlowController struct {
memoryQuota *tableMemoryQuota
memoryQuota *tableMemoryQuota
redoLogEnabled bool
lastCommitTs uint64

queueMu struct {
sync.Mutex
Expand All @@ -44,29 +46,40 @@ type TableFlowController struct {
batchGroupCount uint64
batchID uint64

lastCommitTs uint64
batchSize uint64
maxRowsPerTxn uint64
maxSizePerTxn uint64
}

type txnSizeEntry struct {
// txn id
startTs uint64
commitTs uint64

batchID uint64
size uint64
rowCount uint64
batchID uint64
}

// NewTableFlowController creates a new TableFlowController
func NewTableFlowController(quota uint64) *TableFlowController {
func NewTableFlowController(quota uint64, redoLogEnabled bool) *TableFlowController {
maxSizePerTxn := uint64(defaultSizePerTxn)
if maxSizePerTxn > quota && !redoLogEnabled {
maxSizePerTxn = quota
}

return &TableFlowController{
memoryQuota: newTableMemoryQuota(quota),
memoryQuota: newTableMemoryQuota(quota),
redoLogEnabled: redoLogEnabled,
queueMu: struct {
sync.Mutex
queue deque.Deque
}{
queue: deque.NewDeque(),
},
batchSize: defaultBatchSize,
maxRowsPerTxn: defaultRowsPerTxn,
maxSizePerTxn: maxSizePerTxn,
}
}

Expand Down Expand Up @@ -95,19 +108,16 @@ func (c *TableFlowController) Consume(
zap.Uint64("lastCommitTs", c.lastCommitTs))
}

if commitTs > lastCommitTs {
err := c.memoryQuota.consumeWithBlocking(size, blockingCallBack)
if err != nil {
if commitTs == lastCommitTs && c.redoLogEnabled {
// Here commitTs == lastCommitTs, which means we are not crossing transaction
// boundaries, and redo log currently does not support split transactions, hence
// we use `forceConsume` to avoid deadlock.
// TODO: fix this after we figure out how to make redo log support split txn.
if err := c.memoryQuota.forceConsume(size); err != nil {
return errors.Trace(err)
}
} else {
// Here commitTs == lastCommitTs, which means that we are not crossing
// a transaction boundary. In this situation, we use `forceConsume` because
// blocking the event stream mid-transaction is highly likely to cause
// a deadlock.
// TODO fix this in the future, after we figure out how to elegantly support large txns.
err := c.memoryQuota.forceConsume(size)
if err != nil {
if err := c.memoryQuota.consumeWithBlocking(size, blockingCallBack); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -150,29 +160,29 @@ func (c *TableFlowController) enqueueSingleMsg(
// 1. Processing a new txn with different commitTs.
if e = c.queueMu.queue.Back(); e == nil || lastCommitTs < commitTs {
atomic.StoreUint64(&c.lastCommitTs, commitTs)
c.resetBatch()
c.resetBatch(lastCommitTs, commitTs)
c.addEntry(msg, size)
return
}

// Processing txns with the same commitTs.
txnEntry := e.(*txnSizeEntry)
if txnEntry.commitTs != lastCommitTs {
log.Panic("got wrong commitTs from deque, report a bug",
log.Panic("got wrong commitTs from deque in flow control, report a bug",
zap.Uint64("lastCommitTs", c.lastCommitTs),
zap.Uint64("commitTsInDeque", txnEntry.commitTs))
}

// 2. Append row to current txn entry.
if txnEntry.batchID == c.batchID && txnEntry.startTs == msg.Row.StartTs &&
txnEntry.rowCount < maxRowsPerTxn && txnEntry.size < maxSizePerTxn {
txnEntry.rowCount < c.maxRowsPerTxn && txnEntry.size < c.maxSizePerTxn {
txnEntry.size += size
txnEntry.rowCount++
return
}

// 3. Split the txn or handle a new txn with the same commitTs.
if c.batchGroupCount+1 >= batchSize {
if c.batchGroupCount >= c.batchSize {
_ = callback()
}
c.addEntry(msg, size)
Expand All @@ -191,9 +201,12 @@ func (c *TableFlowController) addEntry(msg *model.PolymorphicEvent, size uint64)
msg.Row.SplitTxn = true
}

func (c *TableFlowController) resetBatch() {
// At least one batch for each txn.
c.batchID = 1
func (c *TableFlowController) resetBatch(lastCommitTs, commitTs uint64) {
if lastCommitTs < commitTs {
// First batch of a new txn.
// At least one batch for each txn.
c.batchID = 1
}
c.batchGroupCount = 0
}

Expand Down
Loading

0 comments on commit fe9c91f

Please sign in to comment.