Skip to content

Commit

Permalink
services/horizon/internal/db2/history: Implement account loader and f…
Browse files Browse the repository at this point in the history
…uture account ids (#5015)
  • Loading branch information
tamirms authored Aug 14, 2023
1 parent 88f19e4 commit a630fcb
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 0 deletions.
193 changes: 193 additions & 0 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package history

import (
"context"
"database/sql/driver"
"fmt"
"sort"
"strings"

"github.com/lib/pq"

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
)

// FutureAccountID represents a future history account.
// A FutureAccountID is created by an AccountLoader and
// the account id is available after calling Exec() on
// the AccountLoader.
type FutureAccountID struct {
address string
loader *AccountLoader
}

const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address), nil
}

// AccountLoader will map account addresses to their history
// account ids. If there is no existing mapping for a given address,
// the AccountLoader will insert into the history_accounts table to
// establish a mapping.
type AccountLoader struct {
sealed bool
set map[string]interface{}
ids map[string]int64
}

var errSealed = errors.New("cannot register more entries to loader after calling Exec()")

// NewAccountLoader will construct a new AccountLoader instance.
func NewAccountLoader() *AccountLoader {
return &AccountLoader{
sealed: false,
set: map[string]interface{}{},
ids: map[string]int64{},
}
}

// GetFuture registers the given account address into the loader and
// returns a FutureAccountID which will hold the history account id for
// the address after Exec() is called.
func (a *AccountLoader) GetFuture(address string) FutureAccountID {
if a.sealed {
panic(errSealed)
}

a.set[address] = nil
return FutureAccountID{
address: address,
loader: a,
}
}

// GetNow returns the history account id for the given address.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
} else {
return id
}
}

func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error {
for i := 0; i < len(addresses); i += loaderLookupBatchSize {
end := i + loaderLookupBatchSize
if end > len(addresses) {
end = len(addresses)
}

var accounts []Account
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil {
return errors.Wrap(err, "could not select accounts")
}

for _, account := range accounts {
a.ids[account.Address] = account.ID
}
}
return nil
}

// Exec will look up all the history account ids for the addresses registered in the loader.
// If there are no history account ids for a given set of addresses, Exec will insert rows
// into the history_accounts table to establish a mapping between address and history account id.
func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error {
a.sealed = true
if len(a.set) == 0 {
return nil
}
q := &Q{session}
addresses := make([]string, 0, len(a.set))
for address := range a.set {
addresses = append(addresses, address)
}
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)

if err := a.lookupKeys(ctx, q, addresses); err != nil {
return err
}

insert := 0
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
continue
}
addresses[insert] = address
insert++
}
if insert == 0 {
return nil
}
addresses = addresses[:insert]

err := bulkInsert(
ctx,
q,
"history_accounts",
[]string{"address"},
[]bulkInsertField{
{
name: "address",
dbType: "character varying(64)",
objects: addresses,
},
},
)
if err != nil {
return err
}

return a.lookupKeys(ctx, q, addresses)
}

type bulkInsertField struct {
name string
dbType string
objects []string
}

func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))

for _, field := range fields {
unnestPart = append(
unnestPart,
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name),
)
insertFieldsPart = append(
insertFieldsPart,
field.name,
)
pqArrays = append(
pqArrays,
pq.Array(field.objects),
)
}

sql := `
WITH r AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING`

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
sql,
pqArrays...,
)
return err
}
54 changes: 54 additions & 0 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package history

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestAccountLoader(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
session := tt.HorizonSession()

var addresses []string
for i := 0; i < 100; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

loader := NewAccountLoader()
var futures []FutureAccountID
for _, address := range addresses {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()
})
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Panics(t, func() {
loader.GetFuture(keypair.MustRandom().Address())
})

q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.GetNow(address)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, id, val)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, id)
assert.Equal(t, account.Address, address)
}
}

0 comments on commit a630fcb

Please sign in to comment.