Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Mitigate deadlock when running…
Browse files Browse the repository at this point in the history
… reingest range (#2373)

* Remove unused db functions

* Sort assets and accounts before inserting into assets and accounts tables

* Use separate transaction for each ledger in reingest range


Running horizon db reingest range multiple times in parallel sometimes causes a deadlock in Postgres where multiple connections inserting into history_accounts are blocked trying to acquire a ShareLock.

ShareLocks are row locks that are taken on each row as it is written by the transaction. ShareLock deadlocks occurs when two separate transactions try to write the same rows in opposite order.

That is why assets and accounts are sorted before insertion.

Another change in this PR which should mitigate against deadlocks is keeping transactions short lived. Previously, we reingested the entire range in just one transaction. If you have multiple reingesting processes which are running in parallel over very long ranges that increases the chances there will be a deadlock.
  • Loading branch information
tamirms authored Mar 10, 2020
1 parent ce43434 commit 85727d7
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 431 deletions.
40 changes: 28 additions & 12 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func TestAccountInfo(t *testing.T) {
Thresholds: thresholds,
Flags: 0,
}
_, err := q.InsertAccount(accountEntry, 4)
batch := q.NewAccountsBatchInsertBuilder(0)
err := batch.Add(accountEntry, 4)
assert.NoError(t, err)
assert.NoError(t, batch.Exec())

tt.Assert.NoError(err)

_, err = q.InsertTrustLine(xdr.TrustLineEntry{
Expand Down Expand Up @@ -239,7 +243,15 @@ func TestAccountInfo(t *testing.T) {

// even though horizon ingestion account differs from core account,
// no error is returned because they have different last modified ledgers
_, err = q.UpdateAccount(accountEntry, 5)
err = q.UpsertAccounts([]xdr.LedgerEntry{
xdr.LedgerEntry{
LastModifiedLedgerSeq: 5,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeAccount,
Account: &accountEntry,
},
},
})
tt.Assert.NoError(err)

account, err = AccountInfo(
Expand Down Expand Up @@ -300,12 +312,14 @@ func TestGetAccountsHandlerPageResultsBySigner(t *testing.T) {
q := &history.Q{tt.HorizonSession()}
handler := &GetAccountsHandler{}

_, err := q.InsertAccount(account1, 1234)
tt.Assert.NoError(err)
_, err = q.InsertAccount(account2, 1234)
tt.Assert.NoError(err)
_, err = q.InsertAccount(account3, 1234)
tt.Assert.NoError(err)
batch := q.NewAccountsBatchInsertBuilder(0)
err := batch.Add(account1, 1234)
assert.NoError(t, err)
err = batch.Add(account2, 1234)
assert.NoError(t, err)
err = batch.Add(account3, 1234)
assert.NoError(t, err)
assert.NoError(t, batch.Exec())

for _, row := range accountSigners {
q.CreateAccountSigner(row.Account, row.Signer, row.Weight)
Expand Down Expand Up @@ -378,10 +392,12 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) {
q := &history.Q{tt.HorizonSession()}
handler := &GetAccountsHandler{}

_, err := q.InsertAccount(account1, 1234)
tt.Assert.NoError(err)
_, err = q.InsertAccount(account2, 1234)
tt.Assert.NoError(err)
batch := q.NewAccountsBatchInsertBuilder(0)
err := batch.Add(account1, 1234)
assert.NoError(t, err)
err = batch.Add(account2, 1234)
assert.NoError(t, err)
assert.NoError(t, batch.Exec())

for _, row := range accountSigners {
_, err = q.CreateAccountSigner(row.Account, row.Signer, row.Weight)
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/actions/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,10 @@ func TestAssetStats(t *testing.T) {
if err := accountEntry.AccountId.SetAddress(account.AccountID); err != nil {
t.Fatalf("unexpected error %v", err)
}
numChanged, err := q.InsertAccount(accountEntry, 3)
batch := q.NewAccountsBatchInsertBuilder(0)
err := batch.Add(accountEntry, 3)
tt.Assert.NoError(err)
tt.Assert.Equal(numChanged, int64(1))
tt.Assert.NoError(batch.Exec())
}

for _, testCase := range []struct {
Expand Down
43 changes: 5 additions & 38 deletions services/horizon/internal/db2/history/account.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package history

import (
"sort"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// Accounts provides a helper to filter rows from the `history_accounts` table
Expand All @@ -23,12 +24,6 @@ func (q *Q) AccountByAddress(dest interface{}, addy string) error {
return q.Get(dest, sql)
}

// AccountByID loads a row from `history_accounts`, by id
func (q *Q) AccountByID(dest interface{}, id int64) error {
sql := selectAccount.Limit(1).Where("ha.id = ?", id)
return q.Get(dest, sql)
}

// Page specifies the paging constraints for the query being built by `q`.
func (q *AccountsQ) Page(page db2.PageQuery) *AccountsQ {
if q.Err != nil {
Expand Down Expand Up @@ -66,6 +61,9 @@ func (q *Q) CreateAccounts(addresses []string, batchSize int) (map[string]int64,
Suffix: "ON CONFLICT (address) DO NOTHING",
}

// sort assets before inserting rows into history_assets to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)
for _, address := range addresses {
err := builder.Row(map[string]interface{}{
"address": address,
Expand Down Expand Up @@ -103,35 +101,4 @@ func (q *Q) CreateAccounts(addresses []string, batchSize int) (map[string]int64,
return addressToID, nil
}

// Return id for account. If account doesn't exist, it will be created and the new id returned.
// `ON CONFLICT` is required when running a distributed ingestion.
func (q *Q) GetCreateAccountID(
aid xdr.AccountId,
) (result int64, err error) {

var existing Account

err = q.AccountByAddress(&existing, aid.Address())

//account already exists, return id
if err == nil {
result = existing.ID
return
}

// unexpected error
if !q.NoRows(err) {
return
}

//insert account and return id
err = q.GetRaw(
&result,
`INSERT INTO history_accounts (address) VALUES (?) ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING id`,
aid.Address(),
)

return
}

var selectAccount = sq.Select("ha.*").From("history_accounts ha")
36 changes: 36 additions & 0 deletions services/horizon/internal/db2/history/account_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package history

import (
"sort"
"testing"

"github.com/stellar/go/services/horizon/internal/test"
Expand Down Expand Up @@ -61,6 +62,41 @@ func assertAccountsContainAddresses(tt *test.T, accounts map[string]int64, addre
}
}

func TestCreateAccountsSortedOrder(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

addresses := []string{
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
"GCYVFGI3SEQJGBNQQG7YCMFWEYOHK3XPVOVPA6C566PXWN4SN7LILZSM",
"GBYSBDAJZMHL5AMD7QXQ3JEP3Q4GLKADWIJURAAHQALNAWD6Z5XF2RAC",
"GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB",
}
accounts, err := q.CreateAccounts(addresses, 1)
tt.Assert.NoError(err)

idToAddress := map[int64]string{}
sortedIDs := []int64{}
for address, id := range accounts {
idToAddress[id] = address
sortedIDs = append(sortedIDs, id)
}

sort.Slice(sortedIDs, func(i, j int) bool {
return sortedIDs[i] < sortedIDs[j]
})
sort.Strings(addresses)

values := []string{}
for _, id := range sortedIDs {
values = append(values, idToAddress[id])
}

tt.Assert.Equal(addresses, values)
}

func TestCreateAccounts(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
31 changes: 0 additions & 31 deletions services/horizon/internal/db2/history/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,37 +83,6 @@ func accountToMap(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) map[s
}
}

// InsertAccount creates a row in the accounts table.
// Returns number of rows affected and error.
func (q *Q) InsertAccount(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) (int64, error) {
m := accountToMap(account, lastModifiedLedger)

sql := sq.Insert("accounts").SetMap(m)
result, err := q.Exec(sql)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// UpdateAccount updates a row in the offers table.
// Returns number of rows affected and error.
func (q *Q) UpdateAccount(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) (int64, error) {
m := accountToMap(account, lastModifiedLedger)

accountID := m["account_id"]
delete(m, "account_id")

sql := sq.Update("accounts").SetMap(m).Where(sq.Eq{"account_id": accountID})
result, err := q.Exec(sql)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// UpsertAccounts upserts a batch of accounts in the accounts table.
// There's currently no limit of the number of accounts this method can
// accept other than 2GB limit of the query string length what should be enough
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package history

import "github.com/stellar/go/xdr"
import (
"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

// accountsBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type accountsBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}

func (i *accountsBatchInsertBuilder) Add(account xdr.AccountEntry, lastModifiedLedger xdr.Uint32) error {
return i.builder.Row(accountToMap(account, lastModifiedLedger))
Expand Down
Loading

0 comments on commit 85727d7

Please sign in to comment.