Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

soroban-rpc: Use in-memory event store in getEvents handler #385

Merged
merged 12 commits into from
Feb 2, 2023
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ type LocalConfig struct {
LedgerEntryStorageTimeout time.Duration
LedgerRetentionWindow int
CheckpointFrequency uint32
MaxEventsLimit uint
}
13 changes: 11 additions & 2 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/methods"
)
Expand Down Expand Up @@ -103,9 +104,16 @@ func MustNew(cfg config.LocalConfig) *Daemon {
logger.Fatalf("could not open database: %v", err)
}

ledgerRetentionWindow := uint32(cfg.LedgerRetentionWindow)
eventStore, err := events.NewMemoryStore(cfg.NetworkPassphrase, ledgerRetentionWindow)
if err != nil {
logger.Fatalf("could not create event store: %v", err)
}

ingestService, err := ingest.NewService(ingest.Config{
Logger: logger,
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, uint32(cfg.LedgerRetentionWindow)),
DB: db.NewReadWriter(dbConn, maxLedgerEntryWriteBatchSize, ledgerRetentionWindow),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand Down Expand Up @@ -134,13 +142,14 @@ func MustNew(cfg config.LocalConfig) *Daemon {

handler, err := internal.NewJSONRPCHandler(internal.HandlerParams{
AccountStore: methods.AccountStore{Client: hc},
EventStore: methods.EventStore{Client: hc},
EventStore: eventStore,
FriendbotURL: cfg.FriendbotURL,
NetworkPassphrase: cfg.NetworkPassphrase,
Logger: logger,
TransactionProxy: transactionProxy,
CoreClient: &stellarcore.Client{URL: cfg.StellarCoreURL},
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
MaxEventsLimit: cfg.MaxEventsLimit,
tamirms marked this conversation as resolved.
Show resolved Hide resolved
})
if err != nil {
logger.Fatalf("could not create handler: %v", err)
Expand Down
101 changes: 101 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package events

import (
"fmt"
"math"
"strconv"
"strings"

"github.com/stellar/go/toid"
)

// Cursor represents the position of a Soroban event.
// Soroban events are sorted in ascending order by
// ledger sequence, transaction index, operation index,
// and event index.
type Cursor struct {
// Ledger is the sequence of the ledger which emitted the event.
Ledger uint32
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
"%019d-%010d",
toid.New(int32(c.Ledger), int32(c.Tx), int32(c.Op)).ToInt64(),
c.Event,
)
}

// ParseCursor parses the given string and returns the corresponding cursor
func ParseCursor(input string) (Cursor, error) {
parts := strings.SplitN(input, "-", 2)
if len(parts) != 2 {
return Cursor{}, fmt.Errorf("invalid event id %s", input)
}

// Parse the first part (toid)
idInt, err := strconv.ParseInt(parts[0], 10, 64) //lint:ignore gomnd
if err != nil {
return Cursor{}, fmt.Errorf("invalid event id %s: %w", input, err)
}
parsed := toid.Parse(idInt)

// Parse the second part (event order)
eventOrder, err := strconv.ParseInt(parts[1], 10, 64) //lint:ignore gomnd
if err != nil {
return Cursor{}, fmt.Errorf("invalid event id %s: %w", input, err)
}

return Cursor{
Ledger: uint32(parsed.LedgerSequence),
Tx: uint32(parsed.TransactionOrder),
Op: uint32(parsed.OperationOrder),
Event: uint32(eventOrder),
}, nil
}

func cmp(a, b uint32) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
}

// Cmp compares two cursors.
// 0 is returned if the c is equal to other.
// 1 is returned if c is greater than other.
// -1 is returned if c is less than other.
func (c Cursor) Cmp(other Cursor) int {
if c.Ledger == other.Ledger {
if c.Tx == other.Tx {
if c.Op == other.Op {
return cmp(c.Event, other.Event)
}
return cmp(c.Op, other.Op)
}
return cmp(c.Tx, other.Tx)
}
return cmp(c.Ledger, other.Ledger)
}

var (
// MinCursor is the smallest possible cursor
MinCursor = Cursor{}
// MaxCursor is the largest possible cursor
MaxCursor = Cursor{
Ledger: math.MaxUint32,
Tx: math.MaxUint32,
Op: math.MaxUint32,
Event: math.MaxUint32,
}
)
85 changes: 85 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package events

import (
"math"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseCursor(t *testing.T) {
for _, cursor := range []Cursor{
{
Ledger: math.MaxInt32,
Tx: 1048575,
Op: 4095,
Event: math.MaxInt32,
},
{
Ledger: 0,
Tx: 0,
Op: 0,
Event: 0,
},
{
Ledger: 123,
Tx: 10,
Op: 5,
Event: 1,
},
} {
parsed, err := ParseCursor(cursor.String())
assert.NoError(t, err)
assert.Equal(t, cursor, parsed)
}
}

func TestCursorCmp(t *testing.T) {
for _, testCase := range []struct {
a Cursor
b Cursor
expected int
}{
{MinCursor, MaxCursor, -1},
{MinCursor, MinCursor, 0},
{MaxCursor, MaxCursor, 0},
{
Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4},
0,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 7, Tx: 2, Op: 3, Event: 4},
-1,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 5, Tx: 7, Op: 3, Event: 4},
-1,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 5, Tx: 2, Op: 7, Event: 4},
-1,
},
{
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4},
Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 7},
-1,
},
} {
a := testCase.a
b := testCase.b
expected := testCase.expected

if got := a.Cmp(b); got != expected {
t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got)
}
a, b = b, a
expected *= -1
if got := a.Cmp(b); got != expected {
t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got)
}
}
}
Loading