Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ingest: Batch Offers insert/updates #3917

Merged
merged 5 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions services/horizon/internal/action_offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ func TestOfferActions_Show(t *testing.T) {
LastModifiedLedger: uint32(4),
}

batch := q.NewOffersBatchInsertBuilder(3)
err = batch.Add(ctx, eurOffer)
err = q.UpsertOffers(ctx, []history.Offer{eurOffer, usdOffer})
ht.Assert.NoError(err)
err = batch.Add(ctx, usdOffer)
ht.Assert.NoError(err)
ht.Assert.NoError(batch.Exec(ctx))

w := ht.Get("/offers")
if ht.Assert.Equal(200, w.Code) {
Expand Down
21 changes: 3 additions & 18 deletions services/horizon/internal/actions/offer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ func TestGetOfferByIDHandler(t *testing.T) {
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)

batch := q.NewOffersBatchInsertBuilder(0)
err = batch.Add(tt.Ctx, eurOffer)
err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer})
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, usdOffer)
tt.Assert.NoError(err)
tt.Assert.NoError(batch.Exec(tt.Ctx))

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -200,14 +196,8 @@ func TestGetOffersHandler(t *testing.T) {
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)

batch := q.NewOffersBatchInsertBuilder(0)
err = batch.Add(tt.Ctx, eurOffer)
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, twoEurOffer)
err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer})
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, usdOffer)
tt.Assert.NoError(err)
tt.Assert.NoError(batch.Exec(tt.Ctx))

t.Run("No filter", func(t *testing.T) {
records, err := handler.GetResourcePage(
Expand Down Expand Up @@ -477,13 +467,8 @@ func TestGetAccountOffersHandler(t *testing.T) {
q := &history.Q{tt.HorizonSession()}
handler := GetAccountOffersHandler{}

batch := q.NewOffersBatchInsertBuilder(0)
err := batch.Add(tt.Ctx, eurOffer)
err = batch.Add(tt.Ctx, twoEurOffer)
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, usdOffer)
err := q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer})
tt.Assert.NoError(err)
tt.Assert.NoError(batch.Exec(tt.Ctx))

records, err := handler.GetResourcePage(
httptest.NewRecorder(),
Expand Down
7 changes: 1 addition & 6 deletions services/horizon/internal/actions/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,7 @@ func TestOrderbookGetResource(t *testing.T) {
}

assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"}))

batch := q.NewOffersBatchInsertBuilder(0)
for _, offer := range offers {
assert.NoError(t, batch.Add(tt.Ctx, offer))
}
assert.NoError(t, batch.Exec(tt.Ctx))
assert.NoError(t, q.UpsertOffers(tt.Ctx, offers))

assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down
93 changes: 74 additions & 19 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
Expand Down Expand Up @@ -573,6 +575,14 @@ type ManageOffer struct {
OfferID int64 `json:"offer_id"`
}

// upsertField is used in upsertRows function generating upsert query for
// different tables.
type upsertField struct {
name string
dbType string
objects []interface{}
}

// Offer is row of data from the `offers` table from horizon DB
type Offer struct {
SellerID string `db:"seller_id"`
Expand All @@ -591,16 +601,6 @@ type Offer struct {
Sponsor null.String `db:"sponsor"`
}

type OffersBatchInsertBuilder interface {
Add(ctx context.Context, offer Offer) error
Exec(ctx context.Context) error
}

// offersBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type offersBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}

// OperationsQ is a helper struct to aid in configuring queries that loads
// slices of Operation structs.
type OperationsQ struct {
Expand Down Expand Up @@ -765,15 +765,6 @@ func (q *Q) NewAccountDataBatchInsertBuilder(maxBatchSize int) AccountDataBatchI
}
}

func (q *Q) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder {
return &offersBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("offers"),
MaxBatchSize: maxBatchSize,
},
}
}

func (q *Q) NewTrustLinesBatchInsertBuilder(maxBatchSize int) TrustLinesBatchInsertBuilder {
return &trustLinesBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Expand Down Expand Up @@ -853,3 +844,67 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
}
return nil
}

// upsertRows builds and executes an upsert query that allows very fast upserts
// to a given table. The final query is of form:
//
// WITH r AS
// (SELECT
// /* unnestPart */
// unnest(?::type1[]), /* field1 */
// unnest(?::type2[]), /* field2 */
// ...
// )
// INSERT INTO table (
// /* insertFieldsPart */
// field1,
// field2,
// ...
// )
// SELECT * from r
// ON CONFLICT (conflictField) DO UPDATE SET
// /* onConflictPart */
// field1 = excluded.field1,
// field2 = excluded.field2,
// ...
func (q *Q) upsertRows(ctx context.Context, table string, conflictField string, fields []upsertField) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
onConflictPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))

for _, field := range fields {
unnestPart = append(
unnestPart,
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name),
)
insertFieldsPart = append(
insertFieldsPart,
field.name,
)
onConflictPart = append(
onConflictPart,
fmt.Sprintf("%s = excluded.%s", field.name, field.name),
)
pqArrays = append(
pqArrays,
pq.Array(field.objects),
)
}

