-
Notifications
You must be signed in to change notification settings - Fork 502
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement account loader and future account ids
- Loading branch information
Showing
2 changed files
with
234 additions
and
0 deletions.
There are no files selected for viewing
184 changes: 184 additions & 0 deletions
184
services/horizon/internal/db2/history/account_loader.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package history | ||
|
||
import ( | ||
"context" | ||
"database/sql/driver" | ||
"fmt" | ||
"sort" | ||
"strings" | ||
|
||
"github.com/lib/pq" | ||
|
||
"github.com/stellar/go/support/db" | ||
"github.com/stellar/go/support/errors" | ||
) | ||
|
||
// FutureAccountID represents a future history account. | ||
// A FutureAccountID is created by an AccountLoader and | ||
// the account id is available after calling Exec() on | ||
// the AccountLoader. | ||
type FutureAccountID struct { | ||
address string | ||
loader AccountLoader | ||
} | ||
|
||
const loaderLookupBatchSize = 50000 | ||
|
||
// Value implements the database/sql/driver Valuer interface. | ||
func (a FutureAccountID) Value() (driver.Value, error) { | ||
return a.loader.GetNow(a.address) | ||
} | ||
|
||
// AccountLoader will map account addresses to their history | ||
// account ids. If there is no existing mapping for a given address, | ||
// the AccountLoader will insert into the history_accounts table to | ||
// establish a mapping. | ||
type AccountLoader struct { | ||
set map[string]interface{} | ||
ids map[string]int64 | ||
} | ||
|
||
// NewAccountLoader will construct a new AccountLoader instance. | ||
func NewAccountLoader() AccountLoader { | ||
return AccountLoader{ | ||
set: map[string]interface{}{}, | ||
ids: map[string]int64{}, | ||
} | ||
} | ||
|
||
// GetFuture registers the given account address into the loader and | ||
// returns a FutureAccountID which will hold the history account id for | ||
// the address after Exec() is called. | ||
func (a AccountLoader) GetFuture(address string) FutureAccountID { | ||
a.set[address] = nil | ||
return FutureAccountID{ | ||
address: address, | ||
loader: a, | ||
} | ||
} | ||
|
||
// GetNow returns the history account id for the given address. | ||
// GetNow should only be called on values which were registered by | ||
// GetFuture() calls. Also, Exec() must be called before any GetNow | ||
// call can succeed. | ||
func (a AccountLoader) GetNow(address string) (int64, error) { | ||
if id, ok := a.ids[address]; !ok { | ||
return 0, fmt.Errorf("address %v not present", address) | ||
} else { | ||
return id, nil | ||
} | ||
} | ||
|
||
func (a AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string) error { | ||
for i := 0; i < len(addresses); i += loaderLookupBatchSize { | ||
end := i + loaderLookupBatchSize | ||
if end > len(addresses) { | ||
end = len(addresses) | ||
} | ||
|
||
var accounts []Account | ||
if err := q.AccountsByAddresses(ctx, &accounts, addresses[i:end]); err != nil { | ||
return errors.Wrap(err, "could not select accounts") | ||
} | ||
|
||
for _, account := range accounts { | ||
a.ids[account.Address] = account.ID | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// Exec will look up all the history account ids for the addresses registered in the loader. | ||
// If there are no history account ids for a given set of addresses, Exec will insert rows | ||
// into the history_accounts table to establish a mapping between address and history account id. | ||
func (a AccountLoader) Exec(ctx context.Context, session db.SessionInterface) error { | ||
if len(a.set) == 0 { | ||
return nil | ||
} | ||
q := &Q{session} | ||
addresses := make([]string, 0, len(a.set)) | ||
for address := range a.set { | ||
addresses = append(addresses, address) | ||
} | ||
// sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock | ||
// https://github.com/stellar/go/issues/2370 | ||
sort.Strings(addresses) | ||
|
||
if err := a.lookupKeys(ctx, q, addresses); err != nil { | ||
return err | ||
} | ||
|
||
insert := 0 | ||
for _, address := range addresses { | ||
if _, ok := a.ids[address]; ok { | ||
continue | ||
} | ||
addresses[insert] = address | ||
insert++ | ||
} | ||
if insert == 0 { | ||
return nil | ||
} | ||
addresses = addresses[:insert] | ||
|
||
err := bulkInsert( | ||
ctx, | ||
q, | ||
"history_accounts", | ||
[]string{"address"}, | ||
[]bulkInsertField{ | ||
{ | ||
name: "address", | ||
dbType: "character varying(64)", | ||
objects: addresses, | ||
}, | ||
}, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return a.lookupKeys(ctx, q, addresses) | ||
} | ||
|
||
type bulkInsertField struct { | ||
name string | ||
dbType string | ||
objects []string | ||
} | ||
|
||
func bulkInsert(ctx context.Context, q *Q, table string, conflictFields []string, fields []bulkInsertField) error { | ||
unnestPart := make([]string, 0, len(fields)) | ||
insertFieldsPart := make([]string, 0, len(fields)) | ||
pqArrays := make([]interface{}, 0, len(fields)) | ||
|
||
for _, field := range fields { | ||
unnestPart = append( | ||
unnestPart, | ||
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), | ||
) | ||
insertFieldsPart = append( | ||
insertFieldsPart, | ||
field.name, | ||
) | ||
pqArrays = append( | ||
pqArrays, | ||
pq.Array(field.objects), | ||
) | ||
} | ||
|
||
sql := ` | ||
WITH r AS | ||
(SELECT ` + strings.Join(unnestPart, ",") + `) | ||
INSERT INTO ` + table + ` | ||
(` + strings.Join(insertFieldsPart, ",") + `) | ||
SELECT * from r | ||
ON CONFLICT (` + strings.Join(conflictFields, ",") + `) DO NOTHING` | ||
|
||
_, err := q.ExecRaw( | ||
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), | ||
sql, | ||
pqArrays..., | ||
) | ||
return err | ||
} |
50 changes: 50 additions & 0 deletions
50
services/horizon/internal/db2/history/account_loader_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package history | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/stellar/go/keypair" | ||
"github.com/stellar/go/services/horizon/internal/test" | ||
) | ||
|
||
func TestAccountLoader(t *testing.T) { | ||
tt := test.Start(t) | ||
defer tt.Finish() | ||
test.ResetHorizonDB(t, tt.HorizonDB) | ||
session := tt.HorizonSession() | ||
|
||
var addresses []string | ||
for i := 0; i < 100; i++ { | ||
addresses = append(addresses, keypair.MustRandom().Address()) | ||
} | ||
|
||
loader := NewAccountLoader() | ||
var futures []FutureAccountID | ||
for _, address := range addresses { | ||
future := loader.GetFuture(address) | ||
futures = append(futures, future) | ||
_, err := loader.GetNow(address) | ||
assert.Error(t, err) | ||
_, err = future.Value() | ||
assert.Error(t, err) | ||
} | ||
|
||
assert.NoError(t, loader.Exec(context.Background(), session)) | ||
|
||
q := &Q{session} | ||
for i, address := range addresses { | ||
future := futures[i] | ||
id, err := loader.GetNow(address) | ||
assert.NoError(t, err) | ||
val, err := future.Value() | ||
assert.NoError(t, err) | ||
assert.Equal(t, id, val) | ||
var account Account | ||
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) | ||
assert.Equal(t, account.ID, id) | ||
assert.Equal(t, account.Address, address) | ||
} | ||
} |