Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/db2/history: Implement account loader and future account ids #5015

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package history

import (
"context"
"database/sql/driver"
"fmt"
"sort"
"strings"

"github.com/lib/pq"

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
)

// FutureAccountID represents a future history account.
// A FutureAccountID is created by an AccountLoader and
// the account id is available after calling Exec() on
// the AccountLoader.
type FutureAccountID struct {
address string
loader *AccountLoader
}

const loaderLookupBatchSize = 50000

// Value implements the database/sql/driver Valuer interface.
func (a FutureAccountID) Value() (driver.Value, error) {
return a.loader.GetNow(a.address), nil
}

// AccountLoader will map account addresses to their history
// account ids. If there is no existing mapping for a given address,
// the AccountLoader will insert into the history_accounts table to
// establish a mapping.
type AccountLoader struct {
sealed bool
set map[string]interface{}
ids map[string]int64
}

var errSealed = errors.New("cannot register more entries to loader after calling Exec()")

// NewAccountLoader will construct a new AccountLoader instance.
func NewAccountLoader() *AccountLoader {
return &AccountLoader{
sealed: false,
set: map[string]interface{}{},
ids: map[string]int64{},
}
}

// GetFuture registers the given account address into the loader and
// returns a FutureAccountID which will hold the history account id for
// the address after Exec() is called.
func (a *AccountLoader) GetFuture(address string) FutureAccountID {
if a.sealed {
panic(errSealed)
}

a.set[address] = nil
return FutureAccountID{
address: address,
loader: a,
}
}

// GetNow returns the history account id for the given address.
// GetNow should only be called on values which were registered by
// GetFuture() calls. Also, Exec() must be called before any GetNow
// call can succeed.
func (a *AccountLoader) GetNow(address string) int64 {
if id, ok := a.ids[address]; !ok {
panic(fmt.Errorf("address %v not present", address))
} else {
return id
}
}

func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error {
for i := 0; i < len(addresses); i += loaderLookupBatchSize {
end := i + loaderLookupBatchSize
if end > len(addresses) {
end = len(addresses)
}

var accounts []Account
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil {
return errors.Wrap(err, "could not select accounts")
}

for _, account := range accounts {
a.ids[account.Address] = account.ID
}
}
return nil
}

// Exec will look up all the history account ids for the addresses registered in the loader.
// If there are no history account ids for a given set of addresses, Exec will insert rows
// into the history_accounts table to establish a mapping between address and history account id.
func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error {
a.sealed = true
if len(a.set) == 0 {
return nil
}
q := &Q{session}
addresses := make([]string, 0, len(a.set))
for address := range a.set {
addresses = append(addresses, address)
}
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a prior issue that explains this aspect further, since ingestion is single threaded and all this happens in same db tx, trying to understand where it could happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingestion is single threaded. however, it is possible to do parallel reingestion with multiple workers where each worker has a separate but concurrent transaction

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, then there is potential that same accounts could be processed from different ledger ranges of different worker threads at same time, how does sorting in that case further avoid db deadlock, I'm not suggesting to remove it, just to understand. It sticks out in the application code as performing unrelated complexity that would have expected to be mitigated at db level with tx repeatable read isolation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's say there are two workers ingesting different ledger ranges. In both the ledger ranges the workers have to insert the same set of accounts into the history_accounts table. If the workers insert the accounts in the same order they will avoid a deadlock because the worker who wins the race will acquire the lock and the other worker will block until the transaction is complete. Consider the worst case scenario if worker 1 inserts accounts A, B, C and worker 2 inserts accounts C, B, A. Let's say worker 1 is faster so it inserts accounts A and B. Then worker 2 inserts account C. When worker 1 tries to insert account C there will be a deadlock because worker 2 already has a lock on that row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think changing the transaction isolation level could avoid the deadlock

// https://github.com/stellar/go/issues/2370
sort.Strings(addresses)

if err := a.lookupKeys(ctx, q, addresses); err != nil {
return err
}

insert := 0
for _, address := range addresses {
if _, ok := a.ids[address]; ok {
Copy link
Contributor

@sreuland sreuland Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you apply this filter that skips 'set' addresses which are already resolved in ids up in the loop that builds the query on line 100, to avoid it going through db i/o?

nvm, I don't think there could be a case where set initially overlaps with any keys in ids at the start of Exec.

continue
}
addresses[insert] = address
insert++
}
if insert == 0 {
return nil
}
addresses = addresses[:insert]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat, re-used the same array in place


err := bulkInsert(
ctx,
q,
"history_accounts",
[]string{"address"},
[]bulkInsertField{
{
name: "address",
dbType: "character varying(64)",
objects: addresses,
},
},
)
if err != nil {
return err
}

return a.lookupKeys(ctx, q, addresses)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the AccountLoader.set be cleared at this point, since those 'future' requests have now been realized into AccountLoader.ids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intended that the account loader should only be used once and not be reused. I will add some code to enforce that intention.

}

type bulkInsertField struct {
name string
dbType string
objects []string
}

func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))

for _, field := range fields {
unnestPart = append(
unnestPart,
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name),
)
insertFieldsPart = append(
insertFieldsPart,
field.name,
)
pqArrays = append(
pqArrays,
pq.Array(field.objects),
)
}

sql := `
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be compelling at some point if we have time to consider evaluating sql code options that provide compile time type safety on sql statements, less embedded string fragments/concat, like gojet. it may not even be viable for our setup, but worth keeping in mind.

WITH r AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING`

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
sql,
pqArrays...,
)
return err
}
54 changes: 54 additions & 0 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package history

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestAccountLoader(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
session := tt.HorizonSession()

var addresses []string
for i := 0; i < 100; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

loader := NewAccountLoader()
var futures []FutureAccountID
for _, address := range addresses {
future := loader.GetFuture(address)
futures = append(futures, future)
assert.Panics(t, func() {
loader.GetNow(address)
})
assert.Panics(t, func() {
future.Value()

Check failure on line 33 in services/horizon/internal/db2/history/account_loader_test.go

View workflow job for this annotation

GitHub Actions / golangci

Error return value of `future.Value` is not checked (errcheck)
})
}

assert.NoError(t, loader.Exec(context.Background(), session))
assert.Panics(t, func() {
loader.GetFuture(keypair.MustRandom().Address())
})

q := &Q{session}
for i, address := range addresses {
future := futures[i]
id := loader.GetNow(address)
val, err := future.Value()
assert.NoError(t, err)
assert.Equal(t, id, val)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, id)
assert.Equal(t, account.Address, address)
}
}
Loading