diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go new file mode 100644 index 0000000000..22481a2c8f --- /dev/null +++ b/services/horizon/internal/db2/history/account_loader.go @@ -0,0 +1,193 @@ +package history + +import ( + "context" + "database/sql/driver" + "fmt" + "sort" + "strings" + + "github.com/lib/pq" + + "github.com/stellar/go/support/db" + "github.com/stellar/go/support/errors" +) + +// FutureAccountID represents a future history account. +// A FutureAccountID is created by an AccountLoader and +// the account id is available after calling Exec() on +// the AccountLoader. +type FutureAccountID struct { + address string + loader *AccountLoader +} + +const loaderLookupBatchSize = 50000 + +// Value implements the database/sql/driver Valuer interface. +func (a FutureAccountID) Value() (driver.Value, error) { + return a.loader.GetNow(a.address), nil +} + +// AccountLoader will map account addresses to their history +// account ids. If there is no existing mapping for a given address, +// the AccountLoader will insert into the history_accounts table to +// establish a mapping. +type AccountLoader struct { + sealed bool + set map[string]interface{} + ids map[string]int64 +} + +var errSealed = errors.New("cannot register more entries to loader after calling Exec()") + +// NewAccountLoader will construct a new AccountLoader instance. +func NewAccountLoader() *AccountLoader { + return &AccountLoader{ + sealed: false, + set: map[string]interface{}{}, + ids: map[string]int64{}, + } +} + +// GetFuture registers the given account address into the loader and +// returns a FutureAccountID which will hold the history account id for +// the address after Exec() is called. +func (a *AccountLoader) GetFuture(address string) FutureAccountID { + if a.sealed { + panic(errSealed) + } + + a.set[address] = nil + return FutureAccountID{ + address: address, + loader: a, + } +} + +// GetNow returns the history account id for the given address. +// GetNow should only be called on values which were registered by +// GetFuture() calls. Also, Exec() must be called before any GetNow +// call can succeed. +func (a *AccountLoader) GetNow(address string) int64 { + if id, ok := a.ids[address]; !ok { + panic(fmt.Errorf("address %v not present", address)) + } else { + return id + } +} + +func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error { + for i := 0; i < len(addresses); i += loaderLookupBatchSize { + end := i + loaderLookupBatchSize + if end > len(addresses) { + end = len(addresses) + } + + var accounts []Account + if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil { + return errors.Wrap(err, "could not select accounts") + } + + for _, account := range accounts { + a.ids[account.Address] = account.ID + } + } + return nil +} + +// Exec will look up all the history account ids for the addresses registered in the loader. +// If there are no history account ids for a given set of addresses, Exec will insert rows +// into the history_accounts table to establish a mapping between address and history account id. +func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error { + a.sealed = true + if len(a.set) == 0 { + return nil + } + q := &Q{session} + addresses := make([]string, 0, len(a.set)) + for address := range a.set { + addresses = append(addresses, address) + } + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Strings(addresses) + + if err := a.lookupKeys(ctx, q, addresses); err != nil { + return err + } + + insert := 0 + for _, address := range addresses { + if _, ok := a.ids[address]; ok { + continue + } + addresses[insert] = address + insert++ + } + if insert == 0 { + return nil + } + addresses = addresses[:insert] + + err := bulkInsert( + ctx, + q, + "history_accounts", + []string{"address"}, + []bulkInsertField{ + { + name: "address", + dbType: "character varying(64)", + objects: addresses, + }, + }, + ) + if err != nil { + return err + } + + return a.lookupKeys(ctx, q, addresses) +} + +type bulkInsertField struct { + name string + dbType string + objects []string +} + +func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error { + unnestPart := make([]string, 0, len(fields)) + insertFieldsPart := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + insertFieldsPart = append( + insertFieldsPart, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + + sql := ` + WITH r AS + (SELECT ` + strings.Join(unnestPart, ",") + `) + INSERT INTO ` + table + ` + (` + strings.Join(insertFieldsPart, ",") + `) + SELECT * from r + ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING` + + _, err := q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), + sql, + pqArrays..., + ) + return err +} diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go new file mode 100644 index 0000000000..785a68f118 --- /dev/null +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -0,0 +1,54 @@ +package history + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestAccountLoader(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + session := tt.HorizonSession() + + var addresses []string + for i := 0; i < 100; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + loader := NewAccountLoader() + var futures []FutureAccountID + for _, address := range addresses { + future := loader.GetFuture(address) + futures = append(futures, future) + assert.Panics(t, func() { + loader.GetNow(address) + }) + assert.Panics(t, func() { + future.Value() + }) + } + + assert.NoError(t, loader.Exec(context.Background(), session)) + assert.Panics(t, func() { + loader.GetFuture(keypair.MustRandom().Address()) + }) + + q := &Q{session} + for i, address := range addresses { + future := futures[i] + id := loader.GetNow(address) + val, err := future.Value() + assert.NoError(t, err) + assert.Equal(t, id, val) + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, id) + assert.Equal(t, account.Address, address) + } +}