From 2b42d1ec439dbe42c510f789b9f6ca980b13e1f6 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 27 Jan 2023 09:15:08 +0000 Subject: [PATCH 1/6] Add in-memory event store backed by circular buffer --- cmd/soroban-rpc/internal/events/events.go | 274 ++++++++++ .../internal/events/events_test.go | 478 ++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- 4 files changed, 755 insertions(+), 3 deletions(-) create mode 100644 cmd/soroban-rpc/internal/events/events.go create mode 100644 cmd/soroban-rpc/internal/events/events_test.go diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go new file mode 100644 index 000000000..cef4e2327 --- /dev/null +++ b/cmd/soroban-rpc/internal/events/events.go @@ -0,0 +1,274 @@ +package events + +import ( + "fmt" + "io" + "math" + "sort" + "sync" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/xdr" +) + +// Cursor represents the position of an event within the sequence of all +// Soroban events. Soroban events are sorted by ledger sequence, transaction +// index, operation index, and event index. +type Cursor struct { + // Ledger is the sequence of the ledger which emitted the event. + Ledger uint32 + // Tx is the index of the transaction within the ledger which emitted the event. + Tx uint32 + // Op is the index of the operation within the transaction which emitted the event. + Op uint32 + // Event is the index of the event within in the operation which emitted the event. + Event uint32 +} + +func cmp(a, b uint32) int { + if a < b { + return -1 + } + if a > b { + return 1 + } + return 0 +} + +// Cmp compares two cursors. +// 0 is returned if the c is equal to other. +// 1 is returned if c is greater than other. +// -1 is returned if c is less than other. +func (c Cursor) Cmp(other Cursor) int { + if c.Ledger == other.Ledger { + if c.Tx == other.Tx { + if c.Op == other.Op { + return cmp(c.Event, other.Event) + } + return cmp(c.Op, other.Op) + } + return cmp(c.Tx, other.Tx) + } + return cmp(c.Ledger, other.Ledger) +} + +var ( + // MinCursor is the smallest possible cursor + MinCursor = Cursor{} + // MaxCursor is the largest possible cursor + MaxCursor = Cursor{ + Ledger: math.MaxUint32, + Tx: math.MaxUint32, + Op: math.MaxUint32, + Event: math.MaxUint32, + } +) + +type bucket struct { + ledgerSeq uint32 + events []event +} + +type event struct { + contents xdr.ContractEvent + txIndex uint32 + opIndex uint32 + eventIndex uint32 +} + +func (e event) cursor(ledger uint32) Cursor { + return Cursor{ + Ledger: ledger, + Tx: e.txIndex, + Op: e.opIndex, + Event: e.eventIndex, + } +} + +// MemoryStore is an in-memory store of soroban events. +type MemoryStore struct { + lock sync.RWMutex + // buckets is a circular buffer where each cell represents + // all events occurring within a specific ledger. + buckets []bucket + // length is equal to the number of ledgers contained within + // the circular buffer. + length uint32 + // start is the index of the head in the circular buffer. + start uint32 +} + +// NewMemoryStore creates a new MemoryStore populated by the given ledgers. +// retentionWindow defines a retention window in ledgers. All events occurring +// within the retention window will be included in the store. +func NewMemoryStore(retentionWindow uint32) (*MemoryStore, error) { + if retentionWindow == 0 { + return nil, fmt.Errorf("retention window must be positive") + } + return &MemoryStore{ + buckets: make([]bucket, retentionWindow), + }, nil +} + +// Range defines an interval in the sequence of all Soroban events. +type Range struct { + // Start defines the start of the range. + // Start is included in the range. + Start Cursor + // ClampStart indicates whether Start should be clamped to + // the earliest ledger available if Start is too low. + ClampStart bool + // End defines the end of the range. + // End is excluded from the range. + End Cursor + // ClampEnd indicates whether End should be clamped to + // the latest ledger available if End is too high. + ClampEnd bool +} + +// Scan applies f on all the events occurring in the given range. +// The events are processed in sorted ascending Cursor order. +// If f returns false, the scan terminates early (f will not be applied on +// remaining events in the range). Note that a read lock is held for the +// entire duration of the Scan function so f be written in a way +// to minimize latency. +func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) bool) error { + m.lock.RLock() + defer m.lock.RUnlock() + + if err := m.validateRange(&eventRange); err != nil { + return err + } + + curLedger := eventRange.Start.Ledger + minLedger := m.buckets[m.start].ledgerSeq + i := ((curLedger - minLedger) + m.start) % uint32(len(m.buckets)) + events := seek(m.buckets[i].events, eventRange.Start) + for { + for _, event := range events { + cur := event.cursor(curLedger) + if eventRange.End.Cmp(cur) <= 0 { + return nil + } + if !f(cur, event.contents) { + return nil + } + } + i = (i + 1) % uint32(len(m.buckets)) + curLedger++ + if m.buckets[i].ledgerSeq != curLedger { + return nil + } + events = m.buckets[i].events + } +} + +// validateRange checks if the range falls within the bounds +// of the events in the memory store. +// validateRange should be called with the read lock. +func (m *MemoryStore) validateRange(eventRange *Range) error { + if m.length == 0 { + return fmt.Errorf("event store is empty") + } + + min := Cursor{Ledger: m.buckets[m.start].ledgerSeq} + if eventRange.Start.Cmp(min) < 0 { + if eventRange.ClampStart { + eventRange.Start = min + } else { + return fmt.Errorf("start is before oldest ledger") + } + } + + max := Cursor{Ledger: min.Ledger + m.length} + if eventRange.End.Cmp(max) > 0 { + if eventRange.ClampEnd { + eventRange.End = max + } else { + return fmt.Errorf("end is after latest ledger") + } + } + + if eventRange.Start.Cmp(eventRange.End) >= 0 { + return fmt.Errorf("start is not before end") + } + + return nil +} + +// seek returns the subset of all events which occur +// at a point greater than or equal to the given cursor. +// events must be sorted in ascending order. +func seek(events []event, cursor Cursor) []event { + i := sort.Search(len(events), func(i int) bool { + event := events[i] + return cursor.Cmp(event.cursor(cursor.Ledger)) <= 0 + }) + return events[i:] +} + +// IngestEvents adds new events from the given ledger into the store. +// As a side effect, events which fall outside the retention window are +// removed from the store. +func (m *MemoryStore) IngestEvents(txReader *ingest.LedgerTransactionReader) error { + events, err := readEvents(txReader) + if err != nil { + return err + } + ledgerSequence := txReader.GetSequence() + return m.append(ledgerSequence, events) +} + +func readEvents(txReader *ingest.LedgerTransactionReader) ([]event, error) { + var events []event + for { + tx, err := txReader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + for i := range tx.Envelope.Operations() { + opIndex := uint32(i) + opEvents, err := tx.GetOperationEvents(opIndex) + if err != nil { + return nil, err + } + for eventIndex, opEvent := range opEvents { + events = append(events, event{ + contents: opEvent, + txIndex: tx.Index, + opIndex: opIndex, + eventIndex: uint32(eventIndex), + }) + } + } + } + return events, nil +} + +// append adds new events to the circular buffer. +func (m *MemoryStore) append(sequence uint32, events []event) error { + m.lock.Lock() + defer m.lock.Unlock() + + expectedLedgerSequence := m.buckets[m.start].ledgerSeq + m.length + if m.length > 0 && expectedLedgerSequence != sequence { + return fmt.Errorf("events not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, sequence) + } + + index := (m.start + m.length) % uint32(len(m.buckets)) + m.buckets[index] = bucket{ + ledgerSeq: sequence, + events: events, + } + if m.length < uint32(len(m.buckets)) { + m.length++ + } else { + m.start++ + } + + return nil +} diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go new file mode 100644 index 000000000..f2c43e268 --- /dev/null +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -0,0 +1,478 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/stellar/go/xdr" +) + +var ( + ledger5Events = []event{ + newEvent(1, 0, 0, 100), + newEvent(1, 0, 1, 200), + newEvent(2, 0, 0, 300), + newEvent(2, 1, 0, 400), + } + ledger6Events []event = nil + ledger7Events = []event{ + newEvent(1, 0, 0, 500), + } + ledger8Events = []event{ + newEvent(1, 0, 0, 600), + newEvent(2, 0, 0, 700), + newEvent(2, 0, 1, 800), + newEvent(2, 0, 2, 900), + newEvent(2, 1, 0, 1000), + } +) + +func newEvent(txIndex, opIndex, eventIndex, val uint32) event { + v := xdr.Uint32(val) + return event{ + contents: xdr.ContractEvent{ + Type: xdr.ContractEventTypeSystem, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Data: xdr.ScVal{ + Type: xdr.ScValTypeScvU32, + U32: &v, + }, + }, + }, + }, + txIndex: txIndex, + opIndex: opIndex, + eventIndex: eventIndex, + } +} + +func mustMarshal(e xdr.ContractEvent) string { + result, err := xdr.MarshalBase64(e) + if err != nil { + panic(err) + } + return result +} + +func (e event) equals(other event) bool { + return e.txIndex == other.txIndex && + e.opIndex == other.opIndex && + e.eventIndex == other.eventIndex && + mustMarshal(e.contents) == mustMarshal(other.contents) +} + +func eventsAreEqual(t *testing.T, a, b []event) { + require.Equal(t, len(a), len(b)) + for i := range a { + require.True(t, a[i].equals(b[i])) + } +} + +func TestCursorCmp(t *testing.T) { + for _, testCase := range []struct { + a Cursor + b Cursor + expected int + }{ + {MinCursor, MaxCursor, -1}, + {MinCursor, MinCursor, 0}, + {MaxCursor, MaxCursor, 0}, + { + Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 1, Tx: 2, Op: 3, Event: 4}, + 0, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 7, Tx: 2, Op: 3, Event: 4}, + -1, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 5, Tx: 7, Op: 3, Event: 4}, + -1, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 5, Tx: 2, Op: 7, Event: 4}, + -1, + }, + { + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 4}, + Cursor{Ledger: 5, Tx: 2, Op: 3, Event: 7}, + -1, + }, + } { + a := testCase.a + b := testCase.b + expected := testCase.expected + + if got := a.Cmp(b); got != expected { + t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got) + } + a, b = b, a + expected *= -1 + if got := a.Cmp(b); got != expected { + t.Fatalf("expected (%v).Cmp(%v) to be %v but got %v", a, b, expected, got) + } + } +} + +func TestAppend(t *testing.T) { + m, err := NewMemoryStore(3) + require.NoError(t, err) + + require.NoError(t, m.append(5, ledger5Events)) + require.Equal(t, uint32(5), m.buckets[m.start].ledgerSeq) + eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) + require.Equal(t, uint32(1), m.length) + + require.EqualError( + t, m.append(10, ledger5Events), + "events not contiguous: expected ledger sequence 6 but received 10", + ) + require.EqualError( + t, m.append(4, ledger5Events), + "events not contiguous: expected ledger sequence 6 but received 4", + ) + require.EqualError( + t, m.append(5, nil), + "events not contiguous: expected ledger sequence 6 but received 5", + ) + require.Equal(t, ledger5Events, m.buckets[m.start].events) + require.Equal(t, uint32(1), m.length) + + require.NoError(t, m.append(6, ledger6Events)) + eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) + eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(2), m.length) + + require.EqualError( + t, m.append(10, ledger5Events), + "events not contiguous: expected ledger sequence 7 but received 10", + ) + require.EqualError( + t, m.append(5, ledger5Events), + "events not contiguous: expected ledger sequence 7 but received 5", + ) + require.EqualError( + t, m.append(6, nil), + "events not contiguous: expected ledger sequence 7 but received 6", + ) + + require.NoError(t, m.append(7, ledger7Events)) + eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) + eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) + eventsAreEqual(t, ledger7Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(3), m.length) + + ledger8Events := []event{ + newEvent(1, 0, 0, 600), + } + require.NoError(t, m.append(8, ledger8Events)) + eventsAreEqual(t, ledger6Events, m.buckets[m.start].events) + eventsAreEqual(t, ledger7Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) + eventsAreEqual(t, ledger8Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(3), m.length) + + ledger9Events := []event{ + newEvent(1, 0, 0, 700), + } + require.NoError(t, m.append(9, ledger9Events)) + eventsAreEqual(t, ledger7Events, m.buckets[m.start].events) + eventsAreEqual(t, ledger8Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) + eventsAreEqual(t, ledger9Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) + require.Equal(t, uint32(3), m.length) +} + +func TestScanRangeValidation(t *testing.T) { + m, err := NewMemoryStore(4) + require.NoError(t, err) + assertNoCalls := func(cursor Cursor, contractEvent xdr.ContractEvent) bool { + t.Fatalf("unexpected call") + return true + } + err = m.Scan(Range{ + Start: MinCursor, + ClampStart: true, + End: MaxCursor, + ClampEnd: true, + }, assertNoCalls) + require.EqualError(t, err, "event store is empty") + + m = createStore(t) + + for _, testCase := range []struct { + input Range + err string + }{ + { + Range{ + Start: MinCursor, + ClampStart: false, + End: MaxCursor, + ClampEnd: true, + }, + "start is before oldest ledger", + }, + { + Range{ + Start: Cursor{Ledger: 4}, + ClampStart: false, + End: MaxCursor, + ClampEnd: true, + }, + "start is before oldest ledger", + }, + { + Range{ + Start: MinCursor, + ClampStart: true, + End: MaxCursor, + ClampEnd: false, + }, + "end is after latest ledger", + }, + { + Range{ + Start: Cursor{Ledger: 5}, + ClampStart: true, + End: Cursor{Ledger: 10}, + ClampEnd: false, + }, + "end is after latest ledger", + }, + { + Range{ + Start: Cursor{Ledger: 10}, + ClampStart: true, + End: Cursor{Ledger: 3}, + ClampEnd: true, + }, + "start is not before end", + }, + { + Range{ + Start: Cursor{Ledger: 10}, + ClampStart: false, + End: Cursor{Ledger: 3}, + ClampEnd: false, + }, + "start is not before end", + }, + { + Range{ + Start: Cursor{Ledger: 9}, + ClampStart: false, + End: Cursor{Ledger: 10}, + ClampEnd: true, + }, + "start is not before end", + }, + { + Range{ + Start: Cursor{Ledger: 9}, + ClampStart: false, + End: Cursor{Ledger: 10}, + ClampEnd: false, + }, + "end is after latest ledger", + }, + { + Range{ + Start: Cursor{Ledger: 2}, + ClampStart: true, + End: Cursor{Ledger: 3}, + ClampEnd: false, + }, + "start is not before end", + }, + { + Range{ + Start: Cursor{Ledger: 2}, + ClampStart: false, + End: Cursor{Ledger: 3}, + ClampEnd: false, + }, + "start is before oldest ledger", + }, + { + Range{ + Start: Cursor{Ledger: 6}, + ClampStart: false, + End: Cursor{Ledger: 6}, + ClampEnd: false, + }, + "start is not before end", + }, + } { + err := m.Scan(testCase.input, assertNoCalls) + require.EqualError(t, err, testCase.err, testCase.input) + } +} + +func createStore(t *testing.T) *MemoryStore { + m, err := NewMemoryStore(4) + require.NoError(t, err) + + require.NoError(t, m.append(5, ledger5Events)) + require.NoError(t, m.append(6, nil)) + require.NoError(t, m.append(7, ledger7Events)) + require.NoError(t, m.append(8, ledger8Events)) + + return m +} + +func concat(slices ...[]event) []event { + var result []event + for _, slice := range slices { + result = append(result, slice...) + } + return result +} + +func TestScan(t *testing.T) { + m := createStore(t) + + genEquivalentInputs := func(input Range) []Range { + results := []Range{input} + if !input.ClampStart { + rangeCopy := input + rangeCopy.ClampStart = true + results = append(results, rangeCopy) + } + if !input.ClampEnd { + rangeCopy := input + rangeCopy.ClampEnd = true + results = append(results, rangeCopy) + } + if !input.ClampStart && !input.ClampEnd { + rangeCopy := input + rangeCopy.ClampStart = true + rangeCopy.ClampEnd = true + results = append(results, rangeCopy) + } + return results + } + + for _, testCase := range []struct { + input Range + expected []event + }{ + { + Range{ + Start: MinCursor, + ClampStart: true, + End: MaxCursor, + ClampEnd: true, + }, + concat(ledger5Events, ledger6Events, ledger7Events, ledger8Events), + }, + { + Range{ + Start: Cursor{Ledger: 5}, + ClampStart: false, + End: Cursor{Ledger: 9}, + ClampEnd: false, + }, + concat(ledger5Events, ledger6Events, ledger7Events, ledger8Events), + }, + { + Range{ + Start: Cursor{Ledger: 5, Tx: 1, Op: 2}, + ClampStart: false, + End: Cursor{Ledger: 9}, + ClampEnd: false, + }, + concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events), + }, + { + Range{ + Start: Cursor{Ledger: 5, Tx: 3}, + ClampStart: false, + End: MaxCursor, + ClampEnd: true, + }, + concat(ledger6Events, ledger7Events, ledger8Events), + }, + { + Range{ + Start: Cursor{Ledger: 6}, + ClampStart: false, + End: MaxCursor, + ClampEnd: true, + }, + concat(ledger7Events, ledger8Events), + }, + { + Range{ + Start: Cursor{Ledger: 6, Tx: 1}, + ClampStart: false, + End: MaxCursor, + ClampEnd: true, + }, + concat(ledger7Events, ledger8Events), + }, + { + Range{ + Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0}, + ClampStart: false, + End: MaxCursor, + ClampEnd: true, + }, + ledger8Events[len(ledger8Events)-1:], + }, + { + Range{ + Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0}, + ClampStart: false, + End: Cursor{Ledger: 9}, + ClampEnd: false, + }, + ledger8Events[len(ledger8Events)-1:], + }, + { + Range{ + Start: Cursor{Ledger: 5}, + ClampStart: false, + End: Cursor{Ledger: 7}, + ClampEnd: false, + }, + concat(ledger5Events, ledger6Events), + }, + { + Range{ + Start: Cursor{Ledger: 5, Tx: 1, Op: 2}, + ClampStart: false, + End: Cursor{Ledger: 8, Tx: 1, Op: 4}, + ClampEnd: false, + }, + concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]), + }, + } { + for _, input := range genEquivalentInputs(testCase.input) { + var events []event + iterateAll := true + f := func(cursor Cursor, contractEvent xdr.ContractEvent) bool { + events = append(events, event{ + contents: contractEvent, + txIndex: cursor.Tx, + opIndex: cursor.Op, + eventIndex: cursor.Event, + }) + return iterateAll + } + require.NoError(t, m.Scan(input, f)) + eventsAreEqual(t, testCase.expected, events) + if len(events) > 0 { + events = nil + iterateAll = false + require.NoError(t, m.Scan(input, f)) + eventsAreEqual(t, []event{testCase.expected[0]}, events) + } + } + } +} diff --git a/go.mod b/go.mod index 38b83dd4e..f573b612e 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 // indirect github.com/spf13/jwalterweatherman v0.0.0-20141219030609-3d60171a6431 // indirect github.com/spf13/pflag v0.0.0-20161005214240-4bd69631f475 // indirect - github.com/stellar/go v0.0.0-20230125220950-1c33c875f743 + github.com/stellar/go v0.0.0-20230126104422-77cf2ca9d550 github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/xanzy/ssh-agent v0.3.0 // indirect diff --git a/go.sum b/go.sum index d997d1c2f..763c2e841 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189 h1:fvB1AFbBd6SfI9Rd0oo github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189/go.mod h1:A8kyI5cUJhb8N+3pkfONlcEcZbueH6nhAm0Fq7SrnBM= github.com/stellar/go v0.0.0-20230120150719-164910adff87 h1:G9TC6GyEn+sISTg5hxrcw/CD2NOKyWslvdBfnZMH1Z4= github.com/stellar/go v0.0.0-20230120150719-164910adff87/go.mod h1:QXwuKmYVvqQZlByv0EeNb0Rgog9AP+eMmARcdt3h2rI= -github.com/stellar/go v0.0.0-20230125220950-1c33c875f743 h1:BSLNPq+cmFQYcFYi3o0bUwXemWgk3iPQgYrrvoCKH4s= -github.com/stellar/go v0.0.0-20230125220950-1c33c875f743/go.mod h1:QXwuKmYVvqQZlByv0EeNb0Rgog9AP+eMmARcdt3h2rI= +github.com/stellar/go v0.0.0-20230126104422-77cf2ca9d550 h1:GTz5CZnj0dhXin0zyTt9jZixS6X0B/aDcNn7bkKt9IU= +github.com/stellar/go v0.0.0-20230126104422-77cf2ca9d550/go.mod h1:QXwuKmYVvqQZlByv0EeNb0Rgog9AP+eMmARcdt3h2rI= github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee h1:fbVs0xmXpBvVS4GBeiRmAE3Le70ofAqFMch1GTiq/e8= github.com/stellar/go-xdr v0.0.0-20211103144802-8017fc4bdfee/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= From f7e5992421dd0298a5c969e3971e4310a3b1fed3 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 27 Jan 2023 09:30:14 +0000 Subject: [PATCH 2/6] update comments --- cmd/soroban-rpc/internal/events/events.go | 35 ++++++++++++----------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index cef4e2327..af24aa84b 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -11,9 +11,10 @@ import ( "github.com/stellar/go/xdr" ) -// Cursor represents the position of an event within the sequence of all -// Soroban events. Soroban events are sorted by ledger sequence, transaction -// index, operation index, and event index. +// Cursor represents the position of a Soroban event. +// Soroban events are sorted in ascending order by +// ledger sequence, transaction index, operation index, +// and event index. type Cursor struct { // Ledger is the sequence of the ledger which emitted the event. Ledger uint32 @@ -98,9 +99,13 @@ type MemoryStore struct { start uint32 } -// NewMemoryStore creates a new MemoryStore populated by the given ledgers. -// retentionWindow defines a retention window in ledgers. All events occurring -// within the retention window will be included in the store. +// NewMemoryStore creates a new MemoryStore. +// The retention window is in units of ledgers. +// All events occurring in the following ledger range +// [ latestLedger - retentionWindow, latestLedger ] +// will be included in the MemoryStore. If the MemoryStore +// is full, any events from new ledgers will evict +// older entries outside the retention window. func NewMemoryStore(retentionWindow uint32) (*MemoryStore, error) { if retentionWindow == 0 { return nil, fmt.Errorf("retention window must be positive") @@ -110,19 +115,17 @@ func NewMemoryStore(retentionWindow uint32) (*MemoryStore, error) { }, nil } -// Range defines an interval in the sequence of all Soroban events. +// Range defines a [Start, End) interval of Soroban events. type Range struct { - // Start defines the start of the range. - // Start is included in the range. + // Start defines the (inclusive) start of the range. Start Cursor - // ClampStart indicates whether Start should be clamped to - // the earliest ledger available if Start is too low. + // ClampStart indicates whether Start should be clamped up + // to the earliest ledger available if Start is too low. ClampStart bool - // End defines the end of the range. - // End is excluded from the range. + // End defines the (exclusive) end of the range. End Cursor - // ClampEnd indicates whether End should be clamped to - // the latest ledger available if End is too high. + // ClampEnd indicates whether End should be clamped down + // to the latest ledger available if End is too high. ClampEnd bool } @@ -130,7 +133,7 @@ type Range struct { // The events are processed in sorted ascending Cursor order. // If f returns false, the scan terminates early (f will not be applied on // remaining events in the range). Note that a read lock is held for the -// entire duration of the Scan function so f be written in a way +// entire duration of the Scan function so f should be written in a way // to minimize latency. func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) bool) error { m.lock.RLock() From 6f961dd809a5cb307959ef4f270f56a4062492dc Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 27 Jan 2023 10:13:54 +0000 Subject: [PATCH 3/6] Update cmd/soroban-rpc/internal/events/events.go Co-authored-by: Paul Bellamy --- cmd/soroban-rpc/internal/events/events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index af24aa84b..56095fcad 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -205,7 +205,7 @@ func (m *MemoryStore) validateRange(eventRange *Range) error { func seek(events []event, cursor Cursor) []event { i := sort.Search(len(events), func(i int) bool { event := events[i] - return cursor.Cmp(event.cursor(cursor.Ledger)) <= 0 + return cursor.Cmp(events[i].cursor(cursor.Ledger)) <= 0 }) return events[i:] } From e8957d0e22cb170523abdca7a54f31cfa93a1879 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 27 Jan 2023 10:36:13 +0000 Subject: [PATCH 4/6] fix lint error --- cmd/soroban-rpc/internal/events/events.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index 56095fcad..357b6a160 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -203,11 +203,10 @@ func (m *MemoryStore) validateRange(eventRange *Range) error { // at a point greater than or equal to the given cursor. // events must be sorted in ascending order. func seek(events []event, cursor Cursor) []event { - i := sort.Search(len(events), func(i int) bool { - event := events[i] + j := sort.Search(len(events), func(i int) bool { return cursor.Cmp(events[i].cursor(cursor.Ledger)) <= 0 }) - return events[i:] + return events[j:] } // IngestEvents adds new events from the given ledger into the store. From f4136154847604afc4323fbb5a231b4eca8655af Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 27 Jan 2023 10:42:36 +0000 Subject: [PATCH 5/6] add comments to TestAppend --- cmd/soroban-rpc/internal/events/events_test.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go index f2c43e268..7915de565 100644 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -26,6 +26,9 @@ var ( newEvent(2, 0, 2, 900), newEvent(2, 1, 0, 1000), } + ledger9Events = []event{ + newEvent(1, 0, 0, 1100), + } ) func newEvent(txIndex, opIndex, eventIndex, val uint32) event { @@ -125,11 +128,13 @@ func TestAppend(t *testing.T) { m, err := NewMemoryStore(3) require.NoError(t, err) + // test appending first bucket of events require.NoError(t, m.append(5, ledger5Events)) require.Equal(t, uint32(5), m.buckets[m.start].ledgerSeq) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) require.Equal(t, uint32(1), m.length) + // the next bucket of events must follow the previous bucket (ledger 5) require.EqualError( t, m.append(10, ledger5Events), "events not contiguous: expected ledger sequence 6 but received 10", @@ -142,14 +147,17 @@ func TestAppend(t *testing.T) { t, m.append(5, nil), "events not contiguous: expected ledger sequence 6 but received 5", ) + // check that none of the calls above modified our buckets require.Equal(t, ledger5Events, m.buckets[m.start].events) require.Equal(t, uint32(1), m.length) + // append ledger 6 events, now we have two buckets filled require.NoError(t, m.append(6, ledger6Events)) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) require.Equal(t, uint32(2), m.length) + // the next bucket of events must follow the previous bucket (ledger 6) require.EqualError( t, m.append(10, ledger5Events), "events not contiguous: expected ledger sequence 7 but received 10", @@ -163,24 +171,21 @@ func TestAppend(t *testing.T) { "events not contiguous: expected ledger sequence 7 but received 6", ) + // append ledger 7 events, now we have all three buckets filled require.NoError(t, m.append(7, ledger7Events)) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger7Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) require.Equal(t, uint32(3), m.length) - ledger8Events := []event{ - newEvent(1, 0, 0, 600), - } + // append ledger 8 events, but all buckets are full, so we need to evict ledger 5 require.NoError(t, m.append(8, ledger8Events)) eventsAreEqual(t, ledger6Events, m.buckets[m.start].events) eventsAreEqual(t, ledger7Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger8Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) require.Equal(t, uint32(3), m.length) - ledger9Events := []event{ - newEvent(1, 0, 0, 700), - } + // append ledger 9 events, but all buckets are full, so we need to evict ledger 6 require.NoError(t, m.append(9, ledger9Events)) eventsAreEqual(t, ledger7Events, m.buckets[m.start].events) eventsAreEqual(t, ledger8Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) From 835da8403e64e97bc77fcb16d53b1b0a6da8f158 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 27 Jan 2023 17:21:59 +0000 Subject: [PATCH 6/6] address code review feedback --- cmd/soroban-rpc/internal/events/events.go | 57 ++++++++++--------- .../internal/events/events_test.go | 12 ++-- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index 357b6a160..2b288ec52 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -1,6 +1,7 @@ package events import ( + "errors" "fmt" "io" "math" @@ -77,9 +78,9 @@ type event struct { eventIndex uint32 } -func (e event) cursor(ledger uint32) Cursor { +func (e event) cursor(ledgerSeq uint32) Cursor { return Cursor{ - Ledger: ledger, + Ledger: ledgerSeq, Tx: e.txIndex, Op: e.opIndex, Event: e.eventIndex, @@ -92,9 +93,6 @@ type MemoryStore struct { // buckets is a circular buffer where each cell represents // all events occurring within a specific ledger. buckets []bucket - // length is equal to the number of ledgers contained within - // the circular buffer. - length uint32 // start is the index of the head in the circular buffer. start uint32 } @@ -108,10 +106,10 @@ type MemoryStore struct { // older entries outside the retention window. func NewMemoryStore(retentionWindow uint32) (*MemoryStore, error) { if retentionWindow == 0 { - return nil, fmt.Errorf("retention window must be positive") + return nil, errors.New("retention window must be positive") } return &MemoryStore{ - buckets: make([]bucket, retentionWindow), + buckets: make([]bucket, 0, retentionWindow), }, nil } @@ -147,7 +145,7 @@ func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) b minLedger := m.buckets[m.start].ledgerSeq i := ((curLedger - minLedger) + m.start) % uint32(len(m.buckets)) events := seek(m.buckets[i].events, eventRange.Start) - for { + for ; curLedger == m.buckets[i].ledgerSeq; curLedger++ { for _, event := range events { cur := event.cursor(curLedger) if eventRange.End.Cmp(cur) <= 0 { @@ -158,20 +156,17 @@ func (m *MemoryStore) Scan(eventRange Range, f func(Cursor, xdr.ContractEvent) b } } i = (i + 1) % uint32(len(m.buckets)) - curLedger++ - if m.buckets[i].ledgerSeq != curLedger { - return nil - } events = m.buckets[i].events } + return nil } // validateRange checks if the range falls within the bounds // of the events in the memory store. // validateRange should be called with the read lock. func (m *MemoryStore) validateRange(eventRange *Range) error { - if m.length == 0 { - return fmt.Errorf("event store is empty") + if len(m.buckets) == 0 { + return errors.New("event store is empty") } min := Cursor{Ledger: m.buckets[m.start].ledgerSeq} @@ -179,21 +174,21 @@ func (m *MemoryStore) validateRange(eventRange *Range) error { if eventRange.ClampStart { eventRange.Start = min } else { - return fmt.Errorf("start is before oldest ledger") + return errors.New("start is before oldest ledger") } } - max := Cursor{Ledger: min.Ledger + m.length} + max := Cursor{Ledger: min.Ledger + uint32(len(m.buckets))} if eventRange.End.Cmp(max) > 0 { if eventRange.ClampEnd { eventRange.End = max } else { - return fmt.Errorf("end is after latest ledger") + return errors.New("end is after latest ledger") } } if eventRange.Start.Cmp(eventRange.End) >= 0 { - return fmt.Errorf("start is not before end") + return errors.New("start is not before end") } return nil @@ -256,19 +251,25 @@ func (m *MemoryStore) append(sequence uint32, events []event) error { m.lock.Lock() defer m.lock.Unlock() - expectedLedgerSequence := m.buckets[m.start].ledgerSeq + m.length - if m.length > 0 && expectedLedgerSequence != sequence { - return fmt.Errorf("events not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, sequence) + length := uint32(len(m.buckets)) + if length > 0 { + expectedLedgerSequence := m.buckets[m.start].ledgerSeq + length + if expectedLedgerSequence != sequence { + return fmt.Errorf("events not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, sequence) + } } - index := (m.start + m.length) % uint32(len(m.buckets)) - m.buckets[index] = bucket{ - ledgerSeq: sequence, - events: events, - } - if m.length < uint32(len(m.buckets)) { - m.length++ + if length < uint32(cap(m.buckets)) { + m.buckets = append(m.buckets, bucket{ + ledgerSeq: sequence, + events: events, + }) } else { + index := (m.start + length) % uint32(len(m.buckets)) + m.buckets[index] = bucket{ + ledgerSeq: sequence, + events: events, + } m.start++ } diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go index 7915de565..015398973 100644 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -132,7 +132,7 @@ func TestAppend(t *testing.T) { require.NoError(t, m.append(5, ledger5Events)) require.Equal(t, uint32(5), m.buckets[m.start].ledgerSeq) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) - require.Equal(t, uint32(1), m.length) + require.Equal(t, 1, len(m.buckets)) // the next bucket of events must follow the previous bucket (ledger 5) require.EqualError( @@ -149,13 +149,13 @@ func TestAppend(t *testing.T) { ) // check that none of the calls above modified our buckets require.Equal(t, ledger5Events, m.buckets[m.start].events) - require.Equal(t, uint32(1), m.length) + require.Equal(t, 1, len(m.buckets)) // append ledger 6 events, now we have two buckets filled require.NoError(t, m.append(6, ledger6Events)) eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) - require.Equal(t, uint32(2), m.length) + require.Equal(t, 2, len(m.buckets)) // the next bucket of events must follow the previous bucket (ledger 6) require.EqualError( @@ -176,21 +176,21 @@ func TestAppend(t *testing.T) { eventsAreEqual(t, ledger5Events, m.buckets[m.start].events) eventsAreEqual(t, ledger6Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger7Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) - require.Equal(t, uint32(3), m.length) + require.Equal(t, 3, len(m.buckets)) // append ledger 8 events, but all buckets are full, so we need to evict ledger 5 require.NoError(t, m.append(8, ledger8Events)) eventsAreEqual(t, ledger6Events, m.buckets[m.start].events) eventsAreEqual(t, ledger7Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger8Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) - require.Equal(t, uint32(3), m.length) + require.Equal(t, 3, len(m.buckets)) // append ledger 9 events, but all buckets are full, so we need to evict ledger 6 require.NoError(t, m.append(9, ledger9Events)) eventsAreEqual(t, ledger7Events, m.buckets[m.start].events) eventsAreEqual(t, ledger8Events, m.buckets[(m.start+1)%uint32(len(m.buckets))].events) eventsAreEqual(t, ledger9Events, m.buckets[(m.start+2)%uint32(len(m.buckets))].events) - require.Equal(t, uint32(3), m.length) + require.Equal(t, 3, len(m.buckets)) } func TestScanRangeValidation(t *testing.T) {