Skip to content

Commit

Permalink
services/horizon/internal/ingest/processors: Refactor liquidity pools…
Browse files Browse the repository at this point in the history
…, trades, and claimable balances processors to support new ingestion data flow (#5025)
  • Loading branch information
tamirms authored Aug 25, 2023
1 parent 461e5a1 commit 21d016f
Show file tree
Hide file tree
Showing 24 changed files with 420 additions and 755 deletions.
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.getNow(a.address), nil
return a.loader.GetNow(a.address), nil
}

// AccountLoader will map account addresses to their history
Expand Down Expand Up @@ -67,11 +67,11 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID {
}
}

// getNow returns the history account id for the given address.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the history account id for the given address.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) getNow(address string) int64 {
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
} else {
Expand Down Expand Up @@ -190,3 +190,20 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string
)
return err
}

// AccountLoaderStub is a stub wrapper around AccountLoader which allows
// you to manually configure the mapping of addresses to history account ids
type AccountLoaderStub struct {
Loader *AccountLoader
}

// NewAccountLoaderStub returns a new AccountLoaderStub instance
func NewAccountLoaderStub() AccountLoaderStub {
return AccountLoaderStub{Loader: NewAccountLoader()}
}

// Insert updates the wrapped AccountLoader so that the given account
// address is mapped to the provided history account id
func (a AccountLoaderStub) Insert(address string, id int64) {
a.Loader.ids[address] = id
}
6 changes: 4 additions & 2 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ func TestAccountLoader(t *testing.T) {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(address)
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(address)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand All @@ -42,7 +44,7 @@ func TestAccountLoader(t *testing.T) {
q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.getNow(address)
id := loader.GetNow(address)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, id, val)
Expand Down
37 changes: 32 additions & 5 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/ordered"
"github.com/stellar/go/xdr"
)

type AssetKey struct {
Expand All @@ -20,6 +21,15 @@ type AssetKey struct {
Issuer string
}

// AssetKeyFromXDR constructs an AssetKey from an xdr asset
func AssetKeyFromXDR(asset xdr.Asset) AssetKey {
return AssetKey{
Type: xdr.AssetTypeToString[asset.Type],
Code: asset.GetCode(),
Issuer: asset.GetIssuer(),
}
}

// FutureAssetID represents a future history asset.
// A FutureAssetID is created by an AssetLoader and
// the asset id is available after calling Exec() on
Expand All @@ -31,7 +41,7 @@ type FutureAssetID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureAssetID) Value() (driver.Value, error) {
return a.loader.getNow(a.asset), nil
return a.loader.GetNow(a.asset), nil
}

// AssetLoader will map assets to their history
Expand Down Expand Up @@ -67,11 +77,11 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID {
}
}

// getNow returns the history asset id for the given asset.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the history asset id for the given asset.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AssetLoader) getNow(asset AssetKey) int64 {
func (a *AssetLoader) GetNow(asset AssetKey) int64 {
if id, ok := a.ids[asset]; !ok {
panic(fmt.Errorf("asset %v not present", asset))
} else {
Expand Down Expand Up @@ -186,3 +196,20 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err

return a.lookupKeys(ctx, q, keys)
}

// AssetLoaderStub is a stub wrapper around AssetLoader which allows
// you to manually configure the mapping of assets to history asset ids
type AssetLoaderStub struct {
Loader *AssetLoader
}

// NewAssetLoaderStub returns a new AssetLoaderStub instance
func NewAssetLoaderStub() AssetLoaderStub {
return AssetLoaderStub{Loader: NewAssetLoader()}
}

// Insert updates the wrapped AssetLoaderStub so that the given asset
// address is mapped to the provided history asset id
func (a AssetLoaderStub) Insert(asset AssetKey, id int64) {
a.Loader.ids[asset] = id
}
6 changes: 4 additions & 2 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ func TestAssetLoader(t *testing.T) {
future := loader.GetFuture(key)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(key)
loader.GetNow(key)
})
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(key)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand All @@ -56,7 +58,7 @@ func TestAssetLoader(t *testing.T) {
q := &Q{session}
for i, key := range keys {
future := futures[i]
internalID := loader.getNow(key)
internalID := loader.GetNow(key)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func TestClaimableBalanceLoader(t *testing.T) {
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand Down
14 changes: 5 additions & 9 deletions services/horizon/internal/db2/history/effect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/guregu/null"

"github.com/stellar/go/protocols/horizon/effects"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/services/horizon/internal/test"
Expand Down Expand Up @@ -47,17 +48,12 @@ func TestEffectsForLiquidityPool(t *testing.T) {

// Insert Liquidity Pool history
liquidityPoolID := "abcde"
toInternalID, err := q.CreateHistoryLiquidityPools(tt.Ctx, []string{liquidityPoolID}, 2)
tt.Assert.NoError(err)
lpLoader := NewLiquidityPoolLoader()

operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder()
tt.Assert.NoError(err)
internalID, ok := toInternalID[liquidityPoolID]
tt.Assert.True(ok)
err = operationBuilder.Add(opID, internalID)
tt.Assert.NoError(err)
err = operationBuilder.Exec(tt.Ctx, q)
tt.Assert.NoError(err)
tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID)))
tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(operationBuilder.Exec(tt.Ctx, q))

tt.Assert.NoError(q.Commit())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (q *Q) ClaimableBalanceByID(ctx context.Context, id string) (dest HistoryCl
}

type OperationClaimableBalanceBatchInsertBuilder interface {
Add(operationID, internalID int64) error
Add(operationID int64, claimableBalance FutureClaimableBalanceID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -109,10 +109,10 @@ func (q *Q) NewOperationClaimableBalanceBatchInsertBuilder() OperationClaimableB
}

// Add adds a new operation claimable balance to the batch
func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID, internalID int64) error {
func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
"history_claimable_balance_id": internalID,
"history_claimable_balance_id": claimableBalance,
})
}

