Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/flowcontrol(ticdc): make tests run and refactor code #5264

Merged
merged 6 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 24 additions & 132 deletions cdc/sink/flowcontrol/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,144 +20,36 @@ import (
"github.com/edwingeng/deque"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// TableMemoryQuota is designed to curb the total memory consumption of processing
// the event streams in a table.
// A higher-level controller more suitable for direct use by the processor is TableFlowController.
type TableMemoryQuota struct {
Quota uint64 // should not be changed once intialized

IsAborted uint32

mu sync.Mutex
Consumed uint64

cond *sync.Cond
}

// NewTableMemoryQuota creates a new TableMemoryQuota
// quota: max advised memory consumption in bytes.
func NewTableMemoryQuota(quota uint64) *TableMemoryQuota {
ret := &TableMemoryQuota{
Quota: quota,
mu: sync.Mutex{},
Consumed: 0,
}

ret.cond = sync.NewCond(&ret.mu)
return ret
}

// ConsumeWithBlocking is called when a hard-limit is needed. The method will
// block until enough memory has been freed up by Release.
// blockCallBack will be called if the function will block.
// Should be used with care to prevent deadlock.
func (c *TableMemoryQuota) ConsumeWithBlocking(nBytes uint64, blockCallBack func() error) error {
if nBytes >= c.Quota {
return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.Quota)
}

c.mu.Lock()
if c.Consumed+nBytes >= c.Quota {
c.mu.Unlock()
err := blockCallBack()
if err != nil {
return errors.Trace(err)
}
} else {
c.mu.Unlock()
}

c.mu.Lock()
defer c.mu.Unlock()

for {
if atomic.LoadUint32(&c.IsAborted) == 1 {
return cerrors.ErrFlowControllerAborted.GenWithStackByArgs()
}

if c.Consumed+nBytes < c.Quota {
break
}
c.cond.Wait()
}

c.Consumed += nBytes
return nil
}

// ForceConsume is called when blocking is not acceptable and the limit can be violated
// for the sake of avoid deadlock. It merely records the increased memory consumption.
func (c *TableMemoryQuota) ForceConsume(nBytes uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

if atomic.LoadUint32(&c.IsAborted) == 1 {
return cerrors.ErrFlowControllerAborted.GenWithStackByArgs()
}

c.Consumed += nBytes
return nil
}

// Release is called when a chuck of memory is done being used.
func (c *TableMemoryQuota) Release(nBytes uint64) {
c.mu.Lock()

if c.Consumed < nBytes {
c.mu.Unlock()
log.Panic("TableMemoryQuota: releasing more than consumed, report a bug",
zap.Uint64("consumed", c.Consumed),
zap.Uint64("released", nBytes))
}

c.Consumed -= nBytes
if c.Consumed < c.Quota {
c.mu.Unlock()
c.cond.Signal()
return
}

c.mu.Unlock()
}

// Abort interrupts any ongoing ConsumeWithBlocking call
func (c *TableMemoryQuota) Abort() {
atomic.StoreUint32(&c.IsAborted, 1)
c.cond.Signal()
}

// GetConsumption returns the current memory consumption
func (c *TableMemoryQuota) GetConsumption() uint64 {
c.mu.Lock()
defer c.mu.Unlock()

return c.Consumed
}

// TableFlowController provides a convenient interface to control the memory consumption of a per table event stream
type TableFlowController struct {
memoryQuota *TableMemoryQuota
memoryQuota *tableMemoryQuota

mu sync.Mutex
queue deque.Deque
queueMu struct {
sync.Mutex
queue deque.Deque
}

lastCommitTs uint64
}

type commitTsSizeEntry struct {
CommitTs uint64
Size uint64
commitTs uint64
size uint64
}

// NewTableFlowController creates a new TableFlowController
func NewTableFlowController(quota uint64) *TableFlowController {
return &TableFlowController{
memoryQuota: NewTableMemoryQuota(quota),
queue: deque.NewDeque(),
queueMu: struct {
sync.Mutex
queue deque.Deque
}{
queue: deque.NewDeque(),
},
}
}

Expand Down Expand Up @@ -190,11 +82,11 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
}
}

c.mu.Lock()
defer c.mu.Unlock()
c.queue.PushBack(&commitTsSizeEntry{
CommitTs: commitTs,
Size: size,
c.queueMu.Lock()
defer c.queueMu.Unlock()
c.queueMu.queue.PushBack(&commitTsSizeEntry{
commitTs: commitTs,
size: size,
})

return nil
Expand All @@ -204,16 +96,16 @@ func (c *TableFlowController) Consume(commitTs uint64, size uint64, blockCallBac
func (c *TableFlowController) Release(resolvedTs uint64) {
var nBytesToRelease uint64

c.mu.Lock()
for c.queue.Len() > 0 {
if peeked := c.queue.Front().(*commitTsSizeEntry); peeked.CommitTs <= resolvedTs {
nBytesToRelease += peeked.Size
c.queue.PopFront()
c.queueMu.Lock()
for c.queueMu.queue.Len() > 0 {
if peeked := c.queueMu.queue.Front().(*commitTsSizeEntry); peeked.commitTs <= resolvedTs {
nBytesToRelease += peeked.size
c.queueMu.queue.PopFront()
} else {
break
}
}
c.mu.Unlock()
c.queueMu.Unlock()

c.memoryQuota.Release(nBytesToRelease)
}
Expand Down
Loading