Skip to content

Commit

Permalink
ingest: Return progress in CheckpointChangeReader (#3946)
Browse files Browse the repository at this point in the history
Because number of ledger entries can change quickly (ex. 9-11M in 2 months) it's
hard to estimate the progress of reading from buckets. 

The progress value can be slightly off because ledger entries are streamed in
another go routine (so the reported value can be higher than actual value if
there are many entries in a buffer).
  • Loading branch information
bartekn authored Sep 21, 2021
1 parent 69a0999 commit a613f85
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 19 deletions.
8 changes: 7 additions & 1 deletion historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"io"
"io/ioutil"
"net/url"
Expand All @@ -19,6 +18,8 @@ import (
"strings"
"sync"

log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand Down Expand Up @@ -68,6 +69,7 @@ type ArchiveInterface interface {
GetPathHAS(path string) (HistoryArchiveState, error)
PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error
BucketExists(bucket Hash) (bool, error)
BucketSize(bucket Hash) (int64, error)
CategoryCheckpointExists(cat string, chk uint32) (bool, error)
GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error)
GetRootHAS() (HistoryArchiveState, error)
Expand Down Expand Up @@ -164,6 +166,10 @@ func (a *Archive) BucketExists(bucket Hash) (bool, error) {
return a.backend.Exists(BucketPath(bucket))
}

func (a *Archive) BucketSize(bucket Hash) (int64, error) {
return a.backend.Size(BucketPath(bucket))
}

func (a *Archive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
return a.backend.Exists(CategoryCheckpointPath(cat, chk))
}
Expand Down
4 changes: 4 additions & 0 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) {
return pa.GetAnyArchive().BucketExists(bucket)
}

func (pa ArchivePool) BucketSize(bucket Hash) (int64, error) {
return pa.GetAnyArchive().BucketSize(bucket)
}

func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk)
}
Expand Down
5 changes: 5 additions & 0 deletions historyarchive/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (m *MockArchive) BucketExists(bucket Hash) (bool, error) {
return a.Get(0).(bool), a.Error(1)
}

func (m *MockArchive) BucketSize(bucket Hash) (int64, error) {
a := m.Called(bucket)
return a.Get(0).(int64), a.Error(1)
}

