Skip to content

Commit

Permalink
sink/flowcontrol(ticdc): make tests run and refactor code (#5264)
Browse files Browse the repository at this point in the history
ref #5138
  • Loading branch information
Rustin170506 authored Apr 26, 2022
1 parent 83ecda9 commit a7bcd92
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 236 deletions.
170 changes: 31 additions & 139 deletions cdc/sink/flowcontrol/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,144 +20,36 @@ import (
"github.com/edwingeng/deque"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// TableMemoryQuota is designed to curb the total memory consumption of processing
// the event streams in a table.
// A higher-level controller more suitable for direct use by the processor is TableFlowController.
type TableMemoryQuota struct {
Quota uint64 // should not be changed once intialized

IsAborted uint32

mu sync.Mutex
Consumed uint64

cond *sync.Cond
}

// NewTableMemoryQuota creates a new TableMemoryQuota
// quota: max advised memory consumption in bytes.
func NewTableMemoryQuota(quota uint64) *TableMemoryQuota {
ret := &TableMemoryQuota{
Quota: quota,
mu: sync.Mutex{},
Consumed: 0,
}

ret.cond = sync.NewCond(&ret.mu)
return ret
}

// ConsumeWithBlocking is called when a hard-limit is needed. The method will
// block until enough memory has been freed up by Release.
// blockCallBack will be called if the function will block.
// Should be used with care to prevent deadlock.
func (c *TableMemoryQuota) ConsumeWithBlocking(nBytes uint64, blockCallBack func() error) error {
if nBytes >= c.Quota {
return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.Quota)
}

c.mu.Lock()
if c.Consumed+nBytes >= c.Quota {
c.mu.Unlock()
err := blockCallBack()
if err != nil {
return errors.Trace(err)
}
} else {
c.mu.Unlock()
}

c.mu.Lock()
defer c.mu.Unlock()

for {
if atomic.LoadUint32(&c.IsAborted) == 1 {
return cerrors.ErrFlowControllerAborted.GenWithStackByArgs()
}

if c.Consumed+nBytes < c.Quota {
break
}
c.cond.Wait()
}

c.Consumed += nBytes
return nil
}

// ForceConsume is called when blocking is not acceptable and the limit can be violated
// for the sake of avoid deadlock. It merely records the increased memory consumption.
func (c *TableMemoryQuota) ForceConsume(nBytes uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

if atomic.LoadUint32(&c.IsAborted) == 1 {
return cerrors.ErrFlowControllerAborted.GenWithStackByArgs()
}

c.Consumed += nBytes
return nil
}

// Release is called when a chuck of memory is done being used.
func (c *TableMemoryQuota) Release(nBytes uint64) {
c.mu.Lock()

if c.Consumed < nBytes {
c.mu.Unlock()
log.Panic("TableMemoryQuota: releasing more than consumed, report a bug",
zap.Uint64("consumed", c.Consumed),
zap.Uint64("released", nBytes))
}

c.Consumed -= nBytes
if c.Consumed < c.Quota {
c.mu.Unlock()
c.cond.Signal()
return
}

c.mu.Unlock()
}

// Abort interrupts any ongoing ConsumeWithBlocking call
func (c *TableMemoryQuota) Abort() {
atomic.StoreUint32(&c.IsAborted, 1)
c.cond.Signal()
}

// GetConsumption returns the current memory consumption
func (c *TableMemoryQuota) GetConsumption() uint64 {
c.mu.Lock()
defer c.mu.Unlock()

return c.Consumed
}

// TableFlowController provides a convenient interface to control the memory consumption of a per table event stream
type TableFlowController struct {
memoryQuota *TableMemoryQuota
memoryQuota *tableMemoryQuota

mu sync.Mutex
queue deque.Deque
queueMu struct {
sync.Mutex
queue deque.Deque
}

lastCommitTs uint64
}

type commitTsSizeEntry struct {
CommitTs uint64
Size uint64
commitTs uint64
size uint64
}

// NewTableFlowController creates a new TableFlowController
func NewTableFlowController(quota uint64) *TableFlowController {
return &TableFlowController{
memoryQuota: NewTableMemoryQuota(quota),
queue: deque.NewDeque(),
memoryQuota: newTableMemoryQuota(quota),
queueMu: struct {
sync.Mutex
queue deque.Deque
}{
queue: deque.NewDeque(),
},
}
}

Expand All @@ -174,27 +66,27 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac

if commitTs > lastCommitTs {
atomic.StoreUint64(&c.lastCommitTs, commitTs)
err := c.memoryQuota.ConsumeWithBlocking(size, blockCallBack)
err := c.memoryQuota.consumeWithBlocking(size, blockCallBack)
if err != nil {
return errors.Trace(err)
}
} else {
// Here commitTs == lastCommitTs, which means that we are not crossing
// a transaction boundary. In this situation, we use `ForceConsume` because
// a transaction boundary. In this situation, we use `forceConsume` because
// blocking the event stream mid-transaction is highly likely to cause
// a deadlock.
// TODO fix this in the future, after we figure out how to elegantly support large txns.
err := c.memoryQuota.ForceConsume(size)
err := c.memoryQuota.forceConsume(size)
if err != nil {
return errors.Trace(err)
}
}

c.mu.Lock()
defer c.mu.Unlock()
c.queue.PushBack(&commitTsSizeEntry{
CommitTs: commitTs,
Size: size,
c.queueMu.Lock()
defer c.queueMu.Unlock()
c.queueMu.queue.PushBack(&commitTsSizeEntry{
commitTs: commitTs,
size: size,
})

return nil
Expand All @@ -204,26 +96,26 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
func (c *TableFlowController) Release(resolvedTs uint64) {
var nBytesToRelease uint64

c.mu.Lock()
for c.queue.Len() > 0 {
if peeked := c.queue.Front().(*commitTsSizeEntry); peeked.CommitTs <= resolvedTs {
nBytesToRelease += peeked.Size
c.queue.PopFront()
c.queueMu.Lock()
for c.queueMu.queue.Len() > 0 {
if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs {
nBytesToRelease += peeked.size
c.queueMu.queue.PopFront()
} else {
break
}
}
c.mu.Unlock()
c.queueMu.Unlock()

c.memoryQuota.Release(nBytesToRelease)
c.memoryQuota.release(nBytesToRelease)
}

// Abort interrupts any ongoing Consume call
func (c *TableFlowController) Abort() {
c.memoryQuota.Abort()
c.memoryQuota.abort()
}

// GetConsumption returns the current memory consumption
func (c *TableFlowController) GetConsumption() uint64 {
return c.memoryQuota.GetConsumption()
return c.memoryQuota.getConsumption()
}
Loading

0 comments on commit a7bcd92

Please sign in to comment.