Skip to content

Commit

Permalink
Byte to LedgerEntry
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed May 10, 2019
1 parent 18ea946 commit f8c0993
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 181 deletions.
25 changes: 25 additions & 0 deletions ingest/io/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io

import (
"io"

"github.com/stellar/go/xdr"
)

var EOF = io.EOF

// StateReader interface placeholder
type StateReader interface {
GetSequence() uint32
// Read should return next ledger entry. If there are no more
// entries it should return `io.EOF` error.
Read() (xdr.LedgerEntry, error)
}

// StateWriteCloser interface placeholder
type StateWriteCloser interface {
Write(xdr.LedgerEntry) error
// Close should be called when there are no more entries
// to write.
Close() error
}
10 changes: 5 additions & 5 deletions ingest/io/memory_state_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,20 @@ func (msr *MemoryStateReader) GetSequence() uint32 {
}

// Read returns a new ledger entry on each call, returning false when the stream ends
func (msr *MemoryStateReader) Read() (bool, xdr.LedgerEntry, error) {
func (msr *MemoryStateReader) Read() (xdr.LedgerEntry, error) {
if !msr.active {
return false, xdr.LedgerEntry{}, fmt.Errorf("memory state reader not active, need to call BufferReads() before calling Read()")
return xdr.LedgerEntry{}, fmt.Errorf("memory state reader not active, need to call BufferReads() before calling Read()")
}

// blocking call. anytime we consume from this channel, the background goroutine will stream in the next value
result, ok := <-msr.readChan
if !ok {
// when channel is closed then return false with empty values
return false, xdr.LedgerEntry{}, nil
return xdr.LedgerEntry{}, nil
}

if result.e != nil {
return true, xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e)
return xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e)
}
return true, result.entry, nil
return result.entry, nil
}
9 changes: 0 additions & 9 deletions ingest/io/state_reader.go

This file was deleted.

63 changes: 21 additions & 42 deletions ingest/pipeline/buffered_state_read_write_closer.go
Original file line number Diff line number Diff line change
@@ -1,63 +1,42 @@
package pipeline

import (
"io"
"github.com/stellar/go/xdr"
"github.com/stellar/go/ingest/io"
)

const bufferSize = 4
const bufferSize = 2000

