Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sorter: Stabilize Unified Sorter #1210

Merged
merged 46 commits into from
Dec 24, 2020
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c588f9b
basic implementation of WorkerPool
liuzix Dec 15, 2020
b6dd64e
more implementation
liuzix Dec 16, 2020
3b0f216
more tests
liuzix Dec 16, 2020
1361920
fix errors.toml
liuzix Dec 16, 2020
2431bff
updated Unified Sorter
liuzix Dec 16, 2020
a42ffd5
change default sorter to Unified
liuzix Dec 16, 2020
a0757c5
fix goroutine leak
liuzix Dec 16, 2020
20b8763
fix bug
liuzix Dec 16, 2020
0a3044a
fix deadlock
liuzix Dec 16, 2020
fd03c77
fix
liuzix Dec 16, 2020
d5f2c1c
fix
liuzix Dec 16, 2020
20f7896
add debug panic
liuzix Dec 16, 2020
11e13a6
avoid race detector bug
liuzix Dec 17, 2020
c643e9c
fix unit test
liuzix Dec 17, 2020
98f2abf
async pool
liuzix Dec 17, 2020
31ab7ca
fix async pool
liuzix Dec 17, 2020
3868d41
fix asyncPool bug
liuzix Dec 17, 2020
3942738
optimize
liuzix Dec 17, 2020
d252c2f
fix kafka integration test
liuzix Dec 17, 2020
1c2dc5f
remove debug logs
liuzix Dec 18, 2020
113816b
exclude tests
liuzix Dec 18, 2020
a875a7c
exclude tests
liuzix Dec 18, 2020
f7f27b1
optimize
liuzix Dec 18, 2020
c36fef8
fix cleanup
liuzix Dec 18, 2020
dd2b7e4
fix integration test
liuzix Dec 18, 2020
8e00de3
Merge branch 'master' of github.com:pingcap/ticdc into zixiong-unifie…
liuzix Dec 18, 2020
b5513d4
fix integration test
liuzix Dec 18, 2020
9523591
reduce debug log
liuzix Dec 18, 2020
b845ee2
fix sort-dir in kafka_messages case
liuzix Dec 18, 2020
94b6cc1
WIP
liuzix Dec 21, 2020
c8f5d4a
add merged contexts
liuzix Dec 21, 2020
09d1253
fix imports
liuzix Dec 21, 2020
d5ea5a2
fix cancel
liuzix Dec 21, 2020
35cf992
reduce workers
liuzix Dec 21, 2020
dddf707
add configuration for workerpool size
liuzix Dec 21, 2020
83f8d51
modify integration test script
liuzix Dec 21, 2020
18d5f01
address comments
liuzix Dec 21, 2020
a53bbb5
address comments
liuzix Dec 22, 2020
938e15f
Merge branch 'master' into zixiong-unified-sorter-stable
liuzix Dec 23, 2020
78d6c7e
fix
liuzix Dec 23, 2020
049e69f
Merge branch 'zixiong-unified-sorter-stable' of github.com:liuzix/tic…
liuzix Dec 23, 2020
31c046c
fix s3 integration test
liuzix Dec 23, 2020
6679ff6
fix temp file
liuzix Dec 23, 2020
d8448f1
better file clean-up
liuzix Dec 23, 2020
ba910b8
add unit test for backend_pool
liuzix Dec 23, 2020
e1412e3
fix unit test
liuzix Dec 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ endif
ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration'
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration|testing_utils'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor|kv_gen|proto')
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration')
TEST_FILES := $$(find . -name '*_test.go' -type f | grep -vE 'vendor|kv_gen|integration|testing_utils')
CDC_PKG := github.com/pingcap/ticdc
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done)
FAILPOINT := bin/failpoint-ctl
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error {
func (info *ChangeFeedInfo) VerifyAndFix() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortInMemory
info.Engine = SortUnified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this default value be picked to 4.0?

}
if info.Config.Filter == nil {
info.Config.Filter = defaultConfig.Filter
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {

err := info.VerifyAndFix()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortInMemory)
c.Assert(info.Engine, check.Equals, SortUnified)

marshalConfig1, err := info.Config.Marshal()
c.Assert(err, check.IsNil)
Expand Down
4 changes: 3 additions & 1 deletion cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
}

