Skip to content

Commit

Permalink
Pipeline example
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed May 10, 2019
1 parent f8c0993 commit 5e64994
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 39 deletions.
11 changes: 9 additions & 2 deletions ingest/pipeline/buffered_state_read_write_closer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package pipeline

import (
"github.com/stellar/go/xdr"
"github.com/stellar/go/ingest/io"
"github.com/stellar/go/xdr"
)

const bufferSize = 2000
const bufferSize = 50000

func (b *bufferedStateReadWriteCloser) init() {
b.buffer = make(chan xdr.LedgerEntry, bufferSize)
Expand All @@ -20,6 +20,7 @@ func (b *bufferedStateReadWriteCloser) Read() (xdr.LedgerEntry, error) {

entry, more := <-b.buffer
if more {
b.readEntries++
return entry, nil
} else {
return xdr.LedgerEntry{}, io.EOF
Expand All @@ -29,9 +30,15 @@ func (b *bufferedStateReadWriteCloser) Read() (xdr.LedgerEntry, error) {
func (b *bufferedStateReadWriteCloser) Write(entry xdr.LedgerEntry) error {
b.initOnce.Do(b.init)
b.buffer <- entry
b.wroteEntries++
return nil
}

func (b *bufferedStateReadWriteCloser) QueuedEntries() int {
b.initOnce.Do(b.init)
return len(b.buffer)
}

func (b *bufferedStateReadWriteCloser) Close() error {
b.initOnce.Do(b.init)
close(b.buffer)
Expand Down
19 changes: 16 additions & 3 deletions ingest/pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ package pipeline

import (
"sync"
"time"

"github.com/stellar/go/ingest/io"
"github.com/stellar/go/xdr"
)

type bufferedStateReadWriteCloser struct {
initOnce sync.Once
// closed chan bool
buffer chan xdr.LedgerEntry

readEntries int
wroteEntries int
}

type multiWriteCloser struct {
writers []io.StateWriteCloser

mutex sync.Mutex
closeAfter int
mutex sync.Mutex
closeAfter int
wroteEntries int
}

type Pipeline struct {
Expand All @@ -28,6 +32,14 @@ type Pipeline struct {
type PipelineNode struct {
Processor StateProcessor
Children []*PipelineNode

duration time.Duration
jobs int
readEntries int
readsPerSecond int
queuedEntries int
wroteEntries int
writesPerSecond int
}

// StateProcessor defines methods required by state processing pipeline.
Expand All @@ -52,6 +64,7 @@ type StateProcessor interface {
// writes to `writer` will go to "void".
// This is useful for processors resposible for saving aggregated data that don't
// need state objects.
// TODO!
RequiresInput() bool
// Returns processor name. Helpful for errors, debuging and reports.
Name() string
Expand Down
32 changes: 29 additions & 3 deletions ingest/pipeline/multi_write_closer.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
package pipeline

import (
"sync"

"github.com/stellar/go/ingest/io"
"github.com/stellar/go/xdr"
)

func (t *multiWriteCloser) Write(entry xdr.LedgerEntry) error {
for _, w := range t.writers {
err := w.Write(entry)
func (m *multiWriteCloser) Write(entry xdr.LedgerEntry) error {
m.mutex.Lock()
m.wroteEntries++
m.mutex.Unlock()

var wg sync.WaitGroup
results := make(chan error, len(m.writers))

for _, w := range m.writers {
wg.Add(1)
go func(w io.StateWriteCloser) {
defer wg.Done()
err := w.Write(entry)
if err != nil {
results <- err
} else {
results <- nil
}
}(w)
}

wg.Wait()

for range m.writers {
err := <-results
if err != nil {
return err
}
}

return nil
}

Expand Down
75 changes: 72 additions & 3 deletions ingest/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package pipeline

import (
"fmt"
"strings"
"sync"
"time"

"github.com/stellar/go/ingest/io"
)
Expand All @@ -12,6 +15,50 @@ func (p *Pipeline) Node(processor StateProcessor) *PipelineNode {
}
}

func (p *Pipeline) PrintStatus() {
p.printNodeStatus(p.rootStateProcessor, 0)
}

func (p *Pipeline) printNodeStatus(node *PipelineNode, level int) {
fmt.Print(strings.Repeat(" ", level))

var wrRatio = float32(0)
if node.readEntries > 0 {
wrRatio = float32(node.wroteEntries) / float32(node.readEntries)
}

icon := ""
if node.queuedEntries > bufferSize/10*9 {
icon = "⚠️ "
}

fmt.Printf(
"└ %s%s read=%d (queued=%d rps=%d) wrote=%d (w/r ratio = %1.5f) concurrent=%t jobs=%d\n",
icon,
node.Processor.Name(),
node.readEntries,
node.queuedEntries,
node.readsPerSecond,
node.wroteEntries,
wrRatio,
node.Processor.IsConcurrent(),
node.jobs,
)

if node.jobs > 1 {
fmt.Print(strings.Repeat(" ", level))
fmt.Print(" ")
for i := 0; i < node.jobs; i++ {
fmt.Print("• ")
}
fmt.Println("")
}

for _, child := range node.Children {
p.printNodeStatus(child, level+1)
}
}

func (p *Pipeline) AddStateProcessorTree(rootProcessor *PipelineNode) {
p.rootStateProcessor = rootProcessor
}
Expand All @@ -28,21 +75,24 @@ func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader io.
}

var wg sync.WaitGroup

jobs := 1
if node.Processor.IsConcurrent() {
jobs = 10
jobs = 20
}

node.jobs = jobs

writer := &multiWriteCloser{
writers: outputs,
writers: outputs,
closeAfter: jobs,
}

for i := 1; i <= jobs; i++ {
wg.Add(1)
go func(reader io.StateReader, writer io.StateWriteCloser) {
defer wg.Done()

err := node.Processor.ProcessState(store, reader, writer)
if err != nil {
// TODO return to pipeline error channel
Expand All @@ -51,6 +101,25 @@ func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader io.
}(reader, writer)
}

go func() {
// Update stats
for {
readBuffer := reader.(*bufferedStateReadWriteCloser)
writeBuffer := writer

interval := time.Second

node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval)
node.writesPerSecond = (writeBuffer.wroteEntries - node.wroteEntries) * int(time.Second/interval)

node.wroteEntries = writeBuffer.wroteEntries
node.readEntries = readBuffer.readEntries
node.queuedEntries = readBuffer.QueuedEntries()

time.Sleep(interval)
}
}()

for i, child := range node.Children {
wg.Add(1)
go func(i int, child *PipelineNode) {
Expand Down
Loading

0 comments on commit 5e64994

Please sign in to comment.