func (b *BufferedStateReadWriteCloser) init() {
b.buffer = make(chan byte, bufferSize)
b.closed = make(chan bool)
func (b *bufferedStateReadWriteCloser) init() {
b.buffer = make(chan xdr.LedgerEntry, bufferSize)
}

func (b *BufferedStateReadWriteCloser) Read(p []byte) (n int, err error) {
b.initOnce.Do(b.init)
func (b *bufferedStateReadWriteCloser) GetSequence() uint32 {
return 0
}

// This is to make sure to drain channel first if b.closed is ready.
// TODO move `case` contents to another method
select {
case rb := <-b.buffer:
p[0] = rb
return 1, nil
default:
}
func (b *bufferedStateReadWriteCloser) Read() (xdr.LedgerEntry, error) {
b.initOnce.Do(b.init)

select {
case rb := <-b.buffer:
p[0] = rb
return 1, nil
case <-b.closed:
return 0, io.EOF
entry, more := <-b.buffer
if more {
return entry, nil
} else {
return xdr.LedgerEntry{}, io.EOF
}
}

func (b *BufferedStateReadWriteCloser) Write(p []byte) (n int, err error) {
func (b *bufferedStateReadWriteCloser) Write(entry xdr.LedgerEntry) error {
b.initOnce.Do(b.init)
b.buffer <- p[0]
return 1, nil
b.buffer <- entry
return nil
}

func (b *BufferedStateReadWriteCloser) Close() error {
func (b *bufferedStateReadWriteCloser) Close() error {
b.initOnce.Do(b.init)
b.closed <- true
close(b.closed)
close(b.buffer)
return nil
}

func (b *BufferedStateReadWriteCloser) WriteCloseString(s string) {
b.initOnce.Do(b.init)

for _, rb := range s {
_, err := b.Write([]byte{byte(rb)})
if err != nil {
panic(err)
}
}

err := b.Close()
if err != nil {
panic(err)
}
}
var _ io.StateReader = &bufferedStateReadWriteCloser{}
var _ io.StateWriteCloser = &bufferedStateReadWriteCloser{}
48 changes: 31 additions & 17 deletions ingest/pipeline/main.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@
package pipeline

import (
"io"
"sync"
)

// Proof of concept types
type (
StateReader = io.Reader
StateWriteCloser = io.WriteCloser
"github.com/stellar/go/ingest/io"
"github.com/stellar/go/xdr"
)

type BufferedStateReadWriteCloser struct {
type bufferedStateReadWriteCloser struct {
initOnce sync.Once
closed chan bool
buffer chan byte
// closed chan bool
buffer chan xdr.LedgerEntry
}

type multiWriteCloser struct {
writers []io.StateWriteCloser

mutex sync.Mutex
closeAfter int
}

type Pipeline struct {
rootStateProcessor *PipelineNode
done bool
}

type multiWriteCloser struct {
writers []StateWriteCloser
}

type PipelineNode struct {
Processor StateProcessor
Children []*PipelineNode
}

// StateProcessor defines methods required by state processing pipeline.
type StateProcessor interface {
// ProcessState ...
ProcessState(store *Store, reader StateReader, writeCloser StateWriteCloser) (err error)
// IsConcurent defines if processing pipeline should start a single instance
// ProcessState is a main method of `StateProcessor`. It receives `io.StateReader`
// that contains object passed down the pipeline from the previous procesor. Writes to
// `io.StateWriteCloser` will be passed to the next processor. WARNING! `ProcessState`
// should **always** call `Close()` on `io.StateWriteCloser` when no more object will be
// written.
// Data required by following processors (like aggregated data) should be saved in
// `Store`. Read `Store` godoc to understand how to use it.
ProcessState(store *Store, reader io.StateReader, writeCloser io.StateWriteCloser) (err error)
// IsConcurrent defines if processing pipeline should start a single instance
// of the processor or multiple instances. Multiple instances will read
// from the same StateReader and write to the same StateWriter.
// Example: you can calculate number of asset holders in a single processor but
// you can also start multiple processors that sum asset holders in a shared
// variable to calculate it faster.
IsConcurent() bool
IsConcurrent() bool
// RequiresInput defines if processor requires input data (StateReader). If not,
// it will receive empty reader, it's parent process will write to "void" and
// writes to `writer` will go to "void".
Expand All @@ -52,6 +57,15 @@ type StateProcessor interface {
Name() string
}

// Store allows storing data connected to pipeline execution.
// It exposes `Lock()` and `Unlock()` methods that must be called
// when accessing the `Store` for both `Put` and `Get` calls.
//
// Example (incrementing a number):
// s.Lock()
// v := s.Get("value")
// s.Put("value", v.(int)+1)
// s.Unlock()
type Store struct {
sync.Mutex
initOnce sync.Once
Expand Down
31 changes: 14 additions & 17 deletions ingest/pipeline/multi_write_closer.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
package pipeline

import (
"io"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

func (t *multiWriteCloser) Write(p []byte) (n int, err error) {
func (t *multiWriteCloser) Write(entry xdr.LedgerEntry) error {
for _, w := range t.writers {
// BufferedStateReadWriteCloser supports writing only one byte
// at a time so loop over more bytes
for _, rb := range p {
n, err = w.Write([]byte{rb})
if err != nil {
return
}

if n != 1 {
err = errors.Wrap(io.ErrShortWrite, "multiWriteCloser")
return
}
err := w.Write(entry)
if err != nil {
return err
}
}
return len(p), nil
return nil
}

func (m *multiWriteCloser) Close() error {
m.mutex.Lock()
defer m.mutex.Unlock()

m.closeAfter--
if m.closeAfter > 0 {
return nil
}

for _, w := range m.writers {
err := w.Close()
if err != nil {
Expand Down
43 changes: 28 additions & 15 deletions ingest/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package pipeline

import (
"sync"

"github.com/stellar/go/ingest/io"
)

func (p *Pipeline) Node(processor StateProcessor) *PipelineNode {
Expand All @@ -14,35 +16,46 @@ func (p *Pipeline) AddStateProcessorTree(rootProcessor *PipelineNode) {
p.rootStateProcessor = rootProcessor
}

func (p *Pipeline) ProcessState(reader StateReader) (done chan error) {
func (p *Pipeline) ProcessState(reader io.StateReader) (done chan error) {
return p.processStateNode(&Store{}, p.rootStateProcessor, reader)
}

func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader StateReader) chan error {
outputs := make([]StateWriteCloser, len(node.Children))
func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader io.StateReader) chan error {
outputs := make([]io.StateWriteCloser, len(node.Children))

for i := range outputs {
outputs[i] = &BufferedStateReadWriteCloser{}
outputs[i] = &bufferedStateReadWriteCloser{}
}

writer := &multiWriteCloser{writers: outputs}

var wg sync.WaitGroup
wg.Add(1)

jobs := 1
if node.Processor.IsConcurrent() {
jobs = 10
}

writer := &multiWriteCloser{
writers: outputs,
closeAfter: jobs,
}

go func(reader StateReader, writer StateWriteCloser) {
defer wg.Done()
err := node.Processor.ProcessState(store, reader, writer)
if err != nil {
panic(err)
}
}(reader, writer)
for i := 1; i <= jobs; i++ {
wg.Add(1)
go func(reader io.StateReader, writer io.StateWriteCloser) {
defer wg.Done()
err := node.Processor.ProcessState(store, reader, writer)
if err != nil {
// TODO return to pipeline error channel
panic(err)
}
}(reader, writer)
}

for i, child := range node.Children {
wg.Add(1)
go func(i int, child *PipelineNode) {
defer wg.Done()
done := p.processStateNode(store, child, outputs[i].(*BufferedStateReadWriteCloser))
done := p.processStateNode(store, child, outputs[i].(*bufferedStateReadWriteCloser))
<-done
}(i, child)
}
Expand Down
Loading

0 comments on commit f8c0993

Please sign in to comment.