diff --git a/exp/ingest/adapters/history_archive_adapter.go b/exp/ingest/adapters/history_archive_adapter.go index db0265c22f..884fe8b188 100644 --- a/exp/ingest/adapters/history_archive_adapter.go +++ b/exp/ingest/adapters/history_archive_adapter.go @@ -7,7 +7,7 @@ import ( "github.com/stellar/go/support/historyarchive" ) -const msrBufferSize = 10 +const msrBufferSize = 50000 // HistoryArchiveAdapter is an adapter for the historyarchive package to read from history archives type HistoryArchiveAdapter struct { @@ -30,7 +30,7 @@ func (haa *HistoryArchiveAdapter) GetLatestLedgerSequence() (uint32, error) { } // GetState returns a reader with the state of the ledger at the provided sequence number -func (haa *HistoryArchiveAdapter) GetState(sequence uint32) (io.StateReader, error) { +func (haa *HistoryArchiveAdapter) GetState(sequence uint32) (io.StateReadCloser, error) { if !haa.archive.CategoryCheckpointExists("history", sequence) { return nil, fmt.Errorf("history checkpoint does not exist for ledger %d", sequence) } diff --git a/exp/ingest/adapters/history_archive_adapter_test.go b/exp/ingest/adapters/history_archive_adapter_test.go index f3996fc826..8dc4376a8a 100644 --- a/exp/ingest/adapters/history_archive_adapter_test.go +++ b/exp/ingest/adapters/history_archive_adapter_test.go @@ -5,6 +5,7 @@ import ( "log" "testing" + "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/support/historyarchive" "github.com/stretchr/testify/assert" @@ -56,11 +57,11 @@ func TestGetState_Read(t *testing.T) { return } - ok, le, e := sr.Read() + le, e := sr.Read() if !assert.NoError(t, e) { return } - assert.Equal(t, ok, true) + assert.NotEqual(t, e, io.EOF) log.Printf("%v\n", le) if !assert.NotNil(t, le) { diff --git a/exp/ingest/io/main.go b/exp/ingest/io/main.go new file mode 100644 index 0000000000..5e3c418170 --- /dev/null +++ b/exp/ingest/io/main.go @@ -0,0 +1,35 @@ +package io + +import ( + "io" + + "github.com/stellar/go/xdr" +) + +var EOF = io.EOF +var ErrClosedPipe = io.ErrClosedPipe + +// StateReadCloser interface placeholder +type StateReadCloser interface { + GetSequence() uint32 + // Read should return next ledger entry. If there are no more + // entries it should return `EOF` error. + Read() (xdr.LedgerEntry, error) + // Close should be called when reading is finished. This is especially + // helpful when there are still some entries available so reader can stop + // streaming them. + Close() error +} + +// StateWriteCloser interface placeholder +type StateWriteCloser interface { + // Write is used to pass ledger entry to the next processor. It can return + // `ErrClosedPipe` when the pipe between processors has been closed meaning + // that next processor does not need more data. In such situation the current + // processor can terminate as sending more entries to a `StateWriteCloser` + // does not make sense (will not be read). + Write(xdr.LedgerEntry) error + // Close should be called when there are no more entries + // to write. + Close() error +} diff --git a/exp/ingest/io/memory_state_reader.go b/exp/ingest/io/memory_state_reader.go index 674629a59b..b2c5cd5652 100644 --- a/exp/ingest/io/memory_state_reader.go +++ b/exp/ingest/io/memory_state_reader.go @@ -1,36 +1,38 @@ package io import ( - "crypto/sha256" + "encoding/base64" "fmt" "io" - "regexp" "sync" + "github.com/stellar/go/support/errors" "github.com/stellar/go/support/historyarchive" "github.com/stellar/go/xdr" ) -var bucketRegex = regexp.MustCompile(`(bucket/[0-9a-z]{2}/[0-9a-z]{2}/[0-9a-z]{2}/bucket-[0-9a-z]+\.xdr\.gz)`) - // readResult is the result of reading a bucket value type readResult struct { entry xdr.LedgerEntry e error } -// MemoryStateReader is an in-memory streaming implementation that reads HistoryArchiveState +// MemoryStateReader is an in-memory streaming implementation that reads ledger entries +// from buckets for a given HistoryArchiveState. +// MemoryStateReader hides internal structure of buckets from the user so entries returned +// by `Read()` are exactly the ledger entries present at the given ledger. type MemoryStateReader struct { - has *historyarchive.HistoryArchiveState - archive *historyarchive.Archive - sequence uint32 - active bool - readChan chan readResult - once *sync.Once + has *historyarchive.HistoryArchiveState + archive *historyarchive.Archive + sequence uint32 + readChan chan readResult + streamOnce sync.Once + closeOnce sync.Once + done chan bool } -// enforce MemoryStateReader to implement StateReader -var _ StateReader = &MemoryStateReader{} +// enforce MemoryStateReader to implement StateReadCloser +var _ StateReadCloser = &MemoryStateReader{} // MakeMemoryStateReader is a factory method for MemoryStateReader func MakeMemoryStateReader(archive *historyarchive.Archive, sequence uint32, bufferSize uint16) (*MemoryStateReader, error) { @@ -40,123 +42,143 @@ func MakeMemoryStateReader(archive *historyarchive.Archive, sequence uint32, buf } return &MemoryStateReader{ - has: &has, - archive: archive, - sequence: sequence, - active: false, - readChan: make(chan readResult, bufferSize), - once: &sync.Once{}, + has: &has, + archive: archive, + sequence: sequence, + readChan: make(chan readResult, bufferSize), + streamOnce: sync.Once{}, + closeOnce: sync.Once{}, + done: make(chan bool), }, nil } -func getBucketPath(r *regexp.Regexp, s string) (string, error) { - matches := r.FindStringSubmatch(s) - if len(matches) != 2 { - return "", fmt.Errorf("regex string submatch needs full match and one more subgroup, i.e. length should be 2 but was %d", len(matches)) - } - return matches[1], nil -} - -func (msr *MemoryStateReader) bufferNext() { +// streamBuckets is internal method that streams buckets from the given HAS. +// +// Buckets should be processed from oldest to newest, `snap` and then `curr` at +// each level. The correct value of ledger entry is the latest seen `LIVEENTRY` +// except the case when there's a `DEADENTRY` later which removes the entry. +// +// We can implement trivial algorithm (processing from oldest to newest buckets) +// but it requires to keep map of all entries in memory and stream what's left +// when all buckets are processed. +// +// However, we can modify this algorithm to work from newest to oldest ledgers: +// +// 1. For each `LIVEENTRY` we check if we've seen it before (`seen` map) or +// if we've seen `DEADENTRY` for it (`removed` map). If both conditions are +// false, we write that bucket entry to the stream and mark it as `seen`. +// 2. For each `DEADENTRY` we keep track of removed bucket entries in +// `removed` map. +// +// In such algorithm we just need to keep 2 maps with `bool` values that require +// much less memory space. The memory requirements will be lowered when CAP-0020 +// is live. Finally, we can require `ingest/pipeline.StateProcessor` to return +// entry types it needs so that `MemoryStateReader` will only stream entry types +// required by a given pipeline. +func (msr *MemoryStateReader) streamBuckets() { defer close(msr.readChan) + defer msr.closeOnce.Do(msr.close) - // iterate from newest to oldest bucket and track keys already seen + removed := map[string]bool{} seen := map[string]bool{} - for _, hash := range msr.has.Buckets() { - if !msr.archive.BucketExists(hash) { - msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("bucket hash does not exist: %s", hash)} - return - } - // read bucket detail - filepathChan, errChan := msr.archive.ListBucket(historyarchive.HashPrefix(hash)) + var buckets []string + for i := 0; i < len(msr.has.CurrentBuckets); i++ { + b := msr.has.CurrentBuckets[i] + buckets = append(buckets, b.Curr, b.Snap) + } - // read from channels - var filepath string - var e error - var ok bool - select { - case fp, okb := <-filepathChan: - // example filepath: prd/core-testnet/core_testnet_001/bucket/be/3c/bf/bucket-be3cbfc2d7e4272c01a1a22084573a04dad96bf77aa7fc2be4ce2dec8777b4f9.xdr.gz - filepath, e, ok = fp, nil, okb - case err, okb := <-errChan: - filepath, e, ok = "", err, okb - // TODO do we need to do anything special if e is nil here? - } - if !ok { - // move on to next bucket when this bucket is fully consumed or empty - continue + for _, hashString := range buckets { + hash, err := historyarchive.DecodeHash(hashString) + if err != nil { + msr.readChan <- readResult{xdr.LedgerEntry{}, errors.Wrap(err, "Error decoding bucket hash")} + return } - // process values - if e != nil { - msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("received error on errChan when listing buckets for hash '%s': %s", hash, e)} - return + if hash.IsZero() { + continue } - bucketPath, e := getBucketPath(bucketRegex, filepath) - if e != nil { - msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get bucket path for filepath '%s' with hash '%s': %s", filepath, hash, e)} + if !msr.archive.BucketExists(hash) { + msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("bucket hash does not exist: %s", hash)} return } var shouldContinue bool - shouldContinue = msr.streamBucketContents(bucketPath, hash, seen) + shouldContinue = msr.streamBucketContents(hash, seen, removed) if !shouldContinue { - return + break } } } // streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true func (msr *MemoryStateReader) streamBucketContents( - bucketPath string, hash historyarchive.Hash, seen map[string]bool, + removed map[string]bool, ) bool { - rdr, e := msr.archive.GetXdrStream(bucketPath) + rdr, e := msr.archive.GetXdrStreamForHash(hash) if e != nil { - msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get xdr stream for bucketPath '%s': %s", bucketPath, e)} + msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get xdr stream for hash '%s': %s", hash.String(), e)} return false } defer rdr.Close() - n := 0 + n := -1 for { + n++ + var entry xdr.BucketEntry if e = rdr.ReadOne(&entry); e != nil { if e == io.EOF { // proceed to the next bucket hash return true } - msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error on XDR record %d of bucketPath '%s': %s", n, bucketPath, e)} + msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error on XDR record %d of hash '%s': %s", n, hash.String(), e)} return false } - liveEntry, ok := entry.GetLiveEntry() - if ok { - // ignore entry if we've seen it previously - key := liveEntry.LedgerKey() - keyBytes, e := key.MarshalBinary() - if e != nil { - msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error marshaling XDR record %d of bucketPath '%s': %s", n, bucketPath, e)} - return false - } - shasum := fmt.Sprintf("%x", sha256.Sum256(keyBytes)) + var key xdr.LedgerKey + + switch entry.Type { + case xdr.BucketEntryTypeLiveentry: + liveEntry := entry.MustLiveEntry() + key = liveEntry.LedgerKey() + case xdr.BucketEntryTypeDeadentry: + key = entry.MustDeadEntry() + default: + panic(fmt.Sprintf("Shouldn't happen in protocol <=10: BucketEntryType=%d", entry.Type)) + } + + keyBytes, e := key.MarshalBinary() + if e != nil { + msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error marshaling XDR record %d of hash '%s': %s", n, hash.String(), e)} + return false + } + + h := base64.StdEncoding.EncodeToString(keyBytes) - if seen[shasum] { - n++ - continue + switch entry.Type { + case xdr.BucketEntryTypeLiveentry: + if !seen[h] && !removed[h] { + msr.readChan <- readResult{entry.MustLiveEntry(), nil} + seen[h] = true } - seen[shasum] = true + case xdr.BucketEntryTypeDeadentry: + removed[h] = true + } - // since readChan is a buffered channel we block here until one item is consumed on the dequeue side. - // this is our intended behavior, which ensures we only buffer exactly bufferSize results in the channel. - msr.readChan <- readResult{liveEntry, nil} + select { + case <-msr.done: + // Close() called: stop processing buckets. + return false + default: + continue } - // we can ignore dead entries because we're only ever concerned with the first live entry values - n++ } + + panic("Shouldn't happen") } // GetSequence impl. @@ -164,21 +186,31 @@ func (msr *MemoryStateReader) GetSequence() uint32 { return msr.sequence } -// Read returns a new ledger entry on each call, returning false when the stream ends -func (msr *MemoryStateReader) Read() (bool, xdr.LedgerEntry, error) { - msr.once.Do(func() { - go msr.bufferNext() +// Read returns a new ledger entry on each call, returning io.EOF when the stream ends. +func (msr *MemoryStateReader) Read() (xdr.LedgerEntry, error) { + msr.streamOnce.Do(func() { + go msr.streamBuckets() }) // blocking call. anytime we consume from this channel, the background goroutine will stream in the next value result, ok := <-msr.readChan if !ok { - // when channel is closed then return false with empty values - return false, xdr.LedgerEntry{}, nil + // when channel is closed then return io.EOF + return xdr.LedgerEntry{}, EOF } if result.e != nil { - return true, xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e) + return xdr.LedgerEntry{}, errors.Wrap(result.e, "Error while reading from buckets") } - return true, result.entry, nil + return result.entry, nil +} + +func (msr *MemoryStateReader) close() { + close(msr.done) +} + +// Close should be called when reading is finished. +func (msr *MemoryStateReader) Close() error { + msr.closeOnce.Do(msr.close) + return nil } diff --git a/exp/ingest/io/memory_state_reader_test.go b/exp/ingest/io/memory_state_reader_test.go deleted file mode 100644 index 9d81769de6..0000000000 --- a/exp/ingest/io/memory_state_reader_test.go +++ /dev/null @@ -1,19 +0,0 @@ -package io - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestBucketRegex(t *testing.T) { - const bucketFullPath = "prd/core-live/core-live-001/bucket/88/af/31/bucket-88af31f4c51afe5ea75861642359376feb623de5bec4354fa56ab752aeec8f36.xdr.gz" - const bucketPath = "bucket/88/af/31/bucket-88af31f4c51afe5ea75861642359376feb623de5bec4354fa56ab752aeec8f36.xdr.gz" - - bp, e := getBucketPath(bucketRegex, bucketFullPath) - if !assert.NoError(t, e) { - return - } - - assert.Equal(t, bucketPath, bp) -} diff --git a/exp/ingest/io/state_reader.go b/exp/ingest/io/state_reader.go deleted file mode 100644 index 74f24b1055..0000000000 --- a/exp/ingest/io/state_reader.go +++ /dev/null @@ -1,9 +0,0 @@ -package io - -import "github.com/stellar/go/xdr" - -// StateReader interface placeholder -type StateReader interface { - GetSequence() uint32 - Read() (bool, xdr.LedgerEntry, error) -} diff --git a/exp/ingest/main.go b/exp/ingest/main.go new file mode 100644 index 0000000000..92d3407538 --- /dev/null +++ b/exp/ingest/main.go @@ -0,0 +1,9 @@ +package ingest + +type System struct { + // +} + +type Session interface { + Run() error +} diff --git a/exp/ingest/pipeline/buffered_state_read_write_closer.go b/exp/ingest/pipeline/buffered_state_read_write_closer.go new file mode 100644 index 0000000000..a28d61222e --- /dev/null +++ b/exp/ingest/pipeline/buffered_state_read_write_closer.go @@ -0,0 +1,75 @@ +package pipeline + +import ( + "github.com/stellar/go/exp/ingest/io" + "github.com/stellar/go/xdr" +) + +const bufferSize = 50000 + +func (b *bufferedStateReadWriteCloser) init() { + b.buffer = make(chan xdr.LedgerEntry, bufferSize) +} + +func (b *bufferedStateReadWriteCloser) close() { + b.writeCloseMutex.Lock() + defer b.writeCloseMutex.Unlock() + + close(b.buffer) + b.closed = true +} + +func (b *bufferedStateReadWriteCloser) GetSequence() uint32 { + return 0 +} + +func (b *bufferedStateReadWriteCloser) Read() (xdr.LedgerEntry, error) { + b.initOnce.Do(b.init) + + entry, more := <-b.buffer + if more { + b.readEntriesMutex.Lock() + b.readEntries++ + b.readEntriesMutex.Unlock() + return entry, nil + } else { + return xdr.LedgerEntry{}, io.EOF + } +} + +func (b *bufferedStateReadWriteCloser) Write(entry xdr.LedgerEntry) error { + b.initOnce.Do(b.init) + + b.writeCloseMutex.Lock() + defer b.writeCloseMutex.Unlock() + + if b.closed { + return io.ErrClosedPipe + } + + b.buffer <- entry + b.wroteEntries++ + return nil +} + +func (b *bufferedStateReadWriteCloser) QueuedEntries() int { + b.initOnce.Do(b.init) + return len(b.buffer) +} + +// Close can be called in `StateWriteCloser` and `StateReadCloser` context. +// +// In `StateReadCloser` it means that no more values will be read so writer can +// stop writing to a buffer (`io.ErrClosedPipe` will be returned for calls to +// `Write()`). +// +// In `StateWriteCloser` it means that no more values will be written so reader +// should start returning `io.EOF` error after returning all queued values. +func (b *bufferedStateReadWriteCloser) Close() error { + b.initOnce.Do(b.init) + b.closeOnce.Do(b.close) + return nil +} + +var _ io.StateReadCloser = &bufferedStateReadWriteCloser{} +var _ io.StateWriteCloser = &bufferedStateReadWriteCloser{} diff --git a/exp/ingest/pipeline/main.go b/exp/ingest/pipeline/main.go new file mode 100644 index 0000000000..6cd369f07a --- /dev/null +++ b/exp/ingest/pipeline/main.go @@ -0,0 +1,109 @@ +package pipeline + +import ( + "sync" + "time" + + "github.com/stellar/go/exp/ingest/io" + "github.com/stellar/go/xdr" +) + +type bufferedStateReadWriteCloser struct { + initOnce sync.Once + + // readEntriesMutex protects readEntries variable + readEntriesMutex sync.Mutex + readEntries int + + // writeCloseMutex protects from writing to a closed buffer + // and wroteEntries variable + writeCloseMutex sync.Mutex + wroteEntries int + + // closeOnce protects from closing buffer twice + closeOnce sync.Once + buffer chan xdr.LedgerEntry + closed bool +} + +type multiWriteCloser struct { + writers []io.StateWriteCloser + + mutex sync.Mutex + closeAfter int + wroteEntries int +} + +type Pipeline struct { + rootStateProcessor *PipelineNode + done bool +} + +type PipelineNode struct { + Processor StateProcessor + Children []*PipelineNode + + duration time.Duration + jobs int + readEntries int + readsPerSecond int + queuedEntries int + wroteEntries int + writesPerSecond int +} + +// StateProcessor defines methods required by state processing pipeline. +type StateProcessor interface { + // ProcessState is a main method of `StateProcessor`. It receives `io.StateReadCloser` + // that contains object passed down the pipeline from the previous procesor. Writes to + // `io.StateWriteCloser` will be passed to the next processor. WARNING! `ProcessState` + // should **always** call `Close()` on `io.StateWriteCloser` when no more object will be + // written and `Close()` on `io.StateReadCloser` when reading is finished. + // Data required by following processors (like aggregated data) should be saved in + // `Store`. Read `Store` godoc to understand how to use it. + ProcessState(store *Store, readCloser io.StateReadCloser, writeCloser io.StateWriteCloser) (err error) + // IsConcurrent defines if processing pipeline should start a single instance + // of the processor or multiple instances. Multiple instances will read + // from the same StateReader and write to the same StateWriter. + // Example: the processor can insert entries to a DB in a single job but it + // probably will be faster with multiple DB writers (especially when you want + // to do some data conversions before inserting). + IsConcurrent() bool + // RequiresInput defines if processor requires input data (StateReader). If not, + // it will receive empty reader, it's parent process will write to "void" and + // writes to `writer` will go to "void". + // This is useful for processors resposible for saving aggregated data that don't + // need state objects. + // TODO! + RequiresInput() bool + // Returns processor name. Helpful for errors, debuging and reports. + Name() string +} + +// Store allows storing data connected to pipeline execution. +// It exposes `Lock()` and `Unlock()` methods that must be called +// when accessing the `Store` for both `Put` and `Get` calls. +// +// Example (incrementing a number): +// s.Lock() +// v := s.Get("value") +// s.Put("value", v.(int)+1) +// s.Unlock() +type Store struct { + sync.Mutex + initOnce sync.Once + values map[string]interface{} +} + +// ReduceStateProcessor forwards the final produced by applying all the +// ledger entries to the writer. +// Let's say that there are 3 ledger entries: +// - Create account A (XLM balance = 20) +// - Update XLM balance of A to 5 +// - Update XLM balance of A to 15 +// Running ReduceStateProcessor will add a single ledger entry: +// - Create account A (XLM balance = 15) +// to the writer. +type ReduceStateProcessor struct { + // +} diff --git a/exp/ingest/pipeline/multi_write_closer.go b/exp/ingest/pipeline/multi_write_closer.go new file mode 100644 index 0000000000..3f3450d163 --- /dev/null +++ b/exp/ingest/pipeline/multi_write_closer.go @@ -0,0 +1,65 @@ +package pipeline + +import ( + "github.com/stellar/go/exp/ingest/io" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +func (m *multiWriteCloser) Write(entry xdr.LedgerEntry) error { + m.mutex.Lock() + m.wroteEntries++ + m.mutex.Unlock() + + results := make(chan error, len(m.writers)) + + for _, w := range m.writers { + go func(w io.StateWriteCloser) { + // We can keep sending entries even when io.ErrClosedPipe is returned + // as bufferedStateReadWriteCloser will ignore them (won't add them to + // a channel). + results <- w.Write(entry) + }(w) + } + + countClosedPipes := 0 + for range m.writers { + err := <-results + if err != nil { + if err == io.ErrClosedPipe { + countClosedPipes++ + } else { + return err + } + } + } + + // When all pipes are closed return `io.ErrClosedPipe` because there are no + // active readers anymore. + if countClosedPipes == len(m.writers) { + return io.ErrClosedPipe + } + + return nil +} + +func (m *multiWriteCloser) Close() error { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.closeAfter-- + if m.closeAfter > 0 { + return nil + } else if m.closeAfter < 0 { + return errors.New("Close() called more times than closeAfter") + } + + for _, w := range m.writers { + err := w.Close() + if err != nil { + return err + } + } + + return nil +} diff --git a/exp/ingest/pipeline/pipeline.go b/exp/ingest/pipeline/pipeline.go new file mode 100644 index 0000000000..81c038cf2d --- /dev/null +++ b/exp/ingest/pipeline/pipeline.go @@ -0,0 +1,143 @@ +package pipeline + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/stellar/go/exp/ingest/io" +) + +func (p *Pipeline) Node(processor StateProcessor) *PipelineNode { + return &PipelineNode{ + Processor: processor, + } +} + +func (p *Pipeline) PrintStatus() { + p.printNodeStatus(p.rootStateProcessor, 0) +} + +func (p *Pipeline) printNodeStatus(node *PipelineNode, level int) { + fmt.Print(strings.Repeat(" ", level)) + + var wrRatio = float32(0) + if node.readEntries > 0 { + wrRatio = float32(node.wroteEntries) / float32(node.readEntries) + } + + icon := "" + if node.queuedEntries > bufferSize/10*9 { + icon = "⚠️ " + } + + fmt.Printf( + "└ %s%s read=%d (queued=%d rps=%d) wrote=%d (w/r ratio = %1.5f) concurrent=%t jobs=%d\n", + icon, + node.Processor.Name(), + node.readEntries, + node.queuedEntries, + node.readsPerSecond, + node.wroteEntries, + wrRatio, + node.Processor.IsConcurrent(), + node.jobs, + ) + + if node.jobs > 1 { + fmt.Print(strings.Repeat(" ", level)) + fmt.Print(" ") + for i := 0; i < node.jobs; i++ { + fmt.Print("• ") + } + fmt.Println("") + } + + for _, child := range node.Children { + p.printNodeStatus(child, level+1) + } +} + +func (p *Pipeline) AddStateProcessorTree(rootProcessor *PipelineNode) { + p.rootStateProcessor = rootProcessor +} + +func (p *Pipeline) ProcessState(readCloser io.StateReadCloser) (done chan error) { + return p.processStateNode(&Store{}, p.rootStateProcessor, readCloser) +} + +func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, readCloser io.StateReadCloser) chan error { + outputs := make([]io.StateWriteCloser, len(node.Children)) + + for i := range outputs { + outputs[i] = &bufferedStateReadWriteCloser{} + } + + var wg sync.WaitGroup + + jobs := 1 + if node.Processor.IsConcurrent() { + jobs = 20 + } + + node.jobs = jobs + + writeCloser := &multiWriteCloser{ + writers: outputs, + closeAfter: jobs, + } + + for i := 1; i <= jobs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + err := node.Processor.ProcessState(store, readCloser, writeCloser) + if err != nil { + // TODO return to pipeline error channel + panic(err) + } + }() + } + + go func() { + // Update stats + for { + // This is not thread-safe: check if Mutex slows it down a lot... + readBuffer, readBufferIsBufferedStateReadWriteCloser := readCloser.(*bufferedStateReadWriteCloser) + writeBuffer := writeCloser + + interval := time.Second + + node.writesPerSecond = (writeBuffer.wroteEntries - node.wroteEntries) * int(time.Second/interval) + node.wroteEntries = writeBuffer.wroteEntries + + if readBufferIsBufferedStateReadWriteCloser { + node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval) + node.readEntries = readBuffer.readEntries + node.queuedEntries = readBuffer.QueuedEntries() + } + + time.Sleep(interval) + } + }() + + for i, child := range node.Children { + wg.Add(1) + go func(i int, child *PipelineNode) { + defer wg.Done() + done := p.processStateNode(store, child, outputs[i].(*bufferedStateReadWriteCloser)) + <-done + }(i, child) + } + + done := make(chan error) + + go func() { + wg.Wait() + done <- nil + }() + + return done +} diff --git a/exp/ingest/pipeline/pipeline_node.go b/exp/ingest/pipeline/pipeline_node.go new file mode 100644 index 0000000000..4ca4ed089d --- /dev/null +++ b/exp/ingest/pipeline/pipeline_node.go @@ -0,0 +1,6 @@ +package pipeline + +func (p *PipelineNode) Pipe(children ...*PipelineNode) *PipelineNode { + p.Children = children + return p +} diff --git a/exp/ingest/pipeline/pipeline_test.go b/exp/ingest/pipeline/pipeline_test.go new file mode 100644 index 0000000000..a18607ae7f --- /dev/null +++ b/exp/ingest/pipeline/pipeline_test.go @@ -0,0 +1,495 @@ +package pipeline + +import ( + "fmt" + "math/rand" + "runtime" + "strings" + "sync" + "testing" + "time" + + "github.com/stellar/go/exp/ingest/io" + "github.com/stellar/go/keypair" + "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" +) + +func randomAccountId() xdr.AccountId { + random, err := keypair.Random() + if err != nil { + panic(err) + } + + id := xdr.AccountId{} + id.SetAddress(random.Address()) + return id +} + +func randomSignerKey() xdr.SignerKey { + random, err := keypair.Random() + if err != nil { + panic(err) + } + + id := xdr.SignerKey{} + err = id.SetAddress(random.Address()) + if err != nil { + panic(err) + } + + return id +} + +func AccountLedgerEntry() xdr.LedgerEntry { + specialSigner := xdr.SignerKey{} + err := specialSigner.SetAddress("GCS26OX27PF67V22YYCTBLW3A4PBFAL723QG3X3FQYEL56FXX2C7RX5G") + if err != nil { + panic(err) + } + + signer := specialSigner + if rand.Int()%100 >= 1 /* % */ { + signer = randomSignerKey() + } + + return xdr.LedgerEntry{ + LastModifiedLedgerSeq: 0, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: randomAccountId(), + Signers: []xdr.Signer{ + xdr.Signer{ + Key: signer, + Weight: 1, + }, + }, + }, + }, + } +} + +func TrustLineLedgerEntry() xdr.LedgerEntry { + random, err := keypair.Random() + if err != nil { + panic(err) + } + + id := xdr.AccountId{} + id.SetAddress(random.Address()) + + return xdr.LedgerEntry{ + LastModifiedLedgerSeq: 0, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTrustline, + TrustLine: &xdr.TrustLineEntry{ + AccountId: id, + }, + }, + } +} + +func TestStore(t *testing.T) { + var s Store + + s.Lock() + s.Put("value", 0) + s.Unlock() + + s.Lock() + v := s.Get("value") + s.Put("value", v.(int)+1) + s.Unlock() + + assert.Equal(t, 1, s.Get("value")) +} + +func TestBuffer(t *testing.T) { + buffer := &bufferedStateReadWriteCloser{} + write := 20 + read := 0 + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for { + _, err := buffer.Read() + if err != nil { + if err == io.EOF { + break + } else { + panic(err) + } + } + read++ + } + }() + + go func() { + defer wg.Done() + for i := 0; i < write; i++ { + buffer.Write(AccountLedgerEntry()) + } + buffer.Close() + }() + + wg.Wait() + + assert.Equal(t, 20, read) +} + +func ExamplePipeline(t *testing.T) { + pipeline := &Pipeline{} + + passthroughProcessor := &PassthroughProcessor{} + accountsOnlyFilter := &EntryTypeFilter{Type: xdr.LedgerEntryTypeAccount} + trustLinesOnlyFilter := &EntryTypeFilter{Type: xdr.LedgerEntryTypeTrustline} + printCountersProcessor := &PrintCountersProcessor{} + printAllProcessor := &PrintAllProcessor{} + + pipeline.AddStateProcessorTree( + pipeline.Node(passthroughProcessor). + Pipe( + // Passes accounts only + pipeline.Node(accountsOnlyFilter). + Pipe( + // Finds accounts for a single signer + pipeline.Node(&AccountsForSignerProcessor{Signer: "GCS26OX27PF67V22YYCTBLW3A4PBFAL723QG3X3FQYEL56FXX2C7RX5G"}). + Pipe(pipeline.Node(printAllProcessor)), + + // Counts accounts with prefix GA/GB/GC/GD and stores results in a store + pipeline.Node(&CountPrefixProcessor{Prefix: "GA"}). + Pipe(pipeline.Node(printCountersProcessor)), + pipeline.Node(&CountPrefixProcessor{Prefix: "GB"}). + Pipe(pipeline.Node(printCountersProcessor)), + pipeline.Node(&CountPrefixProcessor{Prefix: "GC"}). + Pipe(pipeline.Node(printCountersProcessor)), + pipeline.Node(&CountPrefixProcessor{Prefix: "GD"}). + Pipe(pipeline.Node(printCountersProcessor)), + ), + // Passes trust lines only + pipeline.Node(trustLinesOnlyFilter). + Pipe(pipeline.Node(printAllProcessor)), + ), + ) + + buffer := &bufferedStateReadWriteCloser{} + + go func() { + for i := 0; i < 1000000; i++ { + buffer.Write(AccountLedgerEntry()) + buffer.Write(TrustLineLedgerEntry()) + } + buffer.Close() + }() + + done := pipeline.ProcessState(buffer) + startTime := time.Now() + + go func() { + for { + fmt.Print("\033[H\033[2J") + + var m runtime.MemStats + runtime.ReadMemStats(&m) + + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tHeapAlloc = %v MiB", bToMb(m.HeapAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v", m.NumGC) + fmt.Printf("\tGoroutines = %v", runtime.NumGoroutine()) + fmt.Printf("\tNumCPU = %v\n\n", runtime.NumCPU()) + + fmt.Printf("Duration: %s\n\n", time.Since(startTime)) + + pipeline.PrintStatus() + + time.Sleep(500 * time.Millisecond) + } + }() + + <-done + time.Sleep(2 * time.Second) + pipeline.PrintStatus() +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} + +type SimpleProcessor struct { + sync.Mutex + callCount int +} + +func (n *SimpleProcessor) IsConcurrent() bool { + return false +} + +func (n *SimpleProcessor) RequiresInput() bool { + return true +} + +func (n *SimpleProcessor) IncrementAndReturnCallCount() int { + n.Lock() + defer n.Unlock() + n.callCount++ + return n.callCount +} + +type PassthroughProcessor struct { + SimpleProcessor +} + +func (p *PassthroughProcessor) ProcessState(store *Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer w.Close() + defer r.Close() + + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + err = w.Write(entry) + if err != nil { + if err == io.ErrClosedPipe { + // Reader does not need more data + return nil + } + return err + } + } + + return nil +} + +func (p *PassthroughProcessor) Name() string { + return "PassthroughProcessor" +} + +func (n *PassthroughProcessor) IsConcurrent() bool { + return true +} + +type EntryTypeFilter struct { + SimpleProcessor + + Type xdr.LedgerEntryType +} + +func (p *EntryTypeFilter) ProcessState(store *Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer w.Close() + defer r.Close() + + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + if entry.Data.Type == p.Type { + err := w.Write(entry) + if err != nil { + if err == io.ErrClosedPipe { + // Reader does not need more data + return nil + } + return err + } + } + } + + return nil +} + +func (p *EntryTypeFilter) Name() string { + return fmt.Sprintf("EntryTypeFilter (%s)", p.Type) +} + +type AccountsForSignerProcessor struct { + SimpleProcessor + + Signer string +} + +func (p *AccountsForSignerProcessor) ProcessState(store *Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer w.Close() + defer r.Close() + + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + if entry.Data.Type != xdr.LedgerEntryTypeAccount { + continue + } + + for _, signer := range entry.Data.Account.Signers { + if signer.Key.Address() == p.Signer { + err := w.Write(entry) + if err != nil { + if err == io.ErrClosedPipe { + // Reader does not need more data + return nil + } + return err + } + break + } + } + } + + return nil +} + +func (p *AccountsForSignerProcessor) Name() string { + return "AccountsForSignerProcessor" +} + +type CountPrefixProcessor struct { + SimpleProcessor + Prefix string +} + +func (p *CountPrefixProcessor) ProcessState(store *Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer w.Close() + defer r.Close() + + count := 0 + + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + address := entry.Data.Account.AccountId.Address() + + if strings.HasPrefix(address, p.Prefix) { + err := w.Write(entry) + if err != nil { + if err == io.ErrClosedPipe { + // Reader does not need more data + return nil + } + return err + } + count++ + } + + if p.Prefix == "GA" { + // Make it slower to test full buffer + // time.Sleep(50 * time.Millisecond) + } + } + + store.Lock() + prevCount := store.Get("count" + p.Prefix) + if prevCount != nil { + count += prevCount.(int) + } + store.Put("count"+p.Prefix, count) + store.Unlock() + + return nil +} + +func (p *CountPrefixProcessor) IsConcurrent() bool { + return true +} + +func (p *CountPrefixProcessor) Name() string { + return fmt.Sprintf("CountPrefixProcessor (%s)", p.Prefix) +} + +type PrintCountersProcessor struct { + SimpleProcessor +} + +func (p *PrintCountersProcessor) ProcessState(store *Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer w.Close() + defer r.Close() + + // TODO, we should use context with cancel and value to check when pipeline is done. + for { + _, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + } + + if p.IncrementAndReturnCallCount() != 4 { + return nil + } + + store.Lock() + fmt.Println("countGA", store.Get("countGA")) + fmt.Println("countGB", store.Get("countGB")) + fmt.Println("countGC", store.Get("countGC")) + fmt.Println("countGD", store.Get("countGD")) + store.Unlock() + + return nil +} + +func (p *PrintCountersProcessor) Name() string { + return "PrintCountersProcessor" +} + +type PrintAllProcessor struct { + SimpleProcessor +} + +func (p *PrintAllProcessor) ProcessState(store *Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer w.Close() + defer r.Close() + + entries := 0 + for { + _, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + entries++ + // fmt.Printf("%+v\n", entry) + } + + fmt.Printf("Found %d entries\n", entries) + + return nil +} + +func (p *PrintAllProcessor) Name() string { + return "PrintAllProcessor" +} diff --git a/exp/ingest/pipeline/store.go b/exp/ingest/pipeline/store.go new file mode 100644 index 0000000000..6a294f3fab --- /dev/null +++ b/exp/ingest/pipeline/store.go @@ -0,0 +1,15 @@ +package pipeline + +func (s *Store) init() { + s.values = make(map[string]interface{}) +} + +func (s *Store) Put(name string, value interface{}) { + s.initOnce.Do(s.init) + s.values[name] = value +} + +func (s *Store) Get(name string) interface{} { + s.initOnce.Do(s.init) + return s.values[name] +} diff --git a/exp/ingest/system_test.go b/exp/ingest/system_test.go new file mode 100644 index 0000000000..0f1c8d85ec --- /dev/null +++ b/exp/ingest/system_test.go @@ -0,0 +1,3 @@ +package ingest + +// diff --git a/exp/tools/accounts-for-signer/filters.go b/exp/tools/accounts-for-signer/filters.go new file mode 100644 index 0000000000..78ac73fc86 --- /dev/null +++ b/exp/tools/accounts-for-signer/filters.go @@ -0,0 +1,173 @@ +package main + +import ( + "fmt" + "os" + "sync" + + "github.com/stellar/go/exp/ingest/io" + "github.com/stellar/go/exp/ingest/pipeline" + "github.com/stellar/go/xdr" +) + +type SimpleProcessor struct { + sync.Mutex + callCount int +} + +func (n *SimpleProcessor) IsConcurrent() bool { + return false +} + +func (n *SimpleProcessor) RequiresInput() bool { + return true +} + +func (n *SimpleProcessor) IncrementAndReturnCallCount() int { + n.Lock() + defer n.Unlock() + n.callCount++ + return n.callCount +} + +type EntryTypeFilter struct { + SimpleProcessor + + Type xdr.LedgerEntryType +} + +func (p *EntryTypeFilter) ProcessState(store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer r.Close() + defer w.Close() + + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + if entry.Data.Type == p.Type { + err := w.Write(entry) + if err != nil { + if err == io.ErrClosedPipe { + // Reader does not need more data + return nil + } + return err + } + } + } + + return nil +} + +func (p *EntryTypeFilter) Name() string { + return fmt.Sprintf("EntryTypeFilter (%s)", p.Type) +} + +type AccountsForSignerProcessor struct { + SimpleProcessor + + Signer string +} + +func (p *AccountsForSignerProcessor) ProcessState(store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer r.Close() + defer w.Close() + + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + if entry.Data.Type != xdr.LedgerEntryTypeAccount { + continue + } + + for _, signer := range entry.Data.Account.Signers { + if signer.Key.Address() == p.Signer { + err := w.Write(entry) + if err != nil { + if err == io.ErrClosedPipe { + // Reader does not need more data + return nil + } + return err + } + break + } + } + } + + return nil +} + +func (p *AccountsForSignerProcessor) Name() string { + return "AccountsForSignerProcessor" +} + +func (n *AccountsForSignerProcessor) IsConcurrent() bool { + return true +} + +type PrintAllProcessor struct { + SimpleProcessor + Filename string +} + +func (p *PrintAllProcessor) ProcessState(store *pipeline.Store, r io.StateReadCloser, w io.StateWriteCloser) error { + defer r.Close() + defer w.Close() + + f, err := os.Create(p.Filename) + if err != nil { + return err + } + + defer f.Close() + + foundEntries := 0 + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + switch entry.Data.Type { + case xdr.LedgerEntryTypeAccount: + fmt.Fprintf( + f, + "%s,%d,%d\n", + entry.Data.Account.AccountId.Address(), + entry.Data.Account.Balance, + entry.Data.Account.SeqNum, + ) + foundEntries++ + if foundEntries == 3 { + // We only want a few entries... + return nil + } + default: + // Ignore for now + } + } + + return nil +} + +func (p *PrintAllProcessor) Name() string { + return "PrintAllProcessor" +} diff --git a/exp/tools/accounts-for-signer/main.go b/exp/tools/accounts-for-signer/main.go new file mode 100644 index 0000000000..9a87eed9f6 --- /dev/null +++ b/exp/tools/accounts-for-signer/main.go @@ -0,0 +1,97 @@ +package main + +import ( + "fmt" + "runtime" + "time" + + "github.com/stellar/go/exp/ingest/adapters" + "github.com/stellar/go/exp/ingest/pipeline" + "github.com/stellar/go/support/historyarchive" + "github.com/stellar/go/xdr" +) + +func main() { + archive, err := archive() + if err != nil { + panic(err) + } + + historyAdapter := ingestadapters.MakeHistoryArchiveAdapter(archive) + + seq, err := historyAdapter.GetLatestLedgerSequence() + if err != nil { + panic(err) + } + + // seq := uint32(23991935) + + fmt.Printf("Getting data for ledger seq = %d\n", seq) + + stateReader, err := historyAdapter.GetState(seq) + if err != nil { + panic(err) + } + + p, err := buildPipeline() + if err != nil { + panic(err) + } + + done := p.ProcessState(stateReader) + startTime := time.Now() + go printPipelineStats(p, startTime) + <-done +} + +func archive() (*historyarchive.Archive, error) { + return historyarchive.Connect( + fmt.Sprintf("s3://history.stellar.org/prd/core-live/core_live_001/"), + historyarchive.ConnectOptions{ + S3Region: "eu-west-1", + UnsignedRequests: true, + }, + ) +} + +func buildPipeline() (*pipeline.Pipeline, error) { + p := &pipeline.Pipeline{} + + p.AddStateProcessorTree( + // Passes accounts only + p.Node(&EntryTypeFilter{Type: xdr.LedgerEntryTypeAccount}). + Pipe( + // Finds accounts for a single signer + p.Node(&AccountsForSignerProcessor{Signer: "GBMALBYJT6A73SYQWOWVVCGSPUPJPBX4AFDJ7A63GG64QCNRCAFYWWEN"}). + Pipe(p.Node(&PrintAllProcessor{Filename: "./accounts_for_signer.txt"})), + ), + ) + + return p, nil +} + +func printPipelineStats(p *pipeline.Pipeline, startTime time.Time) { + for { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tHeapAlloc = %v MiB", bToMb(m.HeapAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v", m.NumGC) + fmt.Printf("\tGoroutines = %v", runtime.NumGoroutine()) + fmt.Printf("\tNumCPU = %v\n\n", runtime.NumCPU()) + + fmt.Printf("Duration: %s\n", time.Since(startTime)) + fmt.Println("Pipeline status:") + p.PrintStatus() + + fmt.Println("========================================") + + time.Sleep(10 * time.Second) + } +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} diff --git a/support/historyarchive/json.go b/support/historyarchive/json.go index 8680b4bdd8..37fb7bb6be 100644 --- a/support/historyarchive/json.go +++ b/support/historyarchive/json.go @@ -17,11 +17,6 @@ import ( ) func DumpXdrAsJson(args []string) error { - var lhe xdr.LedgerHeaderHistoryEntry - var the xdr.TransactionHistoryEntry - var thre xdr.TransactionHistoryResultEntry - var bke xdr.BucketEntry - var scp xdr.ScpHistoryEntry var tmp interface{} var rdr io.ReadCloser var err error @@ -40,22 +35,29 @@ func DumpXdrAsJson(args []string) error { } base := path.Base(arg) - if strings.HasPrefix(base, "bucket") { - tmp = &bke - } else if strings.HasPrefix(base, "ledger") { - tmp = &lhe - } else if strings.HasPrefix(base, "transactions") { - tmp = &the - } else if strings.HasPrefix(base, "results") { - tmp = &thre - } else if strings.HasPrefix(base, "scp") { - tmp = &scp - } else { - return fmt.Errorf("Error: unrecognized XDR file type %s", base) - } xr := NewXdrStream(rdr) n := 0 for { + var lhe xdr.LedgerHeaderHistoryEntry + var the xdr.TransactionHistoryEntry + var thre xdr.TransactionHistoryResultEntry + var bke xdr.BucketEntry + var scp xdr.ScpHistoryEntry + + if strings.HasPrefix(base, "bucket") { + tmp = &bke + } else if strings.HasPrefix(base, "ledger") { + tmp = &lhe + } else if strings.HasPrefix(base, "transactions") { + tmp = &the + } else if strings.HasPrefix(base, "results") { + tmp = &thre + } else if strings.HasPrefix(base, "scp") { + tmp = &scp + } else { + return fmt.Errorf("Error: unrecognized XDR file type %s", base) + } + if err = xr.ReadOne(&tmp); err != nil { if err == io.EOF { break diff --git a/support/historyarchive/xdrstream.go b/support/historyarchive/xdrstream.go index 5849f1f27a..35d8393f24 100644 --- a/support/historyarchive/xdrstream.go +++ b/support/historyarchive/xdrstream.go @@ -36,6 +36,16 @@ func NewXdrGzStream(in io.ReadCloser) (*XdrStream, error) { return &XdrStream{rdr: bufReadCloser(rdr), rdr2: in}, nil } +func (a *Archive) GetXdrStreamForHash(hash Hash) (*XdrStream, error) { + path := fmt.Sprintf( + "bucket/%s/bucket-%s.xdr.gz", + HashPrefix(hash).Path(), + hash.String(), + ) + + return a.GetXdrStream(path) +} + func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) { if !strings.HasSuffix(pth, ".xdr.gz") { return nil, errors.New("File has non-.xdr.gz suffix: " + pth) diff --git a/tools/archive-reader/archive_reader.go b/tools/archive-reader/archive_reader.go index 981f716871..dd1f99ed71 100644 --- a/tools/archive-reader/archive_reader.go +++ b/tools/archive-reader/archive_reader.go @@ -6,6 +6,7 @@ import ( "log" "github.com/stellar/go/exp/ingest/adapters" + "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/support/historyarchive" ) @@ -34,11 +35,11 @@ func main() { var i uint64 = 0 var count uint64 = 0 for { - ok, le, e := sr.Read() + le, e := sr.Read() if e != nil { panic(e) } - if !ok { + if e == io.EOF { log.Printf("total seen %d entries of which %d were accounts", i, count) return }