Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Remove exp_history_* tables (#…
Browse files Browse the repository at this point in the history
…2118)

* Add migration to remove exp_history_* tables
* Remove CheckExp functions in the processors
* Modify db ingestion functions to insert into history_ * tables instead of exp_history_* tables
  • Loading branch information
tamirms authored Jan 15, 2020
1 parent fd5decd commit 06b6a58
Show file tree
Hide file tree
Showing 52 changed files with 483 additions and 2,302 deletions.
18 changes: 14 additions & 4 deletions services/horizon/internal/db2/history/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,26 @@ func (q *Q) AccountsByAddresses(dest interface{}, addresses []string) error {
return q.Select(dest, sql)
}

// CreateAccounts creates rows for addresses in history_accounts table and
// put. `ON CONFLICT` is required when running a distributed ingestion.
func (q *Q) CreateAccounts(dest interface{}, addresses []string) error {
// CreateAccounts creates rows in the history_accounts table for a given list of addresses.
// CreateAccounts returns a mapping of account address to its corresponding id in the history_accounts table
func (q *Q) CreateAccounts(addresses []string) (map[string]int64, error) {
var accounts []Account
sql := sq.Insert("history_accounts").Columns("address")
for _, address := range addresses {
sql = sql.Values(address)
}
sql = sql.Suffix("ON CONFLICT (address) DO UPDATE SET address=EXCLUDED.address RETURNING *")

return q.Select(dest, sql)
err := q.Select(&accounts, sql)
if err != nil {
return nil, err
}

addressToID := map[string]int64{}
for _, account := range accounts {
addressToID[account.Address] = account.ID
}
return addressToID, nil
}

// Return id for account. If account doesn't exist, it will be created and the new id returned.
Expand Down
49 changes: 49 additions & 0 deletions services/horizon/internal/db2/history/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,52 @@ func TestIsAuthImmutable(t *testing.T) {
account = AccountEntry{Flags: 0}
tt.False(account.IsAuthImmutable())
}

func assertAccountsContainAddresses(tt *test.T, accounts map[string]int64, addresses []string) {
tt.Assert.Len(accounts, len(addresses))
set := map[int64]bool{}
for _, address := range addresses {
accountID, ok := accounts[address]
tt.Assert.True(ok)
tt.Assert.False(set[accountID])
set[accountID] = true
}
}

func TestCreateAccounts(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}

addresses := []string{
"GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
}
accounts, err := q.CreateAccounts(addresses)
tt.Assert.NoError(err)
tt.Assert.Len(accounts, 2)
assertAccountsContainAddresses(tt, accounts, addresses)

dupAccounts, err := q.CreateAccounts([]string{
"GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
})
tt.Assert.NoError(err)
tt.Assert.Equal(accounts, dupAccounts)

addresses = []string{
"GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
"GCYVFGI3SEQJGBNQQG7YCMFWEYOHK3XPVOVPA6C566PXWN4SN7LILZSM",
"GBYSBDAJZMHL5AMD7QXQ3JEP3Q4GLKADWIJURAAHQALNAWD6Z5XF2RAC",
}
accounts, err = q.CreateAccounts(addresses)
tt.Assert.NoError(err)
assertAccountsContainAddresses(tt, accounts, addresses)
for address, accountID := range dupAccounts {
id, ok := accounts[address]
tt.Assert.True(ok)
tt.Assert.Equal(id, accountID)
}
}
8 changes: 4 additions & 4 deletions services/horizon/internal/db2/history/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ func (q *Q) GetCreateAssetID(
return
}

