Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

soroban-rpc: Use in-memory event store in getEvents handler #385

Merged
merged 12 commits into from
Feb 2, 2023
12 changes: 10 additions & 2 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/methods"
)
Expand Down Expand Up @@ -103,9 +104,16 @@ func MustNew(cfg config.LocalConfig) *Daemon {
logger.Fatalf("could not open database: %v", err)
}

ledgerRetentionWindow := uint32(cfg.LedgerRetentionWindow)
eventStore, err := events.NewMemoryStore(ledgerRetentionWindow)
if err != nil {
logger.Fatalf("could not create event store: %v", err)
}

ingestService, err := ingest.NewService(ingest.Config{
Logger: logger,
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, uint32(cfg.LedgerRetentionWindow)),
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, ledgerRetentionWindow),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand Down Expand Up @@ -134,7 +142,7 @@ func MustNew(cfg config.LocalConfig) *Daemon {

handler, err := internal.NewJSONRPCHandler(internal.HandlerParams{
AccountStore: methods.AccountStore{Client: hc},
EventStore: methods.EventStore{Client: hc},
EventStore: eventStore,
FriendbotURL: cfg.FriendbotURL,
NetworkPassphrase: cfg.NetworkPassphrase,
Logger: logger,
Expand Down
101 changes: 101 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package events

import (
"fmt"
"math"
"strconv"
"strings"

"github.com/stellar/go/toid"
)

// Cursor represents the position of a Soroban event.
// Soroban events are sorted in ascending order by
// ledger sequence, transaction index, operation index,
// and event index.
type Cursor struct {
// Ledger is the sequence of the ledger which emitted the event.
Ledger uint32
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
"%019d-%010d",
toid.New(int32(c.Ledger), int32(c.Tx), int32(c.Op)).ToInt64(),
c.Event,
)
}

// ParseCursor parses the given string and returns the corresponding cursor
func ParseCursor(input string) (Cursor, error) {
parts := strings.SplitN(input, "-", 2)
if len(parts) != 2 {
return Cursor{}, fmt.Errorf("invalid event id %s", input)
}

// Parse the first part (toid)
idInt, err := strconv.ParseInt(parts[0], 10, 64) //lint:ignore gomnd
if err != nil {
return Cursor{}, fmt.Errorf("invalid event id %s: %w", input, err)
}
parsed := toid.Parse(idInt)

// Parse the second part (event order)
eventOrder, err := strconv.ParseInt(parts[1], 10, 64) //lint:ignore gomnd
if err != nil {
return Cursor{}, fmt.Errorf("invalid event id %s: %w", input, err)
}

return Cursor{
Ledger: uint32(parsed.LedgerSequence),
Tx: uint32(parsed.TransactionOrder),
Op: uint32(parsed.OperationOrder),
Event: uint32(eventOrder),
}, nil
}

func cmp(a, b uint32) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
}

// Cmp compares two cursors.
// 0 is returned if the c is equal to other.
// 1 is returned if c is greater than other.
// -1 is returned if c is less than other.
func (c Cursor) Cmp(other Cursor) int {
if c.Ledger == other.Ledger {
if c.Tx == other.Tx {
if c.Op == other.Op {
return cmp(c.Event, other.Event)
}
return cmp(c.Op, other.Op)
}
return cmp(c.Tx, other.Tx)
}
return cmp(c.Ledger, other.Ledger)
}

var (
// MinCursor is the smallest possible cursor
MinCursor = Cursor{}
// MaxCursor is the largest possible cursor
MaxCursor = Cursor{
Ledger: math.MaxUint32,
Tx: math.MaxUint32,
Op: math.MaxUint32,
Event: math.MaxUint32,
}
)
84 changes: 84 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package events

import (
"github.com/stretchr/testify/assert"
tamirms marked this conversation as resolved.
Show resolved Hide resolved
"math"
"testing"
)

func TestParseCursor(t *testing.T) {
for _, cursor := range []Cursor{
{
Ledger: math.MaxInt32,
Tx: 1048575,
Op: 4095,
Event: math.MaxInt32,
},
{
Ledger: 0,
Tx: 0,
Op: 0,
Event: 0,
},
{
Ledger: 123,
Tx: 10,
Op: 5,
Event: 1,
},
} {
parsed, err := ParseCursor(cursor.String())
assert.NoError(t, err)
assert.Equal(t, cursor, parsed)
}
}

func TestCursorCmp(t *testing.T) {
for _, testCase := range []struct {
a Cursor
b Cursor
expected int
}{
{MinCursor, MaxCursor, -1},
{MinCursor, MinCursor, 0},
{MaxCursor, MaxCursor, 0},
{
Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4},
0,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 7, Tx: 2, Op: 3, Event: 4},
-1,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 5, Tx: 7, Op: 3, Event: 4},
-1,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 5, Tx: 2, Op: 7, Event: 4},
-1,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 7},
-1,
},
} {
a := testCase.a
b := testCase.b
expected := testCase.expected

if got := a.Cmp(b); got != expected {
t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got)
}
a, b = b, a
expected *= -1
if got := a.Cmp(b); got != expected {
t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got)
}
}
}
98 changes: 28 additions & 70 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,18 @@ package events
import (
tamirms marked this conversation as resolved.
Show resolved Hide resolved
"errors"
"fmt"
"github.com/stellar/go/ingest"
"io"
"math"
"sort"
"sync"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// Cursor represents the position of a Soroban event.
// Soroban events are sorted in ascending order by
// ledger sequence, transaction index, operation index,
// and event index.
type Cursor struct {
// Ledger is the sequence of the ledger which emitted the event.
Ledger uint32
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
}

func cmp(a, b uint32) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
}

