Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest/processors: Refactor liquidity pools, trades, and claimable balances processors to support new ingestion data flow #5025

Merged
merged 4 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.getNow(a.address), nil
return a.loader.GetNow(a.address), nil
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -67,11 +67,11 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
}
}

// getNow returns the history account id for the given address.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the history account id for the given address.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) getNow(address string) int64 {
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
} else {
Expand Down Expand Up @@ -190,3 +190,20 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
)
return err
}

// AccountLoaderStub is a stub wrapper around AccountLoader which allows
// you to manually configure the mapping of addresses to history account ids
type AccountLoaderStub struct {
Loader *AccountLoader
}

// NewAccountLoaderStub returns a new AccountLoaderStub instance
func NewAccountLoaderStub() AccountLoaderStub {
return AccountLoaderStub{Loader: NewAccountLoader()}
}

// Insert updates the wrapped AccountLoader so that the given account
// address is mapped to the provided history account id
func (a AccountLoaderStub) Insert(address string, id int64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to understand what's the reason for using a stub wrapper vs. adding Insert(address, id) on to existing AccountLoader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the history ids should be determined by calling the Exec() method which queries the DB. However, I don't want the unit tests to interact with the db so I added an Insert() method to allow tests to manually specify history ids for accounts. The reason Insert() is not defined on the existing AccountLoader is that it's only supposed to be used by unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, which means AccountLoaderStub is only intended for test scope as well? can it be defined in account_loader_test.go instead to drive the test aspect from the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AccountLoaderStub is used by the trades processor test which is in a different package. If AccountLoaderStub were defined in account_loader_test.go it would not be visible to other packages

a.Loader.ids[address] = id
}
6 changes: 4 additions & 2 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ func TestAccountLoader(t *testing.T) {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(address)
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand All @@ -42,7 +44,7 @@ func TestAccountLoader(t *testing.T) {
q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.getNow(address)
id := loader.GetNow(address)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, id, val)
Expand Down
37 changes: 32 additions & 5 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

type AssetKey struct {
Expand All @@ -20,6 +21,15 @@ type AssetKey struct {
Issuer string
}

// AssetKeyFromXDR constructs an AssetKey from an xdr asset
func AssetKeyFromXDR(asset xdr.Asset) AssetKey {
return AssetKey{
Type: xdr.AssetTypeToString[asset.Type],
Code: asset.GetCode(),
Issuer: asset.GetIssuer(),
}
}

// FutureAssetID represents a future history asset.
// A FutureAssetID is created by an AssetLoader and
// the asset id is available after calling Exec() on
Expand All @@ -31,7 +41,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.getNow(a.asset), nil
return a.loader.GetNow(a.asset), nil
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -67,11 +77,11 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
}
}

// getNow returns the history asset id for the given asset.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the history asset id for the given asset.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) getNow(asset AssetKey) int64 {
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
} else {
Expand Down Expand Up @@ -186,3 +196,20 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err

return a.lookupKeys(ctx, q, keys)
}

// AssetLoaderStub is a stub wrapper around AssetLoader which allows
// you to manually configure the mapping of assets to history asset ids
type AssetLoaderStub struct {
Loader *AssetLoader
}

// NewAssetLoaderStub returns a new AssetLoaderStub instance
func NewAssetLoaderStub() AssetLoaderStub {
return AssetLoaderStub{Loader: NewAssetLoader()}
}

// Insert updates the wrapped AssetLoaderStub so that the given asset
// address is mapped to the provided history asset id
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.ids[asset] = id
}
6 changes: 4 additions & 2 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func TestAssetLoader(t *testing.T) {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(key)
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand All @@ -56,7 +58,7 @@ func TestAssetLoader(t *testing.T) {
q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.getNow(key)
internalID := loader.GetNow(key)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func TestClaimableBalanceLoader(t *testing.T) {
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand Down
14 changes: 5 additions & 9 deletions services/horizon/internal/db2/history/effect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/guregu/null"

"github.com/stellar/go/protocols/horizon/effects"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/test"
Expand Down Expand Up @@ -47,17 +48,12 @@ func TestEffectsForLiquidityPool(t *testing.T) {

// Insert Liquidity Pool history
liquidityPoolID := "abcde"
toInternalID, err := q.CreateHistoryLiquidityPools(tt.Ctx, []string{liquidityPoolID}, 2)
tt.Assert.NoError(err)
lpLoader := NewLiquidityPoolLoader()

operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder()
tt.Assert.NoError(err)
internalID, ok := toInternalID[liquidityPoolID]
tt.Assert.True(ok)
err = operationBuilder.Add(opID, internalID)
tt.Assert.NoError(err)
err = operationBuilder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)
tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID)))
tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(operationBuilder.Exec(tt.Ctx, q))

