Skip to content

Commit

Permalink
services/horizon/ingest: Batch Offers insert/updates (#3917)
Browse files Browse the repository at this point in the history
This commit adds a new `UpsertOffers` method (and generic `upsertRows`) and
modifies `OffersProcessor` to use it.

The performance of `OffersProcessor` degraded recently due to number of offers
in the network and batching updates should improve the performance of the
processor and ingestion overall.
  • Loading branch information
bartekn committed Sep 14, 2021
1 parent 2a5c9ac commit 56f3f88
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 249 deletions.
6 changes: 1 addition & 5 deletions services/horizon/internal/action_offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,8 @@ func TestOfferActions_Show(t *testing.T) {
LastModifiedLedger: uint32(4),
}

batch := q.NewOffersBatchInsertBuilder(3)
err = batch.Add(ctx, eurOffer)
err = q.UpsertOffers(ctx, []history.Offer{eurOffer, usdOffer})
ht.Assert.NoError(err)
err = batch.Add(ctx, usdOffer)
ht.Assert.NoError(err)
ht.Assert.NoError(batch.Exec(ctx))

w := ht.Get("/offers")
if ht.Assert.Equal(200, w.Code) {
Expand Down
21 changes: 3 additions & 18 deletions services/horizon/internal/actions/offer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,8 @@ func TestGetOfferByIDHandler(t *testing.T) {
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)

batch := q.NewOffersBatchInsertBuilder(0)
err = batch.Add(tt.Ctx, eurOffer)
err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer})
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, usdOffer)
tt.Assert.NoError(err)
tt.Assert.NoError(batch.Exec(tt.Ctx))

for _, testCase := range []struct {
name string
Expand Down Expand Up @@ -200,14 +196,8 @@ func TestGetOffersHandler(t *testing.T) {
}, 0, 0, 0, 0, 0)
tt.Assert.NoError(err)

batch := q.NewOffersBatchInsertBuilder(0)
err = batch.Add(tt.Ctx, eurOffer)
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, twoEurOffer)
err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer})
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, usdOffer)
tt.Assert.NoError(err)
tt.Assert.NoError(batch.Exec(tt.Ctx))

t.Run("No filter", func(t *testing.T) {
records, err := handler.GetResourcePage(
Expand Down Expand Up @@ -477,13 +467,8 @@ func TestGetAccountOffersHandler(t *testing.T) {
q := &history.Q{tt.HorizonSession()}
handler := GetAccountOffersHandler{}

batch := q.NewOffersBatchInsertBuilder(0)
err := batch.Add(tt.Ctx, eurOffer)
err = batch.Add(tt.Ctx, twoEurOffer)
tt.Assert.NoError(err)
err = batch.Add(tt.Ctx, usdOffer)
err := q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer})
tt.Assert.NoError(err)
tt.Assert.NoError(batch.Exec(tt.Ctx))

records, err := handler.GetResourcePage(
httptest.NewRecorder(),
Expand Down
7 changes: 1 addition & 6 deletions services/horizon/internal/actions/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,7 @@ func TestOrderbookGetResource(t *testing.T) {
}

assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"}))

batch := q.NewOffersBatchInsertBuilder(0)
for _, offer := range offers {
assert.NoError(t, batch.Add(tt.Ctx, offer))
}
assert.NoError(t, batch.Exec(tt.Ctx))
assert.NoError(t, q.UpsertOffers(tt.Ctx, offers))

assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down
93 changes: 74 additions & 19 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"database/sql/driver"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/guregu/null"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
Expand Down Expand Up @@ -573,6 +575,14 @@ type ManageOffer struct {
OfferID int64 `json:"offer_id"`
}

// upsertField is used in upsertRows function generating upsert query for
// different tables.
type upsertField struct {
name string
dbType string
objects []interface{}
}

// Offer is row of data from the `offers` table from horizon DB
type Offer struct {
SellerID string `db:"seller_id"`
Expand All @@ -591,16 +601,6 @@ type Offer struct {
Sponsor null.String `db:"sponsor"`
}

type OffersBatchInsertBuilder interface {
Add(ctx context.Context, offer Offer) error
Exec(ctx context.Context) error
}

// offersBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder
type offersBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}

// OperationsQ is a helper struct to aid in configuring queries that loads
// slices of Operation structs.
type OperationsQ struct {
Expand Down Expand Up @@ -765,15 +765,6 @@ func (q *Q) NewAccountDataBatchInsertBuilder(maxBatchSize int) AccountDataBatchI
}
}

func (q *Q) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder {
return &offersBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("offers"),
MaxBatchSize: maxBatchSize,
},
}
}

