Skip to content

Commit

Permalink
Clean up the migration even more
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jun 11, 2024
1 parent ce0fe5e commit 61dff8b
Showing 1 changed file with 57 additions and 45 deletions.
102 changes: 57 additions & 45 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func MustNew(cfg *config.Config) *Daemon {
}

// mustInitializeStorage initializes the storage using what was on the DB
// TODO: This function is horrendous, cleanup once we remove the in-memory storage
// TODO: clean up once we remove the in-memory storage
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
Expand All @@ -290,50 +290,11 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
// We should do the migration somewhere else
migrationSession := d.db.Clone()
if err := migrationSession.Begin(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not start migration session")
}
migrationFunc := func(txmeta xdr.LedgerCloseMeta) error { return nil }
migrationDoneFunc := func() {}
val, err := db.GetMetaBool(readTxMetaCtx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err == db.ErrEmptyDB || val == false {
d.logger.Info("migrating transaction to new backend")
writer := db.NewTransactionWriter(d.logger, migrationSession, cfg.NetworkPassphrase)
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx)
if err != nil || err != db.ErrEmptyDB {
d.logger.WithError(err).Fatal("cannot read latest ledger")
}
firstLedgerToMigrate := uint32(2)
if latestLedger > cfg.TransactionLedgerRetentionWindow {
firstLedgerToMigrate = latestLedger - cfg.TransactionLedgerRetentionWindow
}
migrationFunc = func(txmeta xdr.LedgerCloseMeta) error {
if txmeta.LedgerSequence() < firstLedgerToMigrate {
return nil
}
return writer.InsertTransactions(txmeta)
}
migrationDoneFunc = func() {
err := db.SetMetaBool(readTxMetaCtx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err != nil {
d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not set metadata")
migrationSession.Rollback()
return
}
// TODO: rollback wherever necessary
if err = migrationSession.Commit(); err != nil {
d.logger.WithError(err).Error("could not commit migration session")
}
}
} else if err != nil {
d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not get metadata")
}
migration, migrationDone := d.newTxMigration(readTxMetaCtx, cfg)
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
err := db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
Expand All @@ -351,7 +312,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
if err := migrationFunc(txmeta); err != nil {
if err := migration(txmeta); err != nil {
// TODO: we should only migrate the transaction range
d.logger.WithError(err).Fatal("could not run migration")
}
Expand All @@ -360,17 +321,68 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
migrationDoneFunc()
migrationDone()

if currentSeq != 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}

return feewindows, eventStore
}

// TODO: We should probably implement the migrations somewhere else
type migrationFunc func(txmeta xdr.LedgerCloseMeta) error
type migrationDoneFunc func()

func (d *Daemon) newTxMigration(ctx context.Context, cfg *config.Config) (migrationFunc, migrationDoneFunc) {
migrationSession := d.db.Clone()
if err := migrationSession.Begin(ctx); err != nil {
d.logger.WithError(err).Fatal("could not start migration session")
}
migration := func(txmeta xdr.LedgerCloseMeta) error { return nil }
migrationDone := func() {}
previouslyMigrated, err := db.GetMetaBool(ctx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err != nil {
if !errors.Is(err, db.ErrEmptyDB) {
d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not get metadata")
}
} else if previouslyMigrated {
migrationSession.Rollback()
return migration, migrationDone
}

d.logger.Info("migrating transactions to new backend")
writer := db.NewTransactionWriter(d.logger, migrationSession, cfg.NetworkPassphrase)
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(ctx)
if err != nil || err != db.ErrEmptyDB {
d.logger.WithError(err).Fatal("cannot read latest ledger")
}
firstLedgerToMigrate := uint32(2)
if latestLedger > cfg.TransactionLedgerRetentionWindow {
firstLedgerToMigrate = latestLedger - cfg.TransactionLedgerRetentionWindow
}
migration = func(txmeta xdr.LedgerCloseMeta) error {
if txmeta.LedgerSequence() < firstLedgerToMigrate {
return nil
}
return writer.InsertTransactions(txmeta)
}
migrationDone = func() {
err := db.SetMetaBool(ctx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err != nil {
d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not set metadata")
migrationSession.Rollback()
return
}
if err = migrationSession.Commit(); err != nil {
d.logger.WithError(err).Error("could not commit migration session")
}
}
return migration, migrationDone
}

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"addr": d.server.Addr,
Expand Down

0 comments on commit 61dff8b

Please sign in to comment.