Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ingestion data-flow #4909

Closed
8 tasks done
tamirms opened this issue Jun 15, 2023 · 5 comments · Fixed by #5101
Closed
8 tasks done

Refactor ingestion data-flow #4909

tamirms opened this issue Jun 15, 2023 · 5 comments · Fixed by #5101
Assignees
Labels
horizon performance issues aimed at improving performance

Comments

@tamirms
Copy link
Contributor

tamirms commented Jun 15, 2023

In https://docs.google.com/document/d/1YETNALx5EzqZDNSVWzTfaK5Ogw84PsBlrt64nOr-Njg/edit?usp=sharing we were able to speed up ingestion by refactoring the ingestion data-flow. Currently the Horizon processors for the history tables are coupled to a single ledger. We need to break this coupling and allow a Horizon processor instance to be used on batches of ledgers.

Also, we need to refactor how the DB is used in a Horizon processor. Currently, we periodically insert batches of rows to the DB and flush all remaining rows to the DB at the end of the ingestion round. We should instead accumulate all the rows in memory and only use the DB at the very end of the ingestion round to flush all the rows using COPY statements.

Note that these changes only need to be applied on the Horizon processors for the history tables. We do not need to modify the Horizon processors for the state tables.

In the spike branch the interface for the Horizon history processors was changed to:

type horizonTransactionProcessor interface {
	ProcessTransaction(xdr.LedgerCloseMeta, ingest.LedgerTransaction) error
	Commit(ctx context.Context, session db.SessionInterface) error
}

Note how ProcessTransaction() now takes xdr.LedgerCloseMeta parameter which allows the processor to be used on transactions spanning multiple ledgers. Also, ProcessTransaction() no longer has a context parameter because we don't expect to have any DB operations in ProcessTransaction(). Instead, ProcessTransaction() will only accumulate rows for the history tables in-memory. The Commit() function is the only part of the processor which should have access to the db and it will be used to flush the in-memory rows to the DB.

Some of the history tables rely on lookup tables to obtain integer ids for accounts, assets, claimable balances, and liquidity pools. In this case the data-flow will be slightly more complex. In the spike branch, the code for keeping track of all the id lookups was encapsulated into a "loader" component. Whenever the processor encountered an account string in ProcessTransaction(), the processor would register the account string in the loader component. The resulting data-flow looked like:

	accountLoader := history.NewAccountLoader()
	cbLoader := history.NewClaimableBalanceLoader()
	lpLoader := history.NewLiquidityPoolLoader()
	assetLoader := history.NewAssetLoader()
	processors := buildTransactionProcessors(
		s.historyQ,
		accountLoader,
		cbLoader,
		lpLoader,
		assetLoader,
	)

       // apply all the ledgers in the batch on the processors
       for _, ledger := range ledgers {
		if err = s.runner.ApplyProcessorsOnLedger(processors, ledgerCloseMeta); err != nil {
			return err
		}
       }

       // use the loaders to lookup all the accounts, assets, claimable balances, and liquidity pools registered
       // by the processors
       err = func() error {
		if err := s.historyQ.Begin(); err != nil {
			return errors.Wrap(err, "Error starting a transaction")
		}
		defer s.historyQ.Rollback()

		if err := accountLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := cbLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := lpLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := assetLoader.Exec(s.ctx, s.historyQ); err != nil {
			return err
		}
		if err := s.historyQ.Commit(); err != nil {
			return errors.Wrap(err, commitErrMsg)
		}
		return nil
	}()

        // flush the rows to the db, the processors will be able to obtain the integer ids from the loaders
	if err := s.historyQ.Begin(); err != nil {
		return errors.Wrap(err, "Error starting a transaction")
	}
	defer s.historyQ.Rollback()
	if err := processors.Commit(s.ctx, s.historyQ); err != nil {
		return err
	}
	if err := s.historyQ.Commit(); err != nil {
		return errors.Wrap(err, commitErrMsg)
	}

This refactoring will need to be implemented on the following Horizon ingestion processors:

@sreuland
Copy link
Contributor

have begun working on sub-task to finish integrating the new processor interface into the tx/commit flow of processor runner

sreuland added a commit to sreuland/go that referenced this issue Oct 16, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 17, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 19, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 20, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 20, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 20, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 23, 2023
sreuland added a commit to sreuland/go that referenced this issue Oct 24, 2023
…asset texts, trade processor depends on ordered id's for buyer/seller delineation
sreuland added a commit to sreuland/go that referenced this issue Oct 24, 2023
@sreuland
Copy link
Contributor

I think before closing this ticket, the criteria also includes porting the latest mainline of horizon back to the ingestion-next feature, assert all tests pass, and then deploy build to staging infrastructure and test full history ingestion on pubnet.

sreuland added a commit to sreuland/go that referenced this issue Oct 31, 2023
@tamirms
Copy link
Contributor Author

tamirms commented Nov 1, 2023

@sreuland once #5096 is merged, I think we should merge the ingestion-next branch into master.

We can deploy to staging and run the verify-range jobs on full history during the testing phase of the release process. Running verify-range will take more than 1 week and I'd prefer to avoid having the master branch diverge from ingestion-next (thus creating more merge conflicts) during that time.

@sreuland
Copy link
Contributor

sreuland commented Nov 1, 2023

@sreuland once #5096 is merged, I think we should merge the ingestion-next branch into master.

We can deploy to staging and run the verify-range jobs on full history during the testing phase of the release process. Running verify-range will take more than 1 week and I'd prefer to avoid having the master branch diverge from ingestion-next (thus creating more merge conflicts) during that time.

@tamirms got it, no staging performance tests after merging ingestion-next to master, rather @urvisavla and I would proceed with those new remaining feature dev tickets for ingestion perf - #5098/#5099, get those into master, then release prep for horizon 2.28.0, and run the staging verify-range on a larger than usual range for performance insight as part of release, sounds good, as that reduces verify-range effort to one occurrence.

@tamirms
Copy link
Contributor Author

tamirms commented Nov 1, 2023

@sreuland yes, that's right

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
horizon performance issues aimed at improving performance
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

3 participants