From 3196ade050bfa3ac300dab37fadaa89b2bc08cc4 Mon Sep 17 00:00:00 2001 From: George Kudrayvtsev Date: Mon, 15 Apr 2024 14:13:38 -0700 Subject: [PATCH] Add common interface between in-memory and on-disk transaction storage --- cmd/soroban-rpc/internal/ingest/service.go | 7 +-- cmd/soroban-rpc/internal/jsonrpc.go | 2 +- .../internal/transactions/database.go | 45 +++++++++++++++++++ .../internal/transactions/interface.go | 15 +++++++ .../{transactions.go => memory.go} | 4 +- 5 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 cmd/soroban-rpc/internal/transactions/database.go create mode 100644 cmd/soroban-rpc/internal/transactions/interface.go rename cmd/soroban-rpc/internal/transactions/{transactions.go => memory.go} (99%) diff --git a/cmd/soroban-rpc/internal/ingest/service.go b/cmd/soroban-rpc/internal/ingest/service.go index 931abfe8..2abaf17b 100644 --- a/cmd/soroban-rpc/internal/ingest/service.go +++ b/cmd/soroban-rpc/internal/ingest/service.go @@ -34,7 +34,7 @@ type Config struct { Logger *log.Entry DB db.ReadWriter EventStore *events.MemoryStore - TransactionStore *transactions.MemoryStore + TransactionStore transactions.TransactionStore NetworkPassPhrase string Archive historyarchive.ArchiveInterface LedgerBackend backends.LedgerBackend @@ -134,7 +134,7 @@ type Service struct { logger *log.Entry db db.ReadWriter eventStore *events.MemoryStore - transactionStore *transactions.MemoryStore + transactionStore transactions.TransactionStore ledgerBackend backends.LedgerBackend timeout time.Duration networkPassPhrase string @@ -308,7 +308,8 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge return err } s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds()) + With(prometheus.Labels{"type": "ledger_close_meta"}). + Observe(time.Since(startTime).Seconds()) if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { return err diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index b0ceb372..1fb5d678 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -47,7 +47,7 @@ func (h Handler) Close() { type HandlerParams struct { EventStore *events.MemoryStore - TransactionStore *transactions.MemoryStore + TransactionStore transactions.TransactionStore LedgerEntryReader db.LedgerEntryReader LedgerReader db.LedgerReader Logger *log.Entry diff --git a/cmd/soroban-rpc/internal/transactions/database.go b/cmd/soroban-rpc/internal/transactions/database.go new file mode 100644 index 00000000..40994127 --- /dev/null +++ b/cmd/soroban-rpc/internal/transactions/database.go @@ -0,0 +1,45 @@ +package transactions + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" +) + +// DatabaseStore is an on-disk (sqlite) store of Stellar transactions. +type DatabaseStore struct { + // passphrase is an immutable string containing the Stellar network + // passphrase and accessing it does not need to be protected by the lock + passphrase string + + lock sync.RWMutex + transactions map[xdr.Hash]transaction + transactionsByLedger *ledgerbucketwindow.LedgerBucketWindow[[]xdr.Hash] + transactionDurationMetric *prometheus.SummaryVec + transactionCountMetric prometheus.Summary +} + +func NewDatabaseStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) TransactionStore { + return NewMemoryStore(daemon, networkPassphrase, retentionWindow) +} + +// func (m *DatabaseStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error { +// // startTime := time.Now() +// return nil +// } + +// // GetLedgerRange returns the first and latest ledger available in the store. +// func (m *DatabaseStore) GetLedgerRange() ledgerbucketwindow.LedgerRange { +// m.lock.RLock() +// defer m.lock.RUnlock() +// return m.transactionsByLedger.GetLedgerRange() +// } + +// // GetTransaction obtains a transaction from the store and whether it's present and the current store range +// func (m *DatabaseStore) GetTransaction(hash xdr.Hash) (Transaction, bool, ledgerbucketwindow.LedgerRange) { +// return Transaction{}, false, ledgerbucketwindow.LedgerRange{} +// } diff --git a/cmd/soroban-rpc/internal/transactions/interface.go b/cmd/soroban-rpc/internal/transactions/interface.go new file mode 100644 index 00000000..65fc6f13 --- /dev/null +++ b/cmd/soroban-rpc/internal/transactions/interface.go @@ -0,0 +1,15 @@ +package transactions + +import ( + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" +) + +// TransactionStore lets you ingest (write) and query (read) transactions from +// an abstract backend storage (i.e. via in-memory or sqlite). +type TransactionStore interface { + IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error + GetLedgerRange() ledgerbucketwindow.LedgerRange + GetTransaction(hash xdr.Hash) (Transaction, bool, ledgerbucketwindow.LedgerRange) +} diff --git a/cmd/soroban-rpc/internal/transactions/transactions.go b/cmd/soroban-rpc/internal/transactions/memory.go similarity index 99% rename from cmd/soroban-rpc/internal/transactions/transactions.go rename to cmd/soroban-rpc/internal/transactions/memory.go index e5abe55c..3a312363 100644 --- a/cmd/soroban-rpc/internal/transactions/transactions.go +++ b/cmd/soroban-rpc/internal/transactions/memory.go @@ -43,7 +43,7 @@ type MemoryStore struct { // will be included in the MemoryStore. If the MemoryStore // is full, any transactions from new ledgers will evict // older entries outside the retention window. -func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) *MemoryStore { +func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) TransactionStore { window := ledgerbucketwindow.NewLedgerBucketWindow[[]xdr.Hash](retentionWindow) // transactionDurationMetric is a metric for measuring latency of transaction store operations @@ -75,6 +75,7 @@ func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentio // removed from the store. func (m *MemoryStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) error { startTime := time.Now() + reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(m.networkPassphrase, ledgerCloseMeta) if err != nil { return err @@ -135,6 +136,7 @@ func (m *MemoryStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) er for hash, tx := range hashMap { m.transactions[hash] = tx } + m.transactionDurationMetric.With(prometheus.Labels{"operation": "ingest"}).Observe(time.Since(startTime).Seconds()) m.transactionCountMetric.Observe(float64(txCount)) return nil