tt.Assert.NoError(q.Commit())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (q *Q) ClaimableBalanceByID(ctx context.Context, id string) (dest HistoryCl
}

type OperationClaimableBalanceBatchInsertBuilder interface {
Add(operationID, internalID int64) error
Add(operationID int64, claimableBalance FutureClaimableBalanceID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -109,10 +109,10 @@ func (q *Q) NewOperationClaimableBalanceBatchInsertBuilder() OperationClaimableB
}

// Add adds a new operation claimable balance to the batch
func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID, internalID int64) error {
func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
"history_claimable_balance_id": internalID,
"history_claimable_balance_id": claimableBalance,
})
}

Expand All @@ -122,7 +122,7 @@ func (i *operationClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context,
}

type TransactionClaimableBalanceBatchInsertBuilder interface {
Add(transactionID, internalID int64) error
Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -139,10 +139,10 @@ func (q *Q) NewTransactionClaimableBalanceBatchInsertBuilder() TransactionClaima
}

// Add adds a new transaction claimable balance to the batch
func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID, internalID int64) error {
func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_claimable_balance_id": internalID,
"history_claimable_balance_id": claimableBalance,
})
}

Expand Down
13 changes: 7 additions & 6 deletions services/horizon/internal/db2/history/history_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
)
Expand Down Expand Up @@ -101,7 +102,7 @@ func (q *Q) LiquidityPoolByID(ctx context.Context, poolID string) (dest HistoryL
}

type OperationLiquidityPoolBatchInsertBuilder interface {
Add(operationID, internalID int64) error
Add(operationID int64, lp FutureLiquidityPoolID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -118,10 +119,10 @@ func (q *Q) NewOperationLiquidityPoolBatchInsertBuilder() OperationLiquidityPool
}

// Add adds a new operation claimable balance to the batch
func (i *operationLiquidityPoolBatchInsertBuilder) Add(operationID, internalID int64) error {
func (i *operationLiquidityPoolBatchInsertBuilder) Add(operationID int64, lp FutureLiquidityPoolID) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
"history_liquidity_pool_id": internalID,
"history_liquidity_pool_id": lp,
})
}

Expand All @@ -131,7 +132,7 @@ func (i *operationLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context, ses
}

type TransactionLiquidityPoolBatchInsertBuilder interface {
Add(transactionID, internalID int64) error
Add(transactionID int64, lp FutureLiquidityPoolID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -148,10 +149,10 @@ func (q *Q) NewTransactionLiquidityPoolBatchInsertBuilder() TransactionLiquidity
}

// Add adds a new transaction claimable balance to the batch
func (i *transactionLiquidityPoolBatchInsertBuilder) Add(transactionID, internalID int64) error {
func (i *transactionLiquidityPoolBatchInsertBuilder) Add(transactionID int64, lp FutureLiquidityPoolID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_liquidity_pool_id": internalID,
"history_liquidity_pool_id": lp,
})
}

Expand Down
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureLiquidityPoolID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.GetNow(a.id), nil
}

// LiquidityPoolLoader will map liquidity pools to their internal
Expand Down Expand Up @@ -60,11 +60,11 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID {
}
}

// getNow returns the internal history id for the given liquidity pool.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the internal history id for the given liquidity pool.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *LiquidityPoolLoader) getNow(id string) int64 {
func (a *LiquidityPoolLoader) GetNow(id string) int64 {
if id, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
} else {
Expand Down Expand Up @@ -141,3 +141,20 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf

return a.lookupKeys(ctx, q, ids)
}

// LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows
// you to manually configure the mapping of liquidity pools to history liquidity ppol ids
type LiquidityPoolLoaderStub struct {
Loader *LiquidityPoolLoader
}

// NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance
func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub {
return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()}
}

// Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool
// is mapped to the provided history liquidity pool id
func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) {
a.Loader.ids[lp] = id
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func TestLiquidityPoolLoader(t *testing.T) {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
loader.GetNow(id)
})
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand All @@ -45,7 +47,7 @@ func TestLiquidityPoolLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
internalID := loader.GetNow(id)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type MockTransactionClaimableBalanceBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTransactionClaimableBalanceBatchInsertBuilder) Add(transactionID, accountID int64) error {
a := m.Called(transactionID, accountID)
func (m *MockTransactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error {
a := m.Called(transactionID, claimableBalance)
return a.Error(0)
}

Expand All @@ -51,8 +51,8 @@ type MockOperationClaimableBalanceBatchInsertBuilder struct {
mock.Mock
}

func (m *MockOperationClaimableBalanceBatchInsertBuilder) Add(transactionID, accountID int64) error {
a := m.Called(transactionID, accountID)
func (m *MockOperationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error {
a := m.Called(operationID, claimableBalance)
return a.Error(0)
}

Expand Down
Loading