Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve startup by eliminating unnecessary migration ranges #282

Merged
merged 8 commits into from
Sep 4, 2024
Merged
102 changes: 74 additions & 28 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ import (
)

const (
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second

// Since our default retention window will be 7 days (7*17,280 ledgers),
// choose a random 5-digit prime to have irregular logging intervals at each
// halfish-day of processing
inMemoryInitializationLedgerLogPeriod = 10_099
)

type Daemon struct {
Expand Down Expand Up @@ -289,6 +293,29 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
//
// There's some complex "ledger window math" here so we should clarify it
// beforehand.
//
// There are two windows in play here:
// - the ledger retention window, which describes the range of txmeta
// to keep relative to the latest "ledger tip" of the network
// - the fee stats window, which describes a *subset* of the prior
// ledger retention window on which to perform fee analysis
//
// If the fee window *exceeds* the retention window, this doesn't make any
// sense since it implies the user wants to store N amount of actual
// historical data and M > N amount of ledgers just for fee processing,
// which is nonsense from a performance standpoint. We prevent this:
maxFeeRetentionWindow := max(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow)
if maxFeeRetentionWindow > cfg.HistoryRetentionWindow {
d.logger.Fatalf(
"Fee stat analysis window (%d) cannot exceed history retention window (%d).",
maxFeeRetentionWindow, cfg.HistoryRetentionWindow)
}

feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
Expand All @@ -299,27 +326,42 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()

var initialSeq, currentSeq uint32
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
// To combine these windows, we launch as follows:
//
// 1. First, identify the ledger range for database migrations based on the
// ledger retention window. Since we don't do "partial" migrations (all or
// nothing), this represents the entire range of ledger metas we store.
//
retentionRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
dataMigrations, err := db.BuildMigrations(
readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, retentionRange)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
d.logger.WithError(err).Fatal("could not build migrations")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
// 2. Then, incorporate the fee analysis window. If there are migrations to
// do, this has no effect, since migration windows are larger than the fee
// window. In the absence of migrations, though, this means the ingestion
// range is just the fee stat range.
//
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Apply migration for events, transactions and fee stats
// Additionally, by treating the fee window *as if* it's a migration, we can
// make the interface here really clean.
dataMigrations.Append(feeWindows.AsMigration(feeStatsRange))
ledgerSeqRange := dataMigrations.ApplicableRange()
Shaptic marked this conversation as resolved.
Show resolved Hide resolved

//
// 3. Apply all migrations, including fee stat analysis.
//
var initialSeq, currentSeq uint32
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.First,
Expand All @@ -328,32 +370,36 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithField("seq", currentSeq).
Info("initializing in-memory store")
d.logger.
WithField("first", initialSeq).
WithField("last", ledgerSeqRange.Last).
Info("Initializing in-memory store")
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithField("seq", currentSeq).
Debug("still initializing in-memory store")
}

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
d.logger.
WithField("seq", currentSeq).
WithField("last", ledgerSeqRange.Last).
Debug("Still initializing in-memory store")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}

return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
d.logger.WithError(err).Fatal("Could not obtain txmeta cache from the database")
}

if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
d.logger.WithError(err).Fatal("Could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithField("seq", currentSeq).
Info("finished initializing in-memory store and applying DB data migrations")
d.logger.
WithField("first", retentionRange.First).
WithField("last", retentionRange.Last).
Info("Finished initializing in-memory store and applying DB data migrations")
}

return feeWindows
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ type eventTableMigration struct {
writer EventWriter
}

func (e *eventTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (e *eventTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: e.firstLedger,
Last: e.lastLedger,
}
Expand All @@ -326,7 +326,7 @@ func newEventTableMigration(
_ context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
Expand Down
76 changes: 50 additions & 26 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,40 @@ type LedgerSeqRange struct {
Last uint32
}

func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
if mlr == nil {
return false
}
func (mlr LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool {
return ledgerSeq >= mlr.First && ledgerSeq <= mlr.Last
}

func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange {
if mlr == nil {
func (mlr LedgerSeqRange) Merge(other LedgerSeqRange) LedgerSeqRange {
if mlr.Empty() {
return other
}
if other == nil {
if other.Empty() {
return mlr
}

// TODO: using min/max can result in a much larger range than needed,
// as an optimization, we should probably use a sequence of ranges instead.
return &LedgerSeqRange{
return LedgerSeqRange{
First: min(mlr.First, other.First),
Last: max(mlr.Last, other.Last),
}
}

func (mlr LedgerSeqRange) Empty() bool {
return mlr.First == 0 && mlr.Last == 0
}

type MigrationApplier interface {
// ApplicableRange returns the closed ledger sequence interval,
// where Apply() should be called. A null result indicates the empty range
ApplicableRange() *LedgerSeqRange
// where Apply() should be called.
ApplicableRange() LedgerSeqRange
// Apply applies the migration on a ledger. It should never be applied
// in ledgers outside the ApplicableRange()
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory
type migrationApplierF func(context.Context, *log.Entry, string, LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB) (MigrationApplier, error)
Expand All @@ -72,8 +74,15 @@ type MultiMigration struct {
db *DB
}

func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
func (mm *MultiMigration) Append(m Migration) {
r := m.ApplicableRange()
if !r.Empty() {
mm.migrations = append(mm.migrations, m)
}
}

func (mm MultiMigration) ApplicableRange() LedgerSeqRange {
var result LedgerSeqRange
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
Expand Down Expand Up @@ -117,13 +126,18 @@ type guardedMigration struct {
}

func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
ctx context.Context, uniqueMigrationName string,
logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, err
}
if previouslyMigrated {
//nolint:nilnil // a sentinel value here would be stupid
return nil, nil
}
applier, err := factory.New(db)
if err != nil {
return nil, err
Expand All @@ -145,15 +159,15 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta)
return nil
}
if !g.applyLogged {
g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration")
g.logger.WithField("ledger", meta.LedgerSequence()).Info("Applying migration")
g.applyLogged = true
}
return g.migration.Apply(ctx, meta)
}

func (g *guardedMigration) ApplicableRange() *LedgerSeqRange {
func (g *guardedMigration) ApplicableRange() LedgerSeqRange {
if g.alreadyMigrated {
return nil
return LedgerSeqRange{}
}
return g.migration.ApplicableRange()
}
Expand All @@ -165,38 +179,41 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
return LedgerSeqRange{}, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
return LedgerSeqRange{
First: firstLedgerToMigrate,
Last: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
func BuildMigrations(
ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange LedgerSeqRange,
) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return MultiMigration{}, errors.Join(err, db.Rollback())
}

migrationNameToFunc := map[string]migrationApplierF{
//
// Add new DB migrations here:
//
currentMigrations := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}

migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrations := make([]Migration, 0, len(currentMigrations))
for migrationName, migrationFunc := range currentMigrations {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
Expand All @@ -210,8 +227,15 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass
return MultiMigration{}, errors.Join(err, fmt.Errorf(
"could not create guarded migration for %s", migrationName), db.Rollback())
}

if guardedM == nil {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof("Skipping completed migration %s", migrationName)
continue
}

migrations = append(migrations, guardedM)
}

return MultiMigration{
migrations: migrations,
db: db,
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ type transactionTableMigration struct {
writer TransactionWriter
}

func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange {
return &LedgerSeqRange{
func (t *transactionTableMigration) ApplicableRange() LedgerSeqRange {
return LedgerSeqRange{
First: t.firstLedger,
Last: t.lastLedger,
}
Expand All @@ -270,7 +270,7 @@ func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
ledgerSeqRange LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
Expand Down
Loading
Loading