func (m *MockArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
a := m.Called(cat, chk)
return a.Get(0).(bool), a.Error(1)
Expand Down
34 changes: 25 additions & 9 deletions historyarchive/xdrstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type XdrStream struct {
buf bytes.Buffer
gzipReader *countReader
rdr *countReader
rdr2 io.ReadCloser
sha256Hash hash.Hash
Expand Down Expand Up @@ -64,14 +65,16 @@ func NewXdrStream(in io.ReadCloser) *XdrStream {
}

func NewXdrGzStream(in io.ReadCloser) (*XdrStream, error) {
rdr, err := gzip.NewReader(bufReadCloser(in))
gzipCountReader := newCountReader(in)
rdr, err := gzip.NewReader(bufReadCloser(gzipCountReader))
if err != nil {
in.Close()
return nil, err
}

stream := NewXdrStream(rdr)
stream.rdr2 = in
stream.gzipReader = gzipCountReader
return stream, nil
}

Expand Down Expand Up @@ -123,21 +126,25 @@ func (x *XdrStream) Close() error {
}

func (x *XdrStream) closeReaders() error {
var err error

if x.rdr != nil {
if err := x.rdr.Close(); err != nil {
if x.rdr2 != nil {
x.rdr2.Close()
}
return err
if err2 := x.rdr.Close(); err2 != nil {
err = err2
}
}
if x.rdr2 != nil {
if err := x.rdr2.Close(); err != nil {
return err
if err2 := x.rdr2.Close(); err2 != nil {
err = err2
}
}
if x.gzipReader != nil {
if err2 := x.gzipReader.Close(); err2 != nil {
err = err2
}
}

return nil
return err
}

func (x *XdrStream) ReadOne(in interface{}) error {
Expand Down Expand Up @@ -185,6 +192,15 @@ func (x *XdrStream) BytesRead() int64 {
return x.rdr.bytesRead
}

// GzipBytesRead returns the number of gzip bytes read in the stream.
// Returns -1 if underlying reader is not gzipped.
func (x *XdrStream) GzipBytesRead() int64 {
if x.gzipReader == nil {
return -1
}
return x.gzipReader.bytesRead
}

// Discard removes n bytes from the stream
func (x *XdrStream) Discard(n int64) (int64, error) {
return io.CopyN(ioutil.Discard, x.rdr, n)
Expand Down
31 changes: 30 additions & 1 deletion ingest/checkpoint_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type CheckpointChangeReader struct {
closeOnce sync.Once
done chan bool

readBytesMutex sync.RWMutex
totalRead int64
totalSize int64

// This should be set to true in tests only
disableBucketListHashValidation bool
sleep func(time.Duration)
Expand Down Expand Up @@ -189,7 +193,7 @@ func (r *CheckpointChangeReader) streamBuckets() {
}
}

for i, hash := range buckets {
for _, hash := range buckets {
exists, err := r.bucketExists(hash)
if err != nil {
r.readChan <- r.error(
Expand All @@ -205,6 +209,20 @@ func (r *CheckpointChangeReader) streamBuckets() {
return
}

size, err := r.archive.BucketSize(hash)
if err != nil {
r.readChan <- r.error(
errors.Wrapf(err, "error checking bucket size: %s", hash),
)
return
}

r.readBytesMutex.Lock()
r.totalSize += size
r.readBytesMutex.Unlock()
}

for i, hash := range buckets {
oldestBucket := i == len(buckets)-1
if shouldContinue := r.streamBucketContents(hash, oldestBucket); !shouldContinue {
break
Expand All @@ -223,6 +241,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea
var entry xdr.BucketEntry
var err error
currentPosition := stream.BytesRead()
gzipCurrentPosition := stream.GzipBytesRead()

for attempts := 0; ; attempts++ {
if r.ctx.Err() != nil {
Expand All @@ -232,6 +251,9 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea
if err == nil {
err = stream.ReadOne(&entry)
if err == nil || err == io.EOF {
r.readBytesMutex.Lock()
r.totalRead += stream.GzipBytesRead() - gzipCurrentPosition
r.readBytesMutex.Unlock()
break
}
}
Expand Down Expand Up @@ -511,6 +533,13 @@ func (r *CheckpointChangeReader) close() {
close(r.done)
}

// Progress returns progress reading all buckets in percents.
func (r *CheckpointChangeReader) Progress() float64 {
r.readBytesMutex.RLock()
defer r.readBytesMutex.RUnlock()
return float64(r.totalRead) / float64(r.totalSize) * 100
}

// Close should be called when reading is finished.
func (r *CheckpointChangeReader) Close() error {
r.closeOnce.Do(r.close)
Expand Down
12 changes: 6 additions & 6 deletions ingest/checkpoint_change_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type SingleLedgerStateReaderTestSuite struct {
reader *CheckpointChangeReader
has historyarchive.HistoryArchiveState
mockBucketExistsCall *mock.Call
mockBucketSizeCall *mock.Call
}

func (s *SingleLedgerStateReaderTestSuite) SetupTest() {
Expand All @@ -46,6 +47,11 @@ func (s *SingleLedgerStateReaderTestSuite) SetupTest() {
On("BucketExists", mock.AnythingOfType("historyarchive.Hash")).
Return(true, nil).Times(21)

// BucketSize should be called 21 times (11 levels, last without `snap`)
s.mockBucketSizeCall = s.mockArchive.
On("BucketSize", mock.AnythingOfType("historyarchive.Hash")).
Return(int64(100), nil).Times(21)

s.mockArchive.
On("GetCheckpointManager").
Return(historyarchive.NewCheckpointManager(
Expand Down Expand Up @@ -228,9 +234,6 @@ func (s *SingleLedgerStateReaderTestSuite) TestMalformedProtocol11Bucket() {
On("GetXdrStreamForHash", <-nextBucket).
Return(curr1, nil).Once()

// BucketExists will be called only once in this test due to an error
s.mockBucketExistsCall.Once()

// Account entry
_, err := s.reader.Read()
s.Require().Nil(err)
Expand All @@ -254,9 +257,6 @@ func (s *SingleLedgerStateReaderTestSuite) TestMalformedProtocol11BucketNoMeta()
On("GetXdrStreamForHash", <-nextBucket).
Return(curr1, nil).Once()

// BucketExists will be called only once in this test due to an error
s.mockBucketExistsCall.Once()

// Init entry without meta
_, err := s.reader.Read()
s.Require().NotNil(err)
Expand Down
10 changes: 9 additions & 1 deletion services/horizon/internal/ingest/logging_change_reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingest

import (
"fmt"
"runtime"

"github.com/stellar/go/ingest"
Expand Down Expand Up @@ -51,10 +52,17 @@ func (lcr *loggingChangeReader) Read() (ingest.Change, error) {
lcr.entryCount++

if lcr.entryCount%lcr.frequency == 0 {
logger := log.WithField("numEntries", lcr.entryCount).
logger := log.WithField("processed_entries", lcr.entryCount).
WithField("source", lcr.source).
WithField("sequence", lcr.sequence)

if reader, ok := lcr.ChangeReader.(*ingest.CheckpointChangeReader); ok {
logger = logger.WithField(
"progress",
fmt.Sprintf("%.02f%%", reader.Progress()),
)
}

if lcr.profile {
curHeap, sysHeap := getMemStats()
logger = logger.
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
_ = iota
historyArchiveSource = ingestionSource(iota)
ledgerSource = ingestionSource(iota)
logFrequency = 100000
logFrequency = 50000
)

type horizonChangeProcessor interface {
Expand Down

0 comments on commit a613f85

Please sign in to comment.