Skip to content

Commit

Permalink
calculate archived asset contract stats
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed Nov 6, 2023
1 parent 5549f2b commit f32583f
Show file tree
Hide file tree
Showing 26 changed files with 2,660 additions and 1,097 deletions.
2 changes: 2 additions & 0 deletions protocols/horizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,14 @@ type AssetStat struct {
NumClaimableBalances int32 `json:"num_claimable_balances"`
NumLiquidityPools int32 `json:"num_liquidity_pools"`
NumContracts int32 `json:"num_contracts"`
NumArchivedContracts int32 `json:"num_archived_contracts"`
// Action needed in release: horizon-v3.0.0: deprecated field
Amount string `json:"amount"`
Accounts AssetStatAccounts `json:"accounts"`
ClaimableBalancesAmount string `json:"claimable_balances_amount"`
LiquidityPoolsAmount string `json:"liquidity_pools_amount"`
ContractsAmount string `json:"contracts_amount"`
ArchivedContractsAmount string `json:"archived_contracts_amount"`
Balances AssetStatBalances `json:"balances"`
Flags AccountFlags `json:"flags"`
}
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/docker/captive-core-integration-tests.cfg
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
PEER_PORT=11725
ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true
ENABLE_SOROBAN_DIAGNOSTIC_EVENTS=true
TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true

UNSAFE_QUORUM=true
FAILURE_SAFETY=0

ENABLE_SOROBAN_DIAGNOSTIC_EVENTS=true
TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true

[[VALIDATORS]]
NAME="local_core"
HOME_DOMAIN="core.local"
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/docker/stellar-core-integration-tests.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true
TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true

NETWORK_PASSPHRASE="Standalone Network ; February 2017"

Expand All @@ -15,11 +14,13 @@ FAILURE_SAFETY=0

DATABASE="postgresql://user=postgres password=mysecretpassword host=core-postgres port=5641 dbname=stellar"

TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE=true

[QUORUM_SET]
THRESHOLD_PERCENT=100
VALIDATORS=["GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS"]