// CreateExpAssets creates rows in the exp_history_assets table for a given list of assets.
func (q *Q) CreateExpAssets(assets []xdr.Asset) (map[string]Asset, error) {
// CreateAssets creates rows in the history_assets table for a given list of assets.
func (q *Q) CreateAssets(assets []xdr.Asset) (map[string]Asset, error) {
searchStrings := make([]string, 0, len(assets))
assetToKey := map[[3]string]string{}

sql := sq.Insert("exp_history_assets").Columns("asset_type", "asset_code", "asset_issuer")
sql := sq.Insert("history_assets").Columns("asset_type", "asset_code", "asset_issuer")

for _, asset := range assets {
var assetType, assetCode, assetIssuer string
Expand All @@ -118,7 +118,7 @@ func (q *Q) CreateExpAssets(assets []xdr.Asset) (map[string]Asset, error) {
}

var rows []Asset
err = q.Select(&rows, sq.Select("*").From("exp_history_assets").Where(sq.Eq{
err = q.Select(&rows, sq.Select("*").From("history_assets").Where(sq.Eq{
"concat(asset_type, '/', asset_code, '/', asset_issuer)": searchStrings,
}))
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions services/horizon/internal/db2/history/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ import (
"github.com/stellar/go/xdr"
)

func TestCreateExpAssetIDs(t *testing.T) {
func TestCreateAssets(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

q := &Q{tt.HorizonSession()}

// CreateExpAssets creates new rows
// CreateAssets creates new rows
assets := []xdr.Asset{
nativeAsset, eurAsset,
}
assetMap, err := q.CreateExpAssets(assets)
assetMap, err := q.CreateAssets(assets)
tt.Assert.NoError(err)
tt.Assert.Len(assetMap, len(assets))

Expand All @@ -37,8 +37,8 @@ func TestCreateExpAssetIDs(t *testing.T) {
tt.Assert.Equal(row.Issuer, assetIssuer)
}

// CreateExpAssets handles duplicates
assetMap, err = q.CreateExpAssets([]xdr.Asset{
// CreateAssets handles duplicates
assetMap, err = q.CreateAssets([]xdr.Asset{
nativeAsset, nativeAsset, eurAsset, eurAsset,
nativeAsset, nativeAsset, eurAsset, eurAsset,
})
Expand All @@ -58,9 +58,9 @@ func TestCreateExpAssetIDs(t *testing.T) {
tt.Assert.Equal(row.Issuer, assetIssuer)
}

// CreateExpAssets handles duplicates and new rows
// CreateAssets handles duplicates and new rows
assets = append(assets, usdAsset)
assetMap, err = q.CreateExpAssets(assets)
assetMap, err = q.CreateAssets(assets)
tt.Assert.NoError(err)
tt.Assert.Len(assetMap, len(assets))

Expand Down
96 changes: 3 additions & 93 deletions services/horizon/internal/db2/history/effect.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,102 +216,12 @@ func (q *EffectsQ) orderBookFilter(a xdr.Asset, prefix string) {
q.sql = q.sql.Where(clause, typ, code, iss)
}

func (q *Q) findOperationEffects(
effectsTable,
accountTable string,
seq int32,
) ([]Effect, error) {
from := toid.ID{LedgerSequence: int32(seq)}.ToInt64()
to := toid.ID{LedgerSequence: int32(seq + 1)}.ToInt64()
effects := []Effect{}

sql := effectFields.
From(
fmt.Sprintf("%s heff", effectsTable),
).
Join(
fmt.Sprintf(
"%s hacc ON hacc.id = heff.history_account_id",
accountTable,
),
).
Where("heff.history_operation_id >= ? AND heff.history_operation_id <= ? ", from, to).
OrderBy(
"heff.history_operation_id asc, heff.order asc",
)

err := q.Select(&effects, sql)

return effects, err
}

// CheckExpOperationEffects checks that the effects in exp_history_effects for
// the given ledger matches the same effects as in history_effects
func (q *Q) CheckExpOperationEffects(seq int32) (bool, error) {
expEffects, err := q.findOperationEffects(
"exp_history_effects",
"exp_history_accounts",
seq,
)

if err != nil {
return false, errors.Wrapf(
err,
"could not load exp_history_effects for ledger: %v",
seq,
)
}

effects, err := q.findOperationEffects(
"history_effects",
"history_accounts",
seq,
)

if err != nil {
return false, errors.Wrapf(
err,
"could not load history_effects for ledger: %v",
seq,
)
}

// We only proceed with the comparison if we have data in both the
// legacy ingestion system and the experimental ingestion system.
// If there is no data in either the legacy ingestion system or the
// experimental ingestion system we skip the check.
if len(expEffects) == 0 || len(effects) == 0 {
return true, nil
}

if len(expEffects) != len(effects) {
return false, nil
}

for i, expEffect := range expEffects {
effect := effects[i]

// Make HistoryAccountID the same since we don't care about this value
expEffect.HistoryAccountID = 0
effect.HistoryAccountID = 0

if expEffect != effect {
return false, nil
}
}

return true, nil
}

// QEffects defines exp_history_effects related queries.
// QEffects defines history_effects related queries.
type QEffects interface {
NewEffectBatchInsertBuilder(maxBatchSize int) EffectBatchInsertBuilder
CreateExpAccounts(addresses []string) (map[string]int64, error)
CheckExpOperationEffects(seq int32) (bool, error)
CreateAccounts(addresses []string) (map[string]int64, error)
}

var effectFields = sq.Select("heff.*, hacc.address")

var selectEffect = effectFields.
var selectEffect = sq.Select("heff.*, hacc.address").
From("history_effects heff").
LeftJoin("history_accounts hacc ON hacc.id = heff.history_account_id")
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// EffectBatchInsertBuilder is used to insert effects into the
// exp_history_effects table
// history_effects table
type EffectBatchInsertBuilder interface {
Add(
accountID int64,
Expand All @@ -26,7 +26,7 @@ type effectBatchInsertBuilder struct {
func (q *Q) NewEffectBatchInsertBuilder(maxBatchSize int) EffectBatchInsertBuilder {
return &effectBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("exp_history_effects"),
Table: q.GetTable("history_effects"),
MaxBatchSize: maxBatchSize,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestAddEffect(t *testing.T) {
q := &Q{tt.HorizonSession()}

address := "GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON"
accounIDs, err := q.CreateExpAccounts([]string{address})
accounIDs, err := q.CreateAccounts([]string{address})
tt.Assert.NoError(err)

builder := q.NewEffectBatchInsertBuilder(2)
Expand All @@ -38,14 +38,7 @@ func TestAddEffect(t *testing.T) {
tt.Assert.NoError(err)

effects := []Effect{}
err = q.Select(
&effects,
effectFields.
From("exp_history_effects heff").
LeftJoin("exp_history_accounts hacc ON hacc.id = heff.history_account_id"),
)

tt.Assert.NoError(err)
tt.Assert.NoError(q.Effects().Select(&effects))
tt.Assert.Len(effects, 1)

effect := effects[0]
Expand Down
Loading

0 comments on commit 06b6a58

Please sign in to comment.