From 728af9cfb1487b7759f66c5db117d731afbb19f8 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 30 Sep 2019 13:46:27 +0800 Subject: [PATCH 1/5] don't search leveldb when last ts is greater than pump's max commit ts --- pump/storage/storage.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index d541c990f..157ee5305 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -1114,9 +1114,15 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte for { startTS := last + 1 + limitTS := atomic.LoadInt64(&a.maxCommitTS) + 1 + if startTS > limitTS { + log.Warn("last ts is greater than pump's max commit ts", zap.Int64("last ts", startTS-1), zap.Int64("max commit ts", limitTS-1)) + time.Sleep(time.Second * 10) + continue + } irange.Start = encodeTSKey(startTS) - irange.Limit = encodeTSKey(atomic.LoadInt64(&a.maxCommitTS) + 1) + irange.Limit = encodeTSKey(limitTS) iter := a.metadata.NewIterator(irange, nil) // log.Debugf("try to get range [%d,%d)", startTS, atomic.LoadInt64(&a.maxCommitTS)+1) From d45b01e9a4f7c20e970cd176dfaafcc0c4c9968f Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 30 Sep 2019 14:34:17 +0800 Subject: [PATCH 2/5] fix log --- pump/storage/sorter.go | 6 ++++++ pump/storage/storage.go | 13 +++++++------ pump/storage/vlog.go | 14 ++++++++++++++ 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pump/storage/sorter.go b/pump/storage/sorter.go index f830e4447..2403c15aa 100644 --- a/pump/storage/sorter.go +++ b/pump/storage/sorter.go @@ -15,6 +15,7 @@ package storage import ( "container/list" + "fmt" "math/rand" "sync" "sync/atomic" @@ -92,6 +93,11 @@ type sortItem struct { tp pb.BinlogType } +// String implements fmt.Stringer +func (s *sortItem) String() string { + return fmt.Sprintf("{start: %d, commit: %d, tp: %v}", s.start, s.commit, s.tp) +} + type sorter struct { maxTSItemCB func(item sortItem) // if resolver return true, we can skip the P binlog, don't need to wait for the C binlog diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 157ee5305..3e22f2d56 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -190,7 +190,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL append.handleSortItemQuit = append.handleSortItem(append.sortItems) sorter := newSorter(func(item sortItem) { - log.Debug("sorter get item", zap.Reflect("item:", item)) + log.Debug("sorter get item", zap.Stringer("item", &item)) append.sortItems <- item }) @@ -246,7 +246,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL } func (a *Append) persistHandlePointer(item sortItem) error { - log.Debug("persist item", zap.Reflect("item", item)) + log.Debug("persist item", zap.Stringer("item", &item)) tsKey := encodeTSKey(item.commit) pointerData, err := a.metadata.Get(tsKey, nil) if err != nil { @@ -307,7 +307,7 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) { if toSave == nil { toSave = time.After(handlePtrSaveInterval) } - log.Debug("get sort item", zap.Reflect("item", item)) + log.Debug("get sort item", zap.Stringer("item", &item)) case <-toSave: err := a.persistHandlePointer(toSaveItem) if err != nil { @@ -841,7 +841,7 @@ func (a *Append) writeToSorter(reqs chan *request) { defer a.wg.Done() for req := range reqs { - log.Debug("write request to sorter", zap.Reflect("request", req)) + log.Debug("write request to sorter", zap.Stringer("request", req)) var item sortItem item.start = req.startTS item.commit = req.commitTS @@ -939,7 +939,8 @@ func (a *Append) writeToValueLog(reqs chan *request) chan *request { if len(batch) == 0 { return } - log.Debug("write requests to value log", zap.Reflect("requests", batch)) + br := batchRequest(batch) + log.Debug("write requests to value log", zap.Stringer("requests", &br)) beginTime := time.Now() writeBinlogSizeHistogram.WithLabelValues("batch").Observe(float64(size)) @@ -1383,7 +1384,7 @@ func (a *Append) writeBatchToKV(bufReqs []*request) error { var batch leveldb.Batch var lastPointer []byte for _, req := range bufReqs { - log.Debug("write request to kv", zap.Reflect("request", req)) + log.Debug("write request to kv", zap.Stringer("request", req)) pointer, err := req.valuePointer.MarshalBinary() if err != nil { diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 39748e843..bdbbccfcb 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -120,6 +120,20 @@ func (r *request) String() string { return fmt.Sprintf("{ts: %d, payload len: %d, valuePointer: %+v}", r.ts(), len(r.payload), r.valuePointer) } +type batchRequest []*request + +// String implements fmt.Stringer +func (b *batchRequest) String() string { + s := new(strings.Builder) + s.WriteString("{") + for _, r := range []*request(*b) { + s.WriteString(r.String()) + } + s.WriteString("}") + + return s.String() +} + type valuePointer struct { Fid uint32 Offset int64 From c48eb256b31a0b61b538fbf9fea8cecb10c4027c Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 30 Sep 2019 14:57:11 +0800 Subject: [PATCH 3/5] Update pump/storage/sorter.go Co-Authored-By: Ian --- pump/storage/sorter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pump/storage/sorter.go b/pump/storage/sorter.go index 2403c15aa..7b712a4e2 100644 --- a/pump/storage/sorter.go +++ b/pump/storage/sorter.go @@ -95,7 +95,7 @@ type sortItem struct { // String implements fmt.Stringer func (s *sortItem) String() string { - return fmt.Sprintf("{start: %d, commit: %d, tp: %v}", s.start, s.commit, s.tp) + return fmt.Sprintf("{start: %d, commit: %d, tp: %s}", s.start, s.commit, s.tp) } type sorter struct { From a289573d7c7a6c09c4c821495d8133b1bad3e280 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 30 Sep 2019 15:21:41 +0800 Subject: [PATCH 4/5] address comment --- pump/storage/vlog.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index bdbbccfcb..9d4d32902 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -128,6 +128,7 @@ func (b *batchRequest) String() string { s.WriteString("{") for _, r := range []*request(*b) { s.WriteString(r.String()) + s.WriteByte(',') } s.WriteString("}") From c3b595f23e9615ddc5df545fb7165cc08d89ef33 Mon Sep 17 00:00:00 2001 From: WangXiangUSTC Date: Mon, 30 Sep 2019 16:06:48 +0800 Subject: [PATCH 5/5] reduce sleep time and add comment --- pump/storage/storage.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index 3e22f2d56..80f1b5884 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -1110,6 +1110,10 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte Limit: encodeTSKey(math.MaxInt64), } + pLog := pkgutil.NewLog() + labelWrongRange := "wrong range" + pLog.Add(labelWrongRange, 10*time.Second) + go func() { defer close(values) @@ -1117,8 +1121,11 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte startTS := last + 1 limitTS := atomic.LoadInt64(&a.maxCommitTS) + 1 if startTS > limitTS { - log.Warn("last ts is greater than pump's max commit ts", zap.Int64("last ts", startTS-1), zap.Int64("max commit ts", limitTS-1)) - time.Sleep(time.Second * 10) + // if range's start is greater than limit, may cause panic, see https://github.com/syndtr/goleveldb/issues/224 for detail. + pLog.Print(labelWrongRange, func() { + log.Warn("last ts is greater than pump's max commit ts", zap.Int64("last ts", startTS-1), zap.Int64("max commit ts", limitTS-1)) + }) + time.Sleep(time.Second) continue }