diff --git a/services/horizon/internal/db2/history/effect_batch_insert_builder.go b/services/horizon/internal/db2/history/effect_batch_insert_builder.go index e3e5896e7f..bd9aa0687a 100644 --- a/services/horizon/internal/db2/history/effect_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/effect_batch_insert_builder.go @@ -4,6 +4,7 @@ import ( "context" "github.com/guregu/null" + "github.com/stellar/go/support/db" ) @@ -11,7 +12,7 @@ import ( // history_effects table type EffectBatchInsertBuilder interface { Add( - accountID int64, + accountID FutureAccountID, muxedAccount null.String, operationID int64, order uint32, @@ -37,7 +38,7 @@ func (q *Q) NewEffectBatchInsertBuilder() EffectBatchInsertBuilder { // Add adds a effect to the batch func (i *effectBatchInsertBuilder) Add( - accountID int64, + accountID FutureAccountID, muxedAccount null.String, operationID int64, order uint32, diff --git a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go index 78988db2b4..cef5f3745d 100644 --- a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/guregu/null" + "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/toid" ) @@ -18,8 +19,7 @@ func TestAddEffect(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accounIDs, err := q.CreateAccounts(tt.Ctx, []string{address}, 1) - tt.Assert.NoError(err) + accountLoader := NewAccountLoader() builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) @@ -29,7 +29,7 @@ func TestAddEffect(t *testing.T) { }) err = builder.Add( - accounIDs[address], + accountLoader.GetFuture(address), null.StringFrom(muxedAddres), toid.New(sequence, 1, 1).ToInt64(), 1, @@ -38,6 +38,7 @@ func TestAddEffect(t *testing.T) { ) tt.Assert.NoError(err) + tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q)) tt.Assert.NoError(builder.Exec(tt.Ctx, q)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/db2/history/effect_test.go b/services/horizon/internal/db2/history/effect_test.go index bf893a100b..96d68208cb 100644 --- a/services/horizon/internal/db2/history/effect_test.go +++ b/services/horizon/internal/db2/history/effect_test.go @@ -23,8 +23,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Effect address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountIDs, err := q.CreateAccounts(tt.Ctx, []string{address}, 1) - tt.Assert.NoError(err) + accountLoader := NewAccountLoader() builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) @@ -32,19 +31,19 @@ func TestEffectsForLiquidityPool(t *testing.T) { "amount": "1000.0000000", "asset_type": "native", }) + tt.Assert.NoError(err) opID := toid.New(sequence, 1, 1).ToInt64() - err = builder.Add( - accountIDs[address], + tt.Assert.NoError(builder.Add( + accountLoader.GetFuture(address), null.StringFrom(muxedAddres), opID, 1, 3, details, - ) - tt.Assert.NoError(err) + )) - err = builder.Exec(tt.Ctx, q) - tt.Assert.NoError(err) + tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q)) + tt.Assert.NoError(builder.Exec(tt.Ctx, q)) // Insert Liquidity Pool history liquidityPoolID := "abcde" @@ -79,8 +78,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountIDs, err := q.CreateAccounts(tt.Ctx, []string{address}, 1) - tt.Assert.NoError(err) + accountLoader := NewAccountLoader() builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) @@ -142,27 +140,24 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) { for i, test := range tests { var bytes []byte - bytes, err = json.Marshal(test.details) + bytes, err := json.Marshal(test.details) tt.Require.NoError(err) - err = builder.Add( - accountIDs[address], + tt.Require.NoError(builder.Add( + accountLoader.GetFuture(address), null.StringFrom(muxedAddres), opID, uint32(i), test.effectType, bytes, - ) - tt.Require.NoError(err) + )) } - - err = builder.Exec(tt.Ctx, q) - tt.Require.NoError(err) + tt.Require.NoError(accountLoader.Exec(tt.Ctx, q)) + tt.Require.NoError(builder.Exec(tt.Ctx, q)) tt.Assert.NoError(q.Commit()) var results []Effect - err = q.Effects().Select(tt.Ctx, &results) - tt.Require.NoError(err) + tt.Require.NoError(q.Effects().Select(tt.Ctx, &results)) tt.Require.Len(results, len(tests)) for i, test := range tests { diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index 95fc7ac1b4..5d155ac5e8 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -10,12 +10,13 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/guregu/null" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/ingest" "github.com/stellar/go/network" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/toid" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/assert" ) func ledgerToMap(ledger Ledger) map[string]interface{} { @@ -285,11 +286,10 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err = json.Marshal(map[string]interface{}{"new_seq": 98}) tt.Assert.NoError(err) - accounIDs, err := q.CreateAccounts(ctx, []string{account.Address()}, 1) - tt.Assert.NoError(err) + accountLoader := NewAccountLoader() err = effectBuilder.Add( - accounIDs[account.Address()], + accountLoader.GetFuture(account.Address()), null.String{}, toid.New(fixture.Ledger.Sequence, 1, 1).ToInt64(), 1, @@ -297,6 +297,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, ) tt.Assert.NoError(err) + tt.Assert.NoError(accountLoader.Exec(ctx, q)) tt.Assert.NoError(effectBuilder.Exec(ctx, q)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/db2/history/mock_effect_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_effect_batch_insert_builder.go index f97e4f5a0d..35168a775a 100644 --- a/services/horizon/internal/db2/history/mock_effect_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/mock_effect_batch_insert_builder.go @@ -16,7 +16,7 @@ type MockEffectBatchInsertBuilder struct { // Add mock func (m *MockEffectBatchInsertBuilder) Add( - accountID int64, + accountID FutureAccountID, muxedAccount null.String, operationID int64, order uint32, diff --git a/services/horizon/internal/ingest/processors/effects_processor.go b/services/horizon/internal/ingest/processors/effects_processor.go index f7703016e3..6382461ef6 100644 --- a/services/horizon/internal/ingest/processors/effects_processor.go +++ b/services/horizon/internal/ingest/processors/effects_processor.go @@ -10,6 +10,7 @@ import ( "strconv" "github.com/guregu/null" + "github.com/stellar/go/amount" "github.com/stellar/go/ingest" "github.com/stellar/go/keypair" @@ -22,148 +23,51 @@ import ( // EffectProcessor process effects type EffectProcessor struct { - effects []effect - session db.SessionInterface - effectsQ history.QEffects - sequence uint32 + accountLoader *history.AccountLoader + batch history.EffectBatchInsertBuilder } -func NewEffectProcessor(session db.SessionInterface, effectsQ history.QEffects, sequence uint32) *EffectProcessor { +func NewEffectProcessor( + accountLoader *history.AccountLoader, + batch history.EffectBatchInsertBuilder, +) *EffectProcessor { return &EffectProcessor{ - session: session, - effectsQ: effectsQ, - sequence: sequence, + accountLoader: accountLoader, + batch: batch, } } -func (p *EffectProcessor) loadAccountIDs(ctx context.Context, accountSet map[string]int64) error { - addresses := make([]string, 0, len(accountSet)) - for address := range accountSet { - addresses = append(addresses, address) - } - - addressToID, err := p.effectsQ.CreateAccounts(ctx, addresses, maxBatchSize) - if err != nil { - return errors.Wrap(err, "Could not create account ids") - } - - for _, address := range addresses { - id, ok := addressToID[address] - if !ok { - return errors.Errorf("no id found for account address %s", address) - } - - accountSet[address] = id +func (p *EffectProcessor) ProcessTransaction( + lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction, +) error { + // Failed transactions don't have operation effects + if !transaction.Result.Successful() { + return nil } - return nil -} - -func operationsEffects(transaction ingest.LedgerTransaction, sequence uint32) ([]effect, error) { - effects := []effect{} - for opi, op := range transaction.Envelope.Operations() { operation := transactionOperationWrapper{ index: uint32(opi), transaction: transaction, operation: op, - ledgerSequence: sequence, + ledgerSequence: lcm.LedgerSequence(), } - - p, err := operation.effects() - if err != nil { - return effects, errors.Wrapf(err, "reading operation %v effects", operation.ID()) + if err := operation.ingestEffects(p.accountLoader, p.batch); err != nil { + return errors.Wrapf(err, "reading operation %v effects", operation.ID()) } - effects = append(effects, p...) } - return effects, nil -} - -func (p *EffectProcessor) insertDBOperationsEffects(ctx context.Context, effects []effect, accountSet map[string]int64) error { - batch := p.effectsQ.NewEffectBatchInsertBuilder() - - for _, effect := range effects { - accountID, found := accountSet[effect.address] - - if !found { - return errors.Errorf("Error finding history_account_id for address %v", effect.address) - } - - var detailsJSON []byte - detailsJSON, err := json.Marshal(effect.details) - - if err != nil { - return errors.Wrapf(err, "Error marshaling details for operation effect %v", effect.operationID) - } - - if err := batch.Add( - accountID, - effect.addressMuxed, - effect.operationID, - effect.order, - effect.effectType, - detailsJSON, - ); err != nil { - return errors.Wrap(err, "could not insert operation effect in db") - } - } - - if err := batch.Exec(ctx, p.session); err != nil { - return errors.Wrap(err, "could not flush operation effects to db") - } return nil } -func (p *EffectProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (err error) { - // Failed transactions don't have operation effects - if !transaction.Result.Successful() { - return nil - } - - var effectsForTx []effect - effectsForTx, err = operationsEffects(transaction, p.sequence) - if err != nil { - return err - } - p.effects = append(p.effects, effectsForTx...) - - return nil +func (p *EffectProcessor) Flush(ctx context.Context, session db.SessionInterface) (err error) { + return p.batch.Exec(ctx, session) } -func (p *EffectProcessor) Commit(ctx context.Context) (err error) { - if len(p.effects) > 0 { - accountSet := map[string]int64{} - - for _, effect := range p.effects { - accountSet[effect.address] = 0 - } - - if err = p.loadAccountIDs(ctx, accountSet); err != nil { - return err - } - - if err = p.insertDBOperationsEffects(ctx, p.effects, accountSet); err != nil { - return err - } - } - - return err -} - -type effect struct { - address string - addressMuxed null.String - operationID int64 - details map[string]interface{} - effectType history.EffectType - order uint32 -} - -// Effects returns the operation effects -func (operation *transactionOperationWrapper) effects() ([]effect, error) { +// ingestEffects adds effects from the operation to the given EffectBatchInsertBuilder +func (operation *transactionOperationWrapper) ingestEffects(accountLoader *history.AccountLoader, batch history.EffectBatchInsertBuilder) error { if !operation.transaction.Result.Successful() { - return []effect{}, nil + return nil } var ( op = operation.operation @@ -172,19 +76,21 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) { changes, err := operation.transaction.GetOperationChanges(operation.index) if err != nil { - return nil, err + return err } wrapper := &effectsWrapper{ - effects: []effect{}, - operation: operation, + accountLoader: accountLoader, + batch: batch, + order: 1, + operation: operation, } switch operation.OperationType() { case xdr.OperationTypeCreateAccount: - wrapper.addAccountCreatedEffects() + err = wrapper.addAccountCreatedEffects() case xdr.OperationTypePayment: - wrapper.addPaymentEffects() + err = wrapper.addPaymentEffects() case xdr.OperationTypePathPaymentStrictReceive: err = wrapper.pathPaymentStrictReceiveEffects() case xdr.OperationTypePathPaymentStrictSend: @@ -196,15 +102,15 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) { case xdr.OperationTypeCreatePassiveSellOffer: err = wrapper.addCreatePassiveSellOfferEffect() case xdr.OperationTypeSetOptions: - wrapper.addSetOptionsEffects() + err = wrapper.addSetOptionsEffects() case xdr.OperationTypeChangeTrust: err = wrapper.addChangeTrustEffects() case xdr.OperationTypeAllowTrust: err = wrapper.addAllowTrustEffects() case xdr.OperationTypeAccountMerge: - wrapper.addAccountMergeEffects() + err = wrapper.addAccountMergeEffects() case xdr.OperationTypeInflation: - wrapper.addInflationEffects() + err = wrapper.addInflationEffects() case xdr.OperationTypeManageData: err = wrapper.addManageDataEffects() case xdr.OperationTypeBumpSequence: @@ -226,10 +132,10 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) { case xdr.OperationTypeLiquidityPoolWithdraw: err = wrapper.addLiquidityPoolWithdrawEffect() default: - return nil, fmt.Errorf("Unknown operation type: %s", op.Body.Type) + err = fmt.Errorf("Unknown operation type: %s", op.Body.Type) } if err != nil { - return nil, err + return err } // Effects generated for multiple operations. Keep the effect categories @@ -238,48 +144,63 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) { // Sponsorships for _, change := range changes { - if err = wrapper.addLedgerEntrySponsorshipEffects(change); err != nil { - return nil, err + if err := wrapper.addLedgerEntrySponsorshipEffects(change); err != nil { + return err + } + if err := wrapper.addSignerSponsorshipEffects(change); err != nil { + return err } - wrapper.addSignerSponsorshipEffects(change) } // Liquidity pools for _, change := range changes { // Effects caused by ChangeTrust (creation), AllowTrust and SetTrustlineFlags (removal through revocation) - wrapper.addLedgerEntryLiquidityPoolEffects(change) + if err := wrapper.addLedgerEntryLiquidityPoolEffects(change); err != nil { + return err + } } - return wrapper.effects, nil + return nil } type effectsWrapper struct { - effects []effect - operation *transactionOperationWrapper + accountLoader *history.AccountLoader + batch history.EffectBatchInsertBuilder + order uint32 + operation *transactionOperationWrapper } -func (e *effectsWrapper) add(address string, addressMuxed null.String, effectType history.EffectType, details map[string]interface{}) { - e.effects = append(e.effects, effect{ - address: address, - addressMuxed: addressMuxed, - operationID: e.operation.ID(), - effectType: effectType, - order: uint32(len(e.effects) + 1), - details: details, - }) +func (e *effectsWrapper) add(address string, addressMuxed null.String, effectType history.EffectType, details map[string]interface{}) error { + detailsJSON, err := json.Marshal(details) + if err != nil { + return errors.Wrapf(err, "Error marshaling details for operation effect %v", e.operation.ID()) + } + + if err := e.batch.Add( + e.accountLoader.GetFuture(address), + addressMuxed, + e.operation.ID(), + e.order, + effectType, + detailsJSON, + ); err != nil { + return errors.Wrap(err, "could not insert operation effect in db") + } + e.order++ + return nil } -func (e *effectsWrapper) addUnmuxed(address *xdr.AccountId, effectType history.EffectType, details map[string]interface{}) { - e.add(address.Address(), null.String{}, effectType, details) +func (e *effectsWrapper) addUnmuxed(address *xdr.AccountId, effectType history.EffectType, details map[string]interface{}) error { + return e.add(address.Address(), null.String{}, effectType, details) } -func (e *effectsWrapper) addMuxed(address *xdr.MuxedAccount, effectType history.EffectType, details map[string]interface{}) { +func (e *effectsWrapper) addMuxed(address *xdr.MuxedAccount, effectType history.EffectType, details map[string]interface{}) error { var addressMuxed null.String if address.Type == xdr.CryptoKeyTypeKeyTypeMuxedEd25519 { addressMuxed = null.StringFrom(address.Address()) } accID := address.ToAccountId() - e.add(accID.Address(), addressMuxed, effectType, details) + return e.add(accID.Address(), addressMuxed, effectType, details) } var sponsoringEffectsTable = map[xdr.LedgerEntryType]struct { @@ -310,9 +231,9 @@ var sponsoringEffectsTable = map[xdr.LedgerEntryType]struct { // entries because we don't generate creation effects for them. } -func (e *effectsWrapper) addSignerSponsorshipEffects(change ingest.Change) { +func (e *effectsWrapper) addSignerSponsorshipEffects(change ingest.Change) error { if change.Type != xdr.LedgerEntryTypeAccount { - return + return nil } preSigners := map[string]xdr.AccountId{} @@ -350,12 +271,16 @@ func (e *effectsWrapper) addSignerSponsorshipEffects(change ingest.Change) { details["sponsor"] = post.Address() details["signer"] = signer srcAccount := change.Post.Data.MustAccount().AccountId - e.addUnmuxed(&srcAccount, history.EffectSignerSponsorshipCreated, details) + if err := e.addUnmuxed(&srcAccount, history.EffectSignerSponsorshipCreated, details); err != nil { + return err + } case !foundPost && foundPre: details["former_sponsor"] = pre.Address() details["signer"] = signer srcAccount := change.Pre.Data.MustAccount().AccountId - e.addUnmuxed(&srcAccount, history.EffectSignerSponsorshipRemoved, details) + if err := e.addUnmuxed(&srcAccount, history.EffectSignerSponsorshipRemoved, details); err != nil { + return err + } case foundPre && foundPost: formerSponsor := pre.Address() newSponsor := post.Address() @@ -367,9 +292,12 @@ func (e *effectsWrapper) addSignerSponsorshipEffects(change ingest.Change) { details["new_sponsor"] = newSponsor details["signer"] = signer srcAccount := change.Post.Data.MustAccount().AccountId - e.addUnmuxed(&srcAccount, history.EffectSignerSponsorshipUpdated, details) + if err := e.addUnmuxed(&srcAccount, history.EffectSignerSponsorshipUpdated, details); err != nil { + return err + } } } + return nil } func (e *effectsWrapper) addLedgerEntrySponsorshipEffects(change ingest.Change) error { @@ -447,9 +375,13 @@ func (e *effectsWrapper) addLedgerEntrySponsorshipEffects(change ingest.Change) } if accountID != nil { - e.addUnmuxed(accountID, effectType, details) + if err := e.addUnmuxed(accountID, effectType, details); err != nil { + return err + } } else { - e.addMuxed(muxedAccount, effectType, details) + if err := e.addMuxed(muxedAccount, effectType, details); err != nil { + return err + } } return nil @@ -477,55 +409,64 @@ func (e *effectsWrapper) addLedgerEntryLiquidityPoolEffects(change ingest.Change default: return nil } - e.addMuxed( + return e.addMuxed( e.operation.SourceAccount(), effectType, details, ) - - return nil } -func (e *effectsWrapper) addAccountCreatedEffects() { +func (e *effectsWrapper) addAccountCreatedEffects() error { op := e.operation.operation.Body.MustCreateAccountOp() - e.addUnmuxed( + if err := e.addUnmuxed( &op.Destination, history.EffectAccountCreated, map[string]interface{}{ "starting_balance": amount.String(op.StartingBalance), }, - ) - e.addMuxed( + ); err != nil { + return err + } + if err := e.addMuxed( e.operation.SourceAccount(), history.EffectAccountDebited, map[string]interface{}{ "asset_type": "native", "amount": amount.String(op.StartingBalance), }, - ) - e.addUnmuxed( + ); err != nil { + return err + } + if err := e.addUnmuxed( &op.Destination, history.EffectSignerCreated, map[string]interface{}{ "public_key": op.Destination.Address(), "weight": keypair.DefaultSignerWeight, }, - ) + ); err != nil { + return err + } + return nil } -func (e *effectsWrapper) addPaymentEffects() { +func (e *effectsWrapper) addPaymentEffects() error { op := e.operation.operation.Body.MustPaymentOp() details := map[string]interface{}{"amount": amount.String(op.Amount)} - addAssetDetails(details, op.Asset, "") + if err := addAssetDetails(details, op.Asset, ""); err != nil { + return err + } - e.addMuxed( + if err := e.addMuxed( &op.Destination, history.EffectAccountCredited, details, - ) - e.addMuxed( + ); err != nil { + return err + } + return e.addMuxed( e.operation.SourceAccount(), history.EffectAccountDebited, details, @@ -538,23 +479,31 @@ func (e *effectsWrapper) pathPaymentStrictReceiveEffects() error { source := e.operation.SourceAccount() details := map[string]interface{}{"amount": amount.String(op.DestAmount)} - addAssetDetails(details, op.DestAsset, "") + if err := addAssetDetails(details, op.DestAsset, ""); err != nil { + return err + } - e.addMuxed( + if err := e.addMuxed( &op.Destination, history.EffectAccountCredited, details, - ) + ); err != nil { + return err + } result := e.operation.OperationResult().MustPathPaymentStrictReceiveResult() details = map[string]interface{}{"amount": amount.String(result.SendAmount())} - addAssetDetails(details, op.SendAsset, "") + if err := addAssetDetails(details, op.SendAsset, ""); err != nil { + return err + } - e.addMuxed( + if err := e.addMuxed( source, history.EffectAccountDebited, details, - ) + ); err != nil { + return err + } return e.addIngestTradeEffects(*source, resultSuccess.Offers) } @@ -566,12 +515,20 @@ func (e *effectsWrapper) addPathPaymentStrictSendEffects() error { result := e.operation.OperationResult().MustPathPaymentStrictSendResult() details := map[string]interface{}{"amount": amount.String(result.DestAmount())} - addAssetDetails(details, op.DestAsset, "") - e.addMuxed(&op.Destination, history.EffectAccountCredited, details) + if err := addAssetDetails(details, op.DestAsset, ""); err != nil { + return err + } + if err := e.addMuxed(&op.Destination, history.EffectAccountCredited, details); err != nil { + return err + } details = map[string]interface{}{"amount": amount.String(op.SendAmount)} - addAssetDetails(details, op.SendAsset, "") - e.addMuxed(source, history.EffectAccountDebited, details) + if err := addAssetDetails(details, op.SendAsset, ""); err != nil { + return err + } + if err := e.addMuxed(source, history.EffectAccountDebited, details); err != nil { + return err + } return e.addIngestTradeEffects(*source, resultSuccess.Offers) } @@ -610,11 +567,13 @@ func (e *effectsWrapper) addSetOptionsEffects() error { op := e.operation.operation.Body.MustSetOptionsOp() if op.HomeDomain != nil { - e.addMuxed(source, history.EffectAccountHomeDomainUpdated, + if err := e.addMuxed(source, history.EffectAccountHomeDomainUpdated, map[string]interface{}{ "home_domain": string(*op.HomeDomain), }, - ) + ); err != nil { + return err + } } thresholdDetails := map[string]interface{}{} @@ -632,7 +591,9 @@ func (e *effectsWrapper) addSetOptionsEffects() error { } if len(thresholdDetails) > 0 { - e.addMuxed(source, history.EffectAccountThresholdsUpdated, thresholdDetails) + if err := e.addMuxed(source, history.EffectAccountThresholdsUpdated, thresholdDetails); err != nil { + return err + } } flagDetails := map[string]interface{}{} @@ -644,15 +605,19 @@ func (e *effectsWrapper) addSetOptionsEffects() error { } if len(flagDetails) > 0 { - e.addMuxed(source, history.EffectAccountFlagsUpdated, flagDetails) + if err := e.addMuxed(source, history.EffectAccountFlagsUpdated, flagDetails); err != nil { + return err + } } if op.InflationDest != nil { - e.addMuxed(source, history.EffectAccountInflationDestinationUpdated, + if err := e.addMuxed(source, history.EffectAccountInflationDestinationUpdated, map[string]interface{}{ "inflation_destination": op.InflationDest.Address(), }, - ) + ); err != nil { + return err + } } changes, err := e.operation.transaction.GetOperationChanges(e.operation.index) if err != nil { @@ -675,7 +640,7 @@ func (e *effectsWrapper) addSetOptionsEffects() error { continue } - beforeSortedSigners := []string{} + var beforeSortedSigners []string for signer := range before { beforeSortedSigners = append(beforeSortedSigners, signer) } @@ -684,21 +649,25 @@ func (e *effectsWrapper) addSetOptionsEffects() error { for _, addy := range beforeSortedSigners { weight, ok := after[addy] if !ok { - e.addMuxed(source, history.EffectSignerRemoved, map[string]interface{}{ + if err := e.addMuxed(source, history.EffectSignerRemoved, map[string]interface{}{ "public_key": addy, - }) + }); err != nil { + return err + } continue } if weight != before[addy] { - e.addMuxed(source, history.EffectSignerUpdated, map[string]interface{}{ + if err := e.addMuxed(source, history.EffectSignerUpdated, map[string]interface{}{ "public_key": addy, "weight": weight, - }) + }); err != nil { + return err + } } } - afterSortedSigners := []string{} + var afterSortedSigners []string for signer := range after { afterSortedSigners = append(afterSortedSigners, signer) } @@ -713,10 +682,12 @@ func (e *effectsWrapper) addSetOptionsEffects() error { continue } - e.addMuxed(source, history.EffectSignerCreated, map[string]interface{}{ + if err := e.addMuxed(source, history.EffectSignerCreated, map[string]interface{}{ "public_key": addy, "weight": weight, - }) + }); err != nil { + return err + } } } return nil @@ -772,10 +743,14 @@ func (e *effectsWrapper) addChangeTrustEffects() error { return err } } else { - addAssetDetails(details, op.Line.ToAsset(), "") + if err := addAssetDetails(details, op.Line.ToAsset(), ""); err != nil { + return err + } } - e.addMuxed(source, effect, details) + if err := e.addMuxed(source, effect, details); err != nil { + return err + } break } @@ -789,33 +764,47 @@ func (e *effectsWrapper) addAllowTrustEffects() error { details := map[string]interface{}{ "trustor": op.Trustor.Address(), } - addAssetDetails(details, asset, "") + if err := addAssetDetails(details, asset, ""); err != nil { + return err + } switch { case xdr.TrustLineFlags(op.Authorize).IsAuthorized(): - e.addMuxed(source, history.EffectTrustlineAuthorized, details) + if err := e.addMuxed(source, history.EffectTrustlineAuthorized, details); err != nil { + return err + } // Forward compatibility setFlags := xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag) - e.addTrustLineFlagsEffect(source, &op.Trustor, asset, &setFlags, nil) + if err := e.addTrustLineFlagsEffect(source, &op.Trustor, asset, &setFlags, nil); err != nil { + return err + } case xdr.TrustLineFlags(op.Authorize).IsAuthorizedToMaintainLiabilitiesFlag(): - e.addMuxed( + if err := e.addMuxed( source, history.EffectTrustlineAuthorizedToMaintainLiabilities, details, - ) + ); err != nil { + return err + } // Forward compatibility setFlags := xdr.Uint32(xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag) - e.addTrustLineFlagsEffect(source, &op.Trustor, asset, &setFlags, nil) + if err := e.addTrustLineFlagsEffect(source, &op.Trustor, asset, &setFlags, nil); err != nil { + return err + } default: - e.addMuxed(source, history.EffectTrustlineDeauthorized, details) + if err := e.addMuxed(source, history.EffectTrustlineDeauthorized, details); err != nil { + return err + } // Forward compatibility, show both as cleared clearFlags := xdr.Uint32(xdr.TrustLineFlagsAuthorizedFlag | xdr.TrustLineFlagsAuthorizedToMaintainLiabilitiesFlag) - e.addTrustLineFlagsEffect(source, &op.Trustor, asset, nil, &clearFlags) + if err := e.addTrustLineFlagsEffect(source, &op.Trustor, asset, nil, &clearFlags); err != nil { + return err + } } return e.addLiquidityPoolRevokedEffect() } -func (e *effectsWrapper) addAccountMergeEffects() { +func (e *effectsWrapper) addAccountMergeEffects() error { source := e.operation.SourceAccount() dest := e.operation.operation.Body.MustDestination() @@ -825,21 +814,31 @@ func (e *effectsWrapper) addAccountMergeEffects() { "asset_type": "native", } - e.addMuxed(source, history.EffectAccountDebited, details) - e.addMuxed(&dest, history.EffectAccountCredited, details) - e.addMuxed(source, history.EffectAccountRemoved, map[string]interface{}{}) + if err := e.addMuxed(source, history.EffectAccountDebited, details); err != nil { + return err + } + if err := e.addMuxed(&dest, history.EffectAccountCredited, details); err != nil { + return err + } + if err := e.addMuxed(source, history.EffectAccountRemoved, map[string]interface{}{}); err != nil { + return err + } + return nil } -func (e *effectsWrapper) addInflationEffects() { +func (e *effectsWrapper) addInflationEffects() error { payouts := e.operation.OperationResult().MustInflationResult().MustPayouts() for _, payout := range payouts { - e.addUnmuxed(&payout.Destination, history.EffectAccountCredited, + if err := e.addUnmuxed(&payout.Destination, history.EffectAccountCredited, map[string]interface{}{ "amount": amount.String(payout.Amount), "asset_type": "native", }, - ) + ); err != nil { + return err + } } + return nil } func (e *effectsWrapper) addManageDataEffects() error { @@ -879,8 +878,7 @@ func (e *effectsWrapper) addManageDataEffects() error { break } - e.addMuxed(source, effect, details) - return nil + return e.addMuxed(source, effect, details) } func (e *effectsWrapper) addBumpSequenceEffects() error { @@ -903,7 +901,9 @@ func (e *effectsWrapper) addBumpSequenceEffects() error { if beforeAccount.SeqNum != afterAccount.SeqNum { details := map[string]interface{}{"new_seq": afterAccount.SeqNum} - e.addMuxed(source, history.EffectSequenceBumped, details) + if err := e.addMuxed(source, history.EffectSequenceBumped, details); err != nil { + return err + } } break } @@ -926,7 +926,9 @@ func (e *effectsWrapper) addCreateClaimableBalanceEffects(changes []ingest.Chang continue } cb = change.Post.Data.ClaimableBalance - e.addClaimableBalanceEntryCreatedEffects(source, cb) + if err := e.addClaimableBalanceEntryCreatedEffects(source, cb); err != nil { + return err + } break } if cb == nil { @@ -936,14 +938,14 @@ func (e *effectsWrapper) addCreateClaimableBalanceEffects(changes []ingest.Chang details := map[string]interface{}{ "amount": amount.String(cb.Amount), } - addAssetDetails(details, cb.Asset, "") - e.addMuxed( + if err := addAssetDetails(details, cb.Asset, ""); err != nil { + return err + } + return e.addMuxed( source, history.EffectAccountDebited, details, ) - - return nil } func (e *effectsWrapper) addClaimableBalanceEntryCreatedEffects(source *xdr.MuxedAccount, cb *xdr.ClaimableBalanceEntry) error { @@ -957,11 +959,13 @@ func (e *effectsWrapper) addClaimableBalanceEntryCreatedEffects(source *xdr.Muxe "asset": cb.Asset.StringCanonical(), } setClaimableBalanceFlagDetails(details, cb.Flags()) - e.addMuxed( + if err := e.addMuxed( source, history.EffectClaimableBalanceCreated, details, - ) + ); err != nil { + return err + } // EffectClaimableBalanceClaimantCreated can be generated by // `create_claimable_balance` operation but also by `liquidity_pool_withdraw` // operation causing a revocation. @@ -977,7 +981,7 @@ func (e *effectsWrapper) addClaimableBalanceEntryCreatedEffects(source *xdr.Muxe } for _, c := range claimants { cv0 := c.MustV0() - e.addUnmuxed( + if err := e.addUnmuxed( &cv0.Destination, history.EffectClaimableBalanceClaimantCreated, map[string]interface{}{ @@ -986,9 +990,11 @@ func (e *effectsWrapper) addClaimableBalanceEntryCreatedEffects(source *xdr.Muxe "predicate": cv0.Predicate, "asset": cb.Asset.StringCanonical(), }, - ) + ); err != nil { + return err + } } - return err + return nil } func (e *effectsWrapper) addClaimClaimableBalanceEffects(changes []ingest.Change) error { @@ -1031,23 +1037,25 @@ func (e *effectsWrapper) addClaimClaimableBalanceEffects(changes []ingest.Change } setClaimableBalanceFlagDetails(details, cBalance.Flags()) source := e.operation.SourceAccount() - e.addMuxed( + if err := e.addMuxed( source, history.EffectClaimableBalanceClaimed, details, - ) + ); err != nil { + return err + } details = map[string]interface{}{ "amount": amount.String(cBalance.Amount), } - addAssetDetails(details, cBalance.Asset, "") - e.addMuxed( + if err := addAssetDetails(details, cBalance.Asset, ""); err != nil { + return err + } + return e.addMuxed( source, history.EffectAccountCredited, details, ) - - return nil } func (e *effectsWrapper) addIngestTradeEffects(buyer xdr.MuxedAccount, claims []xdr.ClaimAtom) error { @@ -1061,23 +1069,30 @@ func (e *effectsWrapper) addIngestTradeEffects(buyer xdr.MuxedAccount, claims [] return err } default: - e.addClaimTradeEffects(buyer, claim) + if err := e.addClaimTradeEffects(buyer, claim); err != nil { + return err + } } } return nil } -func (e *effectsWrapper) addClaimTradeEffects(buyer xdr.MuxedAccount, claim xdr.ClaimAtom) { +func (e *effectsWrapper) addClaimTradeEffects(buyer xdr.MuxedAccount, claim xdr.ClaimAtom) error { seller := claim.SellerId() - bd, sd := tradeDetails(buyer, seller, claim) + bd, sd, err := tradeDetails(buyer, seller, claim) + if err != nil { + return err + } - e.addMuxed( + if err := e.addMuxed( &buyer, history.EffectTrade, bd, - ) + ); err != nil { + return err + } - e.addUnmuxed( + return e.addUnmuxed( &seller, history.EffectTrade, sd, @@ -1100,8 +1115,7 @@ func (e *effectsWrapper) addClaimLiquidityPoolTradeEffect(claim xdr.ClaimAtom) e "amount": amount.String(claim.LiquidityPool.AmountBought), }, } - e.addMuxed(e.operation.SourceAccount(), history.EffectLiquidityPoolTrade, details) - return nil + return e.addMuxed(e.operation.SourceAccount(), history.EffectLiquidityPoolTrade, details) } func (e *effectsWrapper) addClawbackEffects() error { @@ -1110,20 +1124,26 @@ func (e *effectsWrapper) addClawbackEffects() error { "amount": amount.String(op.Amount), } source := e.operation.SourceAccount() - addAssetDetails(details, op.Asset, "") + if err := addAssetDetails(details, op.Asset, ""); err != nil { + return err + } // The funds will be burned, but even with that, we generated an account credited effect - e.addMuxed( + if err := e.addMuxed( source, history.EffectAccountCredited, details, - ) + ); err != nil { + return err + } - e.addMuxed( + if err := e.addMuxed( &op.From, history.EffectAccountDebited, details, - ) + ); err != nil { + return err + } return nil } @@ -1138,23 +1158,29 @@ func (e *effectsWrapper) addClawbackClaimableBalanceEffects(changes []ingest.Cha "balance_id": balanceId, } source := e.operation.SourceAccount() - e.addMuxed( + if err := e.addMuxed( source, history.EffectClaimableBalanceClawedBack, details, - ) + ); err != nil { + return err + } // Generate the account credited effect (although the funds will be burned) for the asset issuer for _, c := range changes { if c.Type == xdr.LedgerEntryTypeClaimableBalance && c.Post == nil && c.Pre != nil { cb := c.Pre.Data.ClaimableBalance details = map[string]interface{}{"amount": amount.String(cb.Amount)} - addAssetDetails(details, cb.Asset, "") - e.addMuxed( + if err := addAssetDetails(details, cb.Asset, ""); err != nil { + return err + } + if err := e.addMuxed( source, history.EffectAccountCredited, details, - ) + ); err != nil { + return err + } break } } @@ -1165,7 +1191,9 @@ func (e *effectsWrapper) addClawbackClaimableBalanceEffects(changes []ingest.Cha func (e *effectsWrapper) addSetTrustLineFlagsEffects() error { source := e.operation.SourceAccount() op := e.operation.operation.Body.MustSetTrustLineFlagsOp() - e.addTrustLineFlagsEffect(source, &op.Trustor, op.Asset, &op.SetFlags, &op.ClearFlags) + if err := e.addTrustLineFlagsEffect(source, &op.Trustor, op.Asset, &op.SetFlags, &op.ClearFlags); err != nil { + return err + } return e.addLiquidityPoolRevokedEffect() } @@ -1174,11 +1202,13 @@ func (e *effectsWrapper) addTrustLineFlagsEffect( trustor *xdr.AccountId, asset xdr.Asset, setFlags *xdr.Uint32, - clearFlags *xdr.Uint32) { + clearFlags *xdr.Uint32) error { details := map[string]interface{}{ "trustor": trustor.Address(), } - addAssetDetails(details, asset, "") + if err := addAssetDetails(details, asset, ""); err != nil { + return err + } var flagDetailsAdded bool if setFlags != nil { @@ -1191,8 +1221,11 @@ func (e *effectsWrapper) addTrustLineFlagsEffect( } if flagDetailsAdded { - e.addMuxed(account, history.EffectTrustlineFlagsUpdated, details) + if err := e.addMuxed(account, history.EffectTrustlineFlagsUpdated, details); err != nil { + return err + } } + return nil } func setTrustLineFlagDetails(flagDetails map[string]interface{}, flags xdr.TrustLineFlags, setValue bool) { @@ -1278,8 +1311,8 @@ func (e *effectsWrapper) addLiquidityPoolRevokedEffect() error { "reserves_revoked": reservesRevoked, "shares_revoked": amount.String(-delta.TotalPoolShares), } - e.addMuxed(source, history.EffectLiquidityPoolRevoked, details) - return nil + + return e.addMuxed(source, history.EffectLiquidityPoolRevoked, details) } func setAuthFlagDetails(flagDetails map[string]interface{}, flags xdr.AccountFlags, setValue bool) { @@ -1297,15 +1330,19 @@ func setAuthFlagDetails(flagDetails map[string]interface{}, flags xdr.AccountFla } } -func tradeDetails(buyer xdr.MuxedAccount, seller xdr.AccountId, claim xdr.ClaimAtom) (bd map[string]interface{}, sd map[string]interface{}) { +func tradeDetails(buyer xdr.MuxedAccount, seller xdr.AccountId, claim xdr.ClaimAtom) (bd map[string]interface{}, sd map[string]interface{}, err error) { bd = map[string]interface{}{ "offer_id": claim.OfferId(), "seller": seller.Address(), "bought_amount": amount.String(claim.AmountSold()), "sold_amount": amount.String(claim.AmountBought()), } - addAssetDetails(bd, claim.AssetSold(), "bought_") - addAssetDetails(bd, claim.AssetBought(), "sold_") + if err = addAssetDetails(bd, claim.AssetSold(), "bought_"); err != nil { + return + } + if err = addAssetDetails(bd, claim.AssetBought(), "sold_"); err != nil { + return + } sd = map[string]interface{}{ "offer_id": claim.OfferId(), @@ -1313,9 +1350,12 @@ func tradeDetails(buyer xdr.MuxedAccount, seller xdr.AccountId, claim xdr.ClaimA "sold_amount": amount.String(claim.AmountSold()), } addAccountAndMuxedAccountDetails(sd, buyer, "seller") - addAssetDetails(sd, claim.AssetBought(), "bought_") - addAssetDetails(sd, claim.AssetSold(), "sold_") - + if err = addAssetDetails(sd, claim.AssetBought(), "bought_"); err != nil { + return + } + if err = addAssetDetails(sd, claim.AssetSold(), "sold_"); err != nil { + return + } return } @@ -1359,8 +1399,8 @@ func (e *effectsWrapper) addLiquidityPoolDepositEffect() error { }, "shares_received": amount.String(delta.TotalPoolShares), } - e.addMuxed(e.operation.SourceAccount(), history.EffectLiquidityPoolDeposited, details) - return nil + + return e.addMuxed(e.operation.SourceAccount(), history.EffectLiquidityPoolDeposited, details) } func (e *effectsWrapper) addLiquidityPoolWithdrawEffect() error { @@ -1383,6 +1423,6 @@ func (e *effectsWrapper) addLiquidityPoolWithdrawEffect() error { }, "shares_redeemed": amount.String(-delta.TotalPoolShares), } - e.addMuxed(e.operation.SourceAccount(), history.EffectLiquidityPoolWithdrew, details) - return nil + + return e.addMuxed(e.operation.SourceAccount(), history.EffectLiquidityPoolWithdrew, details) } diff --git a/services/horizon/internal/ingest/processors/effects_processor_test.go b/services/horizon/internal/ingest/processors/effects_processor_test.go index 4293fb5b3b..3afd169c30 100644 --- a/services/horizon/internal/ingest/processors/effects_processor_test.go +++ b/services/horizon/internal/ingest/processors/effects_processor_test.go @@ -5,15 +5,15 @@ package processors import ( "context" "encoding/hex" + "encoding/json" "testing" "github.com/guregu/null" - "github.com/stellar/go/protocols/horizon/base" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest" + "github.com/stellar/go/protocols/horizon/base" "github.com/stellar/go/services/horizon/internal/db2/history" . "github.com/stellar/go/services/horizon/internal/test/transactions" "github.com/stellar/go/support/db" @@ -27,9 +27,10 @@ type EffectsProcessorTestSuiteLedger struct { ctx context.Context processor *EffectProcessor mockSession *db.MockSession - mockQ *history.MockQEffects + accountLoader *history.AccountLoader mockBatchInsertBuilder *history.MockEffectBatchInsertBuilder + lcm xdr.LedgerCloseMeta firstTx ingest.LedgerTransaction secondTx ingest.LedgerTransaction thirdTx ingest.LedgerTransaction @@ -38,7 +39,6 @@ type EffectsProcessorTestSuiteLedger struct { secondTxID int64 thirdTxID int64 failedTxID int64 - sequence uint32 addresses []string addressToID map[string]int64 txs []ingest.LedgerTransaction @@ -50,11 +50,18 @@ func TestEffectsProcessorTestSuiteLedger(t *testing.T) { func (s *EffectsProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.mockQ = &history.MockQEffects{} + s.accountLoader = history.NewAccountLoader() s.mockBatchInsertBuilder = &history.MockEffectBatchInsertBuilder{} - s.sequence = uint32(20) - + s.lcm = xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(20), + }, + }, + }, + } s.addresses = []string{ "GANFZDRBCNTUXIODCJEYMACPMCSZEVE4WZGZ3CZDZ3P2SXK4KH75IK6Y", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", @@ -72,7 +79,7 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() { Hash: "829d53f2dceebe10af8007564b0aefde819b95734ad431df84270651e7ed8a90", }, ) - s.firstTxID = toid.New(int32(s.sequence), 1, 0).ToInt64() + s.firstTxID = toid.New(int32(s.lcm.LedgerSequence()), 1, 0).ToInt64() s.secondTx = BuildLedgerTransaction( s.Suite.T(), @@ -86,7 +93,7 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() { }, ) - s.secondTxID = toid.New(int32(s.sequence), 2, 0).ToInt64() + s.secondTxID = toid.New(int32(s.lcm.LedgerSequence()), 2, 0).ToInt64() s.thirdTx = BuildLedgerTransaction( s.Suite.T(), @@ -99,7 +106,7 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() { Hash: "2a805712c6d10f9e74bb0ccf54ae92a2b4b1e586451fe8133a2433816f6b567c", }, ) - s.thirdTxID = toid.New(int32(s.sequence), 3, 0).ToInt64() + s.thirdTxID = toid.New(int32(s.lcm.LedgerSequence()), 3, 0).ToInt64() s.failedTx = BuildLedgerTransaction( s.Suite.T(), @@ -112,7 +119,7 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() { Hash: "24206737a02f7f855c46e367418e38c223f897792c76bbfb948e1b0dbd695f8b", }, ) - s.failedTxID = toid.New(int32(s.sequence), 4, 0).ToInt64() + s.failedTxID = toid.New(int32(s.lcm.LedgerSequence()), 4, 0).ToInt64() s.addressToID = map[string]int64{ s.addresses[0]: 2, @@ -121,9 +128,8 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() { } s.processor = NewEffectProcessor( - s.mockSession, - s.mockQ, - 20, + s.accountLoader, + s.mockBatchInsertBuilder, ) s.txs = []ingest.LedgerTransaction{ @@ -134,42 +140,42 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() { } func (s *EffectsProcessorTestSuiteLedger) TearDownTest() { - s.mockQ.AssertExpectations(s.T()) + s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *EffectsProcessorTestSuiteLedger) mockSuccessfulEffectBatchAdds() { s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[2]], + s.accountLoader.GetFuture(s.addresses[2]), null.String{}, - toid.New(int32(s.sequence), 1, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 1, 1).ToInt64(), uint32(1), history.EffectSequenceBumped, []byte("{\"new_seq\":300000000000}"), ).Return(nil).Once() s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[2]], + s.accountLoader.GetFuture(s.addresses[2]), null.String{}, - toid.New(int32(s.sequence), 2, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 2, 1).ToInt64(), uint32(1), history.EffectAccountCreated, []byte("{\"starting_balance\":\"1000.0000000\"}"), ).Return(nil).Once() s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[1]], + s.accountLoader.GetFuture(s.addresses[1]), null.String{}, - toid.New(int32(s.sequence), 2, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 2, 1).ToInt64(), uint32(2), history.EffectAccountDebited, []byte("{\"amount\":\"1000.0000000\",\"asset_type\":\"native\"}"), ).Return(nil).Once() s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[2]], + s.accountLoader.GetFuture(s.addresses[2]), null.String{}, - toid.New(int32(s.sequence), 2, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 2, 1).ToInt64(), uint32(3), history.EffectSignerCreated, []byte("{\"public_key\":\"GCQZP3IU7XU6EJ63JZXKCQOYT2RNXN3HB5CNHENNUEUHSMA4VUJJJSEN\",\"weight\":1}"), @@ -177,9 +183,9 @@ func (s *EffectsProcessorTestSuiteLedger) mockSuccessfulEffectBatchAdds() { s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[0]], + s.accountLoader.GetFuture(s.addresses[0]), null.String{}, - toid.New(int32(s.sequence), 3, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 3, 1).ToInt64(), uint32(1), history.EffectAccountCredited, []byte("{\"amount\":\"10.0000000\",\"asset_type\":\"native\"}"), @@ -187,81 +193,45 @@ func (s *EffectsProcessorTestSuiteLedger) mockSuccessfulEffectBatchAdds() { s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[0]], + s.accountLoader.GetFuture(s.addresses[0]), null.String{}, - toid.New(int32(s.sequence), 3, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 3, 1).ToInt64(), uint32(2), history.EffectAccountDebited, []byte("{\"amount\":\"10.0000000\",\"asset_type\":\"native\"}"), ).Return(nil).Once() } -func (s *EffectsProcessorTestSuiteLedger) mockSuccessfulCreateAccounts() { - s.mockQ.On( - "CreateAccounts", - s.ctx, - mock.AnythingOfType("[]string"), - maxBatchSize, - ).Run(func(args mock.Arguments) { - arg := args.Get(1).([]string) - s.Assert().ElementsMatch(s.addresses, arg) - }).Return(s.addressToID, nil).Once() -} - func (s *EffectsProcessorTestSuiteLedger) TestEmptyEffects() { - err := s.processor.Commit(context.Background()) - s.Assert().NoError(err) + s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() + s.Assert().NoError(s.processor.Flush(s.ctx, s.mockSession)) } func (s *EffectsProcessorTestSuiteLedger) TestIngestEffectsSucceeds() { - s.mockSuccessfulCreateAccounts() - s.mockQ.On("NewEffectBatchInsertBuilder"). - Return(s.mockBatchInsertBuilder).Once() - s.mockSuccessfulEffectBatchAdds() - - s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(s.ctx, tx) - s.Assert().NoError(err) + s.Assert().NoError(s.processor.ProcessTransaction(s.lcm, tx)) } - err := s.processor.Commit(s.ctx) - s.Assert().NoError(err) -} - -func (s *EffectsProcessorTestSuiteLedger) TestCreateAccountsFails() { - s.mockQ.On("CreateAccounts", s.ctx, mock.AnythingOfType("[]string"), maxBatchSize). - Return(s.addressToID, errors.New("transient error")).Once() - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(s.ctx, tx) - s.Assert().NoError(err) - } - err := s.processor.Commit(s.ctx) - s.Assert().EqualError(err, "Could not create account ids: transient error") + s.mockBatchInsertBuilder.On("Exec", s.ctx, s.mockSession).Return(nil).Once() + s.Assert().NoError(s.processor.Flush(s.ctx, s.mockSession)) } func (s *EffectsProcessorTestSuiteLedger) TestBatchAddFails() { - s.mockSuccessfulCreateAccounts() - s.mockQ.On("NewEffectBatchInsertBuilder"). - Return(s.mockBatchInsertBuilder).Once() - s.mockBatchInsertBuilder.On( "Add", - s.addressToID[s.addresses[2]], + s.accountLoader.GetFuture(s.addresses[2]), null.String{}, - toid.New(int32(s.sequence), 1, 1).ToInt64(), + toid.New(int32(s.lcm.LedgerSequence()), 1, 1).ToInt64(), uint32(1), history.EffectSequenceBumped, []byte("{\"new_seq\":300000000000}"), ).Return(errors.New("transient error")).Once() - for _, tx := range s.txs { - err := s.processor.ProcessTransaction(s.ctx, tx) - s.Assert().NoError(err) - } - err := s.processor.Commit(s.ctx) - s.Assert().EqualError(err, "could not insert operation effect in db: transient error") + + s.Assert().EqualError( + s.processor.ProcessTransaction(s.lcm, s.txs[0]), + "reading operation 85899350017 effects: could not insert operation effect in db: transient error", + ) } func getRevokeSponsorshipMeta(t *testing.T) (string, []effect) { @@ -462,7 +432,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { } assert.True(t, err2 != nil || err == nil, s) }() - _, err = operation.effects() + err = operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) }() } @@ -484,7 +454,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { ledgerSequence: 1, } // calling effects should error due to the unknown operation - _, err := operation.effects() + err := operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) assert.Contains(t, err.Error(), "Unknown operation type") } @@ -1546,7 +1516,6 @@ func TestOperationEffects(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - tt := assert.New(t) transaction := BuildLedgerTransaction( t, TestTransaction{ @@ -1566,15 +1535,12 @@ func TestOperationEffects(t *testing.T) { ledgerSequence: tc.sequence, } - effects, err := operation.effects() - tt.NoError(err) - tt.Equal(tc.expected, effects) + assertIngestEffects(t, operation, tc.expected) }) } } func TestOperationEffectsSetOptionsSignersOrder(t *testing.T) { - tt := assert.New(t) transaction := ingest.LedgerTransaction{ UnsafeMeta: createTransactionMeta([]xdr.OperationMeta{ { @@ -1656,8 +1622,6 @@ func TestOperationEffectsSetOptionsSignersOrder(t *testing.T) { ledgerSequence: 46, } - effects, err := operation.effects() - tt.NoError(err) expected := []effect{ { address: "GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV", @@ -1700,12 +1664,11 @@ func TestOperationEffectsSetOptionsSignersOrder(t *testing.T) { order: uint32(4), }, } - tt.Equal(expected, effects) + assertIngestEffects(t, operation, expected) } // Regression for https://github.com/stellar/go/issues/2136 func TestOperationEffectsSetOptionsSignersNoUpdated(t *testing.T) { - tt := assert.New(t) transaction := ingest.LedgerTransaction{ UnsafeMeta: createTransactionMeta([]xdr.OperationMeta{ { @@ -1787,8 +1750,6 @@ func TestOperationEffectsSetOptionsSignersNoUpdated(t *testing.T) { ledgerSequence: 46, } - effects, err := operation.effects() - tt.NoError(err) expected := []effect{ { address: "GCBBDQLCTNASZJ3MTKAOYEOWRGSHDFAJVI7VPZUOP7KXNHYR3HP2BUKV", @@ -1820,11 +1781,10 @@ func TestOperationEffectsSetOptionsSignersNoUpdated(t *testing.T) { order: uint32(3), }, } - tt.Equal(expected, effects) + assertIngestEffects(t, operation, expected) } func TestOperationRegressionAccountTrustItself(t *testing.T) { - tt := assert.New(t) // NOTE: when an account trusts itself, the transaction is successful but // no ledger entries are actually modified. transaction := ingest.LedgerTransaction{ @@ -1853,9 +1813,7 @@ func TestOperationRegressionAccountTrustItself(t *testing.T) { ledgerSequence: 46, } - effects, err := operation.effects() - tt.NoError(err) - tt.Equal([]effect{}, effects) + assertIngestEffects(t, operation, []effect{}) } func TestOperationEffectsAllowTrustAuthorizedToMaintainLiabilities(t *testing.T) { @@ -1889,9 +1847,6 @@ func TestOperationEffectsAllowTrustAuthorizedToMaintainLiabilities(t *testing.T) ledgerSequence: 1, } - effects, err := operation.effects() - tt.NoError(err) - expected := []effect{ { address: "GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD", @@ -1919,11 +1874,10 @@ func TestOperationEffectsAllowTrustAuthorizedToMaintainLiabilities(t *testing.T) order: uint32(2), }, } - tt.Equal(expected, effects) + assertIngestEffects(t, operation, expected) } func TestOperationEffectsClawback(t *testing.T) { - tt := assert.New(t) aid := xdr.MustAddress("GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD") source := aid.ToMuxedAccount() op := xdr.Operation{ @@ -1950,9 +1904,6 @@ func TestOperationEffectsClawback(t *testing.T) { ledgerSequence: 1, } - effects, err := operation.effects() - tt.NoError(err) - expected := []effect{ { address: "GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD", @@ -1979,11 +1930,10 @@ func TestOperationEffectsClawback(t *testing.T) { order: uint32(2), }, } - tt.Equal(expected, effects) + assertIngestEffects(t, operation, expected) } func TestOperationEffectsClawbackClaimableBalance(t *testing.T) { - tt := assert.New(t) aid := xdr.MustAddress("GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD") source := aid.ToMuxedAccount() var balanceID xdr.ClaimableBalanceId @@ -2010,9 +1960,6 @@ func TestOperationEffectsClawbackClaimableBalance(t *testing.T) { ledgerSequence: 1, } - effects, err := operation.effects() - tt.NoError(err) - expected := []effect{ { address: "GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD", @@ -2024,11 +1971,10 @@ func TestOperationEffectsClawbackClaimableBalance(t *testing.T) { order: uint32(1), }, } - tt.Equal(expected, effects) + assertIngestEffects(t, operation, expected) } func TestOperationEffectsSetTrustLineFlags(t *testing.T) { - tt := assert.New(t) aid := xdr.MustAddress("GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD") source := aid.ToMuxedAccount() trustor := xdr.MustAddress("GAUJETIZVEP2NRYLUESJ3LS66NVCEGMON4UDCBCSBEVPIID773P2W6AY") @@ -2059,9 +2005,6 @@ func TestOperationEffectsSetTrustLineFlags(t *testing.T) { ledgerSequence: 1, } - effects, err := operation.effects() - tt.NoError(err) - expected := []effect{ { address: "GDRW375MAYR46ODGF2WGANQC2RRZL7O246DYHHCGWTV2RE7IHE2QUQLD", @@ -2079,7 +2022,7 @@ func TestOperationEffectsSetTrustLineFlags(t *testing.T) { order: uint32(1), }, } - tt.Equal(expected, effects) + assertIngestEffects(t, operation, expected) } type CreateClaimableBalanceEffectsTestSuite struct { @@ -2328,9 +2271,7 @@ func (s *CreateClaimableBalanceEffectsTestSuite) TestEffects() { ledgerSequence: 1, } - effects, err := operation.effects() - s.Assert().NoError(err) - s.Assert().Equal(tc.expected, effects) + assertIngestEffects(t, operation, tc.expected) }) } } @@ -2588,13 +2529,42 @@ func (s *ClaimClaimableBalanceEffectsTestSuite) TestEffects() { ledgerSequence: 1, } - effects, err := operation.effects() - s.Assert().NoError(err) - s.Assert().Equal(tc.expected, effects) + assertIngestEffects(t, operation, tc.expected) }) } } +type effect struct { + address string + addressMuxed null.String + operationID int64 + details map[string]interface{} + effectType history.EffectType + order uint32 +} + +func assertIngestEffects(t *testing.T, operation transactionOperationWrapper, expected []effect) { + accountLoader := history.NewAccountLoader() + mockBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} + + for _, expectedEffect := range expected { + detailsJSON, err := json.Marshal(expectedEffect.details) + assert.NoError(t, err) + mockBatchInsertBuilder.On( + "Add", + accountLoader.GetFuture(expectedEffect.address), + expectedEffect.addressMuxed, + expectedEffect.operationID, + expectedEffect.order, + expectedEffect.effectType, + detailsJSON, + ).Return(nil).Once() + } + + assert.NoError(t, operation.ingestEffects(accountLoader, mockBatchInsertBuilder)) + mockBatchInsertBuilder.AssertExpectations(t) +} + func TestClaimClaimableBalanceEffectsTestSuite(t *testing.T) { suite.Run(t, new(ClaimClaimableBalanceEffectsTestSuite)) } @@ -2811,10 +2781,7 @@ func TestTrustlineSponsorshipEffects(t *testing.T) { ledgerSequence: 1, } - effects, err := operation.effects() - assert.NoError(t, err) - assert.Equal(t, expected, effects) - + assertIngestEffects(t, operation, expected) } func TestLiquidityPoolEffects(t *testing.T) { @@ -3445,9 +3412,7 @@ func TestLiquidityPoolEffects(t *testing.T) { ledgerSequence: 1, } - effects, err := operation.effects() - assert.NoError(t, err) - assert.Equal(t, tc.expected, effects) + assertIngestEffects(t, operation, tc.expected) }) } diff --git a/services/horizon/internal/ingest/processors/operations_processor.go b/services/horizon/internal/ingest/processors/operations_processor.go index d22ba2cd6d..5aefdbe0d1 100644 --- a/services/horizon/internal/ingest/processors/operations_processor.go +++ b/services/horizon/internal/ingest/processors/operations_processor.go @@ -294,7 +294,9 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, addAccountAndMuxedAccountDetails(details, *source, "from") addAccountAndMuxedAccountDetails(details, op.Destination, "to") details["amount"] = amount.String(op.Amount) - addAssetDetails(details, op.Asset, "") + if err := addAssetDetails(details, op.Asset, ""); err != nil { + return nil, err + } case xdr.OperationTypePathPaymentStrictReceive: op := operation.operation.Body.MustPathPaymentStrictReceiveOp() addAccountAndMuxedAccountDetails(details, *source, "from") @@ -303,8 +305,12 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, details["amount"] = amount.String(op.DestAmount) details["source_amount"] = amount.String(0) details["source_max"] = amount.String(op.SendMax) - addAssetDetails(details, op.DestAsset, "") - addAssetDetails(details, op.SendAsset, "source_") + if err := addAssetDetails(details, op.DestAsset, ""); err != nil { + return nil, err + } + if err := addAssetDetails(details, op.SendAsset, "source_"); err != nil { + return nil, err + } if operation.transaction.Result.Successful() { result := operation.OperationResult().MustPathPaymentStrictReceiveResult() @@ -314,7 +320,9 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, var path = make([]map[string]interface{}, len(op.Path)) for i := range op.Path { path[i] = make(map[string]interface{}) - addAssetDetails(path[i], op.Path[i], "") + if err := addAssetDetails(path[i], op.Path[i], ""); err != nil { + return nil, err + } } details["path"] = path @@ -326,8 +334,12 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, details["amount"] = amount.String(0) details["source_amount"] = amount.String(op.SendAmount) details["destination_min"] = amount.String(op.DestMin) - addAssetDetails(details, op.DestAsset, "") - addAssetDetails(details, op.SendAsset, "source_") + if err := addAssetDetails(details, op.DestAsset, ""); err != nil { + return nil, err + } + if err := addAssetDetails(details, op.SendAsset, "source_"); err != nil { + return nil, err + } if operation.transaction.Result.Successful() { result := operation.OperationResult().MustPathPaymentStrictSendResult() @@ -337,7 +349,9 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, var path = make([]map[string]interface{}, len(op.Path)) for i := range op.Path { path[i] = make(map[string]interface{}) - addAssetDetails(path[i], op.Path[i], "") + if err := addAssetDetails(path[i], op.Path[i], ""); err != nil { + return nil, err + } } details["path"] = path case xdr.OperationTypeManageBuyOffer: @@ -349,8 +363,12 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, "n": op.Price.N, "d": op.Price.D, } - addAssetDetails(details, op.Buying, "buying_") - addAssetDetails(details, op.Selling, "selling_") + if err := addAssetDetails(details, op.Buying, "buying_"); err != nil { + return nil, err + } + if err := addAssetDetails(details, op.Selling, "selling_"); err != nil { + return nil, err + } case xdr.OperationTypeManageSellOffer: op := operation.operation.Body.MustManageSellOfferOp() details["offer_id"] = op.OfferId @@ -360,8 +378,12 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, "n": op.Price.N, "d": op.Price.D, } - addAssetDetails(details, op.Buying, "buying_") - addAssetDetails(details, op.Selling, "selling_") + if err := addAssetDetails(details, op.Buying, "buying_"); err != nil { + return nil, err + } + if err := addAssetDetails(details, op.Selling, "selling_"); err != nil { + return nil, err + } case xdr.OperationTypeCreatePassiveSellOffer: op := operation.operation.Body.MustCreatePassiveSellOfferOp() details["amount"] = amount.String(op.Amount) @@ -370,8 +392,12 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, "n": op.Price.N, "d": op.Price.D, } - addAssetDetails(details, op.Buying, "buying_") - addAssetDetails(details, op.Selling, "selling_") + if err := addAssetDetails(details, op.Buying, "buying_"); err != nil { + return nil, err + } + if err := addAssetDetails(details, op.Selling, "selling_"); err != nil { + return nil, err + } case xdr.OperationTypeSetOptions: op := operation.operation.Body.MustSetOptionsOp() @@ -418,14 +444,18 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, return nil, err } } else { - addAssetDetails(details, op.Line.ToAsset(), "") + if err := addAssetDetails(details, op.Line.ToAsset(), ""); err != nil { + return nil, err + } details["trustee"] = details["asset_issuer"] } addAccountAndMuxedAccountDetails(details, *source, "trustor") details["limit"] = amount.String(op.Limit) case xdr.OperationTypeAllowTrust: op := operation.operation.Body.MustAllowTrustOp() - addAssetDetails(details, op.Asset.ToAsset(source.ToAccountId()), "") + if err := addAssetDetails(details, op.Asset.ToAsset(source.ToAccountId()), ""); err != nil { + return nil, err + } addAccountAndMuxedAccountDetails(details, *source, "trustee") details["trustor"] = op.Trustor.Address() details["authorize"] = xdr.TrustLineFlags(op.Authorize).IsAuthorized() @@ -496,7 +526,9 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, } case xdr.OperationTypeClawback: op := operation.operation.Body.MustClawbackOp() - addAssetDetails(details, op.Asset, "") + if err := addAssetDetails(details, op.Asset, ""); err != nil { + return nil, err + } addAccountAndMuxedAccountDetails(details, op.From, "from") details["amount"] = amount.String(op.Amount) case xdr.OperationTypeClawbackClaimableBalance: @@ -509,7 +541,9 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{}, case xdr.OperationTypeSetTrustLineFlags: op := operation.operation.Body.MustSetTrustLineFlagsOp() details["trustor"] = op.Trustor.Address() - addAssetDetails(details, op.Asset, "") + if err := addAssetDetails(details, op.Asset, ""); err != nil { + return nil, err + } if op.SetFlags > 0 { addTrustLineFlagDetails(details, xdr.TrustLineFlags(op.SetFlags), "set") }