diff --git a/ingest/io/main.go b/ingest/io/main.go new file mode 100644 index 0000000000..e34e8b40fd --- /dev/null +++ b/ingest/io/main.go @@ -0,0 +1,25 @@ +package io + +import ( + "io" + + "github.com/stellar/go/xdr" +) + +var EOF = io.EOF + +// StateReader interface placeholder +type StateReader interface { + GetSequence() uint32 + // Read should return next ledger entry. If there are no more + // entries it should return `io.EOF` error. + Read() (xdr.LedgerEntry, error) +} + +// StateWriteCloser interface placeholder +type StateWriteCloser interface { + Write(xdr.LedgerEntry) error + // Close should be called when there are no more entries + // to write. + Close() error +} diff --git a/ingest/io/memory_state_reader.go b/ingest/io/memory_state_reader.go index a862f7dbdd..ba43eb18a3 100644 --- a/ingest/io/memory_state_reader.go +++ b/ingest/io/memory_state_reader.go @@ -158,20 +158,20 @@ func (msr *MemoryStateReader) GetSequence() uint32 { } // Read returns a new ledger entry on each call, returning false when the stream ends -func (msr *MemoryStateReader) Read() (bool, xdr.LedgerEntry, error) { +func (msr *MemoryStateReader) Read() (xdr.LedgerEntry, error) { if !msr.active { - return false, xdr.LedgerEntry{}, fmt.Errorf("memory state reader not active, need to call BufferReads() before calling Read()") + return xdr.LedgerEntry{}, fmt.Errorf("memory state reader not active, need to call BufferReads() before calling Read()") } // blocking call. anytime we consume from this channel, the background goroutine will stream in the next value result, ok := <-msr.readChan if !ok { // when channel is closed then return false with empty values - return false, xdr.LedgerEntry{}, nil + return xdr.LedgerEntry{}, nil } if result.e != nil { - return true, xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e) + return xdr.LedgerEntry{}, fmt.Errorf("error while reading from background channel: %s", result.e) } - return true, result.entry, nil + return result.entry, nil } diff --git a/ingest/io/state_reader.go b/ingest/io/state_reader.go deleted file mode 100644 index 74f24b1055..0000000000 --- a/ingest/io/state_reader.go +++ /dev/null @@ -1,9 +0,0 @@ -package io - -import "github.com/stellar/go/xdr" - -// StateReader interface placeholder -type StateReader interface { - GetSequence() uint32 - Read() (bool, xdr.LedgerEntry, error) -} diff --git a/ingest/pipeline/buffered_state_read_write_closer.go b/ingest/pipeline/buffered_state_read_write_closer.go index 27d1db37d6..d2a5112173 100644 --- a/ingest/pipeline/buffered_state_read_write_closer.go +++ b/ingest/pipeline/buffered_state_read_write_closer.go @@ -1,63 +1,42 @@ package pipeline import ( - "io" + "github.com/stellar/go/xdr" + "github.com/stellar/go/ingest/io" ) -const bufferSize = 4 +const bufferSize = 2000 -func (b *BufferedStateReadWriteCloser) init() { - b.buffer = make(chan byte, bufferSize) - b.closed = make(chan bool) +func (b *bufferedStateReadWriteCloser) init() { + b.buffer = make(chan xdr.LedgerEntry, bufferSize) } -func (b *BufferedStateReadWriteCloser) Read(p []byte) (n int, err error) { - b.initOnce.Do(b.init) +func (b *bufferedStateReadWriteCloser) GetSequence() uint32 { + return 0 +} - // This is to make sure to drain channel first if b.closed is ready. - // TODO move `case` contents to another method - select { - case rb := <-b.buffer: - p[0] = rb - return 1, nil - default: - } +func (b *bufferedStateReadWriteCloser) Read() (xdr.LedgerEntry, error) { + b.initOnce.Do(b.init) - select { - case rb := <-b.buffer: - p[0] = rb - return 1, nil - case <-b.closed: - return 0, io.EOF + entry, more := <-b.buffer + if more { + return entry, nil + } else { + return xdr.LedgerEntry{}, io.EOF } } -func (b *BufferedStateReadWriteCloser) Write(p []byte) (n int, err error) { +func (b *bufferedStateReadWriteCloser) Write(entry xdr.LedgerEntry) error { b.initOnce.Do(b.init) - b.buffer <- p[0] - return 1, nil + b.buffer <- entry + return nil } -func (b *BufferedStateReadWriteCloser) Close() error { +func (b *bufferedStateReadWriteCloser) Close() error { b.initOnce.Do(b.init) - b.closed <- true - close(b.closed) close(b.buffer) return nil } -func (b *BufferedStateReadWriteCloser) WriteCloseString(s string) { - b.initOnce.Do(b.init) - - for _, rb := range s { - _, err := b.Write([]byte{byte(rb)}) - if err != nil { - panic(err) - } - } - - err := b.Close() - if err != nil { - panic(err) - } -} +var _ io.StateReader = &bufferedStateReadWriteCloser{} +var _ io.StateWriteCloser = &bufferedStateReadWriteCloser{} diff --git a/ingest/pipeline/main.go b/ingest/pipeline/main.go index 68c398e951..3c04ea87bf 100644 --- a/ingest/pipeline/main.go +++ b/ingest/pipeline/main.go @@ -1,20 +1,23 @@ package pipeline import ( - "io" "sync" -) -// Proof of concept types -type ( - StateReader = io.Reader - StateWriteCloser = io.WriteCloser + "github.com/stellar/go/ingest/io" + "github.com/stellar/go/xdr" ) -type BufferedStateReadWriteCloser struct { +type bufferedStateReadWriteCloser struct { initOnce sync.Once - closed chan bool - buffer chan byte + // closed chan bool + buffer chan xdr.LedgerEntry +} + +type multiWriteCloser struct { + writers []io.StateWriteCloser + + mutex sync.Mutex + closeAfter int } type Pipeline struct { @@ -22,10 +25,6 @@ type Pipeline struct { done bool } -type multiWriteCloser struct { - writers []StateWriteCloser -} - type PipelineNode struct { Processor StateProcessor Children []*PipelineNode @@ -33,15 +32,21 @@ type PipelineNode struct { // StateProcessor defines methods required by state processing pipeline. type StateProcessor interface { - // ProcessState ... - ProcessState(store *Store, reader StateReader, writeCloser StateWriteCloser) (err error) - // IsConcurent defines if processing pipeline should start a single instance + // ProcessState is a main method of `StateProcessor`. It receives `io.StateReader` + // that contains object passed down the pipeline from the previous procesor. Writes to + // `io.StateWriteCloser` will be passed to the next processor. WARNING! `ProcessState` + // should **always** call `Close()` on `io.StateWriteCloser` when no more object will be + // written. + // Data required by following processors (like aggregated data) should be saved in + // `Store`. Read `Store` godoc to understand how to use it. + ProcessState(store *Store, reader io.StateReader, writeCloser io.StateWriteCloser) (err error) + // IsConcurrent defines if processing pipeline should start a single instance // of the processor or multiple instances. Multiple instances will read // from the same StateReader and write to the same StateWriter. // Example: you can calculate number of asset holders in a single processor but // you can also start multiple processors that sum asset holders in a shared // variable to calculate it faster. - IsConcurent() bool + IsConcurrent() bool // RequiresInput defines if processor requires input data (StateReader). If not, // it will receive empty reader, it's parent process will write to "void" and // writes to `writer` will go to "void". @@ -52,6 +57,15 @@ type StateProcessor interface { Name() string } +// Store allows storing data connected to pipeline execution. +// It exposes `Lock()` and `Unlock()` methods that must be called +// when accessing the `Store` for both `Put` and `Get` calls. +// +// Example (incrementing a number): +// s.Lock() +// v := s.Get("value") +// s.Put("value", v.(int)+1) +// s.Unlock() type Store struct { sync.Mutex initOnce sync.Once diff --git a/ingest/pipeline/multi_write_closer.go b/ingest/pipeline/multi_write_closer.go index 1aa19e88a4..07e5db87c0 100644 --- a/ingest/pipeline/multi_write_closer.go +++ b/ingest/pipeline/multi_write_closer.go @@ -1,31 +1,28 @@ package pipeline import ( - "io" - - "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" ) -func (t *multiWriteCloser) Write(p []byte) (n int, err error) { +func (t *multiWriteCloser) Write(entry xdr.LedgerEntry) error { for _, w := range t.writers { - // BufferedStateReadWriteCloser supports writing only one byte - // at a time so loop over more bytes - for _, rb := range p { - n, err = w.Write([]byte{rb}) - if err != nil { - return - } - - if n != 1 { - err = errors.Wrap(io.ErrShortWrite, "multiWriteCloser") - return - } + err := w.Write(entry) + if err != nil { + return err } } - return len(p), nil + return nil } func (m *multiWriteCloser) Close() error { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.closeAfter-- + if m.closeAfter > 0 { + return nil + } + for _, w := range m.writers { err := w.Close() if err != nil { diff --git a/ingest/pipeline/pipeline.go b/ingest/pipeline/pipeline.go index d2c5cf69e4..69f1aeb5b2 100644 --- a/ingest/pipeline/pipeline.go +++ b/ingest/pipeline/pipeline.go @@ -2,6 +2,8 @@ package pipeline import ( "sync" + + "github.com/stellar/go/ingest/io" ) func (p *Pipeline) Node(processor StateProcessor) *PipelineNode { @@ -14,35 +16,46 @@ func (p *Pipeline) AddStateProcessorTree(rootProcessor *PipelineNode) { p.rootStateProcessor = rootProcessor } -func (p *Pipeline) ProcessState(reader StateReader) (done chan error) { +func (p *Pipeline) ProcessState(reader io.StateReader) (done chan error) { return p.processStateNode(&Store{}, p.rootStateProcessor, reader) } -func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader StateReader) chan error { - outputs := make([]StateWriteCloser, len(node.Children)) +func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader io.StateReader) chan error { + outputs := make([]io.StateWriteCloser, len(node.Children)) for i := range outputs { - outputs[i] = &BufferedStateReadWriteCloser{} + outputs[i] = &bufferedStateReadWriteCloser{} } - writer := &multiWriteCloser{writers: outputs} - var wg sync.WaitGroup - wg.Add(1) + + jobs := 1 + if node.Processor.IsConcurrent() { + jobs = 10 + } + + writer := &multiWriteCloser{ + writers: outputs, + closeAfter: jobs, + } - go func(reader StateReader, writer StateWriteCloser) { - defer wg.Done() - err := node.Processor.ProcessState(store, reader, writer) - if err != nil { - panic(err) - } - }(reader, writer) + for i := 1; i <= jobs; i++ { + wg.Add(1) + go func(reader io.StateReader, writer io.StateWriteCloser) { + defer wg.Done() + err := node.Processor.ProcessState(store, reader, writer) + if err != nil { + // TODO return to pipeline error channel + panic(err) + } + }(reader, writer) + } for i, child := range node.Children { wg.Add(1) go func(i int, child *PipelineNode) { defer wg.Done() - done := p.processStateNode(store, child, outputs[i].(*BufferedStateReadWriteCloser)) + done := p.processStateNode(store, child, outputs[i].(*bufferedStateReadWriteCloser)) <-done }(i, child) } diff --git a/ingest/pipeline/pipeline_test.go b/ingest/pipeline/pipeline_test.go index eaaa7503b8..030f6a8f95 100644 --- a/ingest/pipeline/pipeline_test.go +++ b/ingest/pipeline/pipeline_test.go @@ -2,55 +2,150 @@ package pipeline import ( "fmt" - "io" - "io/ioutil" - "strings" "testing" "time" + "sync" + "strings" + + "github.com/stellar/go/ingest/io" + "github.com/stellar/go/xdr" + "github.com/stellar/go/keypair" ) +func AccountLedgerEntry() xdr.LedgerEntry { + random, err := keypair.Random() + if err != nil { + panic(err) + } + + id := xdr.AccountId{} + id.SetAddress(random.Address()) + + return xdr.LedgerEntry{ + LastModifiedLedgerSeq: 0, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: id, + }, + }, + } +} + +func TrustLineLedgerEntry() xdr.LedgerEntry { + random, err := keypair.Random() + if err != nil { + panic(err) + } + + id := xdr.AccountId{} + id.SetAddress(random.Address()) + + return xdr.LedgerEntry{ + LastModifiedLedgerSeq: 0, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTrustline, + TrustLine: &xdr.TrustLineEntry{ + AccountId: id, + }, + }, + } +} + +func TestStore(t *testing.T) { + var s Store + + s.Lock() + s.Put("value", 0) + s.Unlock() + + s.Lock() + v := s.Get("value") + s.Put("value", v.(int)+1) + s.Unlock() + + fmt.Println(s.Get("value")) +} + func TestBuffer(t *testing.T) { - buffer := &BufferedStateReadWriteCloser{} + buffer := &bufferedStateReadWriteCloser{} + + var wg sync.WaitGroup + wg.Add(2) go func() { - read, err := ioutil.ReadAll(buffer) - if err != nil { - panic(err) + defer wg.Done() + for { + entry, err := buffer.Read() + if err != nil { + if err == io.EOF { + break + } else { + panic(err) + } + } + fmt.Println("Read", entry.Data.Account.AccountId.Address()) + time.Sleep(4*time.Second) + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + buffer.Write(AccountLedgerEntry()) + fmt.Println("Wrote") + time.Sleep(time.Second) } - fmt.Println(read) + buffer.Close() }() - buffer.WriteCloseString("test") - time.Sleep(time.Second) + wg.Wait() } -func TestAbc(t *testing.T) { +func TestPipeline(t *testing.T) { pipeline := &Pipeline{} passthroughProcessor := &PassthroughProcessor{} - uppercaseProcessor := &UppercaseProcessor{} - lowercaseProcessor := &LowercaseProcessor{} + accountsOnlyFilter := &AccountsOnlyFilter{} printProcessor := &PrintProcessor{} pipeline.AddStateProcessorTree( pipeline.Node(passthroughProcessor). Pipe( - pipeline.Node(lowercaseProcessor). - Pipe(pipeline.Node(printProcessor)), - pipeline.Node(uppercaseProcessor). - Pipe(pipeline.Node(printProcessor)), - ), + pipeline.Node(accountsOnlyFilter). + Pipe( + pipeline.Node(&CountPrefixProcessor{Prefix: "GA"}). + Pipe(pipeline.Node(printProcessor)), + pipeline.Node(&CountPrefixProcessor{Prefix: "GB"}). + Pipe(pipeline.Node(printProcessor)), + pipeline.Node(&CountPrefixProcessor{Prefix: "GC"}). + Pipe(pipeline.Node(printProcessor)), + pipeline.Node(&CountPrefixProcessor{Prefix: "GD"}). + Pipe(pipeline.Node(printProcessor)), + ), + ), ) - buffer := &BufferedStateReadWriteCloser{} - go buffer.WriteCloseString("testTEST") + buffer := &bufferedStateReadWriteCloser{} + + go func() { + for i := 0; i < 10000; i++ { + buffer.Write(AccountLedgerEntry()) + buffer.Write(TrustLineLedgerEntry()) + } + buffer.Close() + }() + done := pipeline.ProcessState(buffer) <-done } -type SimpleProcessor struct{} +type SimpleProcessor struct{ + sync.Mutex + callCount int +} -func (n *SimpleProcessor) IsConcurent() bool { +func (n *SimpleProcessor) IsConcurrent() bool { return false } @@ -58,14 +153,29 @@ func (n *SimpleProcessor) RequiresInput() bool { return true } +func (n *SimpleProcessor) CallCount() int { + n.Lock() + defer n.Unlock() + n.callCount++ + return n.callCount +} + type PassthroughProcessor struct { SimpleProcessor } -func (p *PassthroughProcessor) ProcessState(store *Store, r StateReader, w StateWriteCloser) error { - _, err := io.Copy(w, r) - if err != nil { - return err +func (p *PassthroughProcessor) ProcessState(store *Store, r io.StateReader, w io.StateWriteCloser) error { + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + w.Write(entry) } w.Close() @@ -76,93 +186,111 @@ func (p *PassthroughProcessor) Name() string { return "PassthroughProcessor" } -type UppercaseProcessor struct { +type AccountsOnlyFilter struct { SimpleProcessor } -func (p *UppercaseProcessor) ProcessState(store *Store, r StateReader, w StateWriteCloser) error { - defer w.Close() - - lettersCount := make(map[byte]int) +func (p *AccountsOnlyFilter) ProcessState(store *Store, r io.StateReader, w io.StateWriteCloser) error { for { - b := make([]byte, 1) - rn, rerr := r.Read(b) - if rn == 1 { - lettersCount[b[0]]++ - - newLetter := b[0] - if b[0] >= 97 && b[0] <= 122 { - newLetter -= 32 - } - - _, werr := w.Write([]byte{newLetter}) - if werr != nil { - return werr - } - } - - if rerr != nil { - if rerr == io.EOF { + entry, err := r.Read() + if err != nil { + if err == io.EOF { break } else { - return rerr + return err } } - } - store.Lock() - store.Put("letterCount", lettersCount) - store.Unlock() + if entry.Data.Type == xdr.LedgerEntryTypeAccount { + w.Write(entry) + } + } + w.Close() return nil } -func (p *UppercaseProcessor) Name() string { - return "UppercaseProcessor" +func (p *AccountsOnlyFilter) Name() string { + return "AccountsOnlyFilter" } -type LowercaseProcessor struct { +type CountPrefixProcessor struct { SimpleProcessor + Prefix string } -func (p *LowercaseProcessor) ProcessState(store *Store, r StateReader, w StateWriteCloser) error { - // This will read all into memory. See UppercaseProcessor for streaming - // example. - read, err := ioutil.ReadAll(r) - if err != nil { - return err - } +func (p *CountPrefixProcessor) ProcessState(store *Store, r io.StateReader, w io.StateWriteCloser) error { + // Close writer when we're done + defer w.Close() - n := strings.ToLower(string(read)) + count := 0 - defer w.Close() - _, err = fmt.Fprint(w, n) - if err != nil { - return err + for { + entry, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + address := entry.Data.Account.AccountId.Address() + + if strings.HasPrefix(address, p.Prefix) { + count++ + } } + store.Lock() + prevCount := store.Get("count"+p.Prefix) + if prevCount != nil { + count += prevCount.(int) + } + store.Put("count"+p.Prefix, count) + store.Unlock() + return nil } -func (p *LowercaseProcessor) Name() string { - return "LowercaseProcessor" +func (p *CountPrefixProcessor) IsConcurrent() bool { + return true +} + +func (p *CountPrefixProcessor) Name() string { + return "CountPrefixProcessor" } type PrintProcessor struct { SimpleProcessor } -func (p *PrintProcessor) ProcessState(store *Store, r StateReader, w StateWriteCloser) error { +func (p *PrintProcessor) ProcessState(store *Store, r io.StateReader, w io.StateWriteCloser) error { defer w.Close() - read, err := ioutil.ReadAll(r) - if err != nil { - return err + // This should be a helper function or a method on `io.StateReader`. + for { + _, err := r.Read() + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + } + + if p.CallCount() != 4 { + return nil } store.Lock() - fmt.Println(string(read), store.Get("letterCount")) + fmt.Println("countGA", store.Get("countGA")) + fmt.Println("countGB", store.Get("countGB")) + fmt.Println("countGC", store.Get("countGC")) + fmt.Println("countGD", store.Get("countGD")) store.Unlock() + return nil }