Skip to content

Commit

Permalink
exp/ingest: Ingest pipeline prototype (#1264)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn authored May 29, 2019
1 parent 53f58a5 commit cca9f0a
Show file tree
Hide file tree
Showing 20 changed files with 1,387 additions and 144 deletions.
4 changes: 2 additions & 2 deletions exp/ingest/adapters/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stellar/go/support/historyarchive"
)

const msrBufferSize = 10
const msrBufferSize = 50000

// HistoryArchiveAdapter is an adapter for the historyarchive package to read from history archives
type HistoryArchiveAdapter struct {
Expand All @@ -30,7 +30,7 @@ func (haa *HistoryArchiveAdapter) GetLatestLedgerSequence() (uint32, error) {
}

// GetState returns a reader with the state of the ledger at the provided sequence number
func (haa *HistoryArchiveAdapter) GetState(sequence uint32) (io.StateReader, error) {
func (haa *HistoryArchiveAdapter) GetState(sequence uint32) (io.StateReadCloser, error) {
if !haa.archive.CategoryCheckpointExists("history", sequence) {
return nil, fmt.Errorf("history checkpoint does not exist for ledger %d", sequence)
}
Expand Down
5 changes: 3 additions & 2 deletions exp/ingest/adapters/history_archive_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"testing"

"github.com/stellar/go/exp/ingest/io"
"github.com/stellar/go/support/historyarchive"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -56,11 +57,11 @@ func TestGetState_Read(t *testing.T) {
return
}

ok, le, e := sr.Read()
le, e := sr.Read()
if !assert.NoError(t, e) {
return
}
assert.Equal(t, ok, true)
assert.NotEqual(t, e, io.EOF)

log.Printf("%v\n", le)
if !assert.NotNil(t, le) {
Expand Down
35 changes: 35 additions & 0 deletions exp/ingest/io/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io

import (
"io"

"github.com/stellar/go/xdr"
)

var EOF = io.EOF
var ErrClosedPipe = io.ErrClosedPipe

// StateReadCloser interface placeholder
type StateReadCloser interface {
GetSequence() uint32
// Read should return next ledger entry. If there are no more
// entries it should return `EOF` error.
Read() (xdr.LedgerEntry, error)
// Close should be called when reading is finished. This is especially
// helpful when there are still some entries available so reader can stop
// streaming them.
Close() error
}

// StateWriteCloser interface placeholder
type StateWriteCloser interface {
// Write is used to pass ledger entry to the next processor. It can return
// `ErrClosedPipe` when the pipe between processors has been closed meaning
// that next processor does not need more data. In such situation the current
// processor can terminate as sending more entries to a `StateWriteCloser`
// does not make sense (will not be read).
Write(xdr.LedgerEntry) error
// Close should be called when there are no more entries
// to write.
Close() error
}
216 changes: 124 additions & 92 deletions exp/ingest/io/memory_state_reader.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
package io

import (
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"regexp"
"sync"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/historyarchive"
"github.com/stellar/go/xdr"
)

var bucketRegex = regexp.MustCompile(`(bucket/[0-9a-z]{2}/[0-9a-z]{2}/[0-9a-z]{2}/bucket-[0-9a-z]+\.xdr\.gz)`)

// readResult is the result of reading a bucket value
type readResult struct {
entry xdr.LedgerEntry
e error
}

// MemoryStateReader is an in-memory streaming implementation that reads HistoryArchiveState
// MemoryStateReader is an in-memory streaming implementation that reads ledger entries
// from buckets for a given HistoryArchiveState.
// MemoryStateReader hides internal structure of buckets from the user so entries returned
// by `Read()` are exactly the ledger entries present at the given ledger.
type MemoryStateReader struct {
has *historyarchive.HistoryArchiveState
archive *historyarchive.Archive
sequence uint32
active bool
readChan chan readResult
once *sync.Once
has *historyarchive.HistoryArchiveState
archive *historyarchive.Archive
sequence uint32
readChan chan readResult
streamOnce sync.Once
closeOnce sync.Once
done chan bool
}

// enforce MemoryStateReader to implement StateReader
var _ StateReader = &MemoryStateReader{}
// enforce MemoryStateReader to implement StateReadCloser
var _ StateReadCloser = &MemoryStateReader{}

// MakeMemoryStateReader is a factory method for MemoryStateReader
func MakeMemoryStateReader(archive *historyarchive.Archive, sequence uint32, bufferSize uint16) (*MemoryStateReader, error) {
Expand All @@ -40,145 +42,175 @@ func MakeMemoryStateReader(archive *historyarchive.Archive, sequence uint32, buf
}

return &MemoryStateReader{
has: &has,
archive: archive,
sequence: sequence,
active: false,
readChan: make(chan readResult, bufferSize),
once: &sync.Once{},
has: &has,
archive: archive,
sequence: sequence,
readChan: make(chan readResult, bufferSize),
streamOnce: sync.Once{},
closeOnce: sync.Once{},
done: make(chan bool),
}, nil
}

func getBucketPath(r *regexp.Regexp, s string) (string, error) {
matches := r.FindStringSubmatch(s)
if len(matches) != 2 {
return "", fmt.Errorf("regex string submatch needs full match and one more subgroup, i.e. length should be 2 but was %d", len(matches))
}
return matches[1], nil
}

func (msr *MemoryStateReader) bufferNext() {
// streamBuckets is internal method that streams buckets from the given HAS.
//
// Buckets should be processed from oldest to newest, `snap` and then `curr` at
// each level. The correct value of ledger entry is the latest seen `LIVEENTRY`
// except the case when there's a `DEADENTRY` later which removes the entry.
//
// We can implement trivial algorithm (processing from oldest to newest buckets)
// but it requires to keep map of all entries in memory and stream what's left
// when all buckets are processed.
//
// However, we can modify this algorithm to work from newest to oldest ledgers:
//
// 1. For each `LIVEENTRY` we check if we've seen it before (`seen` map) or
// if we've seen `DEADENTRY` for it (`removed` map). If both conditions are
// false, we write that bucket entry to the stream and mark it as `seen`.
// 2. For each `DEADENTRY` we keep track of removed bucket entries in
// `removed` map.
//
// In such algorithm we just need to keep 2 maps with `bool` values that require
// much less memory space. The memory requirements will be lowered when CAP-0020
// is live. Finally, we can require `ingest/pipeline.StateProcessor` to return
// entry types it needs so that `MemoryStateReader` will only stream entry types
// required by a given pipeline.
func (msr *MemoryStateReader) streamBuckets() {
defer close(msr.readChan)
defer msr.closeOnce.Do(msr.close)

// iterate from newest to oldest bucket and track keys already seen
removed := map[string]bool{}
seen := map[string]bool{}
for _, hash := range msr.has.Buckets() {
if !msr.archive.BucketExists(hash) {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("bucket hash does not exist: %s", hash)}
return
}

// read bucket detail
filepathChan, errChan := msr.archive.ListBucket(historyarchive.HashPrefix(hash))
var buckets []string
for i := 0; i < len(msr.has.CurrentBuckets); i++ {
b := msr.has.CurrentBuckets[i]
buckets = append(buckets, b.Curr, b.Snap)
}

// read from channels
var filepath string
var e error
var ok bool
select {
case fp, okb := <-filepathChan:
// example filepath: prd/core-testnet/core_testnet_001/bucket/be/3c/bf/bucket-be3cbfc2d7e4272c01a1a22084573a04dad96bf77aa7fc2be4ce2dec8777b4f9.xdr.gz
filepath, e, ok = fp, nil, okb
case err, okb := <-errChan:
filepath, e, ok = "", err, okb
// TODO do we need to do anything special if e is nil here?
}
if !ok {
// move on to next bucket when this bucket is fully consumed or empty
continue
for _, hashString := range buckets {
hash, err := historyarchive.DecodeHash(hashString)
if err != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, errors.Wrap(err, "Error decoding bucket hash")}
return
}

// process values
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("received error on errChan when listing buckets for hash '%s': %s", hash, e)}
return
if hash.IsZero() {
continue
}

bucketPath, e := getBucketPath(bucketRegex, filepath)
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get bucket path for filepath '%s' with hash '%s': %s", filepath, hash, e)}
if !msr.archive.BucketExists(hash) {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("bucket hash does not exist: %s", hash)}
return
}

var shouldContinue bool
shouldContinue = msr.streamBucketContents(bucketPath, hash, seen)
shouldContinue = msr.streamBucketContents(hash, seen, removed)
if !shouldContinue {
return
break
}
}
}

// streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true
func (msr *MemoryStateReader) streamBucketContents(
bucketPath string,
hash historyarchive.Hash,
seen map[string]bool,
removed map[string]bool,
) bool {
rdr, e := msr.archive.GetXdrStream(bucketPath)
rdr, e := msr.archive.GetXdrStreamForHash(hash)
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get xdr stream for bucketPath '%s': %s", bucketPath, e)}
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get xdr stream for hash '%s': %s", hash.String(), e)}
return false
}
defer rdr.Close()

n := 0
n := -1
for {
n++

var entry xdr.BucketEntry
if e = rdr.ReadOne(&entry); e != nil {
if e == io.EOF {
// proceed to the next bucket hash
return true
}
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error on XDR record %d of bucketPath '%s': %s", n, bucketPath, e)}
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error on XDR record %d of hash '%s': %s", n, hash.String(), e)}
return false
}

liveEntry, ok := entry.GetLiveEntry()
if ok {
// ignore entry if we've seen it previously
key := liveEntry.LedgerKey()
keyBytes, e := key.MarshalBinary()
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error marshaling XDR record %d of bucketPath '%s': %s", n, bucketPath, e)}
return false
}
shasum := fmt.Sprintf("%x", sha256.Sum256(keyBytes))
var key xdr.LedgerKey

switch entry.Type {
case xdr.BucketEntryTypeLiveentry:
liveEntry := entry.MustLiveEntry()
key = liveEntry.LedgerKey()
case xdr.BucketEntryTypeDeadentry:
key = entry.MustDeadEntry()
default:
panic(fmt.Sprintf("Shouldn't happen in protocol <=10: BucketEntryType=%d", entry.Type))
}

keyBytes, e := key.MarshalBinary()
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error marshaling XDR record %d of hash '%s': %s", n, hash.String(), e)}
return false
}

h := base64.StdEncoding.EncodeToString(keyBytes)

if seen[shasum] {
n++
continue
switch entry.Type {
case xdr.BucketEntryTypeLiveentry:
if !seen[h] && !removed[h] {
msr.readChan <- readResult{entry.MustLiveEntry(), nil}
seen[h] = true
}
seen[shasum] = true
case xdr.BucketEntryTypeDeadentry:
removed[h] = true
}

// since readChan is a buffered channel we block here until one item is consumed on the dequeue side.
// this is our intended behavior, which ensures we only buffer exactly bufferSize results in the channel.
msr.readChan <- readResult{liveEntry, nil}
select {
case <-msr.done:
// Close() called: stop processing buckets.
return false
default:
continue
}
// we can ignore dead entries because we're only ever concerned with the first live entry values
n++
}

panic("Shouldn't happen")
}

// GetSequence impl.
func (msr *MemoryStateReader) GetSequence() uint32 {
return msr.sequence
}

// Read returns a new ledger entry on each call, returning false when the stream ends
func (msr *MemoryStateReader) Read() (bool, xdr.LedgerEntry, error) {
msr.once.Do(func() {
go msr.bufferNext()
// Read returns a new ledger entry on each call, returning io.EOF when the stream ends.
func (msr *MemoryStateReader) Read() (xdr.LedgerEntry, error) {
msr.streamOnce.Do(func() {
go msr.streamBuckets()
})

// blocking call. anytime we consume from this channel, the background goroutine will stream in the next value
result, ok := <-msr.readChan
if !ok {
// when channel is closed then return false with empty values
return false, xdr.LedgerEntry{}, nil
// when channel is closed then return io.EOF
return xdr.LedgerEntry{}, EOF
}

if result.e != nil {
return true, xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e)
return xdr.LedgerEntry{}, errors.Wrap(result.e, "Error while reading from buckets")
}
return true, result.entry, nil
return result.entry, nil
}

func (msr *MemoryStateReader) close() {
close(msr.done)
}

// Close should be called when reading is finished.
func (msr *MemoryStateReader) Close() error {
msr.closeOnce.Do(msr.close)
return nil
}
Loading

0 comments on commit cca9f0a

Please sign in to comment.