Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest/processors: Refactor liquidity pools, trades, and claimable balances processors to support new ingestion data flow #5025

Merged
merged 4 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.getNow(a.address), nil
return a.loader.GetNow(a.address), nil
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -67,11 +67,11 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
}
}

// getNow returns the history account id for the given address.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the history account id for the given address.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) getNow(address string) int64 {
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
} else {
Expand Down Expand Up @@ -190,3 +190,20 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
)
return err
}

// AccountLoaderStub is a stub wrapper around AccountLoader which allows
// you to manually configure the mapping of addresses to history account ids
type AccountLoaderStub struct {
Loader *AccountLoader
}

// NewAccountLoaderStub returns a new AccountLoaderStub instance
func NewAccountLoaderStub() AccountLoaderStub {
return AccountLoaderStub{Loader: NewAccountLoader()}
}

// Insert updates the wrapped AccountLoader so that the given account
// address is mapped to the provided history account id
func (a AccountLoaderStub) Insert(address string, id int64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to understand what's the reason for using a stub wrapper vs. adding Insert(address, id) on to existing AccountLoader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the history ids should be determined by calling the Exec() method which queries the DB. However, I don't want the unit tests to interact with the db so I added an Insert() method to allow tests to manually specify history ids for accounts. The reason Insert() is not defined on the existing AccountLoader is that it's only supposed to be used by unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, which means AccountLoaderStub is only intended for test scope as well? can it be defined in account_loader_test.go instead to drive the test aspect from the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AccountLoaderStub is used by the trades processor test which is in a different package. If AccountLoaderStub were defined in account_loader_test.go it would not be visible to other packages

a.Loader.ids[address] = id
}
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAccountLoader(t *testing.T) {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(address)
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
Expand All @@ -44,7 +44,7 @@ func TestAccountLoader(t *testing.T) {
q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.getNow(address)
id := loader.GetNow(address)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, id, val)
Expand Down
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.getNow(a.asset), nil
return a.loader.GetNow(a.asset), nil
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -67,11 +67,11 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
}
}

// getNow returns the history asset id for the given asset.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the history asset id for the given asset.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) getNow(asset AssetKey) int64 {
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
} else {
Expand Down Expand Up @@ -186,3 +186,20 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err

return a.lookupKeys(ctx, q, keys)
}

// AssetLoaderStub is a stub wrapper around AssetLoader which allows
// you to manually configure the mapping of assets to history asset ids
type AssetLoaderStub struct {
Loader *AssetLoader
}

// NewAssetLoaderStub returns a new AssetLoaderStub instance
func NewAssetLoaderStub() AssetLoaderStub {
return AssetLoaderStub{Loader: NewAssetLoader()}
}

// Insert updates the wrapped AssetLoaderStub so that the given asset
// address is mapped to the provided history asset id
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.ids[asset] = id
}
4 changes: 2 additions & 2 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAssetLoader(t *testing.T) {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(key)
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
Expand All @@ -58,7 +58,7 @@ func TestAssetLoader(t *testing.T) {
q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.getNow(key)
internalID := loader.GetNow(key)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
Expand Down
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureLiquidityPoolID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.GetNow(a.id), nil
}

// LiquidityPoolLoader will map liquidity pools to their internal
Expand Down Expand Up @@ -60,11 +60,11 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID {
}
}

// getNow returns the internal history id for the given liquidity pool.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the internal history id for the given liquidity pool.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *LiquidityPoolLoader) getNow(id string) int64 {
func (a *LiquidityPoolLoader) GetNow(id string) int64 {
if id, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
} else {
Expand Down Expand Up @@ -141,3 +141,20 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf

return a.lookupKeys(ctx, q, ids)
}

// LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows
// you to manually configure the mapping of liquidity pools to history liquidity ppol ids
type LiquidityPoolLoaderStub struct {
Loader *LiquidityPoolLoader
}

// NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance
func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub {
return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()}
}

// Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool
// is mapped to the provided history liquidity pool id
func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) {
a.Loader.ids[lp] = id
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestLiquidityPoolLoader(t *testing.T) {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
loader.GetNow(id)
})
assert.Panics(t, func() {
future.Value()
Expand All @@ -47,7 +47,7 @@ func TestLiquidityPoolLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
internalID := loader.GetNow(id)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func TestAddOperationParticipants(t *testing.T) {

op := ops[0]
tt.Assert.Equal(int64(240518172673), op.OperationID)
tt.Assert.Equal(accountLoader.getNow(address), op.AccountID)
tt.Assert.Equal(accountLoader.GetNow(address), op.AccountID)
}
}
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/participants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestTransactionParticipantsBatch(t *testing.T) {
{TransactionID: 2},
}
for i := range expected {
expected[i].AccountID = accountLoader.getNow(addresses[i])
expected[i].AccountID = accountLoader.GetNow(addresses[i])
}
tt.Assert.ElementsMatch(expected, participants)
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (p *ClaimableBalancesTransactionProcessor) addOperationClaimableBalances(
return nil
}

func (p *ClaimableBalancesTransactionProcessor) Commit(ctx context.Context, session db.SessionInterface) error {
func (p *ClaimableBalancesTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error {
err := p.txBatch.Exec(ctx, session)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) TestEmptyClaimabl
s.mockTransactionBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()
s.mockOperationBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()

s.Assert().NoError(s.processor.Commit(s.ctx, s.mockSession))
s.Assert().NoError(s.processor.Flush(s.ctx, s.mockSession))
}

func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInserts(balanceID xdr.ClaimableBalanceId, body xdr.OperationBody, change xdr.LedgerEntryChange) {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInse
// Process the transaction
err = s.processor.ProcessTransaction(s.lcm, txn)
s.Assert().NoError(err)
err = s.processor.Commit(s.ctx, s.mockSession)
err = s.processor.Flush(s.ctx, s.mockSession)
s.Assert().NoError(err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (p *LiquidityPoolsTransactionProcessor) addOperationLiquidityPools(sequence
return nil
}

func (p *LiquidityPoolsTransactionProcessor) Commit(ctx context.Context, session db.SessionInterface) error {
func (p *LiquidityPoolsTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error {
if err := p.txBatch.Exec(ctx, session); err != nil {
return errors.Wrap(err, "Could not flush transaction liquidity pools to db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) TestEmptyLiquidityPo
s.mockTransactionBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()
s.mockOperationBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()

err := s.processor.Commit(context.Background(), s.mockSession)
err := s.processor.Flush(context.Background(), s.mockSession)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) testOperationInserts
// Process the transaction
err := s.processor.ProcessTransaction(s.lcm, txn)
s.Assert().NoError(err)
err = s.processor.Commit(s.ctx, s.mockSession)
err = s.processor.Flush(s.ctx, s.mockSession)
s.Assert().NoError(err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (p *ParticipantsProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, tran
return nil
}

func (p *ParticipantsProcessor) Commit(ctx context.Context, session db.SessionInterface) error {
func (p *ParticipantsProcessor) Flush(ctx context.Context, session db.SessionInterface) error {
if err := p.txBatch.Exec(ctx, session); err != nil {
return errors.Wrap(err, "Could not flush transaction participants to db")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestEmptyParticipants() {
s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()
s.mockOperationsBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()

err := s.processor.Commit(s.ctx, s.mockSession)
err := s.processor.Flush(s.ctx, s.mockSession)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestFeeBumptransaction() {
s.mockOperationsBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once()

s.Assert().NoError(s.processor.ProcessTransaction(s.lcm, feeBumpTx))
s.Assert().NoError(s.processor.Commit(s.ctx, s.mockSession))
s.Assert().NoError(s.processor.Flush(s.ctx, s.mockSession))
}

func (s *ParticipantsProcessorTestSuiteLedger) TestIngestParticipantsSucceeds() {
Expand All @@ -204,7 +204,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestIngestParticipantsSucceeds()
err := s.processor.ProcessTransaction(s.lcm, tx)
s.Assert().NoError(err)
}
err := s.processor.Commit(s.ctx, s.mockSession)
err := s.processor.Flush(s.ctx, s.mockSession)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -240,7 +240,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestBatchAddExecFails() {
err := s.processor.ProcessTransaction(s.lcm, tx)
s.Assert().NoError(err)
}
err := s.processor.Commit(s.ctx, s.mockSession)
err := s.processor.Flush(s.ctx, s.mockSession)
s.Assert().EqualError(err, "Could not flush transaction participants to db: transient error")
}

Expand All @@ -255,6 +255,6 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestOperationBatchAddExecFails()
err := s.processor.ProcessTransaction(s.lcm, tx)
s.Assert().NoError(err)
}
err := s.processor.Commit(s.ctx, s.mockSession)
err := s.processor.Flush(s.ctx, s.mockSession)
s.Assert().EqualError(err, "Could not flush operation participants to db: transient error")
}
Loading