func (q *Q) NewTrustLinesBatchInsertBuilder(maxBatchSize int) TrustLinesBatchInsertBuilder {
return &trustLinesBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Expand Down Expand Up @@ -853,3 +844,67 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
}
return nil
}

// upsertRows builds and executes an upsert query that allows very fast upserts
// to a given table. The final query is of form:
//
// WITH r AS
// (SELECT
// /* unnestPart */
// unnest(?::type1[]), /* field1 */
// unnest(?::type2[]), /* field2 */
// ...
// )
// INSERT INTO table (
// /* insertFieldsPart */
// field1,
// field2,
// ...
// )
// SELECT * from r
// ON CONFLICT (conflictField) DO UPDATE SET
// /* onConflictPart */
// field1 = excluded.field1,
// field2 = excluded.field2,
// ...
func (q *Q) upsertRows(ctx context.Context, table string, conflictField string, fields []upsertField) error {
unnestPart := make([]string, 0, len(fields))
insertFieldsPart := make([]string, 0, len(fields))
onConflictPart := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))

for _, field := range fields {
unnestPart = append(
unnestPart,
fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name),
)
insertFieldsPart = append(
insertFieldsPart,
field.name,
)
onConflictPart = append(
onConflictPart,
fmt.Sprintf("%s = excluded.%s", field.name, field.name),
)
pqArrays = append(
pqArrays,
pq.Array(field.objects),
)
}

sql := `
WITH r AS
(SELECT ` + strings.Join(unnestPart, ",") + `)
INSERT INTO ` + table + `
(` + strings.Join(insertFieldsPart, ",") + `)
SELECT * from r
ON CONFLICT (` + conflictField + `) DO UPDATE SET
` + strings.Join(onConflictPart, ",")

_, err := q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType),
sql,
pqArrays...,
)
return err
}

This file was deleted.

12 changes: 4 additions & 8 deletions services/horizon/internal/db2/history/mock_q_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"

"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -30,14 +31,9 @@ func (m *MockQOffers) CountOffers(ctx context.Context) (int, error) {
return a.Get(0).(int), a.Error(1)
}

func (m *MockQOffers) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder {
a := m.Called(maxBatchSize)
return a.Get(0).(OffersBatchInsertBuilder)
}

func (m *MockQOffers) UpdateOffer(ctx context.Context, row Offer) (int64, error) {
a := m.Called(ctx, row)
return a.Get(0).(int64), a.Error(1)
func (m *MockQOffers) UpsertOffers(ctx context.Context, rows []Offer) error {
a := m.Called(ctx, rows)
return a.Error(0)
}

func (m *MockQOffers) RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) {
Expand Down
47 changes: 37 additions & 10 deletions services/horizon/internal/db2/history/offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ type QOffers interface {
GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error)
CountOffers(ctx context.Context) (int, error)
GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error)
NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder
UpdateOffer(ctx context.Context, offer Offer) (int64, error)
UpsertOffers(ctx context.Context, offers []Offer) error
RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error)
CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error)
}
Expand Down Expand Up @@ -96,15 +95,43 @@ func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]O
return offers, err
}

