diff --git a/services/horizon/docker/docker-compose.yml b/services/horizon/docker/docker-compose.yml index 40bced6677..377e26b0b4 100644 --- a/services/horizon/docker/docker-compose.yml +++ b/services/horizon/docker/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: horizon-postgres: - image: postgres:9.6.17-alpine + image: postgres:postgres:12-bullseye restart: on-failure environment: - POSTGRES_HOST_AUTH_METHOD=trust diff --git a/services/horizon/docker/verify-range/Dockerfile b/services/horizon/docker/verify-range/Dockerfile index 499f86881e..60fc6df745 100644 --- a/services/horizon/docker/verify-range/Dockerfile +++ b/services/horizon/docker/verify-range/Dockerfile @@ -1,14 +1,12 @@ FROM ubuntu:20.04 -MAINTAINER Bartek Nowotarski - ARG STELLAR_CORE_VERSION ENV STELLAR_CORE_VERSION=${STELLAR_CORE_VERSION:-*} # to remove tzdata interactive flow ENV DEBIAN_FRONTEND=noninteractive ADD dependencies / -RUN ["chmod", "+x", "dependencies"] +RUN ["chmod", "+x", "/dependencies"] RUN /dependencies ADD stellar-core.cfg / diff --git a/services/horizon/docker/verify-range/dependencies b/services/horizon/docker/verify-range/dependencies index fa622f9d2e..e17c6f4b5f 100644 --- a/services/horizon/docker/verify-range/dependencies +++ b/services/horizon/docker/verify-range/dependencies @@ -11,8 +11,8 @@ echo "deb https://apt.stellar.org $(lsb_release -cs) stable" | sudo tee -a /etc/ apt-get update apt-get install -y stellar-core=${STELLAR_CORE_VERSION} -wget -q https://dl.google.com/go/go1.18.linux-amd64.tar.gz -tar -C /usr/local -xzf go1.18.linux-amd64.tar.gz +wget -q https://dl.google.com/go/go1.20.linux-amd64.tar.gz +tar -C /usr/local -xzf go1.20.linux-amd64.tar.gz git clone https://github.com/stellar/go.git stellar-go cd stellar-go @@ -20,4 +20,4 @@ cd stellar-go # Below ensures we also fetch PR refs git config --add remote.origin.fetch "+refs/pull/*/head:refs/remotes/origin/pull/*" git fetch --force --quiet origin -/usr/local/go/bin/go build -v ./services/horizon +/usr/local/go/bin/go build -v ./services/horizon/. \ No newline at end of file diff --git a/services/horizon/internal/actions/transaction_test.go b/services/horizon/internal/actions/transaction_test.go index e029edef3a..b76cf1b0bf 100644 --- a/services/horizon/internal/actions/transaction_test.go +++ b/services/horizon/internal/actions/transaction_test.go @@ -149,6 +149,7 @@ func checkOuterHashResponse( } func TestFeeBumpTransactionPage(t *testing.T) { + tt := test.Start(t) defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e9fd9bedea..f3946b0448 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000 // Value implements the database/sql/driver Valuer interface. func (a FutureAccountID) Value() (driver.Value, error) { - return a.loader.GetNow(a.address), nil + return a.loader.GetNow(a.address) } // AccountLoader will map account addresses to their history @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AccountLoader) GetNow(address string) int64 { - if id, ok := a.ids[address]; !ok { - panic(fmt.Errorf("address %v not present", address)) +func (a *AccountLoader) GetNow(address string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid account loader state, + Exec was not called yet to properly seal and resolve %v id`, address) + } + if internalID, ok := a.ids[address]; !ok { + return 0, fmt.Errorf(`account loader address %q was not found`, address) } else { - return id + return internalID, nil } } @@ -205,5 +209,6 @@ func NewAccountLoaderStub() AccountLoaderStub { // Insert updates the wrapped AccountLoader so that the given account // address is mapped to the provided history account id func (a AccountLoaderStub) Insert(address string, id int64) { + a.Loader.sealed = true a.Loader.ids[address] = id } diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 11047f3be2..54d2c7a143 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -22,16 +22,11 @@ func TestAccountLoader(t *testing.T) { } loader := NewAccountLoader() - var futures []FutureAccountID for _, address := range addresses { future := loader.GetFuture(address) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(address) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid account loader state,`) duplicateFuture := loader.GetFuture(address) assert.Equal(t, future, duplicateFuture) } @@ -42,15 +37,16 @@ func TestAccountLoader(t *testing.T) { }) q := &Q{session} - for i, address := range addresses { - future := futures[i] - id := loader.GetNow(address) - val, err := future.Value() + for _, address := range addresses { + internalId, err := loader.GetNow(address) assert.NoError(t, err) - assert.Equal(t, id, val) var account Account assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) - assert.Equal(t, account.ID, id) + assert.Equal(t, account.ID, internalId) assert.Equal(t, account.Address, address) } + + _, err := loader.GetNow("not present") + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index 6ef3d7a350..b5ee9a8326 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "fmt" "sort" + "strings" sq "github.com/Masterminds/squirrel" @@ -21,11 +22,18 @@ type AssetKey struct { Issuer string } +func (key AssetKey) String() string { + if key.Type == xdr.AssetTypeToString[xdr.AssetTypeAssetTypeNative] { + return key.Type + } + return key.Type + "/" + key.Code + "/" + key.Issuer +} + // AssetKeyFromXDR constructs an AssetKey from an xdr asset func AssetKeyFromXDR(asset xdr.Asset) AssetKey { return AssetKey{ Type: xdr.AssetTypeToString[asset.Type], - Code: asset.GetCode(), + Code: strings.TrimRight(asset.GetCode(), "\x00"), Issuer: asset.GetIssuer(), } } @@ -41,7 +49,7 @@ type FutureAssetID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureAssetID) Value() (driver.Value, error) { - return a.loader.GetNow(a.asset), nil + return a.loader.GetNow(a.asset) } // AssetLoader will map assets to their history @@ -81,11 +89,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AssetLoader) GetNow(asset AssetKey) int64 { - if id, ok := a.ids[asset]; !ok { - panic(fmt.Errorf("asset %v not present", asset)) +func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid asset loader state, + Exec was not called yet to properly seal and resolve %v id`, asset) + } + if internalID, ok := a.ids[asset]; !ok { + return 0, fmt.Errorf(`asset loader id %v was not found`, asset) } else { - return id + return internalID, nil } } @@ -137,6 +149,11 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err assetTypes := make([]string, 0, len(a.set)-len(a.ids)) assetCodes := make([]string, 0, len(a.set)-len(a.ids)) assetIssuers := make([]string, 0, len(a.set)-len(a.ids)) + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(keys, func(i, j int) bool { + return keys[i].String() < keys[j].String() + }) insert := 0 for _, key := range keys { if _, ok := a.ids[key]; ok { @@ -152,20 +169,6 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err return nil } keys = keys[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Slice(keys, func(i, j int) bool { - if keys[i].Type < keys[j].Type { - return true - } - if keys[i].Code < keys[j].Code { - return true - } - if keys[i].Issuer < keys[j].Issuer { - return true - } - return false - }) err := bulkInsert( ctx, @@ -211,5 +214,6 @@ func NewAssetLoaderStub() AssetLoaderStub { // Insert updates the wrapped AssetLoaderStub so that the given asset // address is mapped to the provided history asset id func (a AssetLoaderStub) Insert(asset AssetKey, id int64) { + a.Loader.sealed = true a.Loader.ids[asset] = id } diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index 99f510266c..d67163d764 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -12,6 +12,28 @@ import ( "github.com/stellar/go/xdr" ) +func TestAssetKeyToString(t *testing.T) { + num4key := AssetKey{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "A1B2C3", + } + + num12key := AssetKey{ + Type: "credit_alphanum12", + Code: "USDABC", + Issuer: "A1B2C3", + } + + nativekey := AssetKey{ + Type: "native", + } + + assert.Equal(t, num4key.String(), "credit_alphanum4/USD/A1B2C3") + assert.Equal(t, num12key.String(), "credit_alphanum12/USDABC/A1B2C3") + assert.Equal(t, nativekey.String(), "native") +} + func TestAssetLoader(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -22,30 +44,34 @@ func TestAssetLoader(t *testing.T) { for i := 0; i < 100; i++ { var key AssetKey if i == 0 { - key.Type = "native" + key = AssetKeyFromXDR(xdr.Asset{Type: xdr.AssetTypeAssetTypeNative}) } else if i%2 == 0 { - key.Type = "credit_alphanum4" - key.Code = fmt.Sprintf("ab%d", i) - key.Issuer = keypair.MustRandom().Address() + code := [4]byte{0, 0, 0, 0} + copy(code[:], fmt.Sprintf("ab%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum4, + AlphaNum4: &xdr.AlphaNum4{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) } else { - key.Type = "credit_alphanum12" - key.Code = fmt.Sprintf("abcdef%d", i) - key.Issuer = keypair.MustRandom().Address() + code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + copy(code[:], fmt.Sprintf("abcdef%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum12, + AlphaNum12: &xdr.AlphaNum12{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) + } keys = append(keys, key) } loader := NewAssetLoader() - var futures []FutureAssetID for _, key := range keys { future := loader.GetFuture(key) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(key) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid asset loader state,`) duplicateFuture := loader.GetFuture(key) assert.Equal(t, future, duplicateFuture) } @@ -56,12 +82,9 @@ func TestAssetLoader(t *testing.T) { }) q := &Q{session} - for i, key := range keys { - future := futures[i] - internalID := loader.GetNow(key) - val, err := future.Value() + for _, key := range keys { + internalID, err := loader.GetNow(key) assert.NoError(t, err) - assert.Equal(t, internalID, val) var assetXDR xdr.Asset if key.Type == "native" { assetXDR = xdr.MustNewNativeAsset() @@ -72,4 +95,8 @@ func TestAssetLoader(t *testing.T) { assert.NoError(t, err) assert.Equal(t, assetID, internalID) } + + _, err := loader.GetNow(AssetKey{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index a077eb683e..dd7dee4ea5 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureClaimableBalanceID) Value() (driver.Value, error) { - return a.loader.getNow(a.id), nil + return a.loader.getNow(a.id) } // ClaimableBalanceLoader will map claimable balance ids to their internal @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID { // getNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any getNow // call can succeed. -func (a *ClaimableBalanceLoader) getNow(id string) int64 { +func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid claimable balance loader state, + Exec was not called yet to properly seal and resolve %v id`, id) + } if internalID, ok := a.ids[id]; !ok { - panic(fmt.Errorf("id %v not present", id)) + return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id) } else { - return internalID + return internalID, nil } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index b119daa674..4dd7324521 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) { for _, id := range ids { future := loader.GetFuture(id) futures = append(futures, future) - assert.Panics(t, func() { - loader.getNow(id) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid claimable balance loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) { q := &Q{session} for i, id := range ids { future := futures[i] - internalID := loader.getNow(id) - val, err := future.Value() + internalID, err := future.Value() assert.NoError(t, err) - assert.Equal(t, internalID, val) cb, err := q.ClaimableBalanceByID(context.Background(), id) assert.NoError(t, err) assert.Equal(t, cb.BalanceID, id) assert.Equal(t, cb.InternalID, internalID) } + + futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader} + _, err := futureCb.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index 5d155ac5e8..75dcc20d61 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -269,6 +269,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err := json.Marshal(map[string]string{ "bump_to": "98", }) + tt.Assert.NoError(err) tt.Assert.NoError(opBuilder.Add( @@ -296,9 +297,10 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { EffectSequenceBumped, details, ) + tt.Assert.NoError(err) - tt.Assert.NoError(accountLoader.Exec(ctx, q)) - tt.Assert.NoError(effectBuilder.Exec(ctx, q)) + tt.Assert.NoError(accountLoader.Exec(ctx, q.SessionInterface)) + tt.Assert.NoError(effectBuilder.Exec(ctx, q.SessionInterface)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index 7c2fe6fd4d..cf89ae67b4 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureLiquidityPoolID) Value() (driver.Value, error) { - return a.loader.GetNow(a.id), nil + return a.loader.GetNow(a.id) } // LiquidityPoolLoader will map liquidity pools to their internal @@ -64,11 +64,15 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *LiquidityPoolLoader) GetNow(id string) int64 { - if id, ok := a.ids[id]; !ok { - panic(fmt.Errorf("id %v not present", id)) +func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid liquidity pool loader state, + Exec was not called yet to properly seal and resolve %v id`, id) + } + if internalID, ok := a.ids[id]; !ok { + return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id) } else { - return id + return internalID, nil } } @@ -156,5 +160,6 @@ func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { // Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool // is mapped to the provided history liquidity pool id func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) { + a.Loader.sealed = true a.Loader.ids[lp] = id } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index e2b1e05beb..6e5b4addf7 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -25,16 +25,11 @@ func TestLiquidityPoolLoader(t *testing.T) { } loader := NewLiquidityPoolLoader() - var futures []FutureLiquidityPoolID for _, id := range ids { future := loader.GetFuture(id) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(id) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -45,15 +40,16 @@ func TestLiquidityPoolLoader(t *testing.T) { }) q := &Q{session} - for i, id := range ids { - future := futures[i] - internalID := loader.GetNow(id) - val, err := future.Value() + for _, id := range ids { + internalID, err := loader.GetNow(id) assert.NoError(t, err) - assert.Equal(t, internalID, val) lp, err := q.LiquidityPoolByID(context.Background(), id) assert.NoError(t, err) assert.Equal(t, lp.PoolID, id) assert.Equal(t, lp.InternalID, internalID) } + + _, err := loader.GetNow("not present") + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index fc2ca9c831..508bfa22cf 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -43,6 +43,8 @@ func TestAddOperationParticipants(t *testing.T) { op := ops[0] tt.Assert.Equal(int64(240518172673), op.OperationID) - tt.Assert.Equal(accountLoader.GetNow(address), op.AccountID) + val, err := accountLoader.GetNow(address) + tt.Assert.NoError(err) + tt.Assert.Equal(val, op.AccountID) } } diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index 16671098bf..07f6d59c3e 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -63,7 +63,9 @@ func TestTransactionParticipantsBatch(t *testing.T) { {TransactionID: 2}, } for i := range expected { - expected[i].AccountID = accountLoader.GetNow(addresses[i]) + val, err := accountLoader.GetNow(addresses[i]) + tt.Assert.NoError(err) + expected[i].AccountID = val } tt.Assert.ElementsMatch(expected, participants) } diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 38e3fe9ed7..284ea7127b 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -459,6 +459,7 @@ func (r resumeState) run(s *system) (transition, error) { // Update cursor if there's more than one ingesting instance: either // Captive-Core or DB ingestion connected to another Stellar-Core. + // remove now? if err = s.updateCursor(lastIngestedLedger); err != nil { // Don't return updateCursor error. log.WithError(err).Warn("error updating stellar-core cursor") @@ -524,6 +525,7 @@ func (r resumeState) run(s *system) (transition, error) { return retryResume(r), err } + //TODO remove now? stellar-core-db-url is removed if err = s.updateCursor(ingestLedger); err != nil { // Don't return updateCursor error. log.WithError(err).Warn("error updating stellar-core cursor") diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index 86622810b5..af486a35cf 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -7,7 +7,9 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/ingest/processors" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" ) type processorsRunDurations map[string]time.Duration @@ -51,21 +53,23 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error { } type groupTransactionProcessors struct { - processors []horizonTransactionProcessor + processors []horizonTransactionProcessor + lazyLoaders []horizonLazyLoader processorsRunDurations } -func newGroupTransactionProcessors(processors []horizonTransactionProcessor) *groupTransactionProcessors { +func newGroupTransactionProcessors(processors []horizonTransactionProcessor, lazyLoaders []horizonLazyLoader) *groupTransactionProcessors { return &groupTransactionProcessors{ processors: processors, processorsRunDurations: make(map[string]time.Duration), + lazyLoaders: lazyLoaders, } } -func (g groupTransactionProcessors) ProcessTransaction(ctx context.Context, tx ingest.LedgerTransaction) error { +func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta, tx ingest.LedgerTransaction) error { for _, p := range g.processors { startTime := time.Now() - if err := p.ProcessTransaction(ctx, tx); err != nil { + if err := p.ProcessTransaction(lcm, tx); err != nil { return errors.Wrapf(err, "error in %T.ProcessTransaction", p) } g.AddRunDuration(fmt.Sprintf("%T", p), startTime) @@ -73,11 +77,21 @@ func (g groupTransactionProcessors) ProcessTransaction(ctx context.Context, tx i return nil } -func (g groupTransactionProcessors) Commit(ctx context.Context) error { +func (g groupTransactionProcessors) Flush(ctx context.Context, session db.SessionInterface) error { + // need to trigger all lazy loaders to now resolve their future placeholders + // with real db values first + for _, loader := range g.lazyLoaders { + if err := loader.Exec(ctx, session); err != nil { + return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader) + } + } + + // now flush each processor which may call loader.GetNow(), which + // required the prior loader.Exec() to have been called. for _, p := range g.processors { startTime := time.Now() - if err := p.Commit(ctx); err != nil { - return errors.Wrapf(err, "error in %T.Commit", p) + if err := p.Flush(ctx, session); err != nil { + return errors.Wrapf(err, "error in %T.Flush", p) } g.AddRunDuration(fmt.Sprintf("%T", p), startTime) } diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 6848c24a66..73d4f56f3f 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" + "github.com/stellar/go/xdr" ) var _ horizonChangeProcessor = (*mockHorizonChangeProcessor)(nil) @@ -35,13 +37,13 @@ type mockHorizonTransactionProcessor struct { mock.Mock } -func (m *mockHorizonTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { - args := m.Called(ctx, transaction) +func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { + args := m.Called(lcm, transaction) return args.Error(0) } -func (m *mockHorizonTransactionProcessor) Commit(ctx context.Context) error { - args := m.Called(ctx) +func (m *mockHorizonTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { + args := m.Called(ctx, session) return args.Error(0) } @@ -124,6 +126,7 @@ type GroupTransactionProcessorsTestSuiteLedger struct { processors *groupTransactionProcessors processorA *mockHorizonTransactionProcessor processorB *mockHorizonTransactionProcessor + session db.SessionInterface } func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) { @@ -137,7 +140,8 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() { s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{ s.processorA, s.processorB, - }) + }, nil) + s.session = &db.MockSession{} } func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() { @@ -147,46 +151,48 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() { func (s *GroupTransactionProcessorsTestSuiteLedger) TestProcessTransactionFails() { transaction := ingest.LedgerTransaction{} + closeMeta := xdr.LedgerCloseMeta{} s.processorA. - On("ProcessTransaction", s.ctx, transaction). + On("ProcessTransaction", closeMeta, transaction). Return(errors.New("transient error")).Once() - err := s.processors.ProcessTransaction(s.ctx, transaction) + err := s.processors.ProcessTransaction(closeMeta, transaction) s.Assert().Error(err) s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.ProcessTransaction: transient error") } func (s *GroupTransactionProcessorsTestSuiteLedger) TestProcessTransactionSucceeds() { transaction := ingest.LedgerTransaction{} + closeMeta := xdr.LedgerCloseMeta{} s.processorA. - On("ProcessTransaction", s.ctx, transaction). + On("ProcessTransaction", closeMeta, transaction). Return(nil).Once() s.processorB. - On("ProcessTransaction", s.ctx, transaction). + On("ProcessTransaction", closeMeta, transaction). Return(nil).Once() - err := s.processors.ProcessTransaction(s.ctx, transaction) + err := s.processors.ProcessTransaction(closeMeta, transaction) s.Assert().NoError(err) } -func (s *GroupTransactionProcessorsTestSuiteLedger) TestCommitFails() { +func (s *GroupTransactionProcessorsTestSuiteLedger) TestFlushFails() { s.processorA. - On("Commit", s.ctx). + On("Flush", s.ctx, s.session). Return(errors.New("transient error")).Once() - err := s.processors.Commit(s.ctx) + err := s.processors.Flush(s.ctx, s.session) s.Assert().Error(err) - s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.Commit: transient error") + s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.Flush: transient error") } -func (s *GroupTransactionProcessorsTestSuiteLedger) TestCommitSucceeds() { +func (s *GroupTransactionProcessorsTestSuiteLedger) TestFlushSucceeds() { s.processorA. - On("Commit", s.ctx). + On("Flush", s.ctx, s.session). Return(nil).Once() s.processorB. - On("Commit", s.ctx). + On("Flush", s.ctx, s.session). Return(nil).Once() - err := s.processors.Commit(s.ctx) + err := s.processors.Flush(s.ctx, s.session) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 481a4e7d52..f66b10c1e3 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -32,7 +32,10 @@ type horizonChangeProcessor interface { type horizonTransactionProcessor interface { processors.LedgerTransactionProcessor - Commit(context.Context) error +} + +type horizonLazyLoader interface { + Exec(ctx context.Context, session db.SessionInterface) error } type statsChangeProcessor struct { @@ -47,10 +50,6 @@ type statsLedgerTransactionProcessor struct { *processors.StatsLedgerTransactionProcessor } -func (statsLedgerTransactionProcessor) Commit(ctx context.Context) error { - return nil -} - type ledgerStats struct { changeStats ingest.StatsChangeProcessorResults changeDurations processorsRunDurations @@ -135,24 +134,36 @@ func buildChangeProcessor( func (s *ProcessorRunner) buildTransactionProcessor( ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, - ledger xdr.LedgerHeaderHistoryEntry, + ledgersProcessor *processors.LedgersProcessor, ) *groupTransactionProcessors { + accountLoader := history.NewAccountLoader() + assetLoader := history.NewAssetLoader() + lpLoader := history.NewLiquidityPoolLoader() + cbLoader := history.NewClaimableBalanceLoader() + + lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader} + statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{ StatsLedgerTransactionProcessor: ledgerTransactionStats, } - *tradeProcessor = *processors.NewTradeProcessor(s.session, s.historyQ, ledger) - sequence := uint32(ledger.Header.LedgerSeq) - return newGroupTransactionProcessors([]horizonTransactionProcessor{ + *tradeProcessor = *processors.NewTradeProcessor(accountLoader, + lpLoader, assetLoader, s.historyQ.NewTradeBatchInsertBuilder()) + + processors := []horizonTransactionProcessor{ statsLedgerTransactionProcessor, - processors.NewEffectProcessor(s.session, s.historyQ, sequence), - processors.NewLedgerProcessor(s.session, s.historyQ, ledger, CurrentVersion), - processors.NewOperationProcessor(s.session, s.historyQ, sequence), + processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder()), + ledgersProcessor, + processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder()), tradeProcessor, - processors.NewParticipantsProcessor(s.session, s.historyQ, sequence), - processors.NewTransactionProcessor(s.session, s.historyQ, sequence), - processors.NewClaimableBalancesTransactionProcessor(s.session, s.historyQ, sequence), - processors.NewLiquidityPoolsTransactionProcessor(s.session, s.historyQ, sequence), - }) + processors.NewParticipantsProcessor(accountLoader, + s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()), + processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()), + processors.NewClaimableBalancesTransactionProcessor(cbLoader, + s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()), + processors.NewLiquidityPoolsTransactionProcessor(lpLoader, + s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} + + return newGroupTransactionProcessors(processors, lazyLoaders) } func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers { @@ -164,15 +175,15 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers return newGroupTransactionFilterers(f) } -func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHistoryEntry) *groupTransactionProcessors { +func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessors { // when in online mode, the submission result processor must always run (regardless of filtering) var p []horizonTransactionProcessor if s.config.EnableIngestionFiltering { - txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq)) + txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder()) p = append(p, txSubProc) } - return newGroupTransactionProcessors(p) + return newGroupTransactionProcessors(p, nil) } // checkIfProtocolVersionSupported checks if this Horizon version supports the @@ -311,26 +322,31 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos transactionReader *ingest.LedgerTransactionReader ) + if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { + err = errors.Wrap(err, "Error while checking for supported protocol version") + return + } + + // ensure capture of the ledger to history regardless of whether it has transactions. + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) + ledgersProcessor.ProcessLedger(ledger) + transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) if err != nil { err = errors.Wrap(err, "Error creating ledger reader") return } - if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { - err = errors.Wrap(err, "Error while checking for supported protocol version") - return - } - header := transactionReader.GetHeader() groupTransactionFilterers := s.buildTransactionFilterer() - groupFilteredOutProcessors := s.buildFilteredOutProcessor(header) + groupFilteredOutProcessors := s.buildFilteredOutProcessor() groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, header) + &ledgerTransactionStats, &tradeProcessor, ledgersProcessor) err = processors.StreamLedgerTransactions(s.ctx, groupTransactionFilterers, groupFilteredOutProcessors, groupTransactionProcessors, transactionReader, + ledger, ) if err != nil { err = errors.Wrap(err, "Error streaming changes from ledger") @@ -338,9 +354,9 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } if s.config.EnableIngestionFiltering { - err = groupFilteredOutProcessors.Commit(s.ctx) + err = groupFilteredOutProcessors.Flush(s.ctx, s.session) if err != nil { - err = errors.Wrap(err, "Error committing filtered changes from processor") + err = errors.Wrap(err, "Error flushing temp filtered tx from processor") return } if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { @@ -348,9 +364,9 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } } - err = groupTransactionProcessors.Commit(s.ctx) + err = groupTransactionProcessors.Flush(s.ctx, s.session) if err != nil { - err = errors.Wrap(err, "Error committing changes from processor") + err = errors.Wrap(err, "Error flushing changes from processor") return } @@ -390,9 +406,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats.transactionStats, stats.transactionDurations, stats.tradeStats, err = s.RunTransactionProcessorsOnLedger(ledger) - if err != nil { - return - } return } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index c01ee53730..46796807fc 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -234,17 +234,31 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { ctx := context.Background() - maxBatchSize := 100000 q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(&history.MockOperationsBatchInsertBuilder{}).Twice() // Twice = with/without failed q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(&history.MockTransactionsBatchInsertBuilder{}).Twice() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Twice() + Return(&history.MockTransactionsBatchInsertBuilder{}) + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). + Return(&history.MockLedgersBatchInsertBuilder{}) + q.MockQEffects.On("NewEffectBatchInsertBuilder"). + Return(&history.MockEffectBatchInsertBuilder{}) + q.MockQOperations.On("NewOperationBatchInsertBuilder"). + Return(&history.MockOperationsBatchInsertBuilder{}) + q.On("NewTransactionParticipantsBatchInsertBuilder"). + Return(&history.MockTransactionParticipantsBatchInsertBuilder{}) + q.On("NewOperationParticipantBatchInsertBuilder"). + Return(&history.MockOperationParticipantBatchInsertBuilder{}) + q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). + Return(&history.MockTransactionClaimableBalanceBatchInsertBuilder{}) + q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). + Return(&history.MockOperationClaimableBalanceBatchInsertBuilder{}) + q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). + Return(&history.MockTransactionLiquidityPoolBatchInsertBuilder{}) + q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). + Return(&history.MockOperationLiquidityPoolBatchInsertBuilder{}) runner := ProcessorRunner{ ctx: ctx, @@ -254,17 +268,19 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { stats := &processors.StatsLedgerTransactionProcessor{} trades := &processors.TradeProcessor{} - ledger := xdr.LedgerHeaderHistoryEntry{} - processor := runner.buildTransactionProcessor(stats, trades, ledger) - assert.IsType(t, &groupTransactionProcessors{}, processor) + ledgersProcessor := &processors.LedgersProcessor{} + + processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor) + assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) assert.IsType(t, &processors.LedgersProcessor{}, processor.processors[2]) assert.IsType(t, &processors.OperationProcessor{}, processor.processors[3]) assert.IsType(t, &processors.TradeProcessor{}, processor.processors[4]) assert.IsType(t, &processors.ParticipantsProcessor{}, processor.processors[5]) - assert.IsType(t, &processors.TransactionProcessor{}, processor.processors[6]) + assert.IsType(t, &processors.ClaimableBalancesTransactionProcessor{}, processor.processors[7]) + assert.IsType(t, &processors.LiquidityPoolsTransactionProcessor{}, processor.processors[8]) } func TestProcessorRunnerWithFilterEnabled(t *testing.T) { @@ -291,33 +307,17 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { } // Batches - mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) - q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). - Return(mockAccountSignersBatchInsertBuilder).Once() - - mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() - - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Twice() - - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) - + mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder) + mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) - - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + Return(mockTransactionsFilteredTmpBatchInsertBuilder) q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) mockBatchInsertBuilder.On( @@ -364,25 +364,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) - q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). - Return(mockAccountSignersBatchInsertBuilder).Once() - - mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() - - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder).Twice() - - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -429,21 +411,21 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t } // Batches + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). + Return(mockTransactionsBatchInsertBuilder).Twice() mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(mockAccountSignersBatchInsertBuilder).Once() mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). + q.MockQOperations.On("NewOperationBatchInsertBuilder"). Return(mockOperationsBatchInsertBuilder).Twice() - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). - Return(mockTransactionsBatchInsertBuilder).Twice() + defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder, + mockAccountSignersBatchInsertBuilder, + mockOperationsBatchInsertBuilder) runner := ProcessorRunner{ ctx: ctx, @@ -460,3 +442,63 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t ), ) } + +func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, maxBatchSize int) []interface{} { + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + Return(mockTransactionsBatchInsertBuilder) + + mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} + q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). + Return(mockAccountSignersBatchInsertBuilder).Once() + + mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} + mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQOperations.On("NewOperationBatchInsertBuilder"). + Return(mockOperationsBatchInsertBuilder).Twice() + + mockEffectBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} + mockEffectBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQEffects.On("NewEffectBatchInsertBuilder"). + Return(mockEffectBatchInsertBuilder) + + mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} + mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewTransactionParticipantsBatchInsertBuilder"). + Return(mockTransactionsParticipantsBatchInsertBuilder) + + mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} + mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewOperationParticipantBatchInsertBuilder"). + Return(mockOperationParticipantBatchInsertBuilder) + + mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} + mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). + Return(mockTransactionClaimableBalanceBatchInsertBuilder) + + mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} + mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). + Return(mockOperationClaimableBalanceBatchInsertBuilder) + + mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} + mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). + Return(mockTransactionLiquidityPoolBatchInsertBuilder) + + mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} + mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). + Return(mockOperationLiquidityPoolBatchInsertBuilder) + + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + + return []interface{}{mockAccountSignersBatchInsertBuilder, + mockOperationsBatchInsertBuilder, + mockTransactionsBatchInsertBuilder} +} diff --git a/services/horizon/internal/ingest/processors/change_processors.go b/services/horizon/internal/ingest/processors/change_processors.go index 2e5b126d8f..ee9eb127f1 100644 --- a/services/horizon/internal/ingest/processors/change_processors.go +++ b/services/horizon/internal/ingest/processors/change_processors.go @@ -8,63 +8,6 @@ import ( "github.com/stellar/go/support/errors" ) -type ChangeProcessor interface { - ProcessChange(ctx context.Context, change ingest.Change) error -} - -type LedgerTransactionProcessor interface { - ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error -} - -type LedgerTransactionFilterer interface { - FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) -} - -func StreamLedgerTransactions( - ctx context.Context, - txFilterer LedgerTransactionFilterer, - filteredTxProcessor LedgerTransactionProcessor, - txProcessor LedgerTransactionProcessor, - reader *ingest.LedgerTransactionReader, -) error { - for { - tx, err := reader.Read() - if err == io.EOF { - return nil - } - if err != nil { - return errors.Wrap(err, "could not read transaction") - } - include, err := txFilterer.FilterTransaction(ctx, tx) - if err != nil { - return errors.Wrapf( - err, - "could not filter transaction %v", - tx.Index, - ) - } - if !include { - if err = filteredTxProcessor.ProcessTransaction(ctx, tx); err != nil { - return errors.Wrapf( - err, - "could not process transaction %v", - tx.Index, - ) - } - log.Debugf("Filters did not find match on transaction, dropping this tx with hash %v", tx.Result.TransactionHash.HexString()) - continue - } - - if err = txProcessor.ProcessTransaction(ctx, tx); err != nil { - return errors.Wrapf( - err, - "could not process transaction %v", - tx.Index, - ) - } - } -} - func StreamChanges( ctx context.Context, changeProcessor ChangeProcessor, diff --git a/services/horizon/internal/ingest/processors/ledgers_processor.go b/services/horizon/internal/ingest/processors/ledgers_processor.go index 1f14cc5518..942a5f8522 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor.go @@ -32,14 +32,18 @@ func NewLedgerProcessor(batch history.LedgerBatchInsertBuilder, ingestVersion in } } -func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { +func (p *LedgersProcessor) ProcessLedger(lcm xdr.LedgerCloseMeta) *ledgerInfo { sequence := lcm.LedgerSequence() entry, ok := p.ledgers[sequence] if !ok { entry = &ledgerInfo{header: lcm.LedgerHeaderHistoryEntry()} p.ledgers[sequence] = entry } + return entry +} +func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { + entry := p.ProcessLedger(lcm) opCount := len(transaction.Envelope.Operations()) entry.txSetOpCount += opCount if transaction.Result.Successful() { diff --git a/services/horizon/internal/ingest/processors/main.go b/services/horizon/internal/ingest/processors/main.go index 5088dd97aa..94f83f3fa9 100644 --- a/services/horizon/internal/ingest/processors/main.go +++ b/services/horizon/internal/ingest/processors/main.go @@ -1,7 +1,13 @@ package processors import ( + "context" + "io" + "github.com/guregu/null" + "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" + "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -10,6 +16,65 @@ var log = logpkg.DefaultLogger.WithField("service", "ingest") const maxBatchSize = 100000 +type ChangeProcessor interface { + ProcessChange(ctx context.Context, change ingest.Change) error +} + +type LedgerTransactionProcessor interface { + ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error + Flush(ctx context.Context, session db.SessionInterface) error +} + +type LedgerTransactionFilterer interface { + FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) +} + +func StreamLedgerTransactions( + ctx context.Context, + txFilterer LedgerTransactionFilterer, + filteredTxProcessor LedgerTransactionProcessor, + txProcessor LedgerTransactionProcessor, + reader *ingest.LedgerTransactionReader, + ledger xdr.LedgerCloseMeta, +) error { + for { + tx, err := reader.Read() + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrap(err, "could not read transaction") + } + include, err := txFilterer.FilterTransaction(ctx, tx) + if err != nil { + return errors.Wrapf( + err, + "could not filter transaction %v", + tx.Index, + ) + } + if !include { + if err = filteredTxProcessor.ProcessTransaction(ledger, tx); err != nil { + return errors.Wrapf( + err, + "could not process transaction %v", + tx.Index, + ) + } + log.Debugf("Filters did not find match on transaction, dropping this tx with hash %v", tx.Result.TransactionHash.HexString()) + continue + } + + if err = txProcessor.ProcessTransaction(ledger, tx); err != nil { + return errors.Wrapf( + err, + "could not process transaction %v", + tx.Index, + ) + } + } +} + func ledgerEntrySponsorToNullString(entry xdr.LedgerEntry) null.String { sponsoringID := entry.SponsoringID() diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go index b9585d4802..26118b11c4 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -51,7 +52,11 @@ type StatsLedgerTransactionProcessorResults struct { OperationsLiquidityPoolWithdraw int64 } -func (p *StatsLedgerTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { +func (p *StatsLedgerTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { + return nil +} + +func (p *StatsLedgerTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { p.results.Transactions++ ops := int64(len(transaction.Envelope.Operations())) p.results.Operations += ops diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go index f2bc2a5040..c7fc6d7967 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go @@ -1,7 +1,6 @@ package processors import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -23,12 +22,14 @@ func TestStatsLedgerTransactionProcessoAllOpTypesCovered(t *testing.T) { }, }, } + lcm := xdr.LedgerCloseMeta{} + for typ, s := range xdr.OperationTypeToStringMap { tx := txTemplate txTemplate.Envelope.V1.Tx.Operations[0].Body.Type = xdr.OperationType(typ) f := func() { var p StatsLedgerTransactionProcessor - p.ProcessTransaction(context.Background(), tx) + p.ProcessTransaction(lcm, tx) } assert.NotPanics(t, f, s) } @@ -38,16 +39,17 @@ func TestStatsLedgerTransactionProcessoAllOpTypesCovered(t *testing.T) { txTemplate.Envelope.V1.Tx.Operations[0].Body.Type = 20000 f := func() { var p StatsLedgerTransactionProcessor - p.ProcessTransaction(context.Background(), tx) + p.ProcessTransaction(lcm, tx) } assert.Panics(t, f) } func TestStatsLedgerTransactionProcessor(t *testing.T) { processor := &StatsLedgerTransactionProcessor{} + lcm := xdr.LedgerCloseMeta{} // Successful - assert.NoError(t, processor.ProcessTransaction(context.Background(), ingest.LedgerTransaction{ + assert.NoError(t, processor.ProcessTransaction(lcm, ingest.LedgerTransaction{ Result: xdr.TransactionResultPair{ Result: xdr.TransactionResult{ Result: xdr.TransactionResultResult{ @@ -88,7 +90,7 @@ func TestStatsLedgerTransactionProcessor(t *testing.T) { })) // Failed - assert.NoError(t, processor.ProcessTransaction(context.Background(), ingest.LedgerTransaction{ + assert.NoError(t, processor.ProcessTransaction(lcm, ingest.LedgerTransaction{ Result: xdr.TransactionResultPair{ Result: xdr.TransactionResult{ Result: xdr.TransactionResultResult{ diff --git a/services/horizon/internal/ingest/processors/trades_processor.go b/services/horizon/internal/ingest/processors/trades_processor.go index b1084c6e08..d5ee89f51e 100644 --- a/services/horizon/internal/ingest/processors/trades_processor.go +++ b/services/horizon/internal/ingest/processors/trades_processor.go @@ -92,17 +92,38 @@ func (p *TradeProcessor) Flush(ctx context.Context, session db.SessionInterface) for _, trade := range p.trades { row := trade.row if trade.sellerAccount != "" { - row.BaseAccountID = null.IntFrom(p.accountLoader.GetNow(trade.sellerAccount)) + val, err := p.accountLoader.GetNow(trade.sellerAccount) + if err != nil { + return err + } + row.BaseAccountID = null.IntFrom(val) } if trade.buyerAccount != "" { - row.CounterAccountID = null.IntFrom(p.accountLoader.GetNow(trade.buyerAccount)) + val, err := p.accountLoader.GetNow(trade.buyerAccount) + if err != nil { + return err + } + row.CounterAccountID = null.IntFrom(val) } if trade.liquidityPoolID != "" { - row.BaseLiquidityPoolID = null.IntFrom(p.lpLoader.GetNow(trade.liquidityPoolID)) + val, err := p.lpLoader.GetNow(trade.liquidityPoolID) + if err != nil { + return err + } + row.BaseLiquidityPoolID = null.IntFrom(val) + } + + val, err := p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) + if err != nil { + return err } + row.BaseAssetID = val - row.BaseAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) - row.CounterAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) + val, err = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) + if err != nil { + return err + } + row.CounterAssetID = val if row.BaseAssetID > row.CounterAssetID { row.BaseIsSeller = false diff --git a/services/horizon/internal/integration/transaction_preconditions_test.go b/services/horizon/internal/integration/transaction_preconditions_test.go index 44a6baac82..94c0a09c35 100644 --- a/services/horizon/internal/integration/transaction_preconditions_test.go +++ b/services/horizon/internal/integration/transaction_preconditions_test.go @@ -5,89 +5,18 @@ import ( "encoding/base64" "math" "strconv" - "sync" "testing" "time" sdk "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/keypair" "github.com/stellar/go/network" - "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" ) -func TestTransactionPreconditionsMinSeq(t *testing.T) { - tt := assert.New(t) - itest := integration.NewTest(t, integration.Config{}) - if itest.GetEffectiveProtocolVersion() < 19 { - t.Skip("Can't run with protocol < 19") - } - master := itest.Master() - masterAccount := itest.MasterAccount() - currentAccountSeq, err := masterAccount.GetSequenceNumber() - tt.NoError(err) - - // Ensure that the minSequence of the transaction is enough - // but the sequence isn't - txParams := buildTXParams(master, masterAccount, currentAccountSeq+100) - - // this errors because the tx.seqNum is more than +1 from sourceAccoubnt.seqNum - _, err = itest.SubmitTransaction(master, txParams) - tt.Error(err) - - // Now the transaction should be submitted without problems - txParams.Preconditions.MinSequenceNumber = ¤tAccountSeq - tx := itest.MustSubmitTransaction(master, txParams) - - txHistory, err := itest.Client().TransactionDetail(tx.Hash) - assert.NoError(t, err) - assert.Equal(t, txHistory.Preconditions.MinAccountSequence, strconv.FormatInt(*txParams.Preconditions.MinSequenceNumber, 10)) - - // Test the transaction submission queue by sending transactions out of order - // and making sure they are all executed properly - masterAccount = itest.MasterAccount() - currentAccountSeq, err = masterAccount.GetSequenceNumber() - tt.NoError(err) - - seqs := []struct { - minSeq int64 - seq int64 - }{ - {0, currentAccountSeq + 9}, // sent first, executed second - {0, currentAccountSeq + 10}, // sent second, executed third - {currentAccountSeq, currentAccountSeq + 8}, // sent third, executed first - } - - // Send the transactions in parallel since otherwise they are admitted sequentially - var results []horizon.Transaction - var resultsMx sync.Mutex - var wg sync.WaitGroup - wg.Add(len(seqs)) - for _, s := range seqs { - sLocal := s - go func() { - params := buildTXParams(master, masterAccount, sLocal.seq) - if sLocal.minSeq > 0 { - params.Preconditions.MinSequenceNumber = &sLocal.minSeq - } - result := itest.MustSubmitTransaction(master, params) - resultsMx.Lock() - results = append(results, result) - resultsMx.Unlock() - wg.Done() - }() - // Space out requests to ensure the queue receives the transactions - // in the planned order - time.Sleep(time.Millisecond * 50) - } - wg.Wait() - - tt.Len(results, len(seqs)) -} - func TestTransactionPreconditionsTimeBounds(t *testing.T) { tt := assert.New(t) itest := integration.NewTest(t, integration.Config{}) diff --git a/services/horizon/internal/integration/txsub_test.go b/services/horizon/internal/integration/txsub_test.go index 60b8717b18..069aa8be1b 100644 --- a/services/horizon/internal/integration/txsub_test.go +++ b/services/horizon/internal/integration/txsub_test.go @@ -14,11 +14,11 @@ func TestTxsub(t *testing.T) { itest := integration.NewTest(t, integration.Config{}) master := itest.Master() - // Sanity check: create 20 accounts and submit 2 txs from each of them as - // a source at the same time. Then check if the results are correct. t.Run("Sanity", func(t *testing.T) { + // simplify this to one tx per account, to align with core capabilities of one + // tx per account per ledger. testAccounts := 20 - subsPerAccont := 2 + subsPerAccont := 1 keys, accounts := itest.CreateAccounts(testAccounts, "1000") var wg sync.WaitGroup diff --git a/support/db/batch_insert_builder_test.go b/support/db/batch_insert_builder_test.go index e283e8bf57..e0d28e145d 100644 --- a/support/db/batch_insert_builder_test.go +++ b/support/db/batch_insert_builder_test.go @@ -13,6 +13,7 @@ import ( type hungerRow struct { Name string `db:"name"` HungerLevel string `db:"hunger_level"` + JsonValue []byte `db:"json_value"` } type invalidHungerRow struct { diff --git a/support/db/fast_batch_insert_builder_test.go b/support/db/fast_batch_insert_builder_test.go index c31f502735..bfd2f8407b 100644 --- a/support/db/fast_batch_insert_builder_test.go +++ b/support/db/fast_batch_insert_builder_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/support/db/dbtest" @@ -21,6 +22,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", "hunger_level": "1", + "json_value": []byte(`{"bump_to": "97"}`), }), ) @@ -28,13 +30,14 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", }), - "invalid number of columns (expected=2, actual=1)", + "invalid number of columns (expected=3, actual=1)", ) assert.EqualError(t, insertBuilder.Row(map[string]interface{}{ - "name": "bubba", - "city": "London", + "name": "bubba", + "city": "London", + "json_value": []byte(`{"bump_to": "98"}`), }), "column \"hunger_level\" does not exist", ) @@ -43,6 +46,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.RowStruct(hungerRow{ Name: "bubba2", HungerLevel: "9", + JsonValue: []byte(`{"bump_to": "98"}`), }), ) @@ -74,8 +78,8 @@ func TestFastBatchInsertBuilder(t *testing.T) { t, found, []person{ - {Name: "bubba", HungerLevel: "1"}, - {Name: "bubba2", HungerLevel: "9"}, + {Name: "bubba", HungerLevel: "1", JsonValue: null.NewString(`{"bump_to": "97"}`, true)}, + {Name: "bubba2", HungerLevel: "9", JsonValue: null.NewString(`{"bump_to": "98"}`, true)}, }, ) @@ -116,8 +120,8 @@ func TestFastBatchInsertBuilder(t *testing.T) { t, found, []person{ - {Name: "bubba", HungerLevel: "1"}, - {Name: "bubba2", HungerLevel: "9"}, + {Name: "bubba", HungerLevel: "1", JsonValue: null.NewString(`{"bump_to": "97"}`, true)}, + {Name: "bubba2", HungerLevel: "9", JsonValue: null.NewString(`{"bump_to": "98"}`, true)}, }, ) assert.NoError(t, sess.Rollback()) diff --git a/support/db/internal_test.go b/support/db/internal_test.go index 8ce0370a92..3e1a06dabc 100644 --- a/support/db/internal_test.go +++ b/support/db/internal_test.go @@ -7,6 +7,7 @@ const testSchema = ` CREATE TABLE IF NOT EXISTS people ( name character varying NOT NULL, hunger_level integer NOT NULL, + json_value jsonb, PRIMARY KEY (name) ); DELETE FROM people; diff --git a/support/db/main_test.go b/support/db/main_test.go index 68724d197d..301b533aa4 100644 --- a/support/db/main_test.go +++ b/support/db/main_test.go @@ -4,15 +4,16 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stellar/go/support/db/dbtest" "github.com/stretchr/testify/assert" ) type person struct { - Name string `db:"name"` - HungerLevel string `db:"hunger_level"` - - SomethingIgnored int `db:"-"` + Name string `db:"name"` + HungerLevel string `db:"hunger_level"` + JsonValue null.String `db:"json_value"` + SomethingIgnored int `db:"-"` } func TestGetTable(t *testing.T) {