diff --git a/services/horizon/internal/db2/history/effect_batch_insert_builder.go b/services/horizon/internal/db2/history/effect_batch_insert_builder.go index 8b2522cf9e..97b2042c3c 100644 --- a/services/horizon/internal/db2/history/effect_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/effect_batch_insert_builder.go @@ -51,9 +51,10 @@ func (i *effectBatchInsertBuilder) Add( "history_account_id": accountID, "address_muxed": muxedAccount, "history_operation_id": operationID, - "\"order\"": order, + "order": order, "type": effectType, - "details": details, + // we need to convert to string in order to make the lib/pq's COPY escaping happy + "details": string(details), }) } diff --git a/services/horizon/internal/db2/history/liquidity_pools.go b/services/horizon/internal/db2/history/liquidity_pools.go index 94ad9bdbda..a06791fa83 100644 --- a/services/horizon/internal/db2/history/liquidity_pools.go +++ b/services/horizon/internal/db2/history/liquidity_pools.go @@ -36,7 +36,11 @@ type LiquidityPool struct { type LiquidityPoolAssetReserves []LiquidityPoolAssetReserve func (c LiquidityPoolAssetReserves) Value() (driver.Value, error) { - return json.Marshal(c) + b, err := json.Marshal(c) + if err != nil { + return nil, err + } + return string(b), err } func (c *LiquidityPoolAssetReserves) Scan(value interface{}) error { diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 165e87af30..e192143c7b 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -366,7 +366,11 @@ type ExpAssetStatAccounts struct { } func (e ExpAssetStatAccounts) Value() (driver.Value, error) { - return json.Marshal(e) + b, err := json.Marshal(e) + if err != nil { + return nil, err + } + return string(b), nil } func (e *ExpAssetStatAccounts) Scan(src interface{}) error { @@ -403,7 +407,11 @@ type ExpAssetStatBalances struct { } func (e ExpAssetStatBalances) Value() (driver.Value, error) { - return json.Marshal(e) + b, err := json.Marshal(e) + if err != nil { + return nil, err + } + return string(b), nil } func (e *ExpAssetStatBalances) Scan(src interface{}) error { diff --git a/services/horizon/internal/db2/history/operation_batch_insert_builder.go b/services/horizon/internal/db2/history/operation_batch_insert_builder.go index a3baee8863..753d51bdf7 100644 --- a/services/horizon/internal/db2/history/operation_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/operation_batch_insert_builder.go @@ -51,11 +51,12 @@ func (i *operationBatchInsertBuilder) Add( sourceAccountMuxed null.String, ) error { return i.builder.Row(ctx, map[string]interface{}{ - "id": id, - "transaction_id": transactionID, - "application_order": applicationOrder, - "type": operationType, - "details": details, + "id": id, + "transaction_id": transactionID, + "application_order": applicationOrder, + "type": operationType, + // we need to convert to string in order to make the lib/pq's COPY escaping happy + "details": string(details), "source_account": sourceAccount, "source_account_muxed": sourceAccountMuxed, }) diff --git a/services/horizon/internal/db2/history/trade_batch_insert_builder.go b/services/horizon/internal/db2/history/trade_batch_insert_builder.go index 3836e5397d..e63fe7d43c 100644 --- a/services/horizon/internal/db2/history/trade_batch_insert_builder.go +++ b/services/horizon/internal/db2/history/trade_batch_insert_builder.go @@ -14,7 +14,7 @@ import ( // rows into the history_trades table type InsertTrade struct { HistoryOperationID int64 `db:"history_operation_id"` - Order int32 `db:"\"order\""` + Order int32 `db:"order"` LedgerCloseTime time.Time `db:"ledger_closed_at"` CounterAssetID int64 `db:"counter_asset_id"` diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index ab34d817e3..0cf285ba8f 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -231,9 +231,7 @@ func TestInsertTransactionDoesNotAllowDuplicateIndex(t *testing.T) { tt.Assert.NoError(insertBuilder.Add(tt.Ctx, secondTransaction, sequence)) tt.Assert.EqualError( insertBuilder.Exec(tt.Ctx), - "error adding values while inserting to history_transactions: "+ - "exec failed: pq: duplicate key value violates unique constraint "+ - "\"hs_transaction_by_id\"", + "pq: duplicate key value violates unique constraint \"hs_transaction_by_id\"", ) ledger := Ledger{ diff --git a/support/db/batch_insert_builder.go b/support/db/batch_insert_builder.go index ee6427286d..8d3e564d40 100644 --- a/support/db/batch_insert_builder.go +++ b/support/db/batch_insert_builder.go @@ -6,7 +6,8 @@ import ( "reflect" "sort" - sq "github.com/Masterminds/squirrel" + "github.com/jmoiron/sqlx" + "github.com/lib/pq" "github.com/stellar/go/support/errors" ) @@ -15,15 +16,11 @@ import ( // It is NOT safe for concurrent use. type BatchInsertBuilder struct { Table *Table - // MaxBatchSize defines the maximum size of a batch. If this number is - // reached after calling Row() it will call Exec() immediately inserting - // all rows to a DB. - // Zero (default) will not add rows until explicitly calling Exec. + // TODO: now unused MaxBatchSize int // Suffix adds a sql expression to the end of the query (e.g. an ON CONFLICT clause) - Suffix string - + Suffix string columns []string rows [][]interface{} rowStructType reflect.Type @@ -36,7 +33,6 @@ type BatchInsertBuilder struct { func (b *BatchInsertBuilder) Row(ctx context.Context, row map[string]interface{}) error { if b.columns == nil { b.columns = make([]string, 0, len(row)) - b.rows = make([][]interface{}, 0) for column := range row { b.columns = append(b.columns, column) @@ -60,18 +56,12 @@ func (b *BatchInsertBuilder) Row(ctx context.Context, row map[string]interface{} b.rows = append(b.rows, rowSlice) - // Call Exec when MaxBatchSize is reached. - if len(b.rows) == b.MaxBatchSize { - return b.Exec(ctx) - } - return nil } func (b *BatchInsertBuilder) RowStruct(ctx context.Context, row interface{}) error { if b.columns == nil { b.columns = ColumnsForStruct(row) - b.rows = make([][]interface{}, 0) } rowType := reflect.TypeOf(row) @@ -89,54 +79,101 @@ func (b *BatchInsertBuilder) RowStruct(ctx context.Context, row interface{}) err for i, rval := range rvals { columnValues[i] = rval.Interface() } - b.rows = append(b.rows, columnValues) - - // Call Exec when MaxBatchSize is reached. - if len(b.rows) == b.MaxBatchSize { - return b.Exec(ctx) - } - return nil } -func (b *BatchInsertBuilder) insertSQL() sq.InsertBuilder { - insertStatement := sq.Insert(b.Table.Name).Columns(b.columns...) - if len(b.Suffix) > 0 { - return insertStatement.Suffix(b.Suffix) - } - return insertStatement -} - // Exec inserts rows in batches. In case of errors it's possible that some batches // were added so this should be run in a DB transaction for easy rollbacks. -func (b *BatchInsertBuilder) Exec(ctx context.Context) error { - sql := b.insertSQL() - paramsCount := 0 - - for _, row := range b.rows { - sql = sql.Values(row...) - paramsCount += len(row) - - if paramsCount > postgresQueryMaxParams-2*len(b.columns) { - _, err := b.Table.Session.Exec(ctx, sql) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error adding values while inserting to %s", b.Table.Name)) - } - paramsCount = 0 - sql = b.insertSQL() +func (b *BatchInsertBuilder) Exec(ctx context.Context) (err error) { + if len(b.rows) == 0 { + // Nothing to do + return nil + } + var ( + bookKeepTx bool + stmt *sqlx.Stmt + ) + + // cleanup + defer func() { + if stmt != nil { + stmt.Close() + } + if bookKeepTx && b.Table.Session.GetTx() != nil { + b.Table.Session.Rollback() + } + }() + + // Begin a transaction if it wasn't started externally + if b.Table.Session.GetTx() == nil { + if err = b.Table.Session.Begin(); err != nil { + return } + bookKeepTx = true } - // Insert last batch - if paramsCount > 0 { - _, err := b.Table.Session.Exec(ctx, sql) - if err != nil { - return errors.Wrap(err, fmt.Sprintf("error adding values while inserting to %s", b.Table.Name)) + // Ensure there is temporary table were to COPY the content + // and later merge into the final table (needed to support the insert suffix) + _, err = b.Table.Session.GetTx().ExecContext( + ctx, + fmt.Sprintf("CREATE TEMP TABLE IF NOT EXISTS tmp_%s (LIKE %s INCLUDING DEFAULTS) ON COMMIT DROP", b.Table.Name, b.Table.Name), + ) + if err != nil { + return + } + + // Start COPY + stmt, err = b.Table.Session.GetTx().PreparexContext(ctx, pq.CopyIn("tmp_"+b.Table.Name, b.columns...)) + if err != nil { + return + } + + // COPY values into temporary table + for _, r := range b.rows { + if _, err = stmt.ExecContext(ctx, r...); err != nil { + return } + + } + if _, err = stmt.ExecContext(ctx); err != nil { + // wrap up statement execution + return } - // Clear the rows so user can reuse it for batch inserting to a single table - b.rows = make([][]interface{}, 0) - return nil + err = stmt.Close() + // mark statement as closed + stmt = nil + if err != nil { + return + } + + // Merge temporary table with final table, using insertion Suffix + _, err = b.Table.Session.GetTx().ExecContext( + ctx, + fmt.Sprintf("INSERT INTO %s SELECT * FROM tmp_%s %s", b.Table.Name, b.Table.Name, b.Suffix), + ) + if err != nil { + return + } + + // Truncate temporary table + // TODO: we could avoid this if we have guarantees of Exec() only being called once + // per transaction + _, err = b.Table.Session.GetTx().ExecContext( + ctx, + fmt.Sprintf("TRUNCATE TABLE tmp_%s", b.Table.Name), + ) + if err != nil { + return + } + + if bookKeepTx { + err = b.Table.Session.Commit() + } + if err == nil { + // Clear the rows so user can reuse it for batch inserting to a single table + b.rows = make([][]interface{}, 0) + } + return } diff --git a/support/db/batch_insert_builder_test.go b/support/db/batch_insert_builder_test.go index e283e8bf57..b9c08099c4 100644 --- a/support/db/batch_insert_builder_test.go +++ b/support/db/batch_insert_builder_test.go @@ -141,8 +141,7 @@ func TestBatchInsertBuilder(t *testing.T) { err = insertBuilder.Exec(ctx) assert.EqualError( - t, err, "error adding values while inserting to people: exec failed: pq:"+ - " duplicate key value violates unique constraint \"people_pkey\"", + t, err, "pq: duplicate key value violates unique constraint \"people_pkey\"", ) insertBuilder.Suffix = "ON CONFLICT (name) DO NOTHING"