Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pump.Storage: fix possible panic if start ts is greater than max commit ts when pull binlog #758

Merged
merged 6 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pump/storage/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package storage

import (
"container/list"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -92,6 +93,11 @@ type sortItem struct {
tp pb.BinlogType
}

// String implements fmt.Stringer
func (s *sortItem) String() string {
return fmt.Sprintf("{start: %d, commit: %d, tp: %s}", s.start, s.commit, s.tp)
}

type sorter struct {
maxTSItemCB func(item sortItem)
// if resolver return true, we can skip the P binlog, don't need to wait for the C binlog
Expand Down
28 changes: 21 additions & 7 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL

append.handleSortItemQuit = append.handleSortItem(append.sortItems)
sorter := newSorter(func(item sortItem) {
log.Debug("sorter get item", zap.Reflect("item:", item))
log.Debug("sorter get item", zap.Stringer("item", &item))
append.sortItems <- item
})

Expand Down Expand Up @@ -246,7 +246,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
}

func (a *Append) persistHandlePointer(item sortItem) error {
log.Debug("persist item", zap.Reflect("item", item))
log.Debug("persist item", zap.Stringer("item", &item))
tsKey := encodeTSKey(item.commit)
pointerData, err := a.metadata.Get(tsKey, nil)
if err != nil {
Expand Down Expand Up @@ -307,7 +307,7 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) {
if toSave == nil {
toSave = time.After(handlePtrSaveInterval)
}
log.Debug("get sort item", zap.Reflect("item", item))
log.Debug("get sort item", zap.Stringer("item", &item))
case <-toSave:
err := a.persistHandlePointer(toSaveItem)
if err != nil {
Expand Down Expand Up @@ -841,7 +841,7 @@ func (a *Append) writeToSorter(reqs chan *request) {
defer a.wg.Done()

for req := range reqs {
log.Debug("write request to sorter", zap.Reflect("request", req))
log.Debug("write request to sorter", zap.Stringer("request", req))
var item sortItem
item.start = req.startTS
item.commit = req.commitTS
Expand Down Expand Up @@ -939,7 +939,8 @@ func (a *Append) writeToValueLog(reqs chan *request) chan *request {
if len(batch) == 0 {
return
}
log.Debug("write requests to value log", zap.Reflect("requests", batch))
br := batchRequest(batch)
log.Debug("write requests to value log", zap.Stringer("requests", &br))
beginTime := time.Now()
writeBinlogSizeHistogram.WithLabelValues("batch").Observe(float64(size))

Expand Down Expand Up @@ -1109,14 +1110,27 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
Limit: encodeTSKey(math.MaxInt64),
}

pLog := pkgutil.NewLog()
labelWrongRange := "wrong range"
pLog.Add(labelWrongRange, 10*time.Second)

go func() {
defer close(values)

for {
startTS := last + 1
limitTS := atomic.LoadInt64(&a.maxCommitTS) + 1
if startTS > limitTS {
Copy link
Contributor

@july2993 july2993 Sep 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just set limitTS to be startTS + 1 if startTS >= limitTS to by pass the bug
don't log warn log and sleep 10 second

and add some commend about the bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it cause some bug if query some binlog is greater than the max commit ts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment

Copy link
Contributor

@july2993 july2993 Sep 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct:
set limitTS to be startTS if startTS > limitTS if the bug about goleveldb will only trigger when
start > limit

[startTS, limitTS = startTS) will get no any binlog

// if range's start is greater than limit, may cause panic, see https://github.com/syndtr/goleveldb/issues/224 for detail.
pLog.Print(labelWrongRange, func() {
log.Warn("last ts is greater than pump's max commit ts", zap.Int64("last ts", startTS-1), zap.Int64("max commit ts", limitTS-1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a normal case, so don't add warn log.

like the MaxCommitTS = 10 in pump, checkpointTS = 10 for drainer,
so check for if we having newer binlogs by [checkpointTS + 1, MaxCommitTS + 1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in normal case startTS is equal to limitTS, but will not greater than it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look yes, but why will trigger this?

})
time.Sleep(time.Second)
continue
}

irange.Start = encodeTSKey(startTS)
irange.Limit = encodeTSKey(atomic.LoadInt64(&a.maxCommitTS) + 1)
irange.Limit = encodeTSKey(limitTS)
iter := a.metadata.NewIterator(irange, nil)

// log.Debugf("try to get range [%d,%d)", startTS, atomic.LoadInt64(&a.maxCommitTS)+1)
Expand Down Expand Up @@ -1377,7 +1391,7 @@ func (a *Append) writeBatchToKV(bufReqs []*request) error {
var batch leveldb.Batch
var lastPointer []byte
for _, req := range bufReqs {
log.Debug("write request to kv", zap.Reflect("request", req))
log.Debug("write request to kv", zap.Stringer("request", req))

pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ func (r *request) String() string {
return fmt.Sprintf("{ts: %d, payload len: %d, valuePointer: %+v}", r.ts(), len(r.payload), r.valuePointer)
}

type batchRequest []*request

// String implements fmt.Stringer
func (b *batchRequest) String() string {
s := new(strings.Builder)
s.WriteString("{")
for _, r := range []*request(*b) {
s.WriteString(r.String())
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
s.WriteByte(',')
}
s.WriteString("}")

return s.String()
}

type valuePointer struct {
Fid uint32
Offset int64
Expand Down