sql := `
WITH r AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + conflictField + `) DO UPDATE SET
` + strings.Join(onConflictPart, ",")

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
sql,
pqArrays...,
)
return err
}

This file was deleted.

12 changes: 4 additions & 8 deletions services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"

"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -30,14 +31,9 @@ func (m *MockQOffers) CountOffers(ctx context.Context) (int, error) {
return a.Get(0).(int), a.Error(1)
}

func (m *MockQOffers) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder {
a := m.Called(maxBatchSize)
return a.Get(0).(OffersBatchInsertBuilder)
}

func (m *MockQOffers) UpdateOffer(ctx context.Context, row Offer) (int64, error) {
a := m.Called(ctx, row)
return a.Get(0).(int64), a.Error(1)
func (m *MockQOffers) UpsertOffers(ctx context.Context, rows []Offer) error {
a := m.Called(ctx, rows)
return a.Error(0)
}

func (m *MockQOffers) RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) {
Expand Down
47 changes: 37 additions & 10 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ type QOffers interface {
GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error)
CountOffers(ctx context.Context) (int, error)
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder
UpdateOffer(ctx context.Context, offer Offer) (int64, error)
UpsertOffers(ctx context.Context, offers []Offer) error
RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error)
Copy link
Contributor

@2opremio 2opremio Sep 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveOffers can be replaced by UpsertOffers (since a removal is implemented by a tombstone which be set with UpserOffers).

This would save us (up to) a query in each offers processor iteration.

CC @bartekn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep it separated for the reasons explained in #3944 (comment).

CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error)
}
Expand Down Expand Up @@ -96,15 +95,43 @@ func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]O
return offers, err
}

// UpdateOffer updates a row in the offers table.
// Returns number of rows affected and error.
func (q *Q) UpdateOffer(ctx context.Context, offer Offer) (int64, error) {
updateBuilder := q.GetTable("offers").Update()
result, err := updateBuilder.SetStruct(offer, []string{}).Where("offer_id = ?", offer.OfferID).Exec(ctx)
if err != nil {
return 0, err
// UpsertOffers upserts a batch of offers in the offerss table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertOffers(ctx context.Context, offers []Offer) error {
var sellerID, sellingAsset, buyingAsset, offerID, amount, priceN, priceD,
price, flags, lastModifiedLedger, sponsor []interface{}

for _, offer := range offers {
sellerID = append(sellerID, offer.SellerID)
offerID = append(offerID, offer.OfferID)
sellingAsset = append(sellingAsset, offer.SellingAsset)
buyingAsset = append(buyingAsset, offer.BuyingAsset)
amount = append(amount, offer.Amount)
priceN = append(priceN, offer.Pricen)
priceD = append(priceD, offer.Priced)
price = append(price, offer.Price)
flags = append(flags, offer.Flags)
lastModifiedLedger = append(lastModifiedLedger, offer.LastModifiedLedger)
sponsor = append(sponsor, offer.Sponsor)
}
return result.RowsAffected()

upsertFields := []upsertField{
{"seller_id", "text", sellerID},
{"offer_id", "bigint", offerID},
{"selling_asset", "text", sellingAsset},
{"buying_asset", "text", buyingAsset},
{"amount", "bigint", amount},
{"pricen", "integer", priceN},
{"priced", "integer", priceD},
{"price", "double precision", price},
{"flags", "integer", flags},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"sponsor", "text", sponsor},
}

return q.upsertRows(ctx, "offers", "offer_id", upsertFields)
}

// RemoveOffers marks rows in the offers table as deleted.
Expand Down

This file was deleted.

10 changes: 2 additions & 8 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,7 @@ var (
)

func insertOffer(tt *test.T, q *Q, offer Offer) error {
batch := q.NewOffersBatchInsertBuilder(0)
err := batch.Add(tt.Ctx, offer)
if err != nil {
return err
}
return batch.Exec(tt.Ctx)
return q.UpsertOffers(tt.Ctx, []Offer{offer})
}

func TestGetOfferByID(t *testing.T) {
Expand Down Expand Up @@ -194,9 +189,8 @@ func TestUpdateOffer(t *testing.T) {
modifiedEurOffer := eurOffer
modifiedEurOffer.Amount -= 10

rowsAffected, err := q.UpdateOffer(tt.Ctx, modifiedEurOffer)
err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer})
tt.Assert.NoError(err)
tt.Assert.Equal(int64(1), rowsAffected)

offers, err = q.GetAllOffers(tt.Ctx)
tt.Assert.NoError(err)
Expand Down
13 changes: 2 additions & 11 deletions services/horizon/internal/db2/history/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,7 @@ func TestGetOrderBookSummary(t *testing.T) {
} {
t.Run(testCase.name, func(t *testing.T) {
assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"}))

batch := q.NewOffersBatchInsertBuilder(0)
for _, offer := range testCase.offers {
assert.NoError(t, batch.Add(tt.Ctx, offer))
}
assert.NoError(t, batch.Exec(tt.Ctx))
assert.NoError(t, q.UpsertOffers(tt.Ctx, testCase.offers))

assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down Expand Up @@ -260,11 +255,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
sellEurOffer,
}

batch := q.NewOffersBatchInsertBuilder(0)
for _, offer := range offers {
assert.NoError(t, batch.Add(tt.Ctx, offer))
}
assert.NoError(t, batch.Exec(tt.Ctx))
assert.NoError(t, q.UpsertOffers(tt.Ctx, offers))

assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down
Loading