From 21d016f3f7b01ca1e2f341417762b18c1a059e22 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 25 Aug 2023 19:31:50 +0100 Subject: [PATCH] services/horizon/internal/ingest/processors: Refactor liquidity pools, trades, and claimable balances processors to support new ingestion data flow (#5025) --- .../internal/db2/history/account_loader.go | 27 +- .../db2/history/account_loader_test.go | 6 +- .../internal/db2/history/asset_loader.go | 37 ++- .../internal/db2/history/asset_loader_test.go | 6 +- .../history/claimable_balance_loader_test.go | 2 + .../internal/db2/history/effect_test.go | 14 +- .../db2/history/history_claimable_balances.go | 12 +- .../db2/history/history_liquidity_pools.go | 13 +- .../db2/history/liquidity_pool_loader.go | 27 +- .../db2/history/liquidity_pool_loader_test.go | 6 +- .../mock_q_history_claimable_balances.go | 8 +- .../history/mock_q_history_liquidity_pools.go | 8 +- ...n_participant_batch_insert_builder_test.go | 2 +- .../internal/db2/history/operation_test.go | 18 +- .../internal/db2/history/participants_test.go | 2 +- .../internal/db2/history/transaction_test.go | 20 +- ...laimable_balances_transaction_processor.go | 189 +++-------- ...ble_balances_transaction_processor_test.go | 89 +++-- .../liquidity_pools_transaction_processor.go | 182 +++------- ...uidity_pools_transaction_processor_test.go | 82 +++-- .../processors/participants_processor.go | 2 +- .../processors/participants_processor_test.go | 10 +- .../ingest/processors/trades_processor.go | 102 +++--- .../processors/trades_processor_test.go | 311 +++++------------- 24 files changed, 420 insertions(+), 755 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index 14bcfc5243..e9fd9bedea 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000 // Value implements the database/sql/driver Valuer interface. func (a FutureAccountID) Value() (driver.Value, error) { - return a.loader.getNow(a.address), nil + return a.loader.GetNow(a.address), nil } // AccountLoader will map account addresses to their history @@ -67,11 +67,11 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID { } } -// getNow returns the history account id for the given address. -// getNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any getNow +// GetNow returns the history account id for the given address. +// GetNow should only be called on values which were registered by +// GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AccountLoader) getNow(address string) int64 { +func (a *AccountLoader) GetNow(address string) int64 { if id, ok := a.ids[address]; !ok { panic(fmt.Errorf("address %v not present", address)) } else { @@ -190,3 +190,20 @@ func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string ) return err } + +// AccountLoaderStub is a stub wrapper around AccountLoader which allows +// you to manually configure the mapping of addresses to history account ids +type AccountLoaderStub struct { + Loader *AccountLoader +} + +// NewAccountLoaderStub returns a new AccountLoaderStub instance +func NewAccountLoaderStub() AccountLoaderStub { + return AccountLoaderStub{Loader: NewAccountLoader()} +} + +// Insert updates the wrapped AccountLoader so that the given account +// address is mapped to the provided history account id +func (a AccountLoaderStub) Insert(address string, id int64) { + a.Loader.ids[address] = id +} diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 14d933c0ad..11047f3be2 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -27,11 +27,13 @@ func TestAccountLoader(t *testing.T) { future := loader.GetFuture(address) futures = append(futures, future) assert.Panics(t, func() { - loader.getNow(address) + loader.GetNow(address) }) assert.Panics(t, func() { future.Value() }) + duplicateFuture := loader.GetFuture(address) + assert.Equal(t, future, duplicateFuture) } assert.NoError(t, loader.Exec(context.Background(), session)) @@ -42,7 +44,7 @@ func TestAccountLoader(t *testing.T) { q := &Q{session} for i, address := range addresses { future := futures[i] - id := loader.getNow(address) + id := loader.GetNow(address) val, err := future.Value() assert.NoError(t, err) assert.Equal(t, id, val) diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index 326f1b68ba..6ef3d7a350 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" ) type AssetKey struct { @@ -20,6 +21,15 @@ type AssetKey struct { Issuer string } +// AssetKeyFromXDR constructs an AssetKey from an xdr asset +func AssetKeyFromXDR(asset xdr.Asset) AssetKey { + return AssetKey{ + Type: xdr.AssetTypeToString[asset.Type], + Code: asset.GetCode(), + Issuer: asset.GetIssuer(), + } +} + // FutureAssetID represents a future history asset. // A FutureAssetID is created by an AssetLoader and // the asset id is available after calling Exec() on @@ -31,7 +41,7 @@ type FutureAssetID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureAssetID) Value() (driver.Value, error) { - return a.loader.getNow(a.asset), nil + return a.loader.GetNow(a.asset), nil } // AssetLoader will map assets to their history @@ -67,11 +77,11 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID { } } -// getNow returns the history asset id for the given asset. -// getNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any getNow +// GetNow returns the history asset id for the given asset. +// GetNow should only be called on values which were registered by +// GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AssetLoader) getNow(asset AssetKey) int64 { +func (a *AssetLoader) GetNow(asset AssetKey) int64 { if id, ok := a.ids[asset]; !ok { panic(fmt.Errorf("asset %v not present", asset)) } else { @@ -186,3 +196,20 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err return a.lookupKeys(ctx, q, keys) } + +// AssetLoaderStub is a stub wrapper around AssetLoader which allows +// you to manually configure the mapping of assets to history asset ids +type AssetLoaderStub struct { + Loader *AssetLoader +} + +// NewAssetLoaderStub returns a new AssetLoaderStub instance +func NewAssetLoaderStub() AssetLoaderStub { + return AssetLoaderStub{Loader: NewAssetLoader()} +} + +// Insert updates the wrapped AssetLoaderStub so that the given asset +// address is mapped to the provided history asset id +func (a AssetLoaderStub) Insert(asset AssetKey, id int64) { + a.Loader.ids[asset] = id +} diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index f51b2e27ef..99f510266c 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -41,11 +41,13 @@ func TestAssetLoader(t *testing.T) { future := loader.GetFuture(key) futures = append(futures, future) assert.Panics(t, func() { - loader.getNow(key) + loader.GetNow(key) }) assert.Panics(t, func() { future.Value() }) + duplicateFuture := loader.GetFuture(key) + assert.Equal(t, future, duplicateFuture) } assert.NoError(t, loader.Exec(context.Background(), session)) @@ -56,7 +58,7 @@ func TestAssetLoader(t *testing.T) { q := &Q{session} for i, key := range keys { future := futures[i] - internalID := loader.getNow(key) + internalID := loader.GetNow(key) val, err := future.Value() assert.NoError(t, err) assert.Equal(t, internalID, val) diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index 183bdb3daa..b119daa674 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -38,6 +38,8 @@ func TestClaimableBalanceLoader(t *testing.T) { assert.Panics(t, func() { future.Value() }) + duplicateFuture := loader.GetFuture(id) + assert.Equal(t, future, duplicateFuture) } assert.NoError(t, loader.Exec(context.Background(), session)) diff --git a/services/horizon/internal/db2/history/effect_test.go b/services/horizon/internal/db2/history/effect_test.go index 06cfc2adcb..bf893a100b 100644 --- a/services/horizon/internal/db2/history/effect_test.go +++ b/services/horizon/internal/db2/history/effect_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/guregu/null" + "github.com/stellar/go/protocols/horizon/effects" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/test" @@ -47,17 +48,12 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "abcde" - toInternalID, err := q.CreateHistoryLiquidityPools(tt.Ctx, []string{liquidityPoolID}, 2) - tt.Assert.NoError(err) + lpLoader := NewLiquidityPoolLoader() operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() - tt.Assert.NoError(err) - internalID, ok := toInternalID[liquidityPoolID] - tt.Assert.True(ok) - err = operationBuilder.Add(opID, internalID) - tt.Assert.NoError(err) - err = operationBuilder.Exec(tt.Ctx, q) - tt.Assert.NoError(err) + tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID))) + tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q)) + tt.Assert.NoError(operationBuilder.Exec(tt.Ctx, q)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/db2/history/history_claimable_balances.go b/services/horizon/internal/db2/history/history_claimable_balances.go index 162c31a70b..5d2076f3fd 100644 --- a/services/horizon/internal/db2/history/history_claimable_balances.go +++ b/services/horizon/internal/db2/history/history_claimable_balances.go @@ -92,7 +92,7 @@ func (q *Q) ClaimableBalanceByID(ctx context.Context, id string) (dest HistoryCl } type OperationClaimableBalanceBatchInsertBuilder interface { - Add(operationID, internalID int64) error + Add(operationID int64, claimableBalance FutureClaimableBalanceID) error Exec(ctx context.Context, session db.SessionInterface) error } @@ -109,10 +109,10 @@ func (q *Q) NewOperationClaimableBalanceBatchInsertBuilder() OperationClaimableB } // Add adds a new operation claimable balance to the batch -func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID, internalID int64) error { +func (i *operationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error { return i.builder.Row(map[string]interface{}{ "history_operation_id": operationID, - "history_claimable_balance_id": internalID, + "history_claimable_balance_id": claimableBalance, }) } @@ -122,7 +122,7 @@ func (i *operationClaimableBalanceBatchInsertBuilder) Exec(ctx context.Context, } type TransactionClaimableBalanceBatchInsertBuilder interface { - Add(transactionID, internalID int64) error + Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error Exec(ctx context.Context, session db.SessionInterface) error } @@ -139,10 +139,10 @@ func (q *Q) NewTransactionClaimableBalanceBatchInsertBuilder() TransactionClaima } // Add adds a new transaction claimable balance to the batch -func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID, internalID int64) error { +func (i *transactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error { return i.builder.Row(map[string]interface{}{ "history_transaction_id": transactionID, - "history_claimable_balance_id": internalID, + "history_claimable_balance_id": claimableBalance, }) } diff --git a/services/horizon/internal/db2/history/history_liquidity_pools.go b/services/horizon/internal/db2/history/history_liquidity_pools.go index 3953280aa9..bb13ac59f9 100644 --- a/services/horizon/internal/db2/history/history_liquidity_pools.go +++ b/services/horizon/internal/db2/history/history_liquidity_pools.go @@ -5,6 +5,7 @@ import ( "sort" sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" ) @@ -101,7 +102,7 @@ func (q *Q) LiquidityPoolByID(ctx context.Context, poolID string) (dest HistoryL } type OperationLiquidityPoolBatchInsertBuilder interface { - Add(operationID, internalID int64) error + Add(operationID int64, lp FutureLiquidityPoolID) error Exec(ctx context.Context, session db.SessionInterface) error } @@ -118,10 +119,10 @@ func (q *Q) NewOperationLiquidityPoolBatchInsertBuilder() OperationLiquidityPool } // Add adds a new operation claimable balance to the batch -func (i *operationLiquidityPoolBatchInsertBuilder) Add(operationID, internalID int64) error { +func (i *operationLiquidityPoolBatchInsertBuilder) Add(operationID int64, lp FutureLiquidityPoolID) error { return i.builder.Row(map[string]interface{}{ "history_operation_id": operationID, - "history_liquidity_pool_id": internalID, + "history_liquidity_pool_id": lp, }) } @@ -131,7 +132,7 @@ func (i *operationLiquidityPoolBatchInsertBuilder) Exec(ctx context.Context, ses } type TransactionLiquidityPoolBatchInsertBuilder interface { - Add(transactionID, internalID int64) error + Add(transactionID int64, lp FutureLiquidityPoolID) error Exec(ctx context.Context, session db.SessionInterface) error } @@ -148,10 +149,10 @@ func (q *Q) NewTransactionLiquidityPoolBatchInsertBuilder() TransactionLiquidity } // Add adds a new transaction claimable balance to the batch -func (i *transactionLiquidityPoolBatchInsertBuilder) Add(transactionID, internalID int64) error { +func (i *transactionLiquidityPoolBatchInsertBuilder) Add(transactionID int64, lp FutureLiquidityPoolID) error { return i.builder.Row(map[string]interface{}{ "history_transaction_id": transactionID, - "history_liquidity_pool_id": internalID, + "history_liquidity_pool_id": lp, }) } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index ac501dcfd3..7c2fe6fd4d 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureLiquidityPoolID) Value() (driver.Value, error) { - return a.loader.getNow(a.id), nil + return a.loader.GetNow(a.id), nil } // LiquidityPoolLoader will map liquidity pools to their internal @@ -60,11 +60,11 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID { } } -// getNow returns the internal history id for the given liquidity pool. -// getNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any getNow +// GetNow returns the internal history id for the given liquidity pool. +// GetNow should only be called on values which were registered by +// GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *LiquidityPoolLoader) getNow(id string) int64 { +func (a *LiquidityPoolLoader) GetNow(id string) int64 { if id, ok := a.ids[id]; !ok { panic(fmt.Errorf("id %v not present", id)) } else { @@ -141,3 +141,20 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf return a.lookupKeys(ctx, q, ids) } + +// LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows +// you to manually configure the mapping of liquidity pools to history liquidity ppol ids +type LiquidityPoolLoaderStub struct { + Loader *LiquidityPoolLoader +} + +// NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance +func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { + return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()} +} + +// Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool +// is mapped to the provided history liquidity pool id +func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) { + a.Loader.ids[lp] = id +} diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index 00664ff3e3..e2b1e05beb 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -30,11 +30,13 @@ func TestLiquidityPoolLoader(t *testing.T) { future := loader.GetFuture(id) futures = append(futures, future) assert.Panics(t, func() { - loader.getNow(id) + loader.GetNow(id) }) assert.Panics(t, func() { future.Value() }) + duplicateFuture := loader.GetFuture(id) + assert.Equal(t, future, duplicateFuture) } assert.NoError(t, loader.Exec(context.Background(), session)) @@ -45,7 +47,7 @@ func TestLiquidityPoolLoader(t *testing.T) { q := &Q{session} for i, id := range ids { future := futures[i] - internalID := loader.getNow(id) + internalID := loader.GetNow(id) val, err := future.Value() assert.NoError(t, err) assert.Equal(t, internalID, val) diff --git a/services/horizon/internal/db2/history/mock_q_history_claimable_balances.go b/services/horizon/internal/db2/history/mock_q_history_claimable_balances.go index 6ce3926c91..6607456af2 100644 --- a/services/horizon/internal/db2/history/mock_q_history_claimable_balances.go +++ b/services/horizon/internal/db2/history/mock_q_history_claimable_balances.go @@ -29,8 +29,8 @@ type MockTransactionClaimableBalanceBatchInsertBuilder struct { mock.Mock } -func (m *MockTransactionClaimableBalanceBatchInsertBuilder) Add(transactionID, accountID int64) error { - a := m.Called(transactionID, accountID) +func (m *MockTransactionClaimableBalanceBatchInsertBuilder) Add(transactionID int64, claimableBalance FutureClaimableBalanceID) error { + a := m.Called(transactionID, claimableBalance) return a.Error(0) } @@ -51,8 +51,8 @@ type MockOperationClaimableBalanceBatchInsertBuilder struct { mock.Mock } -func (m *MockOperationClaimableBalanceBatchInsertBuilder) Add(transactionID, accountID int64) error { - a := m.Called(transactionID, accountID) +func (m *MockOperationClaimableBalanceBatchInsertBuilder) Add(operationID int64, claimableBalance FutureClaimableBalanceID) error { + a := m.Called(operationID, claimableBalance) return a.Error(0) } diff --git a/services/horizon/internal/db2/history/mock_q_history_liquidity_pools.go b/services/horizon/internal/db2/history/mock_q_history_liquidity_pools.go index 33f5c8a46b..bf000a22e9 100644 --- a/services/horizon/internal/db2/history/mock_q_history_liquidity_pools.go +++ b/services/horizon/internal/db2/history/mock_q_history_liquidity_pools.go @@ -29,8 +29,8 @@ type MockTransactionLiquidityPoolBatchInsertBuilder struct { mock.Mock } -func (m *MockTransactionLiquidityPoolBatchInsertBuilder) Add(transactionID, accountID int64) error { - a := m.Called(transactionID, accountID) +func (m *MockTransactionLiquidityPoolBatchInsertBuilder) Add(transactionID int64, lp FutureLiquidityPoolID) error { + a := m.Called(transactionID, lp) return a.Error(0) } @@ -51,8 +51,8 @@ type MockOperationLiquidityPoolBatchInsertBuilder struct { mock.Mock } -func (m *MockOperationLiquidityPoolBatchInsertBuilder) Add(transactionID, accountID int64) error { - a := m.Called(transactionID, accountID) +func (m *MockOperationLiquidityPoolBatchInsertBuilder) Add(operationID int64, lp FutureLiquidityPoolID) error { + a := m.Called(operationID, lp) return a.Error(0) } diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index 1bcd64cceb..fc2ca9c831 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -43,6 +43,6 @@ func TestAddOperationParticipants(t *testing.T) { op := ops[0] tt.Assert.Equal(int64(240518172673), op.OperationID) - tt.Assert.Equal(accountLoader.getNow(address), op.AccountID) + tt.Assert.Equal(accountLoader.GetNow(address), op.AccountID) } } diff --git a/services/horizon/internal/db2/history/operation_test.go b/services/horizon/internal/db2/history/operation_test.go index a996ac49b7..40c6e5262e 100644 --- a/services/horizon/internal/db2/history/operation_test.go +++ b/services/horizon/internal/db2/history/operation_test.go @@ -5,6 +5,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/guregu/null" + "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/toid" @@ -122,18 +123,13 @@ func TestOperationByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - toInternalID, err := q.CreateHistoryLiquidityPools(tt.Ctx, []string{liquidityPoolID}, 2) - tt.Assert.NoError(err) + lpLoader := NewLiquidityPoolLoader() + lpOperationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() - tt.Assert.NoError(err) - internalID, ok := toInternalID[liquidityPoolID] - tt.Assert.True(ok) - err = lpOperationBuilder.Add(opID1, internalID) - tt.Assert.NoError(err) - err = lpOperationBuilder.Add(opID2, internalID) - tt.Assert.NoError(err) - err = lpOperationBuilder.Exec(tt.Ctx, q) - tt.Assert.NoError(err) + tt.Assert.NoError(lpOperationBuilder.Add(opID1, lpLoader.GetFuture(liquidityPoolID))) + tt.Assert.NoError(lpOperationBuilder.Add(opID2, lpLoader.GetFuture(liquidityPoolID))) + tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q)) + tt.Assert.NoError(lpOperationBuilder.Exec(tt.Ctx, q)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index a8d87976ac..16671098bf 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -63,7 +63,7 @@ func TestTransactionParticipantsBatch(t *testing.T) { {TransactionID: 2}, } for i := range expected { - expected[i].AccountID = accountLoader.getNow(addresses[i]) + expected[i].AccountID = accountLoader.GetNow(addresses[i]) } tt.Assert.ElementsMatch(expected, participants) } diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index 5bc423006e..a145a13d43 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -8,6 +8,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/guregu/null" + "github.com/stellar/go/xdr" "github.com/stellar/go/ingest" @@ -77,22 +78,17 @@ func TestTransactionByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - toInternalID, err := q.CreateHistoryLiquidityPools(tt.Ctx, []string{liquidityPoolID}, 2) - tt.Assert.NoError(err) + lpLoader := NewLiquidityPoolLoader() lpTransactionBuilder := q.NewTransactionLiquidityPoolBatchInsertBuilder() - tt.Assert.NoError(err) - internalID, ok := toInternalID[liquidityPoolID] - tt.Assert.True(ok) - err = lpTransactionBuilder.Add(txID, internalID) - tt.Assert.NoError(err) - err = lpTransactionBuilder.Exec(tt.Ctx, q) - tt.Assert.NoError(err) - + tt.Assert.NoError(lpTransactionBuilder.Add(txID, lpLoader.GetFuture(liquidityPoolID))) + tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q)) + tt.Assert.NoError(lpTransactionBuilder.Exec(tt.Ctx, q)) tt.Assert.NoError(q.Commit()) var records []Transaction - err = q.Transactions().ForLiquidityPool(tt.Ctx, liquidityPoolID).Select(tt.Ctx, &records) - tt.Assert.NoError(err) + tt.Assert.NoError( + q.Transactions().ForLiquidityPool(tt.Ctx, liquidityPoolID).Select(tt.Ctx, &records), + ) tt.Assert.Len(records, 1) } diff --git a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor.go b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor.go index aecb25f88d..394d2e0f9b 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor.go @@ -5,56 +5,39 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" - set "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" ) -type claimableBalance struct { - internalID int64 // Bigint auto-generated by postgres - transactionSet set.Set[int64] - operationSet set.Set[int64] -} - -func (b *claimableBalance) addTransactionID(id int64) { - if b.transactionSet == nil { - b.transactionSet = set.Set[int64]{} - } - b.transactionSet.Add(id) -} - -func (b *claimableBalance) addOperationID(id int64) { - if b.operationSet == nil { - b.operationSet = set.Set[int64]{} - } - b.operationSet.Add(id) -} - type ClaimableBalancesTransactionProcessor struct { - session db.SessionInterface - sequence uint32 - claimableBalanceSet map[string]claimableBalance - qClaimableBalances history.QHistoryClaimableBalances + cbLoader *history.ClaimableBalanceLoader + txBatch history.TransactionClaimableBalanceBatchInsertBuilder + opBatch history.OperationClaimableBalanceBatchInsertBuilder } -func NewClaimableBalancesTransactionProcessor(session db.SessionInterface, Q history.QHistoryClaimableBalances, sequence uint32) *ClaimableBalancesTransactionProcessor { +func NewClaimableBalancesTransactionProcessor( + cbLoader *history.ClaimableBalanceLoader, + txBatch history.TransactionClaimableBalanceBatchInsertBuilder, + opBatch history.OperationClaimableBalanceBatchInsertBuilder, +) *ClaimableBalancesTransactionProcessor { return &ClaimableBalancesTransactionProcessor{ - session: session, - qClaimableBalances: Q, - sequence: sequence, - claimableBalanceSet: map[string]claimableBalance{}, + cbLoader: cbLoader, + txBatch: txBatch, + opBatch: opBatch, } } -func (p *ClaimableBalancesTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { - err := p.addTransactionClaimableBalances(p.claimableBalanceSet, p.sequence, transaction) +func (p *ClaimableBalancesTransactionProcessor) ProcessTransaction( + lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction, +) error { + err := p.addTransactionClaimableBalances(lcm.LedgerSequence(), transaction) if err != nil { return err } - err = p.addOperationClaimableBalances(p.claimableBalanceSet, p.sequence, transaction) + err = p.addOperationClaimableBalances(lcm.LedgerSequence(), transaction) if err != nil { return err } @@ -62,27 +45,25 @@ func (p *ClaimableBalancesTransactionProcessor) ProcessTransaction(ctx context.C return nil } -func (p *ClaimableBalancesTransactionProcessor) addTransactionClaimableBalances(cbSet map[string]claimableBalance, sequence uint32, transaction ingest.LedgerTransaction) error { +func (p *ClaimableBalancesTransactionProcessor) addTransactionClaimableBalances( + sequence uint32, transaction ingest.LedgerTransaction, +) error { transactionID := toid.New(int32(sequence), int32(transaction.Index), 0).ToInt64() - transactionClaimableBalances, err := claimableBalancesForTransaction( - sequence, - transaction, - ) + transactionClaimableBalances, err := claimableBalancesForTransaction(transaction) if err != nil { return errors.Wrap(err, "Could not determine claimable balances for transaction") } - for _, cb := range transactionClaimableBalances { - entry := cbSet[cb] - entry.addTransactionID(transactionID) - cbSet[cb] = entry + for _, cb := range dedupeStrings(transactionClaimableBalances) { + if err = p.txBatch.Add(transactionID, p.cbLoader.GetFuture(cb)); err != nil { + return err + } } return nil } func claimableBalancesForTransaction( - sequence uint32, transaction ingest.LedgerTransaction, ) ([]string, error) { changes, err := transaction.GetChanges() @@ -93,19 +74,7 @@ func claimableBalancesForTransaction( if err != nil { return nil, errors.Wrapf(err, "reading transaction %v claimable balances", transaction.Index) } - return dedupeClaimableBalances(cbs) -} - -func dedupeClaimableBalances(in []string) (out []string, err error) { - set := set.Set[string]{} - for _, id := range in { - set.Add(id) - } - - for id := range set { - out = append(out, id) - } - return + return cbs, nil } func claimableBalancesForChanges( @@ -139,26 +108,9 @@ func claimableBalancesForChanges( return cbs, nil } -func (p *ClaimableBalancesTransactionProcessor) addOperationClaimableBalances(cbSet map[string]claimableBalance, sequence uint32, transaction ingest.LedgerTransaction) error { - claimableBalances, err := claimableBalancesForOperations(transaction, sequence) - if err != nil { - return errors.Wrap(err, "could not determine operation claimable balances") - } - - for operationID, cbs := range claimableBalances { - for _, cb := range cbs { - entry := cbSet[cb] - entry.addOperationID(operationID) - cbSet[cb] = entry - } - } - - return nil -} - -func claimableBalancesForOperations(transaction ingest.LedgerTransaction, sequence uint32) (map[int64][]string, error) { - cbs := map[int64][]string{} - +func (p *ClaimableBalancesTransactionProcessor) addOperationClaimableBalances( + sequence uint32, transaction ingest.LedgerTransaction, +) error { for opi, op := range transaction.Envelope.Operations() { operation := transactionOperationWrapper{ index: uint32(opi), @@ -169,92 +121,33 @@ func claimableBalancesForOperations(transaction ingest.LedgerTransaction, sequen changes, err := transaction.GetOperationChanges(uint32(opi)) if err != nil { - return cbs, err - } - c, err := claimableBalancesForChanges(changes) - if err != nil { - return cbs, errors.Wrapf(err, "reading operation %v claimable balances", operation.ID()) - } - cbs[operation.ID()] = c - } - - return cbs, nil -} - -func (p *ClaimableBalancesTransactionProcessor) Commit(ctx context.Context) error { - if len(p.claimableBalanceSet) > 0 { - if err := p.loadClaimableBalanceIDs(ctx, p.claimableBalanceSet); err != nil { return err } - - if err := p.insertDBTransactionClaimableBalances(ctx, p.claimableBalanceSet); err != nil { - return err + cbs, err := claimableBalancesForChanges(changes) + if err != nil { + return errors.Wrapf(err, "reading operation %v claimable balances", operation.ID()) } - if err := p.insertDBOperationsClaimableBalances(ctx, p.claimableBalanceSet); err != nil { - return err + for _, cb := range dedupeStrings(cbs) { + if err = p.opBatch.Add(operation.ID(), p.cbLoader.GetFuture(cb)); err != nil { + return err + } } } return nil } -func (p *ClaimableBalancesTransactionProcessor) loadClaimableBalanceIDs(ctx context.Context, claimableBalanceSet map[string]claimableBalance) error { - ids := make([]string, 0, len(claimableBalanceSet)) - for id := range claimableBalanceSet { - ids = append(ids, id) - } - - toInternalID, err := p.qClaimableBalances.CreateHistoryClaimableBalances(ctx, ids, maxBatchSize) +func (p *ClaimableBalancesTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { + err := p.txBatch.Exec(ctx, session) if err != nil { - return errors.Wrap(err, "Could not create claimable balance ids") - } - - for _, id := range ids { - internalID, ok := toInternalID[id] - if !ok { - // TODO: Figure out the right way to convert the id to a string here. %v will be nonsense. - return errors.Errorf("no internal id found for claimable balance %v", id) - } - - cb := claimableBalanceSet[id] - cb.internalID = internalID - claimableBalanceSet[id] = cb - } - - return nil -} - -func (p ClaimableBalancesTransactionProcessor) insertDBTransactionClaimableBalances(ctx context.Context, claimableBalanceSet map[string]claimableBalance) error { - batch := p.qClaimableBalances.NewTransactionClaimableBalanceBatchInsertBuilder() - - for _, entry := range claimableBalanceSet { - for transactionID := range entry.transactionSet { - if err := batch.Add(transactionID, entry.internalID); err != nil { - return errors.Wrap(err, "could not insert transaction claimable balance in db") - } - } - } - - if err := batch.Exec(ctx, p.session); err != nil { - return errors.Wrap(err, "could not flush transaction claimable balances to db") + return err } - return nil -} -func (p ClaimableBalancesTransactionProcessor) insertDBOperationsClaimableBalances(ctx context.Context, claimableBalanceSet map[string]claimableBalance) error { - batch := p.qClaimableBalances.NewOperationClaimableBalanceBatchInsertBuilder() - - for _, entry := range claimableBalanceSet { - for operationID := range entry.operationSet { - if err := batch.Add(operationID, entry.internalID); err != nil { - return errors.Wrap(err, "could not insert operation claimable balance in db") - } - } + err = p.opBatch.Exec(ctx, session) + if err != nil { + return err } - if err := batch.Exec(ctx, p.session); err != nil { - return errors.Wrap(err, "could not flush operation claimable balances to db") - } return nil } diff --git a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go index 9c771c7e8a..11ce54505a 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go @@ -6,7 +6,6 @@ import ( "context" "testing" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -20,11 +19,11 @@ type ClaimableBalancesTransactionProcessorTestSuiteLedger struct { ctx context.Context processor *ClaimableBalancesTransactionProcessor mockSession *db.MockSession - mockQ *history.MockQHistoryClaimableBalances mockTransactionBatchInsertBuilder *history.MockTransactionClaimableBalanceBatchInsertBuilder mockOperationBatchInsertBuilder *history.MockOperationClaimableBalanceBatchInsertBuilder + cbLoader *history.ClaimableBalanceLoader - sequence uint32 + lcm xdr.LedgerCloseMeta } func TestClaimableBalancesTransactionProcessorTestSuiteLedger(t *testing.T) { @@ -33,41 +32,41 @@ func TestClaimableBalancesTransactionProcessorTestSuiteLedger(t *testing.T) { func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.mockQ = &history.MockQHistoryClaimableBalances{} s.mockTransactionBatchInsertBuilder = &history.MockTransactionClaimableBalanceBatchInsertBuilder{} s.mockOperationBatchInsertBuilder = &history.MockOperationClaimableBalanceBatchInsertBuilder{} - s.sequence = 20 + sequence := uint32(20) + s.lcm = xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(sequence), + }, + }, + }, + } + s.cbLoader = history.NewClaimableBalanceLoader() s.processor = NewClaimableBalancesTransactionProcessor( - s.mockSession, - s.mockQ, - s.sequence, + s.cbLoader, + s.mockTransactionBatchInsertBuilder, + s.mockOperationBatchInsertBuilder, ) } func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) TearDownTest() { - s.mockQ.AssertExpectations(s.T()) s.mockTransactionBatchInsertBuilder.AssertExpectations(s.T()) s.mockOperationBatchInsertBuilder.AssertExpectations(s.T()) } -func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) mockTransactionBatchAdd(transactionID, internalID int64, err error) { - s.mockTransactionBatchInsertBuilder.On("Add", transactionID, internalID).Return(err).Once() -} - -func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) mockOperationBatchAdd(operationID, internalID int64, err error) { - s.mockOperationBatchInsertBuilder.On("Add", operationID, internalID).Return(err).Once() -} - func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) TestEmptyClaimableBalances() { - // What is this expecting? Doesn't seem to assert anything meaningful... - err := s.processor.Commit(context.Background()) - s.Assert().NoError(err) + s.mockTransactionBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() + s.mockOperationBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() + + s.Assert().NoError(s.processor.Flush(s.ctx, s.mockSession)) } func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInserts(balanceID xdr.ClaimableBalanceId, body xdr.OperationBody, change xdr.LedgerEntryChange) { // Setup the transaction - internalID := int64(1234) txn := createTransaction(true, 1) txn.Envelope.Operations()[0].Body = body txn.UnsafeMeta.V = 2 @@ -85,6 +84,20 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInse }, }, change, + // add a duplicate change to test that the processor + // does not insert duplicate rows + { + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeClaimableBalance, + ClaimableBalance: &xdr.ClaimableBalanceEntry{ + BalanceId: balanceID, + }, + }, + }, + }, + change, }}, } @@ -104,46 +117,28 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInse }, } } - txnID := toid.New(int32(s.sequence), int32(txn.Index), 0).ToInt64() + txnID := toid.New(int32(s.lcm.LedgerSequence()), int32(txn.Index), 0).ToInt64() opID := (&transactionOperationWrapper{ index: uint32(0), transaction: txn, operation: txn.Envelope.Operations()[0], - ledgerSequence: s.sequence, + ledgerSequence: s.lcm.LedgerSequence(), }).ID() - hexID, _ := xdr.MarshalHex(balanceID) + hexID, err := xdr.MarshalHex(balanceID) + s.Assert().NoError(err) - // Setup a q - s.mockQ.On("CreateHistoryClaimableBalances", s.ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch( - []string{ - hexID, - }, - arg, - ) - }).Return(map[string]int64{ - hexID: internalID, - }, nil).Once() - - // Prepare to process transactions successfully - s.mockQ.On("NewTransactionClaimableBalanceBatchInsertBuilder"). - Return(s.mockTransactionBatchInsertBuilder).Once() - s.mockTransactionBatchAdd(txnID, internalID, nil) + s.mockTransactionBatchInsertBuilder.On("Add", txnID, s.cbLoader.GetFuture(hexID)).Return(nil).Once() s.mockTransactionBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() // Prepare to process operations successfully - s.mockQ.On("NewOperationClaimableBalanceBatchInsertBuilder"). - Return(s.mockOperationBatchInsertBuilder).Once() - s.mockOperationBatchAdd(opID, internalID, nil) + s.mockOperationBatchInsertBuilder.On("Add", opID, s.cbLoader.GetFuture(hexID)).Return(nil).Once() s.mockOperationBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() // Process the transaction - err := s.processor.ProcessTransaction(s.ctx, txn) + err = s.processor.ProcessTransaction(s.lcm, txn) s.Assert().NoError(err) - err = s.processor.Commit(s.ctx) + err = s.processor.Flush(s.ctx, s.mockSession) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor.go b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor.go index a55cb95934..0a38215f08 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor.go @@ -5,56 +5,38 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" - set "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" ) -type liquidityPool struct { - internalID int64 // Bigint auto-generated by postgres - transactionSet set.Set[int64] - operationSet set.Set[int64] -} - -func (b *liquidityPool) addTransactionID(id int64) { - if b.transactionSet == nil { - b.transactionSet = set.Set[int64]{} - } - b.transactionSet.Add(id) -} - -func (b *liquidityPool) addOperationID(id int64) { - if b.operationSet == nil { - b.operationSet = set.Set[int64]{} - } - b.operationSet.Add(id) -} - type LiquidityPoolsTransactionProcessor struct { - session db.SessionInterface - sequence uint32 - liquidityPoolSet map[string]liquidityPool - qLiquidityPools history.QHistoryLiquidityPools + lpLoader *history.LiquidityPoolLoader + txBatch history.TransactionLiquidityPoolBatchInsertBuilder + opBatch history.OperationLiquidityPoolBatchInsertBuilder } -func NewLiquidityPoolsTransactionProcessor(session db.SessionInterface, Q history.QHistoryLiquidityPools, sequence uint32) *LiquidityPoolsTransactionProcessor { +func NewLiquidityPoolsTransactionProcessor( + lpLoader *history.LiquidityPoolLoader, + txBatch history.TransactionLiquidityPoolBatchInsertBuilder, + opBatch history.OperationLiquidityPoolBatchInsertBuilder, +) *LiquidityPoolsTransactionProcessor { return &LiquidityPoolsTransactionProcessor{ - session: session, - qLiquidityPools: Q, - sequence: sequence, - liquidityPoolSet: map[string]liquidityPool{}, + lpLoader: lpLoader, + txBatch: txBatch, + opBatch: opBatch, } } -func (p *LiquidityPoolsTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { - err := p.addTransactionLiquidityPools(p.liquidityPoolSet, p.sequence, transaction) +func (p *LiquidityPoolsTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { + err := p.addTransactionLiquidityPools(lcm.LedgerSequence(), transaction) if err != nil { return err } - err = p.addOperationLiquidityPools(p.liquidityPoolSet, p.sequence, transaction) + err = p.addOperationLiquidityPools(lcm.LedgerSequence(), transaction) if err != nil { return err } @@ -62,29 +44,23 @@ func (p *LiquidityPoolsTransactionProcessor) ProcessTransaction(ctx context.Cont return nil } -func (p *LiquidityPoolsTransactionProcessor) addTransactionLiquidityPools(lpSet map[string]liquidityPool, sequence uint32, transaction ingest.LedgerTransaction) error { +func (p *LiquidityPoolsTransactionProcessor) addTransactionLiquidityPools(sequence uint32, transaction ingest.LedgerTransaction) error { transactionID := toid.New(int32(sequence), int32(transaction.Index), 0).ToInt64() - transactionLiquidityPools, err := liquidityPoolsForTransaction( - sequence, - transaction, - ) + lps, err := liquidityPoolsForTransaction(transaction) if err != nil { return errors.Wrap(err, "Could not determine liquidity pools for transaction") } - for _, lp := range transactionLiquidityPools { - entry := lpSet[lp] - entry.addTransactionID(transactionID) - lpSet[lp] = entry + for _, lp := range dedupeStrings(lps) { + if err = p.txBatch.Add(transactionID, p.lpLoader.GetFuture(lp)); err != nil { + return err + } } return nil } -func liquidityPoolsForTransaction( - sequence uint32, - transaction ingest.LedgerTransaction, -) ([]string, error) { +func liquidityPoolsForTransaction(transaction ingest.LedgerTransaction) ([]string, error) { changes, err := transaction.GetChanges() if err != nil { return nil, err @@ -93,19 +69,20 @@ func liquidityPoolsForTransaction( if err != nil { return nil, errors.Wrapf(err, "reading transaction %v liquidity pools", transaction.Index) } - return dedupeLiquidityPools(lps) + return lps, nil } -func dedupeLiquidityPools(in []string) (out []string, err error) { +func dedupeStrings(in []string) []string { set := set.Set[string]{} for _, id := range in { set.Add(id) } + out := make([]string, 0, len(in)) for id := range set { out = append(out, id) } - return + return out } func liquidityPoolsForChanges( @@ -135,26 +112,7 @@ func liquidityPoolsForChanges( return lps, nil } -func (p *LiquidityPoolsTransactionProcessor) addOperationLiquidityPools(lpSet map[string]liquidityPool, sequence uint32, transaction ingest.LedgerTransaction) error { - liquidityPools, err := liquidityPoolsForOperations(transaction, sequence) - if err != nil { - return errors.Wrap(err, "could not determine operation liquidity pools") - } - - for operationID, lps := range liquidityPools { - for _, lp := range lps { - entry := lpSet[lp] - entry.addOperationID(operationID) - lpSet[lp] = entry - } - } - - return nil -} - -func liquidityPoolsForOperations(transaction ingest.LedgerTransaction, sequence uint32) (map[int64][]string, error) { - lps := map[int64][]string{} - +func (p *LiquidityPoolsTransactionProcessor) addOperationLiquidityPools(sequence uint32, transaction ingest.LedgerTransaction) error { for opi, op := range transaction.Envelope.Operations() { operation := transactionOperationWrapper{ index: uint32(opi), @@ -165,91 +123,29 @@ func liquidityPoolsForOperations(transaction ingest.LedgerTransaction, sequence changes, err := transaction.GetOperationChanges(uint32(opi)) if err != nil { - return lps, err - } - c, err := liquidityPoolsForChanges(changes) - if err != nil { - return lps, errors.Wrapf(err, "reading operation %v liquidity pools", operation.ID()) - } - lps[operation.ID()] = c - } - - return lps, nil -} - -func (p *LiquidityPoolsTransactionProcessor) Commit(ctx context.Context) error { - if len(p.liquidityPoolSet) > 0 { - if err := p.loadLiquidityPoolIDs(ctx, p.liquidityPoolSet); err != nil { return err } - - if err := p.insertDBTransactionLiquidityPools(ctx, p.liquidityPoolSet); err != nil { - return err - } - - if err := p.insertDBOperationsLiquidityPools(ctx, p.liquidityPoolSet); err != nil { - return err - } - } - - return nil -} - -func (p *LiquidityPoolsTransactionProcessor) loadLiquidityPoolIDs(ctx context.Context, liquidityPoolSet map[string]liquidityPool) error { - ids := make([]string, 0, len(liquidityPoolSet)) - for id := range liquidityPoolSet { - ids = append(ids, id) - } - - toInternalID, err := p.qLiquidityPools.CreateHistoryLiquidityPools(ctx, ids, maxBatchSize) - if err != nil { - return errors.Wrap(err, "Could not create liquidity pool ids") - } - - for _, id := range ids { - internalID, ok := toInternalID[id] - if !ok { - return errors.Errorf("no internal id found for liquidity pool %s", id) + lps, err := liquidityPoolsForChanges(changes) + if err != nil { + return errors.Wrapf(err, "reading operation %v liquidity pools", operation.ID()) } - - lp := liquidityPoolSet[id] - lp.internalID = internalID - liquidityPoolSet[id] = lp - } - - return nil -} - -func (p LiquidityPoolsTransactionProcessor) insertDBTransactionLiquidityPools(ctx context.Context, liquidityPoolSet map[string]liquidityPool) error { - batch := p.qLiquidityPools.NewTransactionLiquidityPoolBatchInsertBuilder() - - for _, entry := range liquidityPoolSet { - for transactionID := range entry.transactionSet { - if err := batch.Add(transactionID, entry.internalID); err != nil { - return errors.Wrap(err, "could not insert transaction liquidity pool in db") + for _, lp := range dedupeStrings(lps) { + if err := p.opBatch.Add(operation.ID(), p.lpLoader.GetFuture(lp)); err != nil { + return err } } } - if err := batch.Exec(ctx, p.session); err != nil { - return errors.Wrap(err, "could not flush transaction liquidity pools to db") - } return nil } -func (p LiquidityPoolsTransactionProcessor) insertDBOperationsLiquidityPools(ctx context.Context, liquidityPoolSet map[string]liquidityPool) error { - batch := p.qLiquidityPools.NewOperationLiquidityPoolBatchInsertBuilder() - - for _, entry := range liquidityPoolSet { - for operationID := range entry.operationSet { - if err := batch.Add(operationID, entry.internalID); err != nil { - return errors.Wrap(err, "could not insert operation liquidity pool in db") - } - } +func (p *LiquidityPoolsTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { + if err := p.txBatch.Exec(ctx, session); err != nil { + return errors.Wrap(err, "Could not flush transaction liquidity pools to db") } - - if err := batch.Exec(ctx, p.session); err != nil { - return errors.Wrap(err, "could not flush operation liquidity pools to db") + if err := p.opBatch.Exec(ctx, session); err != nil { + return errors.Wrap(err, "Could not flush operation liquidity pools to db") } + return nil } diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go index f86845e931..485d890dca 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go @@ -6,7 +6,6 @@ import ( "context" "testing" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/stellar/go/services/horizon/internal/db2/history" @@ -20,11 +19,11 @@ type LiquidityPoolsTransactionProcessorTestSuiteLedger struct { ctx context.Context processor *LiquidityPoolsTransactionProcessor mockSession *db.MockSession - mockQ *history.MockQHistoryLiquidityPools + lpLoader *history.LiquidityPoolLoader mockTransactionBatchInsertBuilder *history.MockTransactionLiquidityPoolBatchInsertBuilder mockOperationBatchInsertBuilder *history.MockOperationLiquidityPoolBatchInsertBuilder - sequence uint32 + lcm xdr.LedgerCloseMeta } func TestLiquidityPoolsTransactionProcessorTestSuiteLedger(t *testing.T) { @@ -33,41 +32,42 @@ func TestLiquidityPoolsTransactionProcessorTestSuiteLedger(t *testing.T) { func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.mockQ = &history.MockQHistoryLiquidityPools{} s.mockTransactionBatchInsertBuilder = &history.MockTransactionLiquidityPoolBatchInsertBuilder{} s.mockOperationBatchInsertBuilder = &history.MockOperationLiquidityPoolBatchInsertBuilder{} - s.sequence = 20 + sequence := uint32(20) + s.lcm = xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(sequence), + }, + }, + }, + } + s.lpLoader = history.NewLiquidityPoolLoader() s.processor = NewLiquidityPoolsTransactionProcessor( - s.mockSession, - s.mockQ, - s.sequence, + s.lpLoader, + s.mockTransactionBatchInsertBuilder, + s.mockOperationBatchInsertBuilder, ) } func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) TearDownTest() { - s.mockQ.AssertExpectations(s.T()) s.mockTransactionBatchInsertBuilder.AssertExpectations(s.T()) s.mockOperationBatchInsertBuilder.AssertExpectations(s.T()) } -func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) mockTransactionBatchAdd(transactionID, internalID int64, err error) { - s.mockTransactionBatchInsertBuilder.On("Add", transactionID, internalID).Return(err).Once() -} - -func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) mockOperationBatchAdd(operationID, internalID int64, err error) { - s.mockOperationBatchInsertBuilder.On("Add", operationID, internalID).Return(err).Once() -} - func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) TestEmptyLiquidityPools() { - // What is this expecting? Doesn't seem to assert anything meaningful... - err := s.processor.Commit(context.Background()) + s.mockTransactionBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() + s.mockOperationBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() + + err := s.processor.Flush(context.Background(), s.mockSession) s.Assert().NoError(err) } func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) testOperationInserts(poolID xdr.PoolId, body xdr.OperationBody, change xdr.LedgerEntryChange) { // Setup the transaction - internalID := int64(1234) txn := createTransaction(true, 1) txn.Envelope.Operations()[0].Body = body txn.UnsafeMeta.V = 2 @@ -85,6 +85,20 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) testOperationInserts }, }, change, + // add a duplicate change to test that the processor + // does not insert duplicate rows + { + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeLiquidityPool, + LiquidityPool: &xdr.LiquidityPoolEntry{ + LiquidityPoolId: poolID, + }, + }, + }, + }, + change, }}, } @@ -103,44 +117,28 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) testOperationInserts }, } } - txnID := toid.New(int32(s.sequence), int32(txn.Index), 0).ToInt64() + txnID := toid.New(int32(s.lcm.LedgerSequence()), int32(txn.Index), 0).ToInt64() opID := (&transactionOperationWrapper{ index: uint32(0), transaction: txn, operation: txn.Envelope.Operations()[0], - ledgerSequence: s.sequence, + ledgerSequence: s.lcm.LedgerSequence(), }).ID() hexID := PoolIDToString(poolID) - // Setup a q - s.mockQ.On("CreateHistoryLiquidityPools", s.ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch( - []string{hexID}, - arg, - ) - }).Return(map[string]int64{ - hexID: internalID, - }, nil).Once() - // Prepare to process transactions successfully - s.mockQ.On("NewTransactionLiquidityPoolBatchInsertBuilder"). - Return(s.mockTransactionBatchInsertBuilder).Once() - s.mockTransactionBatchAdd(txnID, internalID, nil) + s.mockTransactionBatchInsertBuilder.On("Add", txnID, s.lpLoader.GetFuture(hexID)).Return(nil).Once() s.mockTransactionBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() // Prepare to process operations successfully - s.mockQ.On("NewOperationLiquidityPoolBatchInsertBuilder"). - Return(s.mockOperationBatchInsertBuilder).Once() - s.mockOperationBatchAdd(opID, internalID, nil) + s.mockOperationBatchInsertBuilder.On("Add", opID, s.lpLoader.GetFuture(hexID)).Return(nil).Once() s.mockOperationBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() // Process the transaction - err := s.processor.ProcessTransaction(s.ctx, txn) + err := s.processor.ProcessTransaction(s.lcm, txn) s.Assert().NoError(err) - err = s.processor.Commit(s.ctx) + err = s.processor.Flush(s.ctx, s.mockSession) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/processors/participants_processor.go b/services/horizon/internal/ingest/processors/participants_processor.go index 45ffcea524..7d4ae7fe39 100644 --- a/services/horizon/internal/ingest/processors/participants_processor.go +++ b/services/horizon/internal/ingest/processors/participants_processor.go @@ -155,7 +155,7 @@ func (p *ParticipantsProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, tran return nil } -func (p *ParticipantsProcessor) Commit(ctx context.Context, session db.SessionInterface) error { +func (p *ParticipantsProcessor) Flush(ctx context.Context, session db.SessionInterface) error { if err := p.txBatch.Exec(ctx, session); err != nil { return errors.Wrap(err, "Could not flush transaction participants to db") } diff --git a/services/horizon/internal/ingest/processors/participants_processor_test.go b/services/horizon/internal/ingest/processors/participants_processor_test.go index 28bc6a126c..2348b79eaf 100644 --- a/services/horizon/internal/ingest/processors/participants_processor_test.go +++ b/services/horizon/internal/ingest/processors/participants_processor_test.go @@ -145,7 +145,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestEmptyParticipants() { s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() s.mockOperationsBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() - err := s.processor.Commit(s.ctx, s.mockSession) + err := s.processor.Flush(s.ctx, s.mockSession) s.Assert().NoError(err) } @@ -190,7 +190,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestFeeBumptransaction() { s.mockOperationsBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() s.Assert().NoError(s.processor.ProcessTransaction(s.lcm, feeBumpTx)) - s.Assert().NoError(s.processor.Commit(s.ctx, s.mockSession)) + s.Assert().NoError(s.processor.Flush(s.ctx, s.mockSession)) } func (s *ParticipantsProcessorTestSuiteLedger) TestIngestParticipantsSucceeds() { @@ -204,7 +204,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestIngestParticipantsSucceeds() err := s.processor.ProcessTransaction(s.lcm, tx) s.Assert().NoError(err) } - err := s.processor.Commit(s.ctx, s.mockSession) + err := s.processor.Flush(s.ctx, s.mockSession) s.Assert().NoError(err) } @@ -240,7 +240,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestBatchAddExecFails() { err := s.processor.ProcessTransaction(s.lcm, tx) s.Assert().NoError(err) } - err := s.processor.Commit(s.ctx, s.mockSession) + err := s.processor.Flush(s.ctx, s.mockSession) s.Assert().EqualError(err, "Could not flush transaction participants to db: transient error") } @@ -255,6 +255,6 @@ func (s *ParticipantsProcessorTestSuiteLedger) TestOperationBatchAddExecFails() err := s.processor.ProcessTransaction(s.lcm, tx) s.Assert().NoError(err) } - err := s.processor.Commit(s.ctx, s.mockSession) + err := s.processor.Flush(s.ctx, s.mockSession) s.Assert().EqualError(err, "Could not flush operation participants to db: transient error") } diff --git a/services/horizon/internal/ingest/processors/trades_processor.go b/services/horizon/internal/ingest/processors/trades_processor.go index 10aed5d9ea..b1084c6e08 100644 --- a/services/horizon/internal/ingest/processors/trades_processor.go +++ b/services/horizon/internal/ingest/processors/trades_processor.go @@ -19,18 +19,25 @@ import ( // TradeProcessor operations processor type TradeProcessor struct { - session db.SessionInterface - tradesQ history.QTrades - ledger xdr.LedgerHeaderHistoryEntry - trades []ingestTrade - stats TradeStats + accountLoader *history.AccountLoader + lpLoader *history.LiquidityPoolLoader + assetLoader *history.AssetLoader + batch history.TradeBatchInsertBuilder + trades []ingestTrade + stats TradeStats } -func NewTradeProcessor(session db.SessionInterface, tradesQ history.QTrades, ledger xdr.LedgerHeaderHistoryEntry) *TradeProcessor { +func NewTradeProcessor( + accountLoader *history.AccountLoader, + lpLoader *history.LiquidityPoolLoader, + assetLoader *history.AssetLoader, + batch history.TradeBatchInsertBuilder, +) *TradeProcessor { return &TradeProcessor{ - session: session, - tradesQ: tradesQ, - ledger: ledger, + accountLoader: accountLoader, + lpLoader: lpLoader, + assetLoader: assetLoader, + batch: batch, } } @@ -48,79 +55,54 @@ func (stats *TradeStats) Map() map[string]interface{} { } // ProcessTransaction process the given transaction -func (p *TradeProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error) { +func (p *TradeProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) (err error) { if !transaction.Result.Successful() { return nil } - trades, err := p.extractTrades(p.ledger, transaction) + trades, err := p.extractTrades(lcm.LedgerHeaderHistoryEntry(), transaction) if err != nil { return err } - p.trades = append(p.trades, trades...) - p.stats.count += int64(len(trades)) - return nil -} - -func (p *TradeProcessor) Commit(ctx context.Context) error { - if len(p.trades) == 0 { - return nil - } - - batch := p.tradesQ.NewTradeBatchInsertBuilder() - var poolIDs, accounts []string - var assets []xdr.Asset - for _, trade := range p.trades { + for _, trade := range trades { if trade.buyerAccount != "" { - accounts = append(accounts, trade.buyerAccount) + p.accountLoader.GetFuture(trade.buyerAccount) } if trade.sellerAccount != "" { - accounts = append(accounts, trade.sellerAccount) + p.accountLoader.GetFuture(trade.sellerAccount) } if trade.liquidityPoolID != "" { - poolIDs = append(poolIDs, trade.liquidityPoolID) + p.lpLoader.GetFuture(trade.liquidityPoolID) } - assets = append(assets, trade.boughtAsset) - assets = append(assets, trade.soldAsset) - } - - accountSet, err := p.tradesQ.CreateAccounts(ctx, accounts, maxBatchSize) - if err != nil { - return errors.Wrap(err, "Error creating account ids") + p.assetLoader.GetFuture(history.AssetKeyFromXDR(trade.boughtAsset)) + p.assetLoader.GetFuture(history.AssetKeyFromXDR(trade.soldAsset)) } - var assetMap map[string]history.Asset - assetMap, err = p.tradesQ.CreateAssets(ctx, assets, maxBatchSize) - if err != nil { - return errors.Wrap(err, "Error creating asset ids") - } + p.trades = append(p.trades, trades...) + p.stats.count += int64(len(trades)) + return nil +} - var poolMap map[string]int64 - poolMap, err = p.tradesQ.CreateHistoryLiquidityPools(ctx, poolIDs, maxBatchSize) - if err != nil { - return errors.Wrap(err, "Error creating pool ids") +func (p *TradeProcessor) Flush(ctx context.Context, session db.SessionInterface) error { + if len(p.trades) == 0 { + return nil } for _, trade := range p.trades { row := trade.row - if id, ok := accountSet[trade.sellerAccount]; ok { - row.BaseAccountID = null.IntFrom(id) - } else if len(trade.sellerAccount) > 0 { - return errors.Errorf("Could not find history account id for %s", trade.sellerAccount) + if trade.sellerAccount != "" { + row.BaseAccountID = null.IntFrom(p.accountLoader.GetNow(trade.sellerAccount)) } - if id, ok := accountSet[trade.buyerAccount]; ok { - row.CounterAccountID = null.IntFrom(id) - } else if len(trade.buyerAccount) > 0 { - return errors.Errorf("Could not find history account id for %s", trade.buyerAccount) + if trade.buyerAccount != "" { + row.CounterAccountID = null.IntFrom(p.accountLoader.GetNow(trade.buyerAccount)) } - if id, ok := poolMap[trade.liquidityPoolID]; ok { - row.BaseLiquidityPoolID = null.IntFrom(id) - } else if len(trade.liquidityPoolID) > 0 { - return errors.Errorf("Could not find history liquidity pool id for %s", trade.liquidityPoolID) + if trade.liquidityPoolID != "" { + row.BaseLiquidityPoolID = null.IntFrom(p.lpLoader.GetNow(trade.liquidityPoolID)) } - row.BaseAssetID = assetMap[trade.soldAsset.String()].ID - row.CounterAssetID = assetMap[trade.boughtAsset.String()].ID + + row.BaseAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) + row.CounterAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) if row.BaseAssetID > row.CounterAssetID { row.BaseIsSeller = false @@ -136,12 +118,12 @@ func (p *TradeProcessor) Commit(ctx context.Context) error { } } - if err = batch.Add(row); err != nil { + if err := p.batch.Add(row); err != nil { return errors.Wrap(err, "Error adding trade to batch") } } - if err = batch.Exec(ctx, p.session); err != nil { + if err := p.batch.Exec(ctx, session); err != nil { return errors.Wrap(err, "Error flushing operation batch") } return nil diff --git a/services/horizon/internal/ingest/processors/trades_processor_test.go b/services/horizon/internal/ingest/processors/trades_processor_test.go index d0cf299d30..5b2a2f20e3 100644 --- a/services/horizon/internal/ingest/processors/trades_processor_test.go +++ b/services/horizon/internal/ingest/processors/trades_processor_test.go @@ -10,20 +10,23 @@ import ( "github.com/guregu/null" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/db" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" ) type TradeProcessorTestSuiteLedger struct { suite.Suite processor *TradeProcessor mockSession *db.MockSession - mockQ *history.MockQTrades + accountLoader history.AccountLoaderStub + lpLoader history.LiquidityPoolLoaderStub + assetLoader history.AssetLoaderStub mockBatchInsertBuilder *history.MockTradeBatchInsertBuilder unmuxedSourceAccount xdr.AccountId @@ -45,9 +48,10 @@ type TradeProcessorTestSuiteLedger struct { lpToID map[xdr.PoolId]int64 unmuxedAccountToID map[string]int64 - assetToID map[string]history.Asset + assetToID map[history.AssetKey]history.Asset txs []ingest.LedgerTransaction + lcm xdr.LedgerCloseMeta } func TestTradeProcessorTestSuiteLedger(t *testing.T) { @@ -55,7 +59,6 @@ func TestTradeProcessorTestSuiteLedger(t *testing.T) { } func (s *TradeProcessorTestSuiteLedger) SetupTest() { - s.mockQ = &history.MockQTrades{} s.mockBatchInsertBuilder = &history.MockTradeBatchInsertBuilder{} s.unmuxedSourceAccount = xdr.MustAddress("GAUJETIZVEP2NRYLUESJ3LS66NVCEGMON4UDCBCSBEVPIID773P2W6AY") @@ -165,7 +168,7 @@ func (s *TradeProcessorTestSuiteLedger) SetupTest() { s.unmuxedSourceAccount.Address(): 1000, s.unmuxedOpSourceAccount.Address(): 1001, } - s.assetToID = map[string]history.Asset{} + s.assetToID = map[history.AssetKey]history.Asset{} s.allTrades = []xdr.ClaimAtom{ s.strictReceiveTrade, s.strictSendTrade, @@ -190,43 +193,51 @@ func (s *TradeProcessorTestSuiteLedger) SetupTest() { s.sellPrices = append(s.sellPrices, xdr.Price{N: xdr.Int32(trade.AmountBought()), D: xdr.Int32(trade.AmountSold())}) } if i%2 == 0 { - s.assetToID[trade.AssetSold().String()] = history.Asset{ID: int64(10000 + i)} - s.assetToID[trade.AssetBought().String()] = history.Asset{ID: int64(100 + i)} + s.assetToID[history.AssetKeyFromXDR(trade.AssetSold())] = history.Asset{ID: int64(10000 + i)} + s.assetToID[history.AssetKeyFromXDR(trade.AssetBought())] = history.Asset{ID: int64(100 + i)} } else { - s.assetToID[trade.AssetSold().String()] = history.Asset{ID: int64(100 + i)} - s.assetToID[trade.AssetBought().String()] = history.Asset{ID: int64(10000 + i)} + s.assetToID[history.AssetKeyFromXDR(trade.AssetSold())] = history.Asset{ID: int64(100 + i)} + s.assetToID[history.AssetKeyFromXDR(trade.AssetBought())] = history.Asset{ID: int64(10000 + i)} } s.assets = append(s.assets, trade.AssetSold(), trade.AssetBought()) } - s.processor = NewTradeProcessor( - s.mockSession, - s.mockQ, - xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: 100, + s.lcm = xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(100), + }, }, }, + } + + s.accountLoader = history.NewAccountLoaderStub() + s.assetLoader = history.NewAssetLoaderStub() + s.lpLoader = history.NewLiquidityPoolLoaderStub() + s.processor = NewTradeProcessor( + s.accountLoader.Loader, + s.lpLoader.Loader, + s.assetLoader.Loader, + s.mockBatchInsertBuilder, ) } func (s *TradeProcessorTestSuiteLedger) TearDownTest() { - s.mockQ.AssertExpectations(s.T()) s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *TradeProcessorTestSuiteLedger) TestIgnoreFailedTransactions() { ctx := context.Background() - err := s.processor.ProcessTransaction(ctx, createTransaction(false, 1)) + err := s.processor.ProcessTransaction(s.lcm, createTransaction(false, 1)) s.Assert().NoError(err) - err = s.processor.Commit(ctx) + err = s.processor.Flush(ctx, s.mockSession) s.Assert().NoError(err) } -func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( - ledger xdr.LedgerHeaderHistoryEntry, -) []history.InsertTrade { +func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions() []history.InsertTrade { + ledger := s.lcm.LedgerHeaderHistoryEntry() closeTime := time.Unix(int64(ledger.Header.ScpValue.CloseTime), 0).UTC() inserts := []history.InsertTrade{ { @@ -235,11 +246,11 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( LedgerCloseTime: closeTime, BaseAmount: int64(s.strictReceiveTrade.AmountBought()), BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), - BaseAssetID: s.assetToID[s.strictReceiveTrade.AssetBought().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictReceiveTrade.AssetBought())].ID, BaseOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 2).ToInt64()), TOIDType)), CounterAmount: int64(s.strictReceiveTrade.AmountSold()), CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.strictReceiveTrade.SellerId().Address()]), - CounterAssetID: s.assetToID[s.strictReceiveTrade.AssetSold().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictReceiveTrade.AssetSold())].ID, CounterOfferID: null.IntFrom(int64(s.strictReceiveTrade.OfferId())), BaseIsSeller: false, BaseIsExact: null.BoolFrom(false), @@ -253,11 +264,11 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( LedgerCloseTime: closeTime, CounterAmount: int64(s.strictSendTrade.AmountBought()), CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), - CounterAssetID: s.assetToID[s.strictSendTrade.AssetBought().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictSendTrade.AssetBought())].ID, CounterOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 3).ToInt64()), TOIDType)), BaseAmount: int64(s.strictSendTrade.AmountSold()), BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.strictSendTrade.SellerId().Address()]), - BaseAssetID: s.assetToID[s.strictSendTrade.AssetSold().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictSendTrade.AssetSold())].ID, BaseIsSeller: true, BaseIsExact: null.BoolFrom(false), BaseOfferID: null.IntFrom(int64(s.strictSendTrade.OfferId())), @@ -272,10 +283,10 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( BaseOfferID: null.IntFrom(879136), BaseAmount: int64(s.buyOfferTrade.AmountBought()), BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), - BaseAssetID: s.assetToID[s.buyOfferTrade.AssetBought().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.buyOfferTrade.AssetBought())].ID, CounterAmount: int64(s.buyOfferTrade.AmountSold()), CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.buyOfferTrade.SellerId().Address()]), - CounterAssetID: s.assetToID[s.buyOfferTrade.AssetSold().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.buyOfferTrade.AssetSold())].ID, BaseIsSeller: false, CounterOfferID: null.IntFrom(int64(s.buyOfferTrade.OfferId())), PriceN: int64(s.sellPrices[2].D), @@ -287,12 +298,12 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( Order: 2, LedgerCloseTime: closeTime, CounterAmount: int64(s.sellOfferTrade.AmountBought()), - CounterAssetID: s.assetToID[s.sellOfferTrade.AssetBought().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.sellOfferTrade.AssetBought())].ID, CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), CounterOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 5).ToInt64()), TOIDType)), BaseAmount: int64(s.sellOfferTrade.AmountSold()), BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.sellOfferTrade.SellerId().Address()]), - BaseAssetID: s.assetToID[s.sellOfferTrade.AssetSold().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.sellOfferTrade.AssetSold())].ID, BaseIsSeller: true, BaseOfferID: null.IntFrom(int64(s.sellOfferTrade.OfferId())), PriceN: int64(s.sellPrices[3].N), @@ -304,12 +315,12 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( Order: 0, LedgerCloseTime: closeTime, BaseAmount: int64(s.passiveSellOfferTrade.AmountBought()), - BaseAssetID: s.assetToID[s.passiveSellOfferTrade.AssetBought().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.passiveSellOfferTrade.AssetBought())].ID, BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedSourceAccount.Address()]), BaseOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 6).ToInt64()), TOIDType)), CounterAmount: int64(s.passiveSellOfferTrade.AmountSold()), CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.passiveSellOfferTrade.SellerId().Address()]), - CounterAssetID: s.assetToID[s.passiveSellOfferTrade.AssetSold().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.passiveSellOfferTrade.AssetSold())].ID, BaseIsSeller: false, CounterOfferID: null.IntFrom(int64(s.passiveSellOfferTrade.OfferId())), PriceN: int64(s.sellPrices[4].D), @@ -323,12 +334,12 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( LedgerCloseTime: closeTime, CounterAmount: int64(s.otherPassiveSellOfferTrade.AmountBought()), - CounterAssetID: s.assetToID[s.otherPassiveSellOfferTrade.AssetBought().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.otherPassiveSellOfferTrade.AssetBought())].ID, CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), CounterOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 7).ToInt64()), TOIDType)), BaseAmount: int64(s.otherPassiveSellOfferTrade.AmountSold()), BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.otherPassiveSellOfferTrade.SellerId().Address()]), - BaseAssetID: s.assetToID[s.otherPassiveSellOfferTrade.AssetSold().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.otherPassiveSellOfferTrade.AssetSold())].ID, BaseIsSeller: true, BaseOfferID: null.IntFrom(int64(s.otherPassiveSellOfferTrade.OfferId())), PriceN: int64(s.sellPrices[5].N), @@ -340,12 +351,12 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( Order: 1, LedgerCloseTime: closeTime, BaseAmount: int64(s.strictReceiveTradeLP.AmountBought()), - BaseAssetID: s.assetToID[s.strictReceiveTradeLP.AssetBought().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictReceiveTradeLP.AssetBought())].ID, BaseAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), BaseOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 8).ToInt64()), TOIDType)), CounterAmount: int64(s.strictReceiveTradeLP.AmountSold()), CounterLiquidityPoolID: null.IntFrom(s.lpToID[s.strictReceiveTradeLP.MustLiquidityPool().LiquidityPoolId]), - CounterAssetID: s.assetToID[s.strictReceiveTradeLP.AssetSold().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictReceiveTradeLP.AssetSold())].ID, BaseIsSeller: false, BaseIsExact: null.BoolFrom(false), LiquidityPoolFee: null.IntFrom(int64(xdr.LiquidityPoolFeeV18)), @@ -359,12 +370,12 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( Order: 0, LedgerCloseTime: closeTime, CounterAmount: int64(s.strictSendTradeLP.AmountBought()), - CounterAssetID: s.assetToID[s.strictSendTradeLP.AssetBought().String()].ID, + CounterAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictSendTradeLP.AssetBought())].ID, CounterAccountID: null.IntFrom(s.unmuxedAccountToID[s.unmuxedOpSourceAccount.Address()]), CounterOfferID: null.IntFrom(EncodeOfferId(uint64(toid.New(int32(ledger.Header.LedgerSeq), 1, 9).ToInt64()), TOIDType)), BaseAmount: int64(s.strictSendTradeLP.AmountSold()), BaseLiquidityPoolID: null.IntFrom(s.lpToID[s.strictSendTradeLP.MustLiquidityPool().LiquidityPoolId]), - BaseAssetID: s.assetToID[s.strictSendTradeLP.AssetSold().String()].ID, + BaseAssetID: s.assetToID[history.AssetKeyFromXDR(s.strictSendTradeLP.AssetSold())].ID, BaseIsSeller: true, BaseIsExact: null.BoolFrom(false), LiquidityPoolFee: null.IntFrom(int64(xdr.LiquidityPoolFeeV18)), @@ -712,243 +723,75 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions( tx, } - s.mockQ.On("NewTradeBatchInsertBuilder"). - Return(s.mockBatchInsertBuilder).Once() - return inserts } -func mapKeysToList(set map[string]int64) []string { - keys := make([]string, 0, len(set)) - for key := range set { - keys = append(keys, key) +func (s *TradeProcessorTestSuiteLedger) stubLoaders() { + for key, id := range s.unmuxedAccountToID { + s.accountLoader.Insert(key, id) } - return keys -} - -func uniq(list []string) []string { - var deduped []string - set := map[string]bool{} - for _, s := range list { - if set[s] { - continue - } - deduped = append(deduped, s) - set[s] = true + for key, id := range s.assetToID { + s.assetLoader.Insert(key, id.ID) + } + for key, id := range s.lpToID { + s.lpLoader.Insert(PoolIDToString(key), id) } - return deduped } func (s *TradeProcessorTestSuiteLedger) TestIngestTradesSucceeds() { ctx := context.Background() - inserts := s.mockReadTradeTransactions(s.processor.ledger) + inserts := s.mockReadTradeTransactions() - s.mockCreateAccounts(ctx) - - s.mockCreateAssets(ctx) - - s.mockCreateHistoryLiquidityPools(ctx) + for _, tx := range s.txs { + err := s.processor.ProcessTransaction(s.lcm, tx) + s.Assert().NoError(err) + } for _, insert := range inserts { s.mockBatchInsertBuilder.On("Add", []history.InsertTrade{ insert, }).Return(nil).Once() } - s.mockBatchInsertBuilder.On("Exec", ctx, s.mockSession).Return(nil).Once() + s.stubLoaders() - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) - s.Assert().NoError(err) - } - - err := s.processor.Commit(ctx) + err := s.processor.Flush(ctx, s.mockSession) s.Assert().NoError(err) } -func (s *TradeProcessorTestSuiteLedger) mockCreateHistoryLiquidityPools(ctx context.Context) { - lpIDs, lpStrToID := s.extractLpIDs() - s.mockQ.On("CreateHistoryLiquidityPools", ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch( - lpIDs, - arg, - ) - }).Return(lpStrToID, nil).Once() -} - -func (s *TradeProcessorTestSuiteLedger) extractLpIDs() ([]string, map[string]int64) { - var lpIDs []string - lpStrToID := map[string]int64{} - for lpID, id := range s.lpToID { - lpIDStr := PoolIDToString(lpID) - lpIDs = append(lpIDs, lpIDStr) - lpStrToID[lpIDStr] = id - } - return lpIDs, lpStrToID -} - -func (s *TradeProcessorTestSuiteLedger) TestCreateAccountsError() { - ctx := context.Background() - s.mockReadTradeTransactions(s.processor.ledger) - - s.mockQ.On("CreateAccounts", ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch( - mapKeysToList(s.unmuxedAccountToID), - uniq(arg), - ) - }).Return(map[string]int64{}, fmt.Errorf("create accounts error")).Once() - - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) - s.Assert().NoError(err) - } - - err := s.processor.Commit(ctx) - - s.Assert().EqualError(err, "Error creating account ids: create accounts error") -} - -func (s *TradeProcessorTestSuiteLedger) TestCreateAssetsError() { - ctx := context.Background() - s.mockReadTradeTransactions(s.processor.ledger) - - s.mockCreateAccounts(ctx) - - s.mockQ.On("CreateAssets", ctx, mock.AnythingOfType("[]xdr.Asset"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]xdr.Asset) - s.Assert().ElementsMatch( - s.assets, - arg, - ) - }).Return(s.assetToID, fmt.Errorf("create assets error")).Once() - - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) - s.Assert().NoError(err) - } - - err := s.processor.Commit(ctx) - s.Assert().EqualError(err, "Error creating asset ids: create assets error") -} - -func (s *TradeProcessorTestSuiteLedger) TestCreateHistoryLiquidityPoolsError() { +func (s *TradeProcessorTestSuiteLedger) TestBatchAddError() { ctx := context.Background() - s.mockReadTradeTransactions(s.processor.ledger) - - s.mockCreateAccounts(ctx) - - s.mockCreateAssets(ctx) - - lpIDs, lpStrToID := s.extractLpIDs() - s.mockQ.On("CreateHistoryLiquidityPools", ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch( - lpIDs, - arg, - ) - }).Return(lpStrToID, fmt.Errorf("create liqudity pool id error")).Once() + s.mockReadTradeTransactions() for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) + err := s.processor.ProcessTransaction(s.lcm, tx) s.Assert().NoError(err) } - err := s.processor.Commit(ctx) - s.Assert().EqualError(err, "Error creating pool ids: create liqudity pool id error") -} - -func (s *TradeProcessorTestSuiteLedger) mockCreateAssets(ctx context.Context) { - s.mockQ.On("CreateAssets", ctx, mock.AnythingOfType("[]xdr.Asset"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]xdr.Asset) - s.Assert().ElementsMatch( - s.assets, - arg, - ) - }).Return(s.assetToID, nil).Once() -} - -func (s *TradeProcessorTestSuiteLedger) mockCreateAccounts(ctx context.Context) { - s.mockQ.On("CreateAccounts", ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch( - mapKeysToList(s.unmuxedAccountToID), - uniq(arg), - ) - }).Return(s.unmuxedAccountToID, nil).Once() -} - -func (s *TradeProcessorTestSuiteLedger) TestBatchAddError() { - ctx := context.Background() - s.mockReadTradeTransactions(s.processor.ledger) - - s.mockCreateAccounts(ctx) - - s.mockCreateAssets(ctx) - - s.mockCreateHistoryLiquidityPools(ctx) - + s.stubLoaders() s.mockBatchInsertBuilder.On("Add", mock.AnythingOfType("[]history.InsertTrade")). Return(fmt.Errorf("batch add error")).Once() - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) - s.Assert().NoError(err) - } - - err := s.processor.Commit(ctx) + err := s.processor.Flush(ctx, s.mockSession) s.Assert().EqualError(err, "Error adding trade to batch: batch add error") } func (s *TradeProcessorTestSuiteLedger) TestBatchExecError() { ctx := context.Background() - insert := s.mockReadTradeTransactions(s.processor.ledger) - - s.mockCreateAccounts(ctx) - - s.mockCreateAssets(ctx) + insert := s.mockReadTradeTransactions() - s.mockCreateHistoryLiquidityPools(ctx) - - s.mockBatchInsertBuilder.On("Add", mock.AnythingOfType("[]history.InsertTrade")). - Return(nil).Times(len(insert)) - s.mockBatchInsertBuilder.On("Exec", ctx, s.mockSession).Return(fmt.Errorf("exec error")).Once() for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) + err := s.processor.ProcessTransaction(s.lcm, tx) s.Assert().NoError(err) } - err := s.processor.Commit(ctx) - s.Assert().EqualError(err, "Error flushing operation batch: exec error") -} - -func (s *TradeProcessorTestSuiteLedger) TestIgnoreCheckIfSmallLedger() { - ctx := context.Background() - insert := s.mockReadTradeTransactions(s.processor.ledger) - - s.mockCreateAccounts(ctx) - - s.mockCreateAssets(ctx) - - s.mockCreateHistoryLiquidityPools(ctx) + s.stubLoaders() s.mockBatchInsertBuilder.On("Add", mock.AnythingOfType("[]history.InsertTrade")). Return(nil).Times(len(insert)) - s.mockBatchInsertBuilder.On("Exec", ctx, s.mockSession).Return(nil).Once() - - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(ctx, tx) - s.Assert().NoError(err) - } + s.mockBatchInsertBuilder.On("Exec", ctx, s.mockSession).Return(fmt.Errorf("exec error")).Once() - err := s.processor.Commit(ctx) - s.Assert().NoError(err) + err := s.processor.Flush(ctx, s.mockSession) + s.Assert().EqualError(err, "Error flushing operation batch: exec error") } func TestTradeProcessor_ProcessTransaction_MuxedAccount(t *testing.T) { @@ -976,7 +819,7 @@ func TestTradeProcessor_RoundingSlippage_Big(t *testing.T) { s := &TradeProcessorTestSuiteLedger{} s.SetT(t) s.SetupTest() - s.mockReadTradeTransactions(s.processor.ledger) + s.mockReadTradeTransactions() assetDeposited := xdr.MustNewCreditAsset("MAD", s.unmuxedSourceAccount.Address()) assetDisbursed := xdr.MustNewCreditAsset("GRE", s.unmuxedSourceAccount.Address()) @@ -1008,7 +851,7 @@ func TestTradeProcessor_RoundingSlippage_Small(t *testing.T) { s := &TradeProcessorTestSuiteLedger{} s.SetT(t) s.SetupTest() - s.mockReadTradeTransactions(s.processor.ledger) + s.mockReadTradeTransactions() assetDeposited := xdr.MustNewCreditAsset("MAD", s.unmuxedSourceAccount.Address()) assetDisbursed := xdr.MustNewCreditAsset("GRE", s.unmuxedSourceAccount.Address())