Skip to content

Commit

Permalink
Improve startup by eliminating unnecessary migration ranges (#282)
Browse files Browse the repository at this point in the history
Speed up startup time in "already migrated" cases by eliminating unnecessary
ledger range traversals.

Specifically, this includes the following changes:

 - Fee stats windows cannot exceed the history retention window, as this doesn't
   make sense.

 - Migrations that are already applied are not added to the list of
   "multi-migrations".

 - Fee stats window building conforms to the migration interface to simplify
   code.

 - LedgerSeqRange has been refactored to always use a value reference.

As a result, if all migrations have occurred, the traversal only occurs over the fee window.
  • Loading branch information
Shaptic authored Sep 4, 2024
1 parent b668361 commit e110732
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 62 deletions.
102 changes: 74 additions & 28 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ import (
)

const (
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second

// Since our default retention window will be 7 days (7*17,280 ledgers),
// choose a random 5-digit prime to have irregular logging intervals at each
// halfish-day of processing
inMemoryInitializationLedgerLogPeriod = 10_099
)

type Daemon struct {
Expand Down Expand Up @@ -289,6 +293,29 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
//
// There's some complex "ledger window math" here so we should clarify it
// beforehand.
//
// There are two windows in play here:
// - the ledger retention window, which describes the range of txmeta
// to keep relative to the latest "ledger tip" of the network
// - the fee stats window, which describes a *subset* of the prior
// ledger retention window on which to perform fee analysis
//
// If the fee window *exceeds* the retention window, this doesn't make any
// sense since it implies the user wants to store N amount of actual
// historical data and M > N amount of ledgers just for fee processing,
// which is nonsense from a performance standpoint. We prevent this:
maxFeeRetentionWindow := max(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow)
if maxFeeRetentionWindow > cfg.HistoryRetentionWindow {
d.logger.Fatalf(
"Fee stat analysis window (%d) cannot exceed history retention window (%d).",
maxFeeRetentionWindow, cfg.HistoryRetentionWindow)
}

feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
Expand All @@ -299,27 +326,42 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()

var initialSeq, currentSeq uint32
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
// To combine these windows, we launch as follows:
//
// 1. First, identify the ledger range for database migrations based on the
// ledger retention window. Since we don't do "partial" migrations (all or
// nothing), this represents the entire range of ledger metas we store.
//
retentionRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
dataMigrations, err := db.BuildMigrations(
readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, retentionRange)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
d.logger.WithError(err).Fatal("could not build migrations")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
// 2. Then, incorporate the fee analysis window. If there are migrations to
// do, this has no effect, since migration windows are larger than the fee
// window. In the absence of migrations, though, this means the ingestion
// range is just the fee stat range.
//
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Apply migration for events, transactions and fee stats
// Additionally, by treating the fee window *as if* it's a migration, we can
// make the interface here really clean.
dataMigrations.Append(feeWindows.AsMigration(feeStatsRange))
ledgerSeqRange := dataMigrations.ApplicableRange()

//
// 3. Apply all migrations, including fee stat analysis.
//
var initialSeq, currentSeq uint32
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.First,
Expand All @@ -328,32 +370,36 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithField("seq", currentSeq).
Info("initializing in-memory store")
d.logger.
WithField("first", initialSeq).
WithField("last", ledgerSeqRange.Last).
Info("Initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithField("seq", currentSeq).
Debug("still initializing in-memory store")
}

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
d.logger.
WithField("seq", currentSeq).
WithField("last", ledgerSeqRange.Last).
Debug("Still initializing in-memory store")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}

return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
d.logger.WithError(err).Fatal("Could not obtain txmeta cache from the database")
}

if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
d.logger.WithError(err).Fatal("Could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithField("seq", currentSeq).
Info("finished initializing in-memory store and applying DB data migrations")
d.logger.
WithField("first", retentionRange.First).
WithField("last", retentionRange.Last).
Info("Finished initializing in-memory store and applying DB data migrations")
}

return feeWindows
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ type eventTableMigration struct {
writer EventWriter
}

