diff --git a/services/horizon/internal/db2/history/account.go b/services/horizon/internal/db2/history/account.go index bc53b0bd7f..f9ff2d1117 100644 --- a/services/horizon/internal/db2/history/account.go +++ b/services/horizon/internal/db2/history/account.go @@ -47,6 +47,26 @@ func (q *AccountsQ) Select(dest interface{}) error { return q.Err } +// AccountsByAddresses loads a rows from `history_accounts`, by addresses +func (q *Q) AccountsByAddresses(dest interface{}, addresses []string) error { + sql := selectAccount.Where(map[string]interface{}{ + "ha.address": addresses, // ha.address IN (...) + }) + return q.Select(dest, sql) +} + +// CreateAccounts creates rows for addresses in history_accounts table and +// put +func (q *Q) CreateAccounts(dest interface{}, addresses []string) error { + sql := sq.Insert("history_accounts").Columns("address") + for _, address := range addresses { + sql = sql.Values(address) + } + sql = sql.Suffix("RETURNING *") + + return q.Select(dest, sql) +} + // Return id for account. If account doesn't exist, it will be created and the new id returned. func (q *Q) GetCreateAccountID( aid xdr.AccountId, diff --git a/services/horizon/internal/ingest/effect_ingestion.go b/services/horizon/internal/ingest/effect_ingestion.go index ce19959d64..2e106796f6 100644 --- a/services/horizon/internal/ingest/effect_ingestion.go +++ b/services/horizon/internal/ingest/effect_ingestion.go @@ -13,14 +13,8 @@ func (ei *EffectIngestion) Add(aid xdr.AccountId, typ history.EffectType, detail } ei.added++ - var haid int64 - haid, ei.err = ei.Dest.GetCreateAccountID(aid) - if ei.err != nil { - return false - } - - ei.err = ei.Dest.Effect(haid, ei.OperationID, ei.added, typ, details) + ei.err = ei.Dest.Effect(aid.Address(), ei.OperationID, ei.added, typ, details) if ei.err != nil { return false } diff --git a/services/horizon/internal/ingest/ingestion.go b/services/horizon/internal/ingest/ingestion.go index 2884d065ea..1b87cddfbd 100644 --- a/services/horizon/internal/ingest/ingestion.go +++ b/services/horizon/internal/ingest/ingestion.go @@ -68,53 +68,20 @@ func (ingest *Ingestion) Close() error { } // Effect adds a new row into the `history_effects` table. -func (ingest *Ingestion) Effect(aid int64, opid int64, order int, typ history.EffectType, details interface{}) error { +func (ingest *Ingestion) Effect(address string, opid int64, order int, typ history.EffectType, details interface{}) error { djson, err := json.Marshal(details) if err != nil { return err } - ingest.effects = ingest.effects.Values(aid, opid, order, typ, djson) - ingest.effectsQueryParams += 5 - err = ingest.flushQueriesIfNeeded() - if err != nil { - return err - } - - return nil -} - -// flushQueriesIfNeeded flush queries when number of params exceed 60k params. -// PostgreSQL supports up to 65535 parameters -func (ingest *Ingestion) flushQueriesIfNeeded() error { - var err error - - if ingest.operationsQueryParams > 60000 { - _, err = ingest.DB.Exec(ingest.operations) - if err != nil { - return err - } - ingest.operationsQueryParams = 0 - ingest.createOperationsInsertBuilder() - } - - if ingest.operationParticipantsQueryParams > 60000 { - _, err = ingest.DB.Exec(ingest.operation_participants) - if err != nil { - return err - } - ingest.operationParticipantsQueryParams = 0 - ingest.createOperationParticipantsInsertBuilder() - } - - if ingest.effectsQueryParams > 60000 { - _, err = ingest.DB.Exec(ingest.effects) - if err != nil { - return err - } - ingest.effectsQueryParams = 0 - ingest.createEffectsInsertBuilder() + effect := &effectRow{ + Address: address, + OperationID: opid, + Order: order, + Type: typ, + Details: djson, } + ingest.rowsToInsert = append(ingest.rowsToInsert, effect) return nil } @@ -122,26 +89,47 @@ func (ingest *Ingestion) flushQueriesIfNeeded() error { // Flush writes the currently buffered rows to the db, and if successful // starts a new transaction. func (ingest *Ingestion) Flush() error { - var err error + // Update IDs for accounts + err := ingest.UpdateAccountIDs() + if err != nil { + return errors.Wrap(err, "Error while updating account ids") + } + + // Inserts + paramsCount := map[TableName]int{} + for _, row := range ingest.rowsToInsert { + tableName := row.GetTableName() + params := row.GetParams() - if ingest.operationsQueryParams > 0 { - _, err = ingest.DB.Exec(ingest.operations) - if err != nil { - return err + if _, ok := ingest.builders[tableName]; !ok { + return errors.Errorf("%s insert builder does not exist", tableName) } - } - if ingest.operationParticipantsQueryParams > 0 { - _, err = ingest.DB.Exec(ingest.operation_participants) - if err != nil { - return err + ingest.builders[tableName] = ingest.builders[tableName].Values(params...) + paramsCount[tableName] += len(params) + + // PostgreSQL supports up to 65535 parameters. + if paramsCount[tableName] > 65000 { + fmt.Println("flushing", tableName) + _, err = ingest.DB.Exec(ingest.builders[tableName]) + if err != nil { + return err + } + paramsCount[tableName] = 0 + ingest.createInsertBuilderByTableName(tableName) } } - if ingest.effectsQueryParams > 0 { - _, err = ingest.DB.Exec(ingest.effects) - if err != nil { - return err + fmt.Println("paramsCount", paramsCount) + + // Exec the rest + for tableName, params := range paramsCount { + if params > 0 { + fmt.Println("last flushing", tableName) + _, err = ingest.DB.Exec(ingest.builders[tableName]) + if err != nil { + return err + } } } @@ -205,36 +193,86 @@ func (ingest *Ingestion) Operation( return err } - if typ == xdr.OperationTypeAccountMerge { - delete(ingest.accountIDMapping, source) + operation := operationRow{ + ID: id, + TxID: txid, + Order: order, + Source: source.Address(), + Type: typ, + Details: djson, } + ingest.rowsToInsert = append(ingest.rowsToInsert, operation) + return nil +} - ingest.operations = ingest.operations.Values(id, txid, order, source.Address(), typ, djson) - ingest.operationsQueryParams += 6 - err = ingest.flushQueriesIfNeeded() +// UpdateAccountIDs updates IDs of the accounts before inserting +// objects into DB. +func (ingest *Ingestion) UpdateAccountIDs() error { + accounts := map[string]int64{} + addresses := []string{} + + // Collect addresses to fetch + for _, row := range ingest.rowsToInsert { + for _, address := range row.GetAddresses() { + if _, exists := accounts[address]; !exists { + addresses = append(addresses, address) + } + accounts[address] = 0 + } + } + + if len(addresses) == 0 { + return nil + } + + fmt.Println("addresses to load", len(addresses)) + + // Get IDs and update map + q := history.Q{Session: ingest.DB} + dbAccounts := make([]history.Account, 0, len(addresses)) + err := q.AccountsByAddresses(&dbAccounts, addresses) if err != nil { return err } - return nil -} + fmt.Println("accounts loaded", len(dbAccounts)) -// GetCreateAccountID works like history.Q.GetCreateAccountID but is caching results. -// NOTE - what if account gets merged? -// NOTE - how much the cache it can grow? -func (ingest *Ingestion) GetCreateAccountID(aid xdr.AccountId) (int64, error) { - if ingest.accountIDMapping[aid] != 0 { - return ingest.accountIDMapping[aid], nil + for _, row := range dbAccounts { + accounts[row.Address] = row.ID } - q := history.Q{Session: ingest.DB} - haid, err := q.GetCreateAccountID(aid) + // Insert non-existent addresses and update map + addresses = []string{} + for address, id := range accounts { + if id == 0 { + addresses = append(addresses, address) + } + } + + if len(addresses) == 0 { + return nil + } + + fmt.Println("accounts to insert", len(addresses)) + + dbAccounts = make([]history.Account, 0, len(addresses)) + err = q.CreateAccounts(&dbAccounts, addresses) if err != nil { - return 0, err + return err + } + + fmt.Println("accounts inserted", len(dbAccounts)) + + for _, row := range dbAccounts { + accounts[row.Address] = row.ID } - ingest.accountIDMapping[aid] = haid - return haid, nil + // Update IDs in objects + for _, row := range ingest.rowsToInsert { + row.UpdateAccountIDs(accounts) + } + + return nil } // OperationParticipants ingests the provided accounts `aids` as participants of @@ -242,17 +280,11 @@ func (ingest *Ingestion) GetCreateAccountID(aid xdr.AccountId) (int64, error) { // `history_operation_participants` table. func (ingest *Ingestion) OperationParticipants(op int64, aids []xdr.AccountId) error { for _, aid := range aids { - haid, err := ingest.GetCreateAccountID(aid) - if err != nil { - return err - } - ingest.operation_participants = ingest.operation_participants.Values(op, haid) - - ingest.operationParticipantsQueryParams += 2 - err = ingest.flushQueriesIfNeeded() - if err != nil { - return err + operationParticipant := &operationParticipantRow{ + OperationID: op, + Address: aid.Address(), } + ingest.rowsToInsert = append(ingest.rowsToInsert, operationParticipant) } return nil @@ -271,13 +303,10 @@ func (ingest *Ingestion) Start() (err error) { return } - ingest.accountIDMapping = make(map[xdr.AccountId]int64) - + // We need to recreate builders and clear `rowsToInsert` because `Ingestion` + // object can be used to ingest more than one ledger. ingest.createInsertBuilders() - ingest.effectsQueryParams = 0 - ingest.operationsQueryParams = 0 - ingest.operationParticipantsQueryParams = 0 - + ingest.rowsToInsert = []row{} return } @@ -318,15 +347,9 @@ func (ingest *Ingestion) Trade( ) error { q := history.Q{Session: ingest.DB} - sellerAccountId, err := ingest.GetCreateAccountID(trade.SellerId) - if err != nil { - return errors.Wrap(err, "failed to load seller account id") - } + sellerAddress := trade.SellerId.Address() + buyerAddress := buyer.Address() - buyerAccountId, err := ingest.GetCreateAccountID(buyer) - if err != nil { - return errors.Wrap(err, "failed to load buyer account id") - } soldAssetId, err := q.GetCreateAssetID(trade.AssetSold) if err != nil { return errors.Wrap(err, "failed to get sold asset id") @@ -337,36 +360,33 @@ func (ingest *Ingestion) Trade( return errors.Wrap(err, "failed to get bought asset id") } var baseAssetId, counterAssetId int64 - var baseAccountId, counterAccountId int64 + var baseAddress, counterAddress string var baseAmount, counterAmount xdr.Int64 //map seller and buyer to base and counter based on ordering of ids if soldAssetId < boughtAssetId { - baseAccountId, baseAssetId, baseAmount, counterAccountId, counterAssetId, counterAmount = - sellerAccountId, soldAssetId, trade.AmountSold, buyerAccountId, boughtAssetId, trade.AmountBought + baseAddress, baseAssetId, baseAmount, counterAddress, counterAssetId, counterAmount = + sellerAddress, soldAssetId, trade.AmountSold, buyerAddress, boughtAssetId, trade.AmountBought } else { - baseAccountId, baseAssetId, baseAmount, counterAccountId, counterAssetId, counterAmount = - buyerAccountId, boughtAssetId, trade.AmountBought, sellerAccountId, soldAssetId, trade.AmountSold - } - - sql := ingest.trades.Values( - opid, - order, - time.Unix(ledgerClosedAt, 0).UTC(), - trade.OfferId, - baseAccountId, - baseAssetId, - baseAmount, - counterAccountId, - counterAssetId, - counterAmount, - soldAssetId < boughtAssetId, - ) - _, err = ingest.DB.Exec(sql) - if err != nil { - return errors.Wrap(err, "failed to exec sql") + baseAddress, baseAssetId, baseAmount, counterAddress, counterAssetId, counterAmount = + buyerAddress, boughtAssetId, trade.AmountBought, sellerAddress, soldAssetId, trade.AmountSold } + tradeR := &tradeRow{ + OperationID: opid, + Order: order, + LedgerCloseAt: time.Unix(ledgerClosedAt, 0).UTC(), + OfferID: trade.OfferId, + BaseAssetID: baseAssetId, + BaseAmount: baseAmount, + CounterAssetID: counterAssetId, + CounterAmount: counterAmount, + BaseIsSeller: soldAssetId < boughtAssetId, + + BaseAddress: baseAddress, + CounterAddress: counterAddress, + } + ingest.rowsToInsert = append(ingest.rowsToInsert, tradeR) return nil } @@ -391,26 +411,19 @@ func (ingest *Ingestion) Transaction( // transaction with id `tx`, creating a new row in the // `history_transaction_participants` table. func (ingest *Ingestion) TransactionParticipants(tx int64, aids []xdr.AccountId) error { - sql := ingest.transaction_participants - for _, aid := range aids { - haid, err := ingest.GetCreateAccountID(aid) - if err != nil { - return err + transactionParticipant := &transactionParticipantRow{ + TransactionID: tx, + Address: aid.Address(), } - sql = sql.Values(tx, haid) - } - - _, err := ingest.DB.Exec(sql) - if err != nil { - return err + ingest.rowsToInsert = append(ingest.rowsToInsert, transactionParticipant) } return nil } func (ingest *Ingestion) createOperationsInsertBuilder() { - ingest.operations = sq.Insert("history_operations").Columns( + ingest.builders[OperationsTableName] = sq.Insert(string(OperationsTableName)).Columns( "id", "transaction_id", "application_order", @@ -421,14 +434,21 @@ func (ingest *Ingestion) createOperationsInsertBuilder() { } func (ingest *Ingestion) createOperationParticipantsInsertBuilder() { - ingest.operation_participants = sq.Insert("history_operation_participants").Columns( + ingest.builders[OperationParticipantsTableName] = sq.Insert(string(OperationParticipantsTableName)).Columns( "history_operation_id", "history_account_id", ) } +func (ingest *Ingestion) createTransactionParticipantsInsertBuilder() { + ingest.builders[TransactionParticipantsTableName] = sq.Insert(string(TransactionParticipantsTableName)).Columns( + "history_transaction_id", + "history_account_id", + ) +} + func (ingest *Ingestion) createEffectsInsertBuilder() { - ingest.effects = sq.Insert("history_effects").Columns( + ingest.builders[EffectsTableName] = sq.Insert(string(EffectsTableName)).Columns( "history_account_id", "history_operation_id", "\"order\"", @@ -437,7 +457,42 @@ func (ingest *Ingestion) createEffectsInsertBuilder() { ) } +func (ingest *Ingestion) createTradesInsertBuilder() { + ingest.builders[TradesTableName] = sq.Insert(string(TradesTableName)).Columns( + "history_operation_id", + "\"order\"", + "ledger_closed_at", + "offer_id", + "base_account_id", + "base_asset_id", + "base_amount", + "counter_account_id", + "counter_asset_id", + "counter_amount", + "base_is_seller", + ) +} + +func (ingest *Ingestion) createInsertBuilderByTableName(name TableName) { + switch name { + case OperationsTableName: + ingest.createOperationsInsertBuilder() + case EffectsTableName: + ingest.createEffectsInsertBuilder() + case TradesTableName: + ingest.createTradesInsertBuilder() + case OperationParticipantsTableName: + ingest.createOperationParticipantsInsertBuilder() + case TransactionParticipantsTableName: + ingest.createTransactionParticipantsInsertBuilder() + default: + panic("Invalid table name") + } +} + func (ingest *Ingestion) createInsertBuilders() { + ingest.builders = make(map[TableName]sq.InsertBuilder) + ingest.ledgers = sq.Insert("history_ledgers").Columns( "importer_version", "id", @@ -458,10 +513,6 @@ func (ingest *Ingestion) createInsertBuilders() { "ledger_header", ) - ingest.accounts = sq.Insert("history_accounts").Columns( - "address", - ) - ingest.transactions = sq.Insert("history_transactions").Columns( "id", "transaction_hash", @@ -483,28 +534,11 @@ func (ingest *Ingestion) createInsertBuilders() { "updated_at", ) - ingest.transaction_participants = sq.Insert("history_transaction_participants").Columns( - "history_transaction_id", - "history_account_id", - ) - ingest.createOperationsInsertBuilder() ingest.createOperationParticipantsInsertBuilder() + ingest.createTransactionParticipantsInsertBuilder() ingest.createEffectsInsertBuilder() - - ingest.trades = sq.Insert("history_trades").Columns( - "history_operation_id", - "\"order\"", - "ledger_closed_at", - "offer_id", - "base_account_id", - "base_asset_id", - "base_amount", - "counter_account_id", - "counter_asset_id", - "counter_amount", - "base_is_seller", - ) + ingest.createTradesInsertBuilder() ingest.assetStats = sq.Insert("asset_stats").Columns( "id", diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 50329a012f..2091e8c99f 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -5,10 +5,12 @@ package ingest import ( "sync" + "time" sq "github.com/Masterminds/squirrel" metrics "github.com/rcrowley/go-metrics" "github.com/stellar/go/services/horizon/internal/db2/core" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -110,27 +112,97 @@ type IngesterMetrics struct { // AssetsModified tracks all the assets modified during a cycle of ingestion type AssetsModified map[string]xdr.Asset +type TableName string + +const ( + OperationsTableName TableName = "history_operations" + EffectsTableName TableName = "history_effects" + TransactionParticipantsTableName TableName = "history_transaction_participants" + TradesTableName TableName = "trades" + OperationParticipantsTableName TableName = "history_operation_participants" +) + +// row should be implemented by objects added to DB during ingestion. +type row interface { + // GetParams returns fields to be added to DB. Objects can contain + // more helper fields that are not added to DB. + GetParams() []interface{} + // UpdateAccountIDs updates fields with account IDs by using provided + // address => id mapping. + UpdateAccountIDs(accounts map[string]int64) + // GetAddresses returns a list of addresses to find corresponding IDs + GetAddresses() []string + GetTableName() TableName +} + +type effectRow struct { + AccountID int64 + OperationID int64 + Order int + Type history.EffectType + Details []byte + + Address string +} + +type operationRow struct { + ID int64 + TxID int64 + Order int32 + Source string + Type xdr.OperationType + Details []byte +} + +type operationParticipantRow struct { + OperationID int64 + AccountID int64 + + Address string +} + +type tradeRow struct { + OperationID int64 + Order int32 + LedgerCloseAt time.Time + OfferID xdr.Uint64 + BaseAccountID int64 + BaseAssetID int64 + BaseAmount xdr.Int64 + CounterAccountID int64 + CounterAssetID int64 + CounterAmount xdr.Int64 + BaseIsSeller bool + + BaseAddress string + CounterAddress string +} + +type transactionParticipantRow struct { + TransactionID int64 + AccountID int64 + + Address string +} + // Ingestion receives write requests from a Session type Ingestion struct { // DB is the sql connection to be used for writing any rows into the horizon // database. DB *db.Session + builders map[TableName]sq.InsertBuilder + ledgers sq.InsertBuilder transactions sq.InsertBuilder transaction_participants sq.InsertBuilder operations sq.InsertBuilder operation_participants sq.InsertBuilder effects sq.InsertBuilder - accounts sq.InsertBuilder trades sq.InsertBuilder assetStats sq.InsertBuilder - effectsQueryParams int - operationsQueryParams int - operationParticipantsQueryParams int - - accountIDMapping map[xdr.AccountId]int64 + rowsToInsert []row } // Session represents a single attempt at ingesting data into the history diff --git a/services/horizon/internal/ingest/rows.go b/services/horizon/internal/ingest/rows.go new file mode 100644 index 0000000000..4a43f1c4ef --- /dev/null +++ b/services/horizon/internal/ingest/rows.go @@ -0,0 +1,113 @@ +package ingest + +func (r effectRow) GetParams() []interface{} { + return []interface{}{ + r.AccountID, + r.OperationID, + r.Order, + r.Type, + r.Details, + } +} + +func (r *effectRow) UpdateAccountIDs(accounts map[string]int64) { + r.AccountID = accounts[r.Address] +} + +func (r effectRow) GetAddresses() []string { + return []string{r.Address} +} + +func (r effectRow) GetTableName() TableName { + return EffectsTableName +} + +func (r operationRow) GetParams() []interface{} { + return []interface{}{ + r.ID, + r.TxID, + r.Order, + r.Source, + r.Type, + r.Details, + } +} + +func (r operationRow) UpdateAccountIDs(accounts map[string]int64) { + return +} + +func (r operationRow) GetAddresses() []string { + return nil +} + +func (r operationRow) GetTableName() TableName { + return OperationsTableName +} + +func (r operationParticipantRow) GetParams() []interface{} { + return []interface{}{ + r.OperationID, + r.AccountID, + } +} + +func (r *operationParticipantRow) UpdateAccountIDs(accounts map[string]int64) { + r.AccountID = accounts[r.Address] +} + +func (r operationParticipantRow) GetAddresses() []string { + return []string{r.Address} +} + +func (r operationParticipantRow) GetTableName() TableName { + return OperationParticipantsTableName +} + +func (r tradeRow) GetParams() []interface{} { + return []interface{}{ + r.OperationID, + r.Order, + r.LedgerCloseAt, + r.OfferID, + r.BaseAccountID, + r.BaseAssetID, + r.BaseAmount, + r.CounterAccountID, + r.CounterAssetID, + r.CounterAmount, + r.BaseIsSeller, + } +} + +func (r *tradeRow) UpdateAccountIDs(accounts map[string]int64) { + r.BaseAccountID = accounts[r.BaseAddress] + r.CounterAccountID = accounts[r.CounterAddress] +} + +func (r tradeRow) GetAddresses() []string { + return []string{r.BaseAddress, r.CounterAddress} +} + +func (r tradeRow) GetTableName() TableName { + return TradesTableName +} + +func (r transactionParticipantRow) GetParams() []interface{} { + return []interface{}{ + r.TransactionID, + r.AccountID, + } +} + +func (r *transactionParticipantRow) UpdateAccountIDs(accounts map[string]int64) { + r.AccountID = accounts[r.Address] +} + +func (r transactionParticipantRow) GetAddresses() []string { + return []string{r.Address} +} + +func (r transactionParticipantRow) GetTableName() TableName { + return TransactionParticipantsTableName +} diff --git a/services/horizon/internal/ingest/session.go b/services/horizon/internal/ingest/session.go index 725993fbcc..037e6dbbfe 100644 --- a/services/horizon/internal/ingest/session.go +++ b/services/horizon/internal/ingest/session.go @@ -27,6 +27,13 @@ func (is *Session) Run() { return } + start2 := time.Now() + + defer func() { + elapsed := time.Now().Sub(start2) + fmt.Println("Session.Run", elapsed) + }() + is.Err = is.Ingestion.Start() if is.Err != nil { return @@ -96,6 +103,13 @@ func (is *Session) effectFlagDetails(flagDetails map[string]bool, flagPtr *xdr.U } func (is *Session) flush() { + start2 := time.Now() + + defer func() { + elapsed := time.Now().Sub(start2) + fmt.Println("Session.flush", elapsed) + }() + if is.Err != nil { return } @@ -315,13 +329,6 @@ func (is *Session) ingestEffects() { // ingestLedger ingests the current ledger func (is *Session) ingestLedger() { - start2 := time.Now() - - defer func() { - elapsed := time.Now().Sub(start2) - fmt.Println("ingestLedger", elapsed) - }() - if is.Err != nil { return } @@ -539,13 +546,6 @@ func (is *Session) tradeDetails(buyer, seller xdr.AccountId, claim xdr.ClaimOffe } func (is *Session) ingestTransaction() { - start2 := time.Now() - - defer func() { - elapsed := time.Now().Sub(start2) - fmt.Println("ingestTransaction", elapsed) - }() - if is.Err != nil { return }