fname := fmt.Sprintf("%s/sort-%d-%d", p.dir, os.Getpid(), atomic.AddUint64(&p.fileNameCounter, 1))
log.Debug("Unified Sorter: trying to create file backEnd", zap.String("filename", fname))
log.Debug("Unified Sorter: trying to create file backEnd",
zap.String("filename", fname),
zap.String("table", tableNameFromCtx(ctx)))

ret, err := newFileBackEnd(fname, &msgPackGenSerde{})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cdc/puller/sorter/file_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd
}

func (f *fileBackEnd) reader() (backEndReader, error) {
fd, err := os.OpenFile(f.fileName, os.O_RDONLY, 0o644)
fd, err := os.OpenFile(f.fileName, os.O_RDWR, 0o644)
liuzix marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
258 changes: 147 additions & 111 deletions cdc/puller/sorter/heap_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ package sorter
import (
"container/heap"
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/workerpool"
"go.uber.org/zap"
)

Expand All @@ -50,6 +53,9 @@ type heapSorter struct {
inputCh chan *model.PolymorphicEvent
outputCh chan *flushTask
heap sortHeap

poolHandle workerpool.EventHandle
internalState *heapSorterInternalState
}

func newHeapSorter(id int, out chan *flushTask) *heapSorter {
Expand All @@ -61,30 +67,43 @@ func newHeapSorter(id int, out chan *flushTask) *heapSorter {
}
}

// flush should only be called within the main loop in run().
// flush should only be called in the same goroutine where the heap is being written to.
func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
_, tableName := util.TableIDFromCtx(ctx)
sorterFlushCountHistogram.WithLabelValues(captureAddr, changefeedID, tableName).Observe(float64(h.heap.Len()))

isEmptyFlush := h.heap.Len() == 0
if isEmptyFlush {
return nil
}
var (
backEnd backEnd
lowerBound uint64
)

if h.heap.Len() > 0 {
lowerBound = h.heap[0].entry.CRTs
} else {
return nil
}

// We check if the heap contains only one entry and that entry is a ResolvedEvent.
// As an optimization, when the condition is true, we clear the heap and send an empty flush.
// Sending an empty flush saves CPU and potentially IO.
// Since when a table is mostly idle or near-idle, most flushes would contain one ResolvedEvent alone,
// this optimization will greatly improve performance when (1) total number of table is large,
// and (2) most tables do not have many events.
if h.heap.Len() == 1 && h.heap[0].entry.RawKV.OpType == model.OpTypeResolved {
liuzix marked this conversation as resolved.
Show resolved Hide resolved
h.heap.Pop()
}

isEmptyFlush := h.heap.Len() == 0
var finishCh chan error
if !isEmptyFlush {
var err error
backEnd, err = pool.alloc(ctx)
if err != nil {
return errors.Trace(err)
}

lowerBound = h.heap[0].entry.CRTs
finishCh = make(chan error, 1)
}

task := &flushTask{
Expand All @@ -93,7 +112,7 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
backend: backEnd,
tsLowerBound: lowerBound,
maxResolvedTs: maxResolvedTs,
finished: make(chan error, 2),
finished: finishCh,
}
h.taskCounter++

Expand All @@ -113,73 +132,75 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
return nil
}
}
failpoint.Inject("sorterDebug", func() {
log.Debug("Unified Sorter new flushTask",
zap.String("table", tableNameFromCtx(ctx)),
zap.Int("heap-id", task.heapSorterID),
zap.Uint64("resolvedTs", task.maxResolvedTs))
})

