Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve startup by eliminating unnecessary migration ranges #282

Merged
merged 8 commits into from
Sep 4, 2024
Merged
107 changes: 82 additions & 25 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ import (
)

const (
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second

// Since our default retention window will be 7 days (7*17,280 ledgers),
// choose a random 5-digit prime to have irregular logging intervals at each
// halfish-day of processing
inMemoryInitializationLedgerLogPeriod = 10_099
)

type Daemon struct {
Expand Down Expand Up @@ -289,6 +293,31 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
//
// There is a lot of complex "ledger window math" here so it's worth
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
// clarifying as a comment beforehand.
//
// There are two windows in play here:
// - the ledger retention window, which describes the range of txmeta
// to keep relative to the latest "ledger tip" of the network
// - the fee stats window, which describes a *subset* of the prior
// ledger retention window on which to perform fee analysis
//
// If the fee window *exceeds* the retention window, this doesn't make any
// sense since it implies the user wants to store N amount of actual
// historical data and M > N amount of ledgers just for fee processing,
// which is nonsense from a performance standpoint. So we should prevent
// this config on startup.
//
maxFeeRetentionWindow := max(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow)
if maxFeeRetentionWindow > cfg.HistoryRetentionWindow {
d.logger.Fatalf(
"Fee stat analysis window (%d) cannot exceed history retention window (%d).",
maxFeeRetentionWindow, cfg.HistoryRetentionWindow)
}

feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
Expand All @@ -299,27 +328,48 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()

var initialSeq, currentSeq uint32
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
//
// With that in mind, it means we should launch as follows:
//
// 1. Based on the ledger retention window, identify the ledger range that
// needs to migrated. We don't do "partial" migrations (a new migration to a
// migrated table would have a different "Migrated<Table>" meta key in the
// database), so this should represent the entire range of ledger meta we're
// storing.
//
retentionRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
//
// 2. However, if we have already performed migrations, we don't want to
// count those in the "to migrate" ledger range. Thus, db.BuildMigrations
// will return the *applicable* range for the incomplete set of migrations.
// That means **it may be empty** if all migrations have occurred.
//
dataMigrations, applicableRange, err := db.BuildMigrations(
readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, retentionRange)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
d.logger.WithError(err).Fatal("could not build migrations")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
//
// 4. Finally, we can incorporate the fee analysis window. If there are
// migrations to do, this will have no effect, since the migration window is
// larger than the fee window. If there were *no* migrations, though, this
// means the final range is only the fee stat analysis range.
//
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}
ledgerSeqRange := feeStatsRange.Merge(&applicableRange)

// Apply migration for events, transactions and fee stats
//
// 5. Apply migration for events & transactions, and perform fee stat analysis.
//
var initialSeq, currentSeq uint32
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.First,
Expand All @@ -328,32 +378,39 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithField("seq", currentSeq).
Info("initializing in-memory store")
d.logger.
WithField("first", initialSeq).
WithField("last", ledgerSeqRange.Last).
Info("Initializing in-memory store")
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithField("seq", currentSeq).
Debug("still initializing in-memory store")
d.logger.
WithField("seq", currentSeq).
WithField("last", ledgerSeqRange.Last).
Debug("Still initializing in-memory store")
}

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
if feeStatsRange.IsLedgerIncluded(currentSeq) { // skip irrelevant ledgers
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if err = feeWindows.IngestFees(txMeta); err != nil {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}

return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
d.logger.WithError(err).Fatal("Could not obtain txmeta cache from the database")
}
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
d.logger.WithError(err).Fatal("Could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithField("seq", currentSeq).
Info("finished initializing in-memory store and applying DB data migrations")
Info("Finished initializing in-memory store and applying DB data migrations")
}

return feeWindows
Expand Down
66 changes: 48 additions & 18 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
}

func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if mlr == nil {
if mlr.Empty() {
return other
}
if other == nil {
if other.Empty() {
return mlr
}
// TODO: using min/max can result in a much larger range than needed,
Expand All @@ -41,6 +41,17 @@ func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange {
}
}

func (mlr *LedgerSeqRange) MergeInPlace(other LedgerSeqRange) {
// TODO: using min/max can result in a much larger range than needed,
// as an optimization, we should probably use a sequence of ranges instead.
mlr.First = min(mlr.First, other.First)
mlr.Last = max(mlr.Last, other.Last)
}

func (mlr *LedgerSeqRange) Empty() bool {
return mlr == nil || (mlr.First == 0 && mlr.Last == 0)
}

type MigrationApplier interface {
// ApplicableRange returns the closed ledger sequence interval,
// where Apply() should be called. A null result indicates the empty range
Expand Down Expand Up @@ -117,13 +128,18 @@ type guardedMigration struct {
}

func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
ctx context.Context, uniqueMigrationName string,
logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, err
}
if previouslyMigrated {
//nolint:nilnil // a sentinel value here would be stupid
return nil, nil
}
applier, err := factory.New(db)
if err != nil {
return nil, err
Expand All @@ -145,7 +161,7 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta)
return nil
}
if !g.applyLogged {
g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration")
g.logger.WithField("ledger", meta.LedgerSequence()).Info("Applying migration")
g.applyLogged = true
}
return g.migration.Apply(ctx, meta)
Expand All @@ -165,55 +181,69 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
return LedgerSeqRange{}, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
return LedgerSeqRange{
First: firstLedgerToMigrate,
Last: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
) (MultiMigration, error) {
func BuildMigrations(
ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange LedgerSeqRange,
) (MultiMigration, LedgerSeqRange, error) {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
// Track ranges for which migrations are actually necessary
applicableRange := LedgerSeqRange{}

// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return MultiMigration{}, errors.Join(err, db.Rollback())
return MultiMigration{}, applicableRange, errors.Join(err, db.Rollback())
}

migrationNameToFunc := map[string]migrationApplierF{
//
// Add new migrations here:
//
currentMigrations := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}

migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrations := make([]Migration, 0, len(currentMigrations))
for migrationName, migrationFunc := range currentMigrations {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
migrationLogger,
networkPassphrase,
ledgerSeqRange,
&ledgerSeqRange,
)

guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db)
if err != nil {
return MultiMigration{}, errors.Join(err, fmt.Errorf(
return MultiMigration{}, applicableRange, errors.Join(err, fmt.Errorf(
"could not create guarded migration for %s", migrationName), db.Rollback())
}

if guardedM == nil {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("Skipping completed migration %s", migrationName)
continue
}

applicableRange.MergeInPlace(ledgerSeqRange)
migrations = append(migrations, guardedM)
}

return MultiMigration{
migrations: migrations,
db: db,
}, nil
}, applicableRange, nil
}
4 changes: 2 additions & 2 deletions cmd/soroban-rpc/internal/feewindow/feewindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ type FeeWindows struct {
db *db.DB
}

func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows {
func NewFeeWindows(classicRetention uint32, sorobanRetention uint32, networkPassPhrase string, db *db.DB) *FeeWindows {
return &FeeWindows{
SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion),
SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetention),
ClassicFeeWindow: NewFeeWindow(classicRetention),
networkPassPhrase: networkPassPhrase,
db: db,
Expand Down
Loading