Expand All @@ -122,7 +122,7 @@ func (i *operationClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context,
}

type TransactionClaimableBalanceBatchInsertBuilder interface {
Add(transactionID, internalID int64) error
Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -139,10 +139,10 @@ func (q *Q) NewTransactionClaimableBalanceBatchInsertBuilder() TransactionClaima
}

// Add adds a new transaction claimable balance to the batch
func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID, internalID int64) error {
func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_claimable_balance_id": internalID,
"history_claimable_balance_id": claimableBalance,
})
}

Expand Down
13 changes: 7 additions & 6 deletions services/horizon/internal/db2/history/history_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
)
Expand Down Expand Up @@ -101,7 +102,7 @@ func (q *Q) LiquidityPoolByID(ctx context.Context, poolID string) (dest HistoryL
}

type OperationLiquidityPoolBatchInsertBuilder interface {
Add(operationID, internalID int64) error
Add(operationID int64, lp FutureLiquidityPoolID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -118,10 +119,10 @@ func (q *Q) NewOperationLiquidityPoolBatchInsertBuilder() OperationLiquidityPool
}

// Add adds a new operation claimable balance to the batch
func (i *operationLiquidityPoolBatchInsertBuilder) Add(operationID, internalID int64) error {
func (i *operationLiquidityPoolBatchInsertBuilder) Add(operationID int64, lp FutureLiquidityPoolID) error {
return i.builder.Row(map[string]interface{}{
"history_operation_id": operationID,
"history_liquidity_pool_id": internalID,
"history_liquidity_pool_id": lp,
})
}

Expand All @@ -131,7 +132,7 @@ func (i *operationLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context, ses
}

type TransactionLiquidityPoolBatchInsertBuilder interface {
Add(transactionID, internalID int64) error
Add(transactionID int64, lp FutureLiquidityPoolID) error
Exec(ctx context.Context, session db.SessionInterface) error
}

Expand All @@ -148,10 +149,10 @@ func (q *Q) NewTransactionLiquidityPoolBatchInsertBuilder() TransactionLiquidity
}

// Add adds a new transaction claimable balance to the batch
func (i *transactionLiquidityPoolBatchInsertBuilder) Add(transactionID, internalID int64) error {
func (i *transactionLiquidityPoolBatchInsertBuilder) Add(transactionID int64, lp FutureLiquidityPoolID) error {
return i.builder.Row(map[string]interface{}{
"history_transaction_id": transactionID,
"history_liquidity_pool_id": internalID,
"history_liquidity_pool_id": lp,
})
}

Expand Down
27 changes: 22 additions & 5 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct {

// Value implements the database/sql/driver Valuer interface.
func (a FutureLiquidityPoolID) Value() (driver.Value, error) {
return a.loader.getNow(a.id), nil
return a.loader.GetNow(a.id), nil
}

// LiquidityPoolLoader will map liquidity pools to their internal
Expand Down Expand Up @@ -60,11 +60,11 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID {
}
}

// getNow returns the internal history id for the given liquidity pool.
// getNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any getNow
// GetNow returns the internal history id for the given liquidity pool.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *LiquidityPoolLoader) getNow(id string) int64 {
func (a *LiquidityPoolLoader) GetNow(id string) int64 {
if id, ok := a.ids[id]; !ok {
panic(fmt.Errorf("id %v not present", id))
} else {
Expand Down Expand Up @@ -141,3 +141,20 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf

return a.lookupKeys(ctx, q, ids)
}

// LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows
// you to manually configure the mapping of liquidity pools to history liquidity ppol ids
type LiquidityPoolLoaderStub struct {
Loader *LiquidityPoolLoader
}

// NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance
func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub {
return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()}
}

// Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool
// is mapped to the provided history liquidity pool id
func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) {
a.Loader.ids[lp] = id
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func TestLiquidityPoolLoader(t *testing.T) {
future := loader.GetFuture(id)
futures = append(futures, future)
assert.Panics(t, func() {
loader.getNow(id)
loader.GetNow(id)
})
assert.Panics(t, func() {
future.Value()
})
duplicateFuture := loader.GetFuture(id)
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
Expand All @@ -45,7 +47,7 @@ func TestLiquidityPoolLoader(t *testing.T) {
q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID := loader.getNow(id)
internalID := loader.GetNow(id)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, internalID, val)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type MockTransactionClaimableBalanceBatchInsertBuilder struct {
mock.Mock
}

func (m *MockTransactionClaimableBalanceBatchInsertBuilder) Add(transactionID, accountID int64) error {
a := m.Called(transactionID, accountID)
func (m *MockTransactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error {
a := m.Called(transactionID, claimableBalance)
return a.Error(0)
}

Expand All @@ -51,8 +51,8 @@ type MockOperationClaimableBalanceBatchInsertBuilder struct {
mock.Mock
}

func (m *MockOperationClaimableBalanceBatchInsertBuilder) Add(transactionID, accountID int64) error {
a := m.Called(transactionID, accountID)
func (m *MockOperationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error {
a := m.Called(operationID, claimableBalance)
return a.Error(0)
}

Expand Down
Loading

0 comments on commit 21d016f

Please sign in to comment.