log.Debug("Unified Sorter new flushTask",
zap.String("table", tableNameFromCtx(ctx)),
zap.Int("heap-id", task.heapSorterID),
zap.Uint64("resolvedTs", task.maxResolvedTs))

go func() {
if isEmptyFlush {
return
}
if !isEmptyFlush {
backEndFinal := backEnd
writer, err := backEnd.writer()
if err != nil {
if backEndFinal != nil {
_ = task.dealloc()
err := heapSorterIOPool.Go(ctx, func() {
writer, err := backEnd.writer()
if err != nil {
if backEndFinal != nil {
_ = task.dealloc()
}
task.finished <- errors.Trace(err)
return
}
task.finished <- errors.Trace(err)
return
}

defer func() {
// handle errors (or aborts) gracefully to prevent resource leaking (especially FD's)
if writer != nil {
_ = writer.flushAndClose()
}
if backEndFinal != nil {
_ = task.dealloc()
}
close(task.finished)
}()
defer func() {
// handle errors (or aborts) gracefully to prevent resource leaking (especially FD's)
if writer != nil {
_ = writer.flushAndClose()
}
if backEndFinal != nil {
_ = task.dealloc()
}
close(task.finished)
}()

for oldHeap.Len() > 0 {
select {
case <-ctx.Done():
task.finished <- ctx.Err()
default:
for oldHeap.Len() > 0 {
event := heap.Pop(&oldHeap).(*sortItem).entry
err := writer.writeNext(event)
if err != nil {
task.finished <- errors.Trace(err)
return
}
}

event := heap.Pop(&oldHeap).(*sortItem).entry
err := writer.writeNext(event)
dataSize := writer.dataSize()
atomic.StoreInt64(&task.dataSize, int64(dataSize))
eventCount := writer.writtenCount()

writer1 := writer
writer = nil
err = writer1.flushAndClose()
if err != nil {
task.finished <- errors.Trace(err)
return
}
}

dataSize := writer.dataSize()
atomic.StoreInt64(&task.dataSize, int64(dataSize))
eventCount := writer.writtenCount()
backEndFinal = nil

writer1 := writer
writer = nil
err = writer1.flushAndClose()
failpoint.Inject("sorterDebug", func() {
log.Debug("Unified Sorter flushTask finished",
zap.Int("heap-id", task.heapSorterID),
zap.String("table", tableNameFromCtx(ctx)),
zap.Uint64("resolvedTs", task.maxResolvedTs),
zap.Uint64("data-size", dataSize),
zap.Int("size", eventCount))
})

task.finished <- nil // DO NOT access `task` beyond this point in this function
})
if err != nil {
task.finished <- errors.Trace(err)
return
close(task.finished)
return errors.Trace(err)
}

backEndFinal = nil
task.finished <- nil // DO NOT access `task` beyond this point in this function
log.Debug("Unified Sorter flushTask finished",
zap.Int("heap-id", task.heapSorterID),
zap.String("table", tableNameFromCtx(ctx)),
zap.Uint64("resolvedTs", task.maxResolvedTs),
zap.Uint64("data-size", dataSize),
zap.Int("size", eventCount))
}()
}

select {
case <-ctx.Done():
Expand All @@ -189,63 +210,78 @@ func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error {
return nil
}

func (h *heapSorter) run(ctx context.Context) error {
var (
maxResolved uint64
heapSizeBytesEstimate int64
rateCounter int
)
var (
heapSorterPool workerpool.WorkerPool
heapSorterIOPool workerpool.AsyncPool
poolOnce sync.Once
)

rateTicker := time.NewTicker(1 * time.Second)
defer rateTicker.Stop()

flushTicker := time.NewTicker(5 * time.Second)
defer flushTicker.Stop()

sorterConfig := config.GetSorterConfig()
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-h.inputCh:
heap.Push(&h.heap, &sortItem{entry: event})
isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved

if isResolvedEvent {
if event.RawKV.CRTs < maxResolved {
log.Panic("ResolvedTs regression, bug?", zap.Uint64("event-resolvedTs", event.RawKV.CRTs),
zap.Uint64("max-resolvedTs", maxResolved))
}
maxResolved = event.RawKV.CRTs
}
type heapSorterInternalState struct {
maxResolved uint64
heapSizeBytesEstimate int64
rateCounter int
sorterConfig *config.SorterConfig
timerMultiplier int
}

if event.RawKV.CRTs < maxResolved {
log.Panic("Bad input to sorter", zap.Uint64("cur-ts", event.RawKV.CRTs), zap.Uint64("maxResolved", maxResolved))
func (h *heapSorter) init(ctx context.Context, onError func(err error)) {
state := &heapSorterInternalState{
sorterConfig: config.GetSorterConfig(),
}

poolHandle := heapSorterPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error {
event := eventI.(*model.PolymorphicEvent)
heap.Push(&h.heap, &sortItem{entry: event})
isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved

if isResolvedEvent {
if event.RawKV.CRTs < state.maxResolved {
log.Panic("ResolvedTs regression, bug?", zap.Uint64("event-resolvedTs", event.RawKV.CRTs),
zap.Uint64("max-resolvedTs", state.maxResolved))
}
state.maxResolved = event.RawKV.CRTs
}

// 5 * 8 is for the 5 fields in PolymorphicEvent
heapSizeBytesEstimate += event.RawKV.ApproximateSize() + 40
needFlush := heapSizeBytesEstimate >= int64(sorterConfig.ChunkSizeLimit) ||
(isResolvedEvent && rateCounter < flushRateLimitPerSecond)
if event.RawKV.CRTs < state.maxResolved {
log.Panic("Bad input to sorter", zap.Uint64("cur-ts", event.RawKV.CRTs), zap.Uint64("maxResolved", state.maxResolved))
}

if needFlush {
rateCounter++
err := h.flush(ctx, maxResolved)
if err != nil {
return errors.Trace(err)
}
heapSizeBytesEstimate = 0
// 5 * 8 is for the 5 fields in PolymorphicEvent
state.heapSizeBytesEstimate += event.RawKV.ApproximateSize() + 40
needFlush := state.heapSizeBytesEstimate >= int64(state.sorterConfig.ChunkSizeLimit) ||
(isResolvedEvent && state.rateCounter < flushRateLimitPerSecond)

if needFlush {
state.rateCounter++
err := h.flush(ctx, state.maxResolved)
if err != nil {
return errors.Trace(err)
}
case <-flushTicker.C:
if rateCounter < flushRateLimitPerSecond {
err := h.flush(ctx, maxResolved)
if err != nil {
return errors.Trace(err)
}
heapSizeBytesEstimate = 0
state.heapSizeBytesEstimate = 0
}

return nil
}).SetTimer(ctx, 1*time.Second, func(ctx context.Context) error {
state.rateCounter = 0
state.timerMultiplier = (state.timerMultiplier + 1) % 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why adding the timerMultiplier here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to trigger a flush periodically even if there is no resolved event passed down from TiKV, mainly for breaking up incremental scans. But the 1 second period used for rate limiting was too short, which lead to performance degradation when the incremental scan is huge, so I chose the flush period to be 5 seconds.

if state.timerMultiplier == 0 && state.rateCounter < flushRateLimitPerSecond {
err := h.flush(ctx, state.maxResolved)
if err != nil {
return errors.Trace(err)
}
case <-rateTicker.C:
rateCounter = 0
state.heapSizeBytesEstimate = 0
}
}
return nil
}).OnExit(onError)

h.poolHandle = poolHandle
h.internalState = state
}

func lazyInitPool() {
poolOnce.Do(func() {
sorterConfig := config.GetSorterConfig()
heapSorterPool = workerpool.NewDefaultWorkerPool(sorterConfig.NumWorkerPoolGoroutine)
heapSorterIOPool = workerpool.NewDefaultAsyncPool(sorterConfig.NumWorkerPoolGoroutine * 2)
})
}
Loading