diff --git a/services/horizon/internal/actions/account_test.go b/services/horizon/internal/actions/account_test.go index b12b75af0c..a16d216240 100644 --- a/services/horizon/internal/actions/account_test.go +++ b/services/horizon/internal/actions/account_test.go @@ -179,7 +179,11 @@ func TestAccountInfo(t *testing.T) { Thresholds: thresholds, Flags: 0, } - _, err := q.InsertAccount(accountEntry, 4) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(accountEntry, 4) + assert.NoError(t, err) + assert.NoError(t, batch.Exec()) + tt.Assert.NoError(err) _, err = q.InsertTrustLine(xdr.TrustLineEntry{ @@ -239,7 +243,15 @@ func TestAccountInfo(t *testing.T) { // even though horizon ingestion account differs from core account, // no error is returned because they have different last modified ledgers - _, err = q.UpdateAccount(accountEntry, 5) + err = q.UpsertAccounts([]xdr.LedgerEntry{ + xdr.LedgerEntry{ + LastModifiedLedgerSeq: 5, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &accountEntry, + }, + }, + }) tt.Assert.NoError(err) account, err = AccountInfo( @@ -300,12 +312,14 @@ func TestGetAccountsHandlerPageResultsBySigner(t *testing.T) { q := &history.Q{tt.HorizonSession()} handler := &GetAccountsHandler{} - _, err := q.InsertAccount(account1, 1234) - tt.Assert.NoError(err) - _, err = q.InsertAccount(account2, 1234) - tt.Assert.NoError(err) - _, err = q.InsertAccount(account3, 1234) - tt.Assert.NoError(err) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) + assert.NoError(t, err) + err = batch.Add(account2, 1234) + assert.NoError(t, err) + err = batch.Add(account3, 1234) + assert.NoError(t, err) + assert.NoError(t, batch.Exec()) for _, row := range accountSigners { q.CreateAccountSigner(row.Account, row.Signer, row.Weight) @@ -378,10 +392,12 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) { q := &history.Q{tt.HorizonSession()} handler := &GetAccountsHandler{} - _, err := q.InsertAccount(account1, 1234) - tt.Assert.NoError(err) - _, err = q.InsertAccount(account2, 1234) - tt.Assert.NoError(err) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) + assert.NoError(t, err) + err = batch.Add(account2, 1234) + assert.NoError(t, err) + assert.NoError(t, batch.Exec()) for _, row := range accountSigners { _, err = q.CreateAccountSigner(row.Account, row.Signer, row.Weight) diff --git a/services/horizon/internal/actions/asset_test.go b/services/horizon/internal/actions/asset_test.go index 25985a2dc3..d52d20e04d 100644 --- a/services/horizon/internal/actions/asset_test.go +++ b/services/horizon/internal/actions/asset_test.go @@ -219,9 +219,10 @@ func TestAssetStats(t *testing.T) { if err := accountEntry.AccountId.SetAddress(account.AccountID); err != nil { t.Fatalf("unexpected error %v", err) } - numChanged, err := q.InsertAccount(accountEntry, 3) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(accountEntry, 3) tt.Assert.NoError(err) - tt.Assert.Equal(numChanged, int64(1)) + tt.Assert.NoError(batch.Exec()) } for _, testCase := range []struct { diff --git a/services/horizon/internal/db2/history/account.go b/services/horizon/internal/db2/history/account.go index 85571542c3..145f5ef0a4 100644 --- a/services/horizon/internal/db2/history/account.go +++ b/services/horizon/internal/db2/history/account.go @@ -1,11 +1,12 @@ package history import ( + "sort" + sq "github.com/Masterminds/squirrel" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" ) // Accounts provides a helper to filter rows from the `history_accounts` table @@ -23,12 +24,6 @@ func (q *Q) AccountByAddress(dest interface{}, addy string) error { return q.Get(dest, sql) } -// AccountByID loads a row from `history_accounts`, by id -func (q *Q) AccountByID(dest interface{}, id int64) error { - sql := selectAccount.Limit(1).Where("ha.id = ?", id) - return q.Get(dest, sql) -} - // Page specifies the paging constraints for the query being built by `q`. func (q *AccountsQ) Page(page db2.PageQuery) *AccountsQ { if q.Err != nil { @@ -66,6 +61,9 @@ func (q *Q) CreateAccounts(addresses []string, batchSize int) (map[string]int64, Suffix: "ON CONFLICT (address) DO NOTHING", } + // sort assets before inserting rows into history_assets to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Strings(addresses) for _, address := range addresses { err := builder.Row(map[string]interface{}{ "address": address, @@ -103,35 +101,4 @@ func (q *Q) CreateAccounts(addresses []string, batchSize int) (map[string]int64, return addressToID, nil } -// Return id for account. If account doesn't exist, it will be created and the new id returned. -// `ON CONFLICT` is required when running a distributed ingestion. -func (q *Q) GetCreateAccountID( - aid xdr.AccountId, -) (result int64, err error) { - - var existing Account - - err = q.AccountByAddress(&existing, aid.Address()) - - //account already exists, return id - if err == nil { - result = existing.ID - return - } - - // unexpected error - if !q.NoRows(err) { - return - } - - //insert account and return id - err = q.GetRaw( - &result, - `INSERT INTO history_accounts (address) VALUES (?) ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING id`, - aid.Address(), - ) - - return -} - var selectAccount = sq.Select("ha.*").From("history_accounts ha") diff --git a/services/horizon/internal/db2/history/account_test.go b/services/horizon/internal/db2/history/account_test.go index 48ecd2c6e2..d59d863507 100644 --- a/services/horizon/internal/db2/history/account_test.go +++ b/services/horizon/internal/db2/history/account_test.go @@ -1,6 +1,7 @@ package history import ( + "sort" "testing" "github.com/stellar/go/services/horizon/internal/test" @@ -61,6 +62,41 @@ func assertAccountsContainAddresses(tt *test.T, accounts map[string]int64, addre } } +func TestCreateAccountsSortedOrder(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &Q{tt.HorizonSession()} + + addresses := []string{ + "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", + "GCYVFGI3SEQJGBNQQG7YCMFWEYOHK3XPVOVPA6C566PXWN4SN7LILZSM", + "GBYSBDAJZMHL5AMD7QXQ3JEP3Q4GLKADWIJURAAHQALNAWD6Z5XF2RAC", + "GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB", + } + accounts, err := q.CreateAccounts(addresses, 1) + tt.Assert.NoError(err) + + idToAddress := map[int64]string{} + sortedIDs := []int64{} + for address, id := range accounts { + idToAddress[id] = address + sortedIDs = append(sortedIDs, id) + } + + sort.Slice(sortedIDs, func(i, j int) bool { + return sortedIDs[i] < sortedIDs[j] + }) + sort.Strings(addresses) + + values := []string{} + for _, id := range sortedIDs { + values = append(values, idToAddress[id]) + } + + tt.Assert.Equal(addresses, values) +} + func TestCreateAccounts(t *testing.T) { tt := test.Start(t) defer tt.Finish() diff --git a/services/horizon/internal/db2/history/accounts.go b/services/horizon/internal/db2/history/accounts.go index 90604e8525..e2c2b2b625 100644 --- a/services/horizon/internal/db2/history/accounts.go +++ b/services/horizon/internal/db2/history/accounts.go @@ -83,37 +83,6 @@ func accountToMap(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) map[s } } -// InsertAccount creates a row in the accounts table. -// Returns number of rows affected and error. -func (q *Q) InsertAccount(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) (int64, error) { - m := accountToMap(account, lastModifiedLedger) - - sql := sq.Insert("accounts").SetMap(m) - result, err := q.Exec(sql) - if err != nil { - return 0, err - } - - return result.RowsAffected() -} - -// UpdateAccount updates a row in the offers table. -// Returns number of rows affected and error. -func (q *Q) UpdateAccount(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) (int64, error) { - m := accountToMap(account, lastModifiedLedger) - - accountID := m["account_id"] - delete(m, "account_id") - - sql := sq.Update("accounts").SetMap(m).Where(sq.Eq{"account_id": accountID}) - result, err := q.Exec(sql) - if err != nil { - return 0, err - } - - return result.RowsAffected() -} - // UpsertAccounts upserts a batch of accounts in the accounts table. // There's currently no limit of the number of accounts this method can // accept other than 2GB limit of the query string length what should be enough diff --git a/services/horizon/internal/db2/history/accounts_batch_insert_builder.go b/services/horizon/internal/db2/history/accounts_batch_insert_builder.go index 9b2bcca263..32b0175f63 100644 --- a/services/horizon/internal/db2/history/accounts_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/accounts_batch_insert_builder.go @@ -1,6 +1,14 @@ package history -import "github.com/stellar/go/xdr" +import ( + "github.com/stellar/go/support/db" + "github.com/stellar/go/xdr" +) + +// accountsBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder +type accountsBatchInsertBuilder struct { + builder db.BatchInsertBuilder +} func (i *accountsBatchInsertBuilder) Add(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) error { return i.builder.Row(accountToMap(account, lastModifiedLedger)) diff --git a/services/horizon/internal/db2/history/accounts_test.go b/services/horizon/internal/db2/history/accounts_test.go index 05aac0a77f..b629f1382b 100644 --- a/services/horizon/internal/db2/history/accounts_test.go +++ b/services/horizon/internal/db2/history/accounts_test.go @@ -78,13 +78,12 @@ func TestInsertAccount(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - rows, err := q.InsertAccount(account1, 1234) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) assert.NoError(t, err) - assert.Equal(t, int64(1), rows) - - rows, err = q.InsertAccount(account2, 1235) + err = batch.Add(account2, 1235) assert.NoError(t, err) - assert.Equal(t, int64(1), rows) + assert.NoError(t, batch.Exec()) accounts, err := q.GetAccountsByIDs([]string{ "GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB", @@ -108,65 +107,6 @@ func TestInsertAccount(t *testing.T) { assert.Equal(t, int64(4), accounts[0].SellingLiabilities) } -func TestUpdateAccount(t *testing.T) { - tt := test.Start(t) - defer tt.Finish() - test.ResetHorizonDB(t, tt.HorizonDB) - q := &Q{tt.HorizonSession()} - - rows, err := q.InsertAccount(account1, 1234) - assert.NoError(t, err) - assert.Equal(t, int64(1), rows) - - modifiedAccount := account1 - modifiedAccount.Balance = 32847893 - - rows, err = q.UpdateAccount(modifiedAccount, 1235) - assert.NoError(t, err) - assert.Equal(t, int64(1), rows) - - keys := []string{ - "GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB", - "GCT2NQM5KJJEF55NPMY444C6M6CA7T33HRNCMA6ZFBIIXKNCRO6J25K7", - } - accounts, err := q.GetAccountsByIDs(keys) - assert.NoError(t, err) - assert.Len(t, accounts, 1) - - expectedBinary, err := modifiedAccount.MarshalBinary() - assert.NoError(t, err) - - dbEntry := xdr.AccountEntry{ - AccountId: xdr.MustAddress(accounts[0].AccountID), - Balance: xdr.Int64(accounts[0].Balance), - SeqNum: xdr.SequenceNumber(accounts[0].SequenceNumber), - NumSubEntries: xdr.Uint32(accounts[0].NumSubEntries), - InflationDest: &inflationDest, - Flags: xdr.Uint32(accounts[0].Flags), - HomeDomain: xdr.String32(accounts[0].HomeDomain), - Thresholds: xdr.Thresholds{ - accounts[0].MasterWeight, - accounts[0].ThresholdLow, - accounts[0].ThresholdMedium, - accounts[0].ThresholdHigh, - }, - Ext: xdr.AccountEntryExt{ - V: 1, - V1: &xdr.AccountEntryV1{ - Liabilities: xdr.Liabilities{ - Buying: xdr.Int64(accounts[0].BuyingLiabilities), - Selling: xdr.Int64(accounts[0].SellingLiabilities), - }, - }, - }, - } - - actualBinary, err := dbEntry.MarshalBinary() - assert.NoError(t, err) - assert.Equal(t, expectedBinary, actualBinary) - assert.Equal(t, uint32(1235), accounts[0].LastModifiedLedger) -} - func TestUpsertAccount(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -293,10 +233,12 @@ func TestRemoveAccount(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - rows, err := q.InsertAccount(account1, 1234) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) assert.NoError(t, err) - assert.Equal(t, int64(1), rows) + assert.NoError(t, batch.Exec()) + var rows int64 rows, err = q.RemoveAccount("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB") assert.NoError(t, err) assert.Equal(t, int64(1), rows) @@ -320,10 +262,12 @@ func TestAccountsForAsset(t *testing.T) { eurTrustLine.AccountId = account1.AccountId usdTrustLine.AccountId = account2.AccountId - _, err := q.InsertAccount(account1, 1234) - tt.Assert.NoError(err) - _, err = q.InsertAccount(account2, 1235) - tt.Assert.NoError(err) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) + assert.NoError(t, err) + err = batch.Add(account2, 1235) + assert.NoError(t, err) + assert.NoError(t, batch.Exec()) _, err = q.InsertTrustLine(eurTrustLine, 1234) tt.Assert.NoError(err) @@ -371,12 +315,14 @@ func TestAccountEntriesForSigner(t *testing.T) { eurTrustLine.AccountId = account1.AccountId usdTrustLine.AccountId = account2.AccountId - _, err := q.InsertAccount(account1, 1234) - tt.Assert.NoError(err) - _, err = q.InsertAccount(account2, 1235) - tt.Assert.NoError(err) - _, err = q.InsertAccount(account3, 1235) - tt.Assert.NoError(err) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) + assert.NoError(t, err) + err = batch.Add(account2, 1235) + assert.NoError(t, err) + err = batch.Add(account3, 1235) + assert.NoError(t, err) + assert.NoError(t, batch.Exec()) _, err = q.InsertTrustLine(eurTrustLine, 1234) tt.Assert.NoError(err) @@ -454,8 +400,10 @@ func TestGetAccountByID(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - _, err := q.InsertAccount(account1, 1234) - tt.Assert.NoError(err) + batch := q.NewAccountsBatchInsertBuilder(0) + err := batch.Add(account1, 1234) + assert.NoError(t, err) + assert.NoError(t, batch.Exec()) resultAccount, err := q.GetAccountByID(account1.AccountId.Address()) assert.NoError(t, err) diff --git a/services/horizon/internal/db2/history/asset.go b/services/horizon/internal/db2/history/asset.go index 65991d07d4..09bd34da50 100644 --- a/services/horizon/internal/db2/history/asset.go +++ b/services/horizon/internal/db2/history/asset.go @@ -1,35 +1,15 @@ package history import ( + "sort" + sq "github.com/Masterminds/squirrel" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) -func (q *Q) GetAssetByID(dest interface{}, id int64) (err error) { - sql := sq.Select("id", "asset_type", "asset_code", "asset_issuer").From("history_assets").Limit(1).Where(sq.Eq{"id": id}) - err = q.Get(dest, sql) - return -} - -// GetAssetIDs fetches the ids for many Assets at once -func (q *Q) GetAssetIDs(assets []xdr.Asset) ([]int64, error) { - list := make([]string, 0, len(assets)) - for _, asset := range assets { - list = append(list, asset.String()) - } - - sql := sq.Select("id").From("history_assets").Where(sq.Eq{ - "concat(asset_type, '/', asset_code, '/', asset_issuer)": list, - }) - - var ids []int64 - err := q.Select(&ids, sql) - return ids, err -} - -// GetAssetID fetches the id for an Asset. If fetching multiple values, look at GetAssetIDs +// GetAssetID fetches the id for an Asset func (q *Q) GetAssetID(asset xdr.Asset) (id int64, err error) { var ( assetType string @@ -51,42 +31,6 @@ func (q *Q) GetAssetID(asset xdr.Asset) (id int64, err error) { return } -// Get asset row id. If asset is first seen, it will be inserted and the new id returned. -func (q *Q) GetCreateAssetID( - asset xdr.Asset, -) (result int64, err error) { - - result, err = q.GetAssetID(asset) - - //asset exists, return id - if err == nil { - return - } - - //unexpected error - if !q.NoRows(err) { - return - } - - //insert asset and return id - var ( - assetType string - assetCode string - assetIssuer string - ) - - err = asset.Extract(&assetType, &assetCode, &assetIssuer) - if err != nil { - return - } - - err = q.GetRaw(&result, - `INSERT INTO history_assets (asset_type, asset_code, asset_issuer) VALUES (?,?,?) RETURNING id`, - assetType, assetCode, assetIssuer) - - return -} - // CreateAssets creates rows in the history_assets table for a given list of assets. func (q *Q) CreateAssets(assets []xdr.Asset, batchSize int) (map[string]Asset, error) { searchStrings := make([]string, 0, len(assets)) @@ -98,6 +42,11 @@ func (q *Q) CreateAssets(assets []xdr.Asset, batchSize int) (map[string]Asset, e Suffix: "ON CONFLICT (asset_code, asset_type, asset_issuer) DO NOTHING", } + // sort assets before inserting rows into history_assets to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(assets, func(i, j int) bool { + return assets[i].String() < assets[j].String() + }) for _, asset := range assets { var assetType, assetCode, assetIssuer string err := asset.Extract(&assetType, &assetCode, &assetIssuer) diff --git a/services/horizon/internal/db2/history/asset_test.go b/services/horizon/internal/db2/history/asset_test.go index 122da37b3c..c345c43805 100644 --- a/services/horizon/internal/db2/history/asset_test.go +++ b/services/horizon/internal/db2/history/asset_test.go @@ -1,12 +1,55 @@ package history import ( + "sort" "testing" "github.com/stellar/go/services/horizon/internal/test" "github.com/stellar/go/xdr" ) +func TestCreateAssetsSortedOrder(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + assets := []xdr.Asset{ + usdAsset, nativeAsset, eurAsset, + xdr.MustNewCreditAsset("CNY", issuer.Address()), + } + assetMap, err := q.CreateAssets( + assets, + 2, + ) + tt.Assert.NoError(err) + + idsToAsset := map[int64]string{} + sortedIDs := []int64{} + for assetString, asset := range assetMap { + idsToAsset[asset.ID] = assetString + sortedIDs = append(sortedIDs, asset.ID) + } + + sort.Slice(assets, func(i, j int) bool { + return assets[i].String() < assets[j].String() + }) + sort.Slice(sortedIDs, func(i, j int) bool { + return sortedIDs[i] < sortedIDs[j] + }) + + var assetStrings []string + for _, asset := range assets { + assetStrings = append(assetStrings, asset.String()) + } + + var values []string + for _, id := range sortedIDs { + values = append(values, idsToAsset[id]) + } + tt.Assert.Equal(assetStrings, values) +} + func TestCreateAssets(t *testing.T) { tt := test.Start(t) defer tt.Finish() diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 26599b83eb..e9452cde53 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -149,11 +149,6 @@ type AccountsBatchInsertBuilder interface { Exec() error } -// accountsBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder -type accountsBatchInsertBuilder struct { - builder db.BatchInsertBuilder -} - type IngestionQ interface { QAccounts QAssetStats @@ -193,8 +188,6 @@ type IngestionQ interface { type QAccounts interface { NewAccountsBatchInsertBuilder(maxBatchSize int) AccountsBatchInsertBuilder GetAccountsByIDs(ids []string) ([]AccountEntry, error) - InsertAccount(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) (int64, error) - UpdateAccount(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) (int64, error) UpsertAccounts(accounts []xdr.LedgerEntry) error RemoveAccount(accountID string) (int64, error) } diff --git a/services/horizon/internal/db2/history/trade.go b/services/horizon/internal/db2/history/trade.go index 42bd7f0b3c..6b46a88b89 100644 --- a/services/horizon/internal/db2/history/trade.go +++ b/services/horizon/internal/db2/history/trade.go @@ -7,8 +7,6 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/services/horizon/internal/toid" - "github.com/stellar/go/support/errors" - supportTime "github.com/stellar/go/support/time" "github.com/stellar/go/xdr" ) @@ -217,114 +215,6 @@ var selectReverseTradeFields = sq.Select( "htrd.price_n as price_d", ) -var tradesInsert = sq.Insert("history_trades").Columns( - "history_operation_id", - "\"order\"", - "ledger_closed_at", - "offer_id", - "base_offer_id", - "base_account_id", - "base_asset_id", - "base_amount", - "counter_offer_id", - "counter_account_id", - "counter_asset_id", - "counter_amount", - "base_is_seller", - "price_n", - "price_d", -) - -// Trade records a trade into the history_trades table -func (q *Q) InsertTrade( - opid int64, - order int32, - buyer xdr.AccountId, - buyOfferExists bool, - buyOffer xdr.OfferEntry, - trade xdr.ClaimOfferAtom, - sellPrice xdr.Price, - ledgerClosedAt supportTime.Millis, -) error { - sellerAccountId, err := q.GetCreateAccountID(trade.SellerId) - if err != nil { - return errors.Wrap(err, "failed to load seller account id") - } - - buyerAccountId, err := q.GetCreateAccountID(buyer) - if err != nil { - return errors.Wrap(err, "failed to load buyer account id") - } - - soldAssetId, err := q.GetCreateAssetID(trade.AssetSold) - if err != nil { - return errors.Wrap(err, "failed to get sold asset id") - } - - boughtAssetId, err := q.GetCreateAssetID(trade.AssetBought) - if err != nil { - return errors.Wrap(err, "failed to get bought asset id") - } - - sellOfferId := EncodeOfferId(uint64(trade.OfferId), CoreOfferIDType) - - // if the buy offer exists, encode the stellar core generated id as the offer id - // if not, encode the toid as the offer id - var buyOfferId int64 - if buyOfferExists { - buyOfferId = EncodeOfferId(uint64(buyOffer.OfferId), CoreOfferIDType) - } else { - buyOfferId = EncodeOfferId(uint64(opid), TOIDType) - } - - orderPreserved, baseAssetId, counterAssetId := getCanonicalAssetOrder(soldAssetId, boughtAssetId) - - var baseAccountId, counterAccountId int64 - var baseAmount, counterAmount xdr.Int64 - var baseOfferId, counterOfferId int64 - - if orderPreserved { - baseAccountId = sellerAccountId - baseAmount = trade.AmountSold - counterAccountId = buyerAccountId - counterAmount = trade.AmountBought - baseOfferId = sellOfferId - counterOfferId = buyOfferId - } else { - baseAccountId = buyerAccountId - baseAmount = trade.AmountBought - counterAccountId = sellerAccountId - counterAmount = trade.AmountSold - baseOfferId = buyOfferId - counterOfferId = sellOfferId - sellPrice.Invert() - } - - sql := tradesInsert.Values( - opid, - order, - ledgerClosedAt.ToTime(), - trade.OfferId, - baseOfferId, - baseAccountId, - baseAssetId, - baseAmount, - counterOfferId, - counterAccountId, - counterAssetId, - counterAmount, - orderPreserved, - sellPrice.N, - sellPrice.D, - ) - - _, err = q.Exec(sql) - if err != nil { - return errors.Wrap(err, "failed to exec sql") - } - return nil -} - func getCanonicalAssetOrder(assetId1 int64, assetId2 int64) (orderPreserved bool, baseAssetId int64, counterAssetId int64) { if assetId1 < assetId2 { return true, assetId1, assetId2 diff --git a/services/horizon/internal/db2/history/trade_test.go b/services/horizon/internal/db2/history/trade_test.go index 994ff41a5e..16784e0569 100644 --- a/services/horizon/internal/db2/history/trade_test.go +++ b/services/horizon/internal/db2/history/trade_test.go @@ -313,28 +313,3 @@ func TestBatchInsertTrade(t *testing.T) { ) } } - -func createTradeRows( - tt *test.T, q *Q, - idToAccount map[int64]xdr.AccountId, - idToAsset map[int64]xdr.Asset, - entries ...InsertTrade, -) { - for _, entry := range entries { - entry.Trade.SellerId = idToAccount[entry.SellerAccountID] - entry.Trade.AssetSold = idToAsset[entry.SoldAssetID] - entry.Trade.AssetBought = idToAsset[entry.BoughtAssetID] - - err := q.InsertTrade( - entry.HistoryOperationID, - entry.Order, - idToAccount[entry.BuyerAccountID], - entry.BuyOfferExists, - xdr.OfferEntry{OfferId: xdr.Int64(entry.BuyOfferID)}, - entry.Trade, - entry.SellPrice, - supportTime.MillisFromSeconds(entry.LedgerCloseTime.Unix()), - ) - tt.Assert.NoError(err) - } -} diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index 3fa932044b..6626d6496a 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -535,8 +535,10 @@ func (h historyRangeState) run(s *System) (transition, error) { return start(), nil } - if err = ingestHistoryRange(s, h.fromLedger, h.toLedger); err != nil { - return start(), err + for cur := h.fromLedger; cur <= h.toLedger; cur++ { + if err = runTransactionProcessorsOnLedger(s, cur); err != nil { + return start(), err + } } if err = s.historyQ.Commit(); err != nil { @@ -546,34 +548,32 @@ func (h historyRangeState) run(s *System) (transition, error) { return start(), nil } -func ingestHistoryRange(s *System, from, to uint32) error { - for cur := from; cur <= to; cur++ { - log.WithFields(logpkg.F{ - "sequence": cur, +func runTransactionProcessorsOnLedger(s *System, ledger uint32) error { + log.WithFields(logpkg.F{ + "sequence": ledger, + "state": false, + "ledger": true, + "graph": false, + "commit": false, + }).Info("Processing ledger") + startTime := time.Now() + + ledgerTransactionStats, err := s.runner.RunTransactionProcessorsOnLedger(ledger) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", ledger)) + } + + log. + WithFields(ledgerTransactionStats.Map()). + WithFields(logpkg.F{ + "sequence": ledger, + "duration": time.Since(startTime).Seconds(), "state": false, "ledger": true, "graph": false, "commit": false, - }).Info("Processing ledger") - startTime := time.Now() - - ledgerTransactionStats, err := s.runner.RunTransactionProcessorsOnLedger(cur) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error processing ledger sequence=%d", cur)) - } - - log. - WithFields(ledgerTransactionStats.Map()). - WithFields(logpkg.F{ - "sequence": cur, - "duration": time.Since(startTime).Seconds(), - "state": false, - "ledger": true, - "graph": false, - "commit": false, - }). - Info("Processed ledger") - } + }). + Info("Processed ledger") return nil } @@ -592,6 +592,34 @@ func (h reingestHistoryRangeState) String() string { ) } +func (h reingestHistoryRangeState) ingestRange(s *System, fromLedger, toLedger uint32) error { + if s.historyQ.GetTx() == nil { + return errors.New("expected transaction to be present") + } + + // Clear history data before ingesting - used in `reingest range` command. + start, end, err := toid.LedgerRangeInclusive( + int32(fromLedger), + int32(toLedger), + ) + if err != nil { + return errors.Wrap(err, "Invalid range") + } + + err = s.historyQ.DeleteRangeAll(start, end) + if err != nil { + return errors.Wrap(err, "error in DeleteRangeAll") + } + + for cur := fromLedger; cur <= toLedger; cur++ { + if err = runTransactionProcessorsOnLedger(s, cur); err != nil { + return err + } + } + + return nil +} + // reingestHistoryRangeState is used as a command to reingest historical data func (h reingestHistoryRangeState) run(s *System) (transition, error) { if h.fromLedger == 0 || h.toLedger == 0 || @@ -599,16 +627,24 @@ func (h reingestHistoryRangeState) run(s *System) (transition, error) { return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) } - if err := s.historyQ.Begin(); err != nil { - return stop(), errors.Wrap(err, "Error starting a transaction") - } - defer s.historyQ.Rollback() - if h.force { + if err := s.historyQ.Begin(); err != nil { + return stop(), errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + // acquire distributed lock so no one else can perform ingestion operations. if _, err := s.historyQ.GetLastLedgerExpIngest(); err != nil { return stop(), errors.Wrap(err, getLastIngestedErrMsg) } + + if err := h.ingestRange(s, h.fromLedger, h.toLedger); err != nil { + return stop(), err + } + + if err := s.historyQ.Commit(); err != nil { + return stop(), errors.Wrap(err, commitErrMsg) + } } else { lastIngestedLedger, err := s.historyQ.GetLastLedgerExpIngestNonBlocking() if err != nil { @@ -618,28 +654,30 @@ func (h reingestHistoryRangeState) run(s *System) (transition, error) { if lastIngestedLedger > 0 && h.toLedger >= lastIngestedLedger { return stop(), ErrReingestRangeConflict } - } - - // Clear history data before ingesting - used in `reingest range` command. - start, end, err := toid.LedgerRangeInclusive( - int32(h.fromLedger), - int32(h.toLedger), - ) - if err != nil { - return stop(), errors.Wrap(err, "Invalid range") - } - err = s.historyQ.DeleteRangeAll(start, end) - if err != nil { - return stop(), errors.Wrap(err, "error in DeleteRangeAll") - } - - if err = ingestHistoryRange(s, h.fromLedger, h.toLedger); err != nil { - return stop(), err - } - - if err := s.historyQ.Commit(); err != nil { - return stop(), errors.Wrap(err, commitErrMsg) + for cur := h.fromLedger; cur <= h.toLedger; cur++ { + err := func(ledger uint32) error { + if err := s.historyQ.Begin(); err != nil { + return errors.Wrap(err, "Error starting a transaction") + } + defer s.historyQ.Rollback() + + // ingest each ledger in a separate transaction to prevent deadlocks + // when acquiring ShareLocks from multiple parallel reingest range processes + if err := h.ingestRange(s, ledger, ledger); err != nil { + return err + } + + if err := s.historyQ.Commit(); err != nil { + return errors.Wrap(err, commitErrMsg) + } + + return nil + }(cur) + if err != nil { + return stop(), err + } + } } return stop(), nil diff --git a/services/horizon/internal/expingest/ingest_history_range_state_test.go b/services/horizon/internal/expingest/ingest_history_range_state_test.go index e9c279d1dd..f9ce94338f 100644 --- a/services/horizon/internal/expingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/expingest/ingest_history_range_state_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/jmoiron/sqlx" "github.com/stellar/go/exp/ingest/adapters" "github.com/stellar/go/exp/ingest/io" "github.com/stellar/go/services/horizon/internal/toid" @@ -228,6 +229,8 @@ func (s *ReingestHistoryRangeStateTestSuite) TestBeginReturnsError() { // Recreate mock in this single test to remove Rollback assertion. *s.historyQ = mockDBQ{} s.historyQ.On("GetTx").Return(nil) + s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() + s.historyQ.On("Begin").Return(errors.New("my error")).Once() err := s.system.ReingestRange(100, 200, false) @@ -235,6 +238,9 @@ func (s *ReingestHistoryRangeStateTestSuite) TestBeginReturnsError() { } func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerExpIngestNonBlockingError() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), errors.New("my error")).Once() err := s.system.ReingestRange(100, 200, false) @@ -242,6 +248,9 @@ func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerExpIngestNonBlocki } func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlaps() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(190), nil).Once() err := s.system.ReingestRange(100, 200, false) @@ -249,6 +258,9 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlaps() { } func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlapsAtEnd() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() + s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(200), nil).Once() err := s.system.ReingestRange(100, 200, false) @@ -256,67 +268,87 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeOverlapsAtEnd() { } func (s *ReingestHistoryRangeStateTestSuite) TestClearHistoryFails() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(201, 0, 0) + toidTo := toid.New(101, 0, 0) s.historyQ.On( "DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(errors.New("my error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() + err := s.system.ReingestRange(100, 200, false) s.Assert().EqualError(err, "error in DeleteRangeAll: my error") } func (s *ReingestHistoryRangeStateTestSuite) TestRunTransactionProcessorsOnLedgerReturnsError() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(201, 0, 0) + toidTo := toid.New(101, 0, 0) s.historyQ.On( "DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)). Return(io.StatsLedgerTransactionProcessorResults{}, errors.New("my error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() err := s.system.ReingestRange(100, 200, false) s.Assert().EqualError(err, "error processing ledger sequence=100: my error") } func (s *ReingestHistoryRangeStateTestSuite) TestCommitFails() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(201, 0, 0) + toidTo := toid.New(101, 0, 0) s.historyQ.On( "DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(), ).Return(nil).Once() - for i := 100; i <= 200; i++ { - s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once() - } + s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("Commit").Return(errors.New("my error")).Once() + s.historyQ.On("Rollback").Return(nil).Once() err := s.system.ReingestRange(100, 200, false) s.Assert().EqualError(err, "Error committing db transaction: my error") } func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { + *s.historyQ = mockDBQ{} + s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() - toidFrom := toid.New(100, 0, 0) - toidTo := toid.New(201, 0, 0) - s.historyQ.On( - "DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(), - ).Return(nil).Once() + for i := uint32(100); i <= uint32(200); i++ { + s.historyQ.On("Begin").Return(nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() - for i := 100; i <= 200; i++ { - s.runner.On("RunTransactionProcessorsOnLedger", uint32(i)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once() - } + toidFrom := toid.New(int32(i), 0, 0) + toidTo := toid.New(int32(i+1), 0, 0) + s.historyQ.On( + "DeleteRangeAll", toidFrom.ToInt64(), toidTo.ToInt64(), + ).Return(nil).Once() - s.historyQ.On("Commit").Return(nil).Once() + s.runner.On("RunTransactionProcessorsOnLedger", i).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once() + + s.historyQ.On("Commit").Return(nil).Once() + s.historyQ.On("Rollback").Return(nil).Once() + } err := s.system.ReingestRange(100, 200, false) s.Assert().NoError(err) @@ -325,6 +357,8 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + toidFrom := toid.New(100, 0, 0) toidTo := toid.New(101, 0, 0) s.historyQ.On( @@ -348,6 +382,8 @@ func (s *ReingestHistoryRangeStateTestSuite) TestGetLastLedgerExpIngestError() { func (s *ReingestHistoryRangeStateTestSuite) TestReingestRangeForce() { s.historyQ.On("GetLastLedgerExpIngest").Return(uint32(190), nil).Once() + s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() + toidFrom := toid.New(100, 0, 0) toidTo := toid.New(201, 0, 0) s.historyQ.On( diff --git a/services/horizon/internal/test/trades/main.go b/services/horizon/internal/test/trades/main.go index 7e3b6f2108..5a07624024 100644 --- a/services/horizon/internal/test/trades/main.go +++ b/services/horizon/internal/test/trades/main.go @@ -49,7 +49,30 @@ func IngestTestTrade( D: xdr.Int32(amountSold), } - return q.InsertTrade(opCounter, 0, buyer, false, xdr.OfferEntry{}, trade, price, timestamp) + accounts, err := q.CreateAccounts([]string{seller.Address(), buyer.Address()}, 2) + if err != nil { + return err + } + assets, err := q.CreateAssets([]xdr.Asset{assetBought, assetSold}, 2) + if err != nil { + return err + } + + batch := q.NewTradeBatchInsertBuilder(0) + batch.Add(history.InsertTrade{ + HistoryOperationID: opCounter, + Order: 0, + BuyOfferExists: false, + BuyOfferID: 0, + BoughtAssetID: assets[assetBought.String()].ID, + SoldAssetID: assets[assetSold.String()].ID, + LedgerCloseTime: timestamp.ToTime(), + SellPrice: price, + Trade: trade, + BuyerAccountID: accounts[buyer.Address()], + SellerAccountID: accounts[seller.Address()], + }) + return batch.Exec() } //PopulateTestTrades generates and ingests trades between two assets according to given parameters