func (e *eventTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (e *eventTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: e.firstLedger,
Last: e.lastLedger,
}
Expand All @@ -326,7 +326,7 @@ func newEventTableMigration(
_ context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
Expand Down
76 changes: 50 additions & 26 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,40 @@ type LedgerSeqRange struct {
Last uint32
}

func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
if mlr == nil {
return false
}
func (mlr LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
return ledgerSeq >= mlr.First && ledgerSeq <= mlr.Last
}

func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange {
if mlr == nil {
func (mlr LedgerSeqRange) Merge(other LedgerSeqRange) LedgerSeqRange {
if mlr.Empty() {
return other
}
if other == nil {
if other.Empty() {
return mlr
}

// TODO: using min/max can result in a much larger range than needed,
// as an optimization, we should probably use a sequence of ranges instead.
return &LedgerSeqRange{
return LedgerSeqRange{
First: min(mlr.First, other.First),
Last: max(mlr.Last, other.Last),
}
}

func (mlr LedgerSeqRange) Empty() bool {
return mlr.First == 0 && mlr.Last == 0
}

type MigrationApplier interface {
// ApplicableRange returns the closed ledger sequence interval,
// where Apply() should be called. A null result indicates the empty range
ApplicableRange() *LedgerSeqRange
// where Apply() should be called.
ApplicableRange() LedgerSeqRange
// Apply applies the migration on a ledger. It should never be applied
// in ledgers outside the ApplicableRange()
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory
type migrationApplierF func(context.Context, *log.Entry, string, LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB) (MigrationApplier, error)
Expand All @@ -72,8 +74,15 @@ type MultiMigration struct {
db *DB
}

func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
func (mm *MultiMigration) Append(m Migration) {
r := m.ApplicableRange()
if !r.Empty() {
mm.migrations = append(mm.migrations, m)
}
}

func (mm MultiMigration) ApplicableRange() LedgerSeqRange {
var result LedgerSeqRange
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
Expand Down Expand Up @@ -117,13 +126,18 @@ type guardedMigration struct {
}

func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
ctx context.Context, uniqueMigrationName string,
logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, err
}
if previouslyMigrated {
//nolint:nilnil // a sentinel value here would be stupid
return nil, nil
}
applier, err := factory.New(db)
if err != nil {
return nil, err
Expand All @@ -145,15 +159,15 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta)
return nil
}
if !g.applyLogged {
g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration")
g.logger.WithField("ledger", meta.LedgerSequence()).Info("Applying migration")
g.applyLogged = true
}
return g.migration.Apply(ctx, meta)
}

func (g *guardedMigration) ApplicableRange() *LedgerSeqRange {
func (g *guardedMigration) ApplicableRange() LedgerSeqRange {
if g.alreadyMigrated {
return nil
return LedgerSeqRange{}
}
return g.migration.ApplicableRange()
}
Expand All @@ -165,38 +179,41 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
return LedgerSeqRange{}, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
return LedgerSeqRange{
First: firstLedgerToMigrate,
Last: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
func BuildMigrations(
ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange LedgerSeqRange,
) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return MultiMigration{}, errors.Join(err, db.Rollback())
}

migrationNameToFunc := map[string]migrationApplierF{
//
// Add new DB migrations here:
//
currentMigrations := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}

migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrations := make([]Migration, 0, len(currentMigrations))
for migrationName, migrationFunc := range currentMigrations {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
Expand All @@ -210,8 +227,15 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass
return MultiMigration{}, errors.Join(err, fmt.Errorf(
"could not create guarded migration for %s", migrationName), db.Rollback())
}

if guardedM == nil {
logger.Infof("Skipping completed migration %s", migrationName)
continue
}

migrations = append(migrations, guardedM)
}

return MultiMigration{
migrations: migrations,
db: db,
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ type transactionTableMigration struct {
writer TransactionWriter
}

func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (t *transactionTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: t.firstLedger,
Last: t.lastLedger,
}
Expand All @@ -270,7 +270,7 @@ func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
Expand Down
Loading

0 comments on commit e110732

Please sign in to comment.