[HISTORY.vs]
get="cp history/vs/{0} {1}"
put="cp {0} history/vs/{1}"
mkdir="mkdir -p history/vs/{0}"
mkdir="mkdir -p history/vs/{0}"
5 changes: 5 additions & 0 deletions services/horizon/internal/actions/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestAssetStats(t *testing.T) {
Amount: "0.0000001",
NumAccounts: usdAssetStat.NumAccounts,
ContractsAmount: "0.0000000",
ArchivedContractsAmount: "0.0000000",
Asset: base.Asset{
Type: "credit_alphanum4",
Code: usdAssetStat.AssetCode,
Expand Down Expand Up @@ -204,6 +205,7 @@ func TestAssetStats(t *testing.T) {
ClaimableBalancesAmount: "0.0000000",
LiquidityPoolsAmount: "0.0000000",
ContractsAmount: "0.0000000",
ArchivedContractsAmount: "0.0000000",
Amount: "0.0000023",
NumAccounts: etherAssetStat.NumAccounts,
Asset: base.Asset{
Expand Down Expand Up @@ -251,6 +253,7 @@ func TestAssetStats(t *testing.T) {
LiquidityPoolsAmount: "0.0000000",
Amount: "0.0000001",
ContractsAmount: "0.0000000",
ArchivedContractsAmount: "0.0000000",
NumAccounts: otherUSDAssetStat.NumAccounts,
Asset: base.Asset{
Type: "credit_alphanum4",
Expand Down Expand Up @@ -299,6 +302,7 @@ func TestAssetStats(t *testing.T) {
LiquidityPoolsAmount: "0.0000000",
Amount: "0.0000111",
ContractsAmount: "0.0000000",
ArchivedContractsAmount: "0.0000000",
NumAccounts: eurAssetStat.NumAccounts,
Asset: base.Asset{
Type: "credit_alphanum4",
Expand Down Expand Up @@ -476,6 +480,7 @@ func TestAssetStatsIssuerDoesNotExist(t *testing.T) {
LiquidityPoolsAmount: "0.0000000",
Amount: "0.0000001",
ContractsAmount: "0.0000000",
ArchivedContractsAmount: "0.0000000",
NumAccounts: usdAssetStat.NumAccounts,
Asset: base.Asset{
Type: "credit_alphanum4",
Expand Down
190 changes: 171 additions & 19 deletions services/horizon/internal/db2/history/asset_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ func assetStatToPrimaryKeyMap(assetStat ExpAssetStat) map[string]interface{} {
}
}

// ContractStatRow represents a row in the contract_asset_stats table
type ContractStatRow struct {
ContractID []byte `db:"contract_id"`
Stat ContractStat `db:"stat"`
// ContractID is the contract id of the stellar asset contract
ContractID []byte `db:"contract_id"`
// Stat is a json blob containing statistics on the contract holders
// this asset
Stat ContractStat `db:"stat"`
}

// InsertAssetStats a set of asset stats into the exp_asset_stats
func (q *Q) InsertAssetStats(ctx context.Context, assetStats []ExpAssetStat, batchSize int) error {
func (q *Q) InsertAssetStats(ctx context.Context, assetStats []ExpAssetStat) error {
builder := &db.BatchInsertBuilder{
Table: q.GetTable("exp_asset_stats"),
MaxBatchSize: batchSize,
MaxBatchSize: maxUpdateBatchSize,
}

for _, assetStat := range assetStats {
Expand All @@ -59,10 +63,11 @@ func (q *Q) InsertAssetStats(ctx context.Context, assetStats []ExpAssetStat, bat
return nil
}

func (q *Q) InsertAssetContractStats(ctx context.Context, rows []ContractStatRow, batchSize int) error {
// InsertAssetContractStats inserts the given list of rows into the contract_asset_stats table
func (q *Q) InsertAssetContractStats(ctx context.Context, rows []ContractStatRow) error {
builder := &db.BatchInsertBuilder{
Table: q.GetTable("contract_asset_stats"),
MaxBatchSize: batchSize,
MaxBatchSize: maxUpdateBatchSize,
}

for _, row := range rows {
Expand All @@ -78,6 +83,159 @@ func (q *Q) InsertAssetContractStats(ctx context.Context, rows []ContractStatRow
return nil
}

// ContractAssetBalance represents a row in the contract_asset_balances table
type ContractAssetBalance struct {
// KeyHash is a hash of the contract balance's ledger entry key
KeyHash []byte `db:"key_hash"`
// ContractID is the contract id of the stellar asset contract
ContractID []byte `db:"asset_contract_id"`
// Amount is the amount held by the contract
Amount string `db:"amount"`
// ExpirationLedger is the latest ledger for which this contract balance
// ledger entry is active
ExpirationLedger uint32 `db:"expiration_ledger"`
}

// InsertContractAssetBalances will insert the given list of rows into the contract_asset_balances table
func (q *Q) InsertContractAssetBalances(ctx context.Context, rows []ContractAssetBalance) error {
if len(rows) == 0 {
return nil
}
builder := &db.BatchInsertBuilder{
Table: q.GetTable("contract_asset_balances"),
MaxBatchSize: maxUpdateBatchSize,
}

for _, row := range rows {
if err := builder.RowStruct(ctx, row); err != nil {
return errors.Wrap(err, "could not insert asset assetStat row")
}
}

if err := builder.Exec(ctx); err != nil {
return errors.Wrap(err, "could not exec asset assetStats insert builder")
}

return nil
}

const maxUpdateBatchSize = 100000

// UpdateContractAssetBalanceAmounts will update the expiration ledgers for the given list of keys
// (if they exist in the db).
func (q *Q) UpdateContractAssetBalanceAmounts(ctx context.Context, keys []xdr.Hash, amounts []string) error {

Check failure on line 126 in services/horizon/internal/db2/history/asset_stats.go

View workflow job for this annotation

GitHub Actions / golangci

126-158 lines are duplicate of `services/horizon/internal/db2/history/asset_stats.go:162-196` (dupl)
if len(keys) == 0 {
return nil
}
for len(keys) > 0 {
var args []interface{}
var values []string

for i := 0; len(keys) > 0 && (i+1)*2 < maxUpdateBatchSize; i++ {
args = append(args, keys[0][:], amounts[0])
values = append(values, "(cast(? as bytea), cast(? as numeric))")
keys = keys[1:]
amounts = amounts[1:]
}

sql := fmt.Sprintf(`
UPDATE contract_asset_balances
SET
amount = myvalues.amount
FROM (
VALUES
%s
) AS myvalues (key_hash, amount)
WHERE contract_asset_balances.key_hash = myvalues.key_hash
`, strings.Join(values, ","))

_, err := q.ExecRaw(ctx, sql, args...)
if err != nil {
return err
}
}
return nil
}

// UpdateContractAssetBalanceExpirations will update the expiration ledgers for the given list of keys
// (if they exist in the db).
func (q *Q) UpdateContractAssetBalanceExpirations(ctx context.Context, keys []xdr.Hash, expirationLedgers []uint32) error {

Check failure on line 162 in services/horizon/internal/db2/history/asset_stats.go

View workflow job for this annotation

GitHub Actions / golangci

162-196 lines are duplicate of `services/horizon/internal/db2/history/asset_stats.go:126-158` (dupl)
if len(keys) == 0 {
return nil
}

for len(keys) > 0 {
var args []interface{}
var values []string

for i := 0; len(keys) > 0 && (i+1)*2 < maxUpdateBatchSize; i++ {
args = append(args, keys[0][:], expirationLedgers[0])
values = append(values, "(cast(? as bytea), cast(? as integer))")
keys = keys[1:]
expirationLedgers = expirationLedgers[1:]
}

sql := fmt.Sprintf(`
UPDATE contract_asset_balances
SET
expiration_ledger = myvalues.expiration
FROM (
VALUES
%s
) AS myvalues (key_hash, expiration)
WHERE contract_asset_balances.key_hash = myvalues.key_hash
`, strings.Join(values, ","))

_, err := q.ExecRaw(ctx, sql, args...)
if err != nil {
return err
}
}

return nil
}

// GetContractAssetBalancesExpiringAt returns all contract asset balances which are active
// at `ledger` and expired at `ledger+1`
func (q *Q) GetContractAssetBalancesExpiringAt(ctx context.Context, ledger uint32) ([]ContractAssetBalance, error) {
sql := sq.Select("contract_asset_balances.*").From("contract_asset_balances").
Where(map[string]interface{}{"expiration_ledger": ledger})
var balances []ContractAssetBalance
err := q.Select(ctx, &balances, sql)
return balances, err
}

// GetContractAssetBalances fetches all contract_asset_balances rows for the
// given list of key hashes.
func (q *Q) GetContractAssetBalances(ctx context.Context, keys []xdr.Hash) ([]ContractAssetBalance, error) {
keyBytes := make([][]byte, len(keys))
for i := range keys {
keyBytes[i] = keys[i][:]
}
sql := sq.Select("contract_asset_balances.*").From("contract_asset_balances").
Where(map[string]interface{}{"key_hash": keyBytes})
var balances []ContractAssetBalance
err := q.Select(ctx, &balances, sql)
return balances, err
}

// RemoveContractAssetBalances removes rows from the contract_asset_balances table
func (q *Q) RemoveContractAssetBalances(ctx context.Context, keys []xdr.Hash) error {
if len(keys) == 0 {
return nil
}
keyBytes := make([][]byte, len(keys))
for i := range keys {
keyBytes[i] = keys[i][:]
}

_, err := q.Exec(ctx, sq.Delete("contract_asset_balances").
Where(map[string]interface{}{
"key_hash": keyBytes,
}))
return err
}

// InsertAssetStat a single asset assetStat row into the exp_asset_stats
// Returns number of rows affected and error.
func (q *Q) InsertAssetStat(ctx context.Context, assetStat ExpAssetStat) (int64, error) {
Expand All @@ -90,6 +248,7 @@ func (q *Q) InsertAssetStat(ctx context.Context, assetStat ExpAssetStat) (int64,
return result.RowsAffected()
}

// InsertAssetContractStat inserts a row into the contract_asset_stats table
func (q *Q) InsertAssetContractStat(ctx context.Context, row ContractStatRow) (int64, error) {
sql := sq.Insert("contract_asset_stats").SetMap(map[string]interface{}{
"contract_id": row.ContractID,
Expand Down Expand Up @@ -117,6 +276,8 @@ func (q *Q) UpdateAssetStat(ctx context.Context, assetStat ExpAssetStat) (int64,
return result.RowsAffected()
}

// UpdateAssetContractStat updates a row in the contract_asset_stats table.
// Returns number of rows afected and error.
func (q *Q) UpdateAssetContractStat(ctx context.Context, row ContractStatRow) (int64, error) {
sql := sq.Update("contract_asset_stats").Set("stat", row.Stat).
Where("contract_id = ?", row.ContractID)
Expand Down Expand Up @@ -144,6 +305,7 @@ func (q *Q) RemoveAssetStat(ctx context.Context, assetType xdr.AssetType, assetC
return result.RowsAffected()
}

// RemoveAssetContractStat removes a row in the contract_asset_stats table.
func (q *Q) RemoveAssetContractStat(ctx context.Context, contractID []byte) (int64, error) {
sql := sq.Delete("contract_asset_stats").
Where("contract_id = ?", contractID)
Expand Down Expand Up @@ -175,25 +337,15 @@ func (q *Q) GetAssetContractStat(ctx context.Context, contractID []byte) (Contra
return assetStat, err
}

func (q *Q) GetAssetStatByContract(ctx context.Context, contractID [32]byte) (ExpAssetStat, error) {
// GetAssetStatByContract returns the row in the exp_asset_stats table corresponding
// to the given contract id
func (q *Q) GetAssetStatByContract(ctx context.Context, contractID xdr.Hash) (ExpAssetStat, error) {
sql := selectAssetStats.Where("contract_id = ?", contractID[:])
var assetStat ExpAssetStat
err := q.Get(ctx, &assetStat, sql)
return assetStat, err
}

func (q *Q) GetAssetStatByContracts(ctx context.Context, contractIDs [][32]byte) ([]ExpAssetStat, error) {
contractIDBytes := make([][]byte, len(contractIDs))
for i := range contractIDs {
contractIDBytes[i] = contractIDs[i][:]
}
sql := selectAssetStats.Where(map[string]interface{}{"contract_id": contractIDBytes})

var assetStats []ExpAssetStat
err := q.Select(ctx, &assetStats, sql)
return assetStats, err
}

func parseAssetStatsCursor(cursor string) (string, string, error) {
parts := strings.SplitN(cursor, "_", 3)
if len(parts) != 3 {
Expand Down
Loading

0 comments on commit f32583f

Please sign in to comment.