Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/db2/history: Use FastBatchInsertBuilder to insert transactions into the history_transactions #4950

Merged
merged 2 commits into from
Jul 18, 2023

Conversation

tamirms
Copy link
Contributor

@tamirms tamirms commented Jul 10, 2023

PR Checklist

PR Structure

  • This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • This PR avoids mixing refactoring changes with feature changes (split into two PRs
    otherwise).
  • This PR's title starts with name of package that is most changed in the PR, ex.
    services/friendbot, or all or doc if the changes are broad or impact many
    packages.

Thoroughness

  • This PR adds tests for the most critical parts of the new functionality or fixes.
  • I've updated any docs (developer docs, .md
    files, etc... affected by this change). Take a look in the docs folder for a given service,
    like this one.

Release planning

  • I've updated the relevant CHANGELOG (here for Horizon) if
    needed with deprecations, added features, breaking changes, and DB schema changes.
  • I've decided if this PR requires a new major/minor version according to
    semver, or if it's mainly a patch change. The PR is targeted at the next
    release branch if it's not a patch change.

What

In #4916 we introduced a FastBatchInsertBuilder which can be used to quickly insert a large amount of rows into Postgres via the COPY command. In this PR, we use FastBatchInsertBuilder to insert transactions into the history_transactions table.

Why

As a part of #4908 , we want to update all the code which inserts data into the history tables to use the FastBatchInsertBuilder.

Known limitations

[N/A]

@tamirms tamirms force-pushed the transactions-batch branch from 386b660 to 4467f16 Compare July 11, 2023 19:35
@tamirms tamirms requested review from Shaptic and sreuland July 11, 2023 19:36
Table: q.GetTable("history_transactions"),
MaxBatchSize: maxBatchSize,
},
table: "history_transactions",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a constant available to reference instead of hardcode table name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no constant. I could create one but this would be the only place where it would get referenced.

}

// transactionBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type transactionBatchInsertBuilder struct {
encodingBuffer *xdr.EncodingBuffer
builder db.BatchInsertBuilder
table string
builder db.FastBatchInsertBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worthwhile to store the query session instance in the builder instance state i.e. : q: &Q, then don't have to pass it on TransactionBatchInsertBuilder.Exec(ctx context.Context) error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I don't include the session in the struct is because I want to make it explicit that only the Exec() function is acting on the DB. The Add() function is only accumulating rows in-memory. Eventually, we will refactor the ingestion processors so that the processors are not constructed with a db session and only the Commit() function will take a db session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, understood. just a thought exercise, in a way, does that force sql/db persistence aspect onto the ingest lifecycle interfaces, i.e. what if the builders or processors were talking to a no-sql source instead, if they maintain reference to their persistence layer internally through a struct reference, then the ingest interface methods like Exec/Commit can be storage type agnostic, with no parameters referencing specific backend types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point. I'll keep that in mind when we implement #4909 . Perhaps it would be better to not have a session in the Commit() functions.

tt.Assert.NoError(q.Begin())
insertBuilder = q.NewTransactionBatchInsertBuilder()
tt.Assert.NoError(insertBuilder.Add(secondTransaction, sequence))
tt.Assert.Error(insertBuilder.Exec(tt.Ctx, q))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the duplicate error message/content no longer present in the error returned from copy execution? is there any other unique indicators in that error to capture? asking more-so for debugging this type of output in logs at runtime, if there were duplicate tx's attempted on insert, would there a be logged error content stating that aspect at some other level if not captured in the error from fast batch builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 4c07d3b

@@ -210,8 +213,10 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

tt.Assert.NoError(q.Begin())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the significance of starting a db session transaction here rather than on ln 92, after the last db commit and before the same session gets used on liquidity pool query? Have noticed direct calls to q.commit() in tests using the builder also, just want to understand the nuances with the new fast builder requiring that if so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Exec() function needs to be invoked within a transaction:

https://github.com/tamirms/go/blob/4467f163dfdf8bd3ac5884e3377e8790c7815c50/support/db/fast_batch_insert_builder.go#L113

The reason is that the lib/pq implementation of COPY does not allow you to execute COPY outside of a transaction

@@ -168,7 +168,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHisto
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ, uint32(ledger.Header.LedgerSeq))
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the significance between ProcessorRunner.session and ProcessorRunner.historyQ, as they are both set to the same db.SessionInterface

could this be reverted to just pass ProcessorRunner.historyQ for obtaining reference/access to the same pooled db session instance?

Copy link
Contributor Author

@tamirms tamirms Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The historyQ value implements both interfaces in https://github.com/tamirms/go/blob/transactions-batch/services/horizon/internal/ingest/processor_runner.go#L91-L92. Note that history.IngestionQ and db.SessionInterface are distinct interfaces

Eventually we will refactor the ingestion processors so that they don't need the db instance except during the Commit() phase (see #4909 ). When that PR is implemented we will remove the duplicate fields in ProcessorRunner and only have one db session field.

Copy link
Contributor

@sreuland sreuland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments, leave to your discretion on whether you feel they warrant changes or not. overall, looks good!

@tamirms tamirms merged commit af8161b into stellar:ingestion-next Jul 18, 2023
@tamirms tamirms deleted the transactions-batch branch July 18, 2023 12:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants