Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/ingest: Ingest pipeline prototype #1264

Merged
merged 22 commits into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions exp/ingest/adapters/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stellar/go/support/historyarchive"
)

const msrBufferSize = 10
const msrBufferSize = 50000

// HistoryArchiveAdapter is an adapter for the historyarchive package to read from history archives
type HistoryArchiveAdapter struct {
Expand All @@ -30,7 +30,7 @@ func (haa *HistoryArchiveAdapter) GetLatestLedgerSequence() (uint32, error) {
}

// GetState returns a reader with the state of the ledger at the provided sequence number
func (haa *HistoryArchiveAdapter) GetState(sequence uint32) (io.StateReader, error) {
func (haa *HistoryArchiveAdapter) GetState(sequence uint32) (io.StateReadCloser, error) {
if !haa.archive.CategoryCheckpointExists("history", sequence) {
return nil, fmt.Errorf("history checkpoint does not exist for ledger %d", sequence)
}
Expand Down
5 changes: 3 additions & 2 deletions exp/ingest/adapters/history_archive_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"testing"

"github.com/stellar/go/exp/ingest/io"
"github.com/stellar/go/support/historyarchive"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -56,11 +57,11 @@ func TestGetState_Read(t *testing.T) {
return
}

ok, le, e := sr.Read()
le, e := sr.Read()
if !assert.NoError(t, e) {
return
}
assert.Equal(t, ok, true)
assert.NotEqual(t, e, io.EOF)

log.Printf("%v\n", le)
if !assert.NotNil(t, le) {
Expand Down
35 changes: 35 additions & 0 deletions exp/ingest/io/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io

import (
"io"

"github.com/stellar/go/xdr"
)

var EOF = io.EOF
var ErrClosedPipe = io.ErrClosedPipe

// StateReadCloser interface placeholder
type StateReadCloser interface {
GetSequence() uint32
// Read should return next ledger entry. If there are no more
// entries it should return `EOF` error.
Read() (xdr.LedgerEntry, error)
// Close should be called when reading is finished. This is especially
// helpful when there are still some entries available so reader can stop
// streaming them.
Close() error
}

// StateWriteCloser interface placeholder
type StateWriteCloser interface {
// Write is used to pass ledger entry to the next processor. It can return
// `ErrClosedPipe` when the pipe between processors has been closed meaning
// that next processor does not need more data. In such situation the current
// processor can terminate as sending more entries to a `StateWriteCloser`
// does not make sense (will not be read).
Write(xdr.LedgerEntry) error
// Close should be called when there are no more entries
// to write.
Close() error
}
216 changes: 124 additions & 92 deletions exp/ingest/io/memory_state_reader.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
package io

import (
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"regexp"
"sync"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/historyarchive"
"github.com/stellar/go/xdr"
)

var bucketRegex = regexp.MustCompile(`(bucket/[0-9a-z]{2}/[0-9a-z]{2}/[0-9a-z]{2}/bucket-[0-9a-z]+\.xdr\.gz)`)

// readResult is the result of reading a bucket value
type readResult struct {
entry xdr.LedgerEntry
e error
}

// MemoryStateReader is an in-memory streaming implementation that reads HistoryArchiveState
// MemoryStateReader is an in-memory streaming implementation that reads ledger entries
// from buckets for a given HistoryArchiveState.
// MemoryStateReader hides internal structure of buckets from the user so entries returned
// by `Read()` are exactly the ledger entries present at the given ledger.
type MemoryStateReader struct {
has *historyarchive.HistoryArchiveState
archive *historyarchive.Archive
sequence uint32
active bool
readChan chan readResult
once *sync.Once
has *historyarchive.HistoryArchiveState
archive *historyarchive.Archive
sequence uint32
readChan chan readResult
streamOnce sync.Once
closeOnce sync.Once
done chan bool
}

// enforce MemoryStateReader to implement StateReader
var _ StateReader = &MemoryStateReader{}
// enforce MemoryStateReader to implement StateReadCloser
var _ StateReadCloser = &MemoryStateReader{}

// MakeMemoryStateReader is a factory method for MemoryStateReader
func MakeMemoryStateReader(archive *historyarchive.Archive, sequence uint32, bufferSize uint16) (*MemoryStateReader, error) {
Expand All @@ -40,145 +42,175 @@ func MakeMemoryStateReader(archive *historyarchive.Archive, sequence uint32, buf
}

return &MemoryStateReader{
has: &has,
archive: archive,
sequence: sequence,
active: false,
readChan: make(chan readResult, bufferSize),
once: &sync.Once{},
has: &has,
archive: archive,
sequence: sequence,
readChan: make(chan readResult, bufferSize),
streamOnce: sync.Once{},
closeOnce: sync.Once{},
done: make(chan bool),
}, nil
}

func getBucketPath(r *regexp.Regexp, s string) (string, error) {
matches := r.FindStringSubmatch(s)
if len(matches) != 2 {
return "", fmt.Errorf("regex string submatch needs full match and one more subgroup, i.e. length should be 2 but was %d", len(matches))
}
return matches[1], nil
}

func (msr *MemoryStateReader) bufferNext() {
// streamBuckets is internal method that streams buckets from the given HAS.
//
// Buckets should be processed from oldest to newest, `snap` and then `curr` at
// each level. The correct value of ledger entry is the latest seen `LIVEENTRY`
// except the case when there's a `DEADENTRY` later which removes the entry.
//
// We can implement trivial algorithm (processing from oldest to newest buckets)
// but it requires to keep map of all entries in memory and stream what's left
// when all buckets are processed.
//
// However, we can modify this algorithm to work from newest to oldest ledgers:
//
// 1. For each `LIVEENTRY` we check if we've seen it before (`seen` map) or
// if we've seen `DEADENTRY` for it (`removed` map). If both conditions are
// false, we write that bucket entry to the stream and mark it as `seen`.
// 2. For each `DEADENTRY` we keep track of removed bucket entries in
// `removed` map.
//
// In such algorithm we just need to keep 2 maps with `bool` values that require
// much less memory space. The memory requirements will be lowered when CAP-0020
// is live. Finally, we can require `ingest/pipeline.StateProcessor` to return
// entry types it needs so that `MemoryStateReader` will only stream entry types
// required by a given pipeline.
func (msr *MemoryStateReader) streamBuckets() {
defer close(msr.readChan)
defer msr.closeOnce.Do(msr.close)

// iterate from newest to oldest bucket and track keys already seen
removed := map[string]bool{}
seen := map[string]bool{}
for _, hash := range msr.has.Buckets() {
if !msr.archive.BucketExists(hash) {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("bucket hash does not exist: %s", hash)}
return
}

// read bucket detail
filepathChan, errChan := msr.archive.ListBucket(historyarchive.HashPrefix(hash))
var buckets []string
for i := 0; i < len(msr.has.CurrentBuckets); i++ {
b := msr.has.CurrentBuckets[i]
buckets = append(buckets, b.Curr, b.Snap)
}

// read from channels
var filepath string
var e error
var ok bool
select {
case fp, okb := <-filepathChan:
// example filepath: prd/core-testnet/core_testnet_001/bucket/be/3c/bf/bucket-be3cbfc2d7e4272c01a1a22084573a04dad96bf77aa7fc2be4ce2dec8777b4f9.xdr.gz
filepath, e, ok = fp, nil, okb
case err, okb := <-errChan:
filepath, e, ok = "", err, okb
// TODO do we need to do anything special if e is nil here?
}
if !ok {
// move on to next bucket when this bucket is fully consumed or empty
continue
for _, hashString := range buckets {
hash, err := historyarchive.DecodeHash(hashString)
if err != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, errors.Wrap(err, "Error decoding bucket hash")}
return
}

// process values
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("received error on errChan when listing buckets for hash '%s': %s", hash, e)}
return
if hash.IsZero() {
continue
}

bucketPath, e := getBucketPath(bucketRegex, filepath)
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get bucket path for filepath '%s' with hash '%s': %s", filepath, hash, e)}
if !msr.archive.BucketExists(hash) {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("bucket hash does not exist: %s", hash)}
return
}

var shouldContinue bool
shouldContinue = msr.streamBucketContents(bucketPath, hash, seen)
shouldContinue = msr.streamBucketContents(hash, seen, removed)
if !shouldContinue {
return
break
}
}
}

// streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true
func (msr *MemoryStateReader) streamBucketContents(
bucketPath string,
hash historyarchive.Hash,
seen map[string]bool,
removed map[string]bool,
) bool {
rdr, e := msr.archive.GetXdrStream(bucketPath)
rdr, e := msr.archive.GetXdrStreamForHash(hash)
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get xdr stream for bucketPath '%s': %s", bucketPath, e)}
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("cannot get xdr stream for hash '%s': %s", hash.String(), e)}
return false
}
defer rdr.Close()

n := 0
n := -1
for {
n++

var entry xdr.BucketEntry
if e = rdr.ReadOne(&entry); e != nil {
if e == io.EOF {
// proceed to the next bucket hash
return true
}
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error on XDR record %d of bucketPath '%s': %s", n, bucketPath, e)}
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error on XDR record %d of hash '%s': %s", n, hash.String(), e)}
return false
}

liveEntry, ok := entry.GetLiveEntry()
if ok {
// ignore entry if we've seen it previously
key := liveEntry.LedgerKey()
keyBytes, e := key.MarshalBinary()
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error marshaling XDR record %d of bucketPath '%s': %s", n, bucketPath, e)}
return false
}
shasum := fmt.Sprintf("%x", sha256.Sum256(keyBytes))
var key xdr.LedgerKey

switch entry.Type {
case xdr.BucketEntryTypeLiveentry:
liveEntry := entry.MustLiveEntry()
key = liveEntry.LedgerKey()
case xdr.BucketEntryTypeDeadentry:
key = entry.MustDeadEntry()
default:
panic(fmt.Sprintf("Shouldn't happen in protocol <=10: BucketEntryType=%d", entry.Type))
}

keyBytes, e := key.MarshalBinary()
if e != nil {
msr.readChan <- readResult{xdr.LedgerEntry{}, fmt.Errorf("Error marshaling XDR record %d of hash '%s': %s", n, hash.String(), e)}
return false
}

h := base64.StdEncoding.EncodeToString(keyBytes)

if seen[shasum] {
n++
continue
switch entry.Type {
case xdr.BucketEntryTypeLiveentry:
if !seen[h] && !removed[h] {
msr.readChan <- readResult{entry.MustLiveEntry(), nil}
seen[h] = true
}
seen[shasum] = true
case xdr.BucketEntryTypeDeadentry:
removed[h] = true
}

// since readChan is a buffered channel we block here until one item is consumed on the dequeue side.
// this is our intended behavior, which ensures we only buffer exactly bufferSize results in the channel.
msr.readChan <- readResult{liveEntry, nil}
select {
case <-msr.done:
// Close() called: stop processing buckets.
return false
default:
continue
}
// we can ignore dead entries because we're only ever concerned with the first live entry values
n++
}

panic("Shouldn't happen")
}

// GetSequence impl.
func (msr *MemoryStateReader) GetSequence() uint32 {
return msr.sequence
}

// Read returns a new ledger entry on each call, returning false when the stream ends
func (msr *MemoryStateReader) Read() (bool, xdr.LedgerEntry, error) {
msr.once.Do(func() {
go msr.bufferNext()
// Read returns a new ledger entry on each call, returning io.EOF when the stream ends.
func (msr *MemoryStateReader) Read() (xdr.LedgerEntry, error) {
msr.streamOnce.Do(func() {
go msr.streamBuckets()
})

// blocking call. anytime we consume from this channel, the background goroutine will stream in the next value
result, ok := <-msr.readChan
if !ok {
// when channel is closed then return false with empty values
return false, xdr.LedgerEntry{}, nil
// when channel is closed then return io.EOF
return xdr.LedgerEntry{}, EOF
}

if result.e != nil {
return true, xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e)
return xdr.LedgerEntry{}, errors.Wrap(result.e, "Error while reading from buckets")
}
return true, result.entry, nil
return result.entry, nil
}

func (msr *MemoryStateReader) close() {
close(msr.done)
}

// Close should be called when reading is finished.
func (msr *MemoryStateReader) Close() error {
msr.closeOnce.Do(msr.close)
return nil
}
Loading