// UpdateOffer updates a row in the offers table.
// Returns number of rows affected and error.
func (q *Q) UpdateOffer(ctx context.Context, offer Offer) (int64, error) {
updateBuilder := q.GetTable("offers").Update()
result, err := updateBuilder.SetStruct(offer, []string{}).Where("offer_id = ?", offer.OfferID).Exec(ctx)
if err != nil {
return 0, err
// UpsertOffers upserts a batch of offers in the offerss table.
// There's currently no limit of the number of offers this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertOffers(ctx context.Context, offers []Offer) error {
var sellerID, sellingAsset, buyingAsset, offerID, amount, priceN, priceD,
price, flags, lastModifiedLedger, sponsor []interface{}

for _, offer := range offers {
sellerID = append(sellerID, offer.SellerID)
offerID = append(offerID, offer.OfferID)
sellingAsset = append(sellingAsset, offer.SellingAsset)
buyingAsset = append(buyingAsset, offer.BuyingAsset)
amount = append(amount, offer.Amount)
priceN = append(priceN, offer.Pricen)
priceD = append(priceD, offer.Priced)
price = append(price, offer.Price)
flags = append(flags, offer.Flags)
lastModifiedLedger = append(lastModifiedLedger, offer.LastModifiedLedger)
sponsor = append(sponsor, offer.Sponsor)
}
return result.RowsAffected()

upsertFields := []upsertField{
{"seller_id", "text", sellerID},
{"offer_id", "bigint", offerID},
{"selling_asset", "text", sellingAsset},
{"buying_asset", "text", buyingAsset},
{"amount", "bigint", amount},
{"pricen", "integer", priceN},
{"priced", "integer", priceD},
{"price", "double precision", price},
{"flags", "integer", flags},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"sponsor", "text", sponsor},
}

return q.upsertRows(ctx, "offers", "offer_id", upsertFields)
}

// RemoveOffers marks rows in the offers table as deleted.
Expand Down

This file was deleted.

10 changes: 2 additions & 8 deletions services/horizon/internal/db2/history/offers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,7 @@ var (
)

func insertOffer(tt *test.T, q *Q, offer Offer) error {
batch := q.NewOffersBatchInsertBuilder(0)
err := batch.Add(tt.Ctx, offer)
if err != nil {
return err
}
return batch.Exec(tt.Ctx)
return q.UpsertOffers(tt.Ctx, []Offer{offer})
}

func TestGetOfferByID(t *testing.T) {
Expand Down Expand Up @@ -194,9 +189,8 @@ func TestUpdateOffer(t *testing.T) {
modifiedEurOffer := eurOffer
modifiedEurOffer.Amount -= 10

rowsAffected, err := q.UpdateOffer(tt.Ctx, modifiedEurOffer)
err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer})
tt.Assert.NoError(err)
tt.Assert.Equal(int64(1), rowsAffected)

offers, err = q.GetAllOffers(tt.Ctx)
tt.Assert.NoError(err)
Expand Down
13 changes: 2 additions & 11 deletions services/horizon/internal/db2/history/orderbook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,7 @@ func TestGetOrderBookSummary(t *testing.T) {
} {
t.Run(testCase.name, func(t *testing.T) {
assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"}))

batch := q.NewOffersBatchInsertBuilder(0)
for _, offer := range testCase.offers {
assert.NoError(t, batch.Add(tt.Ctx, offer))
}
assert.NoError(t, batch.Exec(tt.Ctx))
assert.NoError(t, q.UpsertOffers(tt.Ctx, testCase.offers))

assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down Expand Up @@ -260,11 +255,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) {
sellEurOffer,
}

batch := q.NewOffersBatchInsertBuilder(0)
for _, offer := range offers {
assert.NoError(t, batch.Add(tt.Ctx, offer))
}
assert.NoError(t, batch.Exec(tt.Ctx))
assert.NoError(t, q.UpsertOffers(tt.Ctx, offers))

assert.NoError(t, q.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
Expand Down
Loading

0 comments on commit 56f3f88

Please sign in to comment.