From a613f85a5654a7d9a6efd01678dd19af72acd5a1 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 21 Sep 2021 21:00:11 +0200 Subject: [PATCH] ingest: Return progress in CheckpointChangeReader (#3946) Because number of ledger entries can change quickly (ex. 9-11M in 2 months) it's hard to estimate the progress of reading from buckets. The progress value can be slightly off because ledger entries are streamed in another go routine (so the reported value can be higher than actual value if there are many entries in a buffer). --- historyarchive/archive.go | 8 ++++- historyarchive/archive_pool.go | 4 +++ historyarchive/mocks.go | 5 +++ historyarchive/xdrstream.go | 34 ++++++++++++++----- ingest/checkpoint_change_reader.go | 31 ++++++++++++++++- ingest/checkpoint_change_reader_test.go | 12 +++---- .../internal/ingest/logging_change_reader.go | 10 +++++- .../internal/ingest/processor_runner.go | 2 +- 8 files changed, 87 insertions(+), 19 deletions(-) diff --git a/historyarchive/archive.go b/historyarchive/archive.go index 2332308855..9ddfb0f37a 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -9,7 +9,6 @@ import ( "context" "encoding/json" "fmt" - log "github.com/sirupsen/logrus" "io" "io/ioutil" "net/url" @@ -19,6 +18,8 @@ import ( "strings" "sync" + log "github.com/sirupsen/logrus" + "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -68,6 +69,7 @@ type ArchiveInterface interface { GetPathHAS(path string) (HistoryArchiveState, error) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error BucketExists(bucket Hash) (bool, error) + BucketSize(bucket Hash) (int64, error) CategoryCheckpointExists(cat string, chk uint32) (bool, error) GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error) GetRootHAS() (HistoryArchiveState, error) @@ -164,6 +166,10 @@ func (a *Archive) BucketExists(bucket Hash) (bool, error) { return a.backend.Exists(BucketPath(bucket)) } +func (a *Archive) BucketSize(bucket Hash) (int64, error) { + return a.backend.Size(BucketPath(bucket)) +} + func (a *Archive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { return a.backend.Exists(CategoryCheckpointPath(cat, chk)) } diff --git a/historyarchive/archive_pool.go b/historyarchive/archive_pool.go index be6f11c948..590988e483 100644 --- a/historyarchive/archive_pool.go +++ b/historyarchive/archive_pool.go @@ -76,6 +76,10 @@ func (pa ArchivePool) BucketExists(bucket Hash) (bool, error) { return pa.GetAnyArchive().BucketExists(bucket) } +func (pa ArchivePool) BucketSize(bucket Hash) (int64, error) { + return pa.GetAnyArchive().BucketSize(bucket) +} + func (pa ArchivePool) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { return pa.GetAnyArchive().CategoryCheckpointExists(cat, chk) } diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index 26c69606aa..3952211cd3 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -30,6 +30,11 @@ func (m *MockArchive) BucketExists(bucket Hash) (bool, error) { return a.Get(0).(bool), a.Error(1) } +func (m *MockArchive) BucketSize(bucket Hash) (int64, error) { + a := m.Called(bucket) + return a.Get(0).(int64), a.Error(1) +} + func (m *MockArchive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) { a := m.Called(cat, chk) return a.Get(0).(bool), a.Error(1) diff --git a/historyarchive/xdrstream.go b/historyarchive/xdrstream.go index 60647c9c8b..0681886668 100644 --- a/historyarchive/xdrstream.go +++ b/historyarchive/xdrstream.go @@ -21,6 +21,7 @@ import ( type XdrStream struct { buf bytes.Buffer + gzipReader *countReader rdr *countReader rdr2 io.ReadCloser sha256Hash hash.Hash @@ -64,7 +65,8 @@ func NewXdrStream(in io.ReadCloser) *XdrStream { } func NewXdrGzStream(in io.ReadCloser) (*XdrStream, error) { - rdr, err := gzip.NewReader(bufReadCloser(in)) + gzipCountReader := newCountReader(in) + rdr, err := gzip.NewReader(bufReadCloser(gzipCountReader)) if err != nil { in.Close() return nil, err @@ -72,6 +74,7 @@ func NewXdrGzStream(in io.ReadCloser) (*XdrStream, error) { stream := NewXdrStream(rdr) stream.rdr2 = in + stream.gzipReader = gzipCountReader return stream, nil } @@ -123,21 +126,25 @@ func (x *XdrStream) Close() error { } func (x *XdrStream) closeReaders() error { + var err error + if x.rdr != nil { - if err := x.rdr.Close(); err != nil { - if x.rdr2 != nil { - x.rdr2.Close() - } - return err + if err2 := x.rdr.Close(); err2 != nil { + err = err2 } } if x.rdr2 != nil { - if err := x.rdr2.Close(); err != nil { - return err + if err2 := x.rdr2.Close(); err2 != nil { + err = err2 + } + } + if x.gzipReader != nil { + if err2 := x.gzipReader.Close(); err2 != nil { + err = err2 } } - return nil + return err } func (x *XdrStream) ReadOne(in interface{}) error { @@ -185,6 +192,15 @@ func (x *XdrStream) BytesRead() int64 { return x.rdr.bytesRead } +// GzipBytesRead returns the number of gzip bytes read in the stream. +// Returns -1 if underlying reader is not gzipped. +func (x *XdrStream) GzipBytesRead() int64 { + if x.gzipReader == nil { + return -1 + } + return x.gzipReader.bytesRead +} + // Discard removes n bytes from the stream func (x *XdrStream) Discard(n int64) (int64, error) { return io.CopyN(ioutil.Discard, x.rdr, n) diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index bb20f63e01..2b6b23973e 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -32,6 +32,10 @@ type CheckpointChangeReader struct { closeOnce sync.Once done chan bool + readBytesMutex sync.RWMutex + totalRead int64 + totalSize int64 + // This should be set to true in tests only disableBucketListHashValidation bool sleep func(time.Duration) @@ -189,7 +193,7 @@ func (r *CheckpointChangeReader) streamBuckets() { } } - for i, hash := range buckets { + for _, hash := range buckets { exists, err := r.bucketExists(hash) if err != nil { r.readChan <- r.error( @@ -205,6 +209,20 @@ func (r *CheckpointChangeReader) streamBuckets() { return } + size, err := r.archive.BucketSize(hash) + if err != nil { + r.readChan <- r.error( + errors.Wrapf(err, "error checking bucket size: %s", hash), + ) + return + } + + r.readBytesMutex.Lock() + r.totalSize += size + r.readBytesMutex.Unlock() + } + + for i, hash := range buckets { oldestBucket := i == len(buckets)-1 if shouldContinue := r.streamBucketContents(hash, oldestBucket); !shouldContinue { break @@ -223,6 +241,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea var entry xdr.BucketEntry var err error currentPosition := stream.BytesRead() + gzipCurrentPosition := stream.GzipBytesRead() for attempts := 0; ; attempts++ { if r.ctx.Err() != nil { @@ -232,6 +251,9 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *historyarchive.XdrStrea if err == nil { err = stream.ReadOne(&entry) if err == nil || err == io.EOF { + r.readBytesMutex.Lock() + r.totalRead += stream.GzipBytesRead() - gzipCurrentPosition + r.readBytesMutex.Unlock() break } } @@ -511,6 +533,13 @@ func (r *CheckpointChangeReader) close() { close(r.done) } +// Progress returns progress reading all buckets in percents. +func (r *CheckpointChangeReader) Progress() float64 { + r.readBytesMutex.RLock() + defer r.readBytesMutex.RUnlock() + return float64(r.totalRead) / float64(r.totalSize) * 100 +} + // Close should be called when reading is finished. func (r *CheckpointChangeReader) Close() error { r.closeOnce.Do(r.close) diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index b7dc738c5d..6f580a9e0e 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -27,6 +27,7 @@ type SingleLedgerStateReaderTestSuite struct { reader *CheckpointChangeReader has historyarchive.HistoryArchiveState mockBucketExistsCall *mock.Call + mockBucketSizeCall *mock.Call } func (s *SingleLedgerStateReaderTestSuite) SetupTest() { @@ -46,6 +47,11 @@ func (s *SingleLedgerStateReaderTestSuite) SetupTest() { On("BucketExists", mock.AnythingOfType("historyarchive.Hash")). Return(true, nil).Times(21) + // BucketSize should be called 21 times (11 levels, last without `snap`) + s.mockBucketSizeCall = s.mockArchive. + On("BucketSize", mock.AnythingOfType("historyarchive.Hash")). + Return(int64(100), nil).Times(21) + s.mockArchive. On("GetCheckpointManager"). Return(historyarchive.NewCheckpointManager( @@ -228,9 +234,6 @@ func (s *SingleLedgerStateReaderTestSuite) TestMalformedProtocol11Bucket() { On("GetXdrStreamForHash", <-nextBucket). Return(curr1, nil).Once() - // BucketExists will be called only once in this test due to an error - s.mockBucketExistsCall.Once() - // Account entry _, err := s.reader.Read() s.Require().Nil(err) @@ -254,9 +257,6 @@ func (s *SingleLedgerStateReaderTestSuite) TestMalformedProtocol11BucketNoMeta() On("GetXdrStreamForHash", <-nextBucket). Return(curr1, nil).Once() - // BucketExists will be called only once in this test due to an error - s.mockBucketExistsCall.Once() - // Init entry without meta _, err := s.reader.Read() s.Require().NotNil(err) diff --git a/services/horizon/internal/ingest/logging_change_reader.go b/services/horizon/internal/ingest/logging_change_reader.go index 185c224eff..a91a269f27 100644 --- a/services/horizon/internal/ingest/logging_change_reader.go +++ b/services/horizon/internal/ingest/logging_change_reader.go @@ -1,6 +1,7 @@ package ingest import ( + "fmt" "runtime" "github.com/stellar/go/ingest" @@ -51,10 +52,17 @@ func (lcr *loggingChangeReader) Read() (ingest.Change, error) { lcr.entryCount++ if lcr.entryCount%lcr.frequency == 0 { - logger := log.WithField("numEntries", lcr.entryCount). + logger := log.WithField("processed_entries", lcr.entryCount). WithField("source", lcr.source). WithField("sequence", lcr.sequence) + if reader, ok := lcr.ChangeReader.(*ingest.CheckpointChangeReader); ok { + logger = logger.WithField( + "progress", + fmt.Sprintf("%.02f%%", reader.Progress()), + ) + } + if lcr.profile { curHeap, sysHeap := getMemStats() logger = logger. diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 298af1244d..0e195d072d 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -18,7 +18,7 @@ const ( _ = iota historyArchiveSource = ingestionSource(iota) ledgerSource = ingestionSource(iota) - logFrequency = 100000 + logFrequency = 50000 ) type horizonChangeProcessor interface {