diff --git a/pump/storage/sorter.go b/pump/storage/sorter.go index f830e4447..7b712a4e2 100644 --- a/pump/storage/sorter.go +++ b/pump/storage/sorter.go @@ -15,6 +15,7 @@ package storage import ( "container/list" + "fmt" "math/rand" "sync" "sync/atomic" @@ -92,6 +93,11 @@ type sortItem struct { tp pb.BinlogType } +// String implements fmt.Stringer +func (s *sortItem) String() string { + return fmt.Sprintf("{start: %d, commit: %d, tp: %s}", s.start, s.commit, s.tp) +} + type sorter struct { maxTSItemCB func(item sortItem) // if resolver return true, we can skip the P binlog, don't need to wait for the C binlog diff --git a/pump/storage/storage.go b/pump/storage/storage.go index d541c990f..80f1b5884 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -190,7 +190,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL append.handleSortItemQuit = append.handleSortItem(append.sortItems) sorter := newSorter(func(item sortItem) { - log.Debug("sorter get item", zap.Reflect("item:", item)) + log.Debug("sorter get item", zap.Stringer("item", &item)) append.sortItems <- item }) @@ -246,7 +246,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL } func (a *Append) persistHandlePointer(item sortItem) error { - log.Debug("persist item", zap.Reflect("item", item)) + log.Debug("persist item", zap.Stringer("item", &item)) tsKey := encodeTSKey(item.commit) pointerData, err := a.metadata.Get(tsKey, nil) if err != nil { @@ -307,7 +307,7 @@ func (a *Append) handleSortItem(items <-chan sortItem) (quit chan struct{}) { if toSave == nil { toSave = time.After(handlePtrSaveInterval) } - log.Debug("get sort item", zap.Reflect("item", item)) + log.Debug("get sort item", zap.Stringer("item", &item)) case <-toSave: err := a.persistHandlePointer(toSaveItem) if err != nil { @@ -841,7 +841,7 @@ func (a *Append) writeToSorter(reqs chan *request) { defer a.wg.Done() for req := range reqs { - log.Debug("write request to sorter", zap.Reflect("request", req)) + log.Debug("write request to sorter", zap.Stringer("request", req)) var item sortItem item.start = req.startTS item.commit = req.commitTS @@ -939,7 +939,8 @@ func (a *Append) writeToValueLog(reqs chan *request) chan *request { if len(batch) == 0 { return } - log.Debug("write requests to value log", zap.Reflect("requests", batch)) + br := batchRequest(batch) + log.Debug("write requests to value log", zap.Stringer("requests", &br)) beginTime := time.Now() writeBinlogSizeHistogram.WithLabelValues("batch").Observe(float64(size)) @@ -1109,14 +1110,27 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte Limit: encodeTSKey(math.MaxInt64), } + pLog := pkgutil.NewLog() + labelWrongRange := "wrong range" + pLog.Add(labelWrongRange, 10*time.Second) + go func() { defer close(values) for { startTS := last + 1 + limitTS := atomic.LoadInt64(&a.maxCommitTS) + 1 + if startTS > limitTS { + // if range's start is greater than limit, may cause panic, see https://github.com/syndtr/goleveldb/issues/224 for detail. + pLog.Print(labelWrongRange, func() { + log.Warn("last ts is greater than pump's max commit ts", zap.Int64("last ts", startTS-1), zap.Int64("max commit ts", limitTS-1)) + }) + time.Sleep(time.Second) + continue + } irange.Start = encodeTSKey(startTS) - irange.Limit = encodeTSKey(atomic.LoadInt64(&a.maxCommitTS) + 1) + irange.Limit = encodeTSKey(limitTS) iter := a.metadata.NewIterator(irange, nil) // log.Debugf("try to get range [%d,%d)", startTS, atomic.LoadInt64(&a.maxCommitTS)+1) @@ -1377,7 +1391,7 @@ func (a *Append) writeBatchToKV(bufReqs []*request) error { var batch leveldb.Batch var lastPointer []byte for _, req := range bufReqs { - log.Debug("write request to kv", zap.Reflect("request", req)) + log.Debug("write request to kv", zap.Stringer("request", req)) pointer, err := req.valuePointer.MarshalBinary() if err != nil { diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 39748e843..9d4d32902 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -120,6 +120,21 @@ func (r *request) String() string { return fmt.Sprintf("{ts: %d, payload len: %d, valuePointer: %+v}", r.ts(), len(r.payload), r.valuePointer) } +type batchRequest []*request + +// String implements fmt.Stringer +func (b *batchRequest) String() string { + s := new(strings.Builder) + s.WriteString("{") + for _, r := range []*request(*b) { + s.WriteString(r.String()) + s.WriteByte(',') + } + s.WriteString("}") + + return s.String() +} + type valuePointer struct { Fid uint32 Offset int64