// Cmp compares two cursors.
// 0 is returned if the c is equal to other.
// 1 is returned if c is greater than other.
// -1 is returned if c is less than other.
func (c Cursor) Cmp(other Cursor) int {
if c.Ledger == other.Ledger {
if c.Tx == other.Tx {
if c.Op == other.Op {
return cmp(c.Event, other.Event)
}
return cmp(c.Op, other.Op)
}
return cmp(c.Tx, other.Tx)
}
return cmp(c.Ledger, other.Ledger)
}

var (
// MinCursor is the smallest possible cursor
MinCursor = Cursor{}
// MaxCursor is the largest possible cursor
MaxCursor = Cursor{
Ledger: math.MaxUint32,
Tx: math.MaxUint32,
Op: math.MaxUint32,
Event: math.MaxUint32,
}
)

type bucket struct {
ledgerSeq uint32
events []event
ledgerSeq uint32
ledgerCloseTimestamp int64
events []event
}

type event struct {
Expand All @@ -89,6 +35,7 @@ func (e event) cursor(ledgerSeq uint32) Cursor {

// MemoryStore is an in-memory store of soroban events.
type MemoryStore struct {
// lock protects the mutable fields below
lock sync.RWMutex
// buckets is a circular buffer where each cell represents
// all events occurring within a specific ledger.
Expand Down Expand Up @@ -133,7 +80,7 @@ type Range struct {
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) bool) error {
func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, int64, xdr.ContractEvent) bool) error {
m.lock.RLock()
defer m.lock.RUnlock()

Expand All @@ -146,12 +93,13 @@ func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) b
i := ((curLedger - minLedger) + m.start) % uint32(len(m.buckets))
events := seek(m.buckets[i].events, eventRange.Start)
for ; curLedger == m.buckets[i].ledgerSeq; curLedger++ {
timestamp := m.buckets[i].ledgerCloseTimestamp
for _, event := range events {
cur := event.cursor(curLedger)
if eventRange.End.Cmp(cur) <= 0 {
return nil
}
if !f(cur, event.contents) {
if !f(cur, timestamp, event.contents) {
return nil
}
}
Expand Down Expand Up @@ -207,16 +155,21 @@ func seek(events []event, cursor Cursor) []event {
// IngestEvents adds new events from the given ledger into the store.
// As a side effect, events which fall outside the retention window are
// removed from the store.
func (m *MemoryStore) IngestEvents(txReader *ingest.LedgerTransactionReader) error {
events, err := readEvents(txReader)
func (m *MemoryStore) IngestEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) error {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
events, err := readEvents(networkPassphrase, ledgerCloseMeta)
if err != nil {
return err
}
ledgerSequence := txReader.GetSequence()
return m.append(ledgerSequence, events)
ledgerSequence := ledgerCloseMeta.LedgerSequence()
ledgerCloseTime := int64(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)
return m.append(ledgerSequence, ledgerCloseTime, events)
}

func readEvents(txReader *ingest.LedgerTransactionReader) ([]event, error) {
func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) ([]event, error) {
txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta)
if err != nil {
return nil, err
}
var events []event
for {
tx, err := txReader.Read()
Expand All @@ -243,11 +196,14 @@ func readEvents(txReader *ingest.LedgerTransactionReader) ([]event, error) {
}
}
}
if err := txReader.Close(); err != nil {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
return events, nil
}

// append adds new events to the circular buffer.
func (m *MemoryStore) append(sequence uint32, events []event) error {
func (m *MemoryStore) append(sequence uint32, ledgerCloseTimestamp int64, events []event) error {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -261,14 +217,16 @@ func (m *MemoryStore) append(sequence uint32, events []event) error {

if length < uint32(cap(m.buckets)) {
m.buckets = append(m.buckets, bucket{
ledgerSeq: sequence,
events: events,
ledgerCloseTimestamp: ledgerCloseTimestamp,
tamirms marked this conversation as resolved.
Show resolved Hide resolved
ledgerSeq: sequence,
events: events,
})
} else {
index := (m.start + length) % uint32(len(m.buckets))
m.buckets[index] = bucket{
ledgerSeq: sequence,
events: events,
ledgerCloseTimestamp: ledgerCloseTimestamp,
ledgerSeq: sequence,
events: events,
}
m.start++
}
Expand Down
Loading