Skip to content

Commit

Permalink
Signers for account demo
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed May 22, 2019
1 parent e690a0a commit c3753ec
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 6 deletions.
2 changes: 1 addition & 1 deletion exp/ingest/adapters/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stellar/go/support/historyarchive"
)

const msrBufferSize = 10
const msrBufferSize = 50000

// HistoryArchiveAdapter is an adapter for the historyarchive package to read from history archives
type HistoryArchiveAdapter struct {
Expand Down
12 changes: 7 additions & 5 deletions exp/ingest/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,19 @@ func (p *Pipeline) processStateNode(store *Store, node *PipelineNode, reader io.
go func() {
// Update stats
for {
readBuffer := reader.(*bufferedStateReadWriteCloser)
readBuffer, readBufferIsBufferedStateReadWriteCloser := reader.(*bufferedStateReadWriteCloser)
writeBuffer := writer

interval := time.Second

node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval)
node.writesPerSecond = (writeBuffer.wroteEntries - node.wroteEntries) * int(time.Second/interval)

node.wroteEntries = writeBuffer.wroteEntries
node.readEntries = readBuffer.readEntries
node.queuedEntries = readBuffer.QueuedEntries()

if readBufferIsBufferedStateReadWriteCloser {
node.readsPerSecond = (readBuffer.readEntries - node.readEntries) * int(time.Second/interval)
node.readEntries = readBuffer.readEntries
node.queuedEntries = readBuffer.QueuedEntries()
}

time.Sleep(interval)
}
Expand Down
142 changes: 142 additions & 0 deletions exp/tools/accounts-for-signer/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package main

import (
"fmt"
"sync"

"github.com/stellar/go/exp/ingest/io"
"github.com/stellar/go/exp/ingest/pipeline"
"github.com/stellar/go/xdr"
)

type SimpleProcessor struct {
sync.Mutex
callCount int
}

func (n *SimpleProcessor) IsConcurrent() bool {
return false
}

func (n *SimpleProcessor) RequiresInput() bool {
return true
}

func (n *SimpleProcessor) CallCount() int {
n.Lock()
defer n.Unlock()
n.callCount++
return n.callCount
}

type EntryTypeFilter struct {
SimpleProcessor

Type xdr.LedgerEntryType
}

func (p *EntryTypeFilter) ProcessState(store *pipeline.Store, r io.StateReader, w io.StateWriteCloser) error {
for {
entry, err := r.Read()
if err != nil {
if err == io.EOF {
break
} else {
return err
}
}

if entry.Data.Type == p.Type {
w.Write(entry)
}
}

w.Close()
return nil
}

func (p *EntryTypeFilter) Name() string {
return fmt.Sprintf("EntryTypeFilter (%s)", p.Type)
}

type AccountsForSignerProcessor struct {
SimpleProcessor

Signer string
}

func (p *AccountsForSignerProcessor) ProcessState(store *pipeline.Store, r io.StateReader, w io.StateWriteCloser) error {
for {
entry, err := r.Read()
if err != nil {
if err == io.EOF {
break
} else {
return err
}
}

if entry.Data.Type != xdr.LedgerEntryTypeAccount {
continue
}

for _, signer := range entry.Data.Account.Signers {
if signer.Key.Address() == p.Signer {
w.Write(entry)
break
}
}
}

w.Close()
return nil
}

func (p *AccountsForSignerProcessor) Name() string {
return "AccountsForSignerProcessor"
}

func (n *AccountsForSignerProcessor) IsConcurrent() bool {
return true
}

type PrintAllProcessor struct {
SimpleProcessor
}

func (p *PrintAllProcessor) ProcessState(store *pipeline.Store, r io.StateReader, w io.StateWriteCloser) error {
defer w.Close()

var accounts []string

entries := 0
for {
entry, err := r.Read()
if err != nil {
if err == io.EOF {
break
} else {
return err
}
}

entries++
switch entry.Data.Type {
case xdr.LedgerEntryTypeAccount:
accounts = append(accounts, entry.Data.Account.AccountId.Address())
default:
// Ignore for now
}
}

fmt.Printf("Found %d entries:\n", entries)
for _, account := range accounts {
fmt.Println(account)
}

return nil
}

func (p *PrintAllProcessor) Name() string {
return "PrintAllProcessor"
}
95 changes: 95 additions & 0 deletions exp/tools/accounts-for-signer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"fmt"
"runtime"
"time"

"github.com/stellar/go/exp/ingest/adapters"
"github.com/stellar/go/exp/ingest/pipeline"
"github.com/stellar/go/support/historyarchive"
"github.com/stellar/go/xdr"
)

func main() {
archive, err := archive()
if err != nil {
panic(err)
}

historyAdapter := ingestadapters.MakeHistoryArchiveAdapter(archive)

seq, err := historyAdapter.GetLatestLedgerSequence()
if err != nil {
panic(err)
}

fmt.Printf("Getting data for ledger seq = %d\n", seq)

stateReader, err := historyAdapter.GetState(seq)
if err != nil {
panic(err)
}

p, err := buildPipeline()
if err != nil {
panic(err)
}

done := p.ProcessState(stateReader)
startTime := time.Now()
go printPipelineStats(p, startTime)
<-done
}

func archive() (*historyarchive.Archive, error) {
return historyarchive.Connect(
fmt.Sprintf("s3://history.stellar.org/prd/core-live/core_live_001/"),
historyarchive.ConnectOptions{
S3Region: "eu-west-1",
UnsignedRequests: true,
},
)
}

func buildPipeline() (*pipeline.Pipeline, error) {
p := &pipeline.Pipeline{}

p.AddStateProcessorTree(
// Passes accounts only
p.Node(&EntryTypeFilter{Type: xdr.LedgerEntryTypeAccount}).
Pipe(
// Finds accounts for a single signer
p.Node(&AccountsForSignerProcessor{Signer: "GBMALBYJT6A73SYQWOWVVCGSPUPJPBX4AFDJ7A63GG64QCNRCAFYWWEN"}).
Pipe(p.Node(&PrintAllProcessor{})),
),
)

return p, nil
}

func printPipelineStats(p *pipeline.Pipeline, startTime time.Time) {
for {
var m runtime.MemStats
runtime.ReadMemStats(&m)

fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tHeapAlloc = %v MiB", bToMb(m.HeapAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v", m.NumGC)
fmt.Printf("\tGoroutines = %v", runtime.NumGoroutine())
fmt.Printf("\tNumCPU = %v\n\n", runtime.NumCPU())

fmt.Printf("Duration: %s\n", time.Since(startTime))
fmt.Println("Pipeline status:")
p.PrintStatus()

fmt.Println("========================================")

time.Sleep(10 * time.Second)
}
}

func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

0 comments on commit c3753ec

Please sign in to comment.