From 952b079d0cb7edfc1c217eeeb02e1ebfb36fe5c7 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 9 Sep 2021 21:19:21 +0200 Subject: [PATCH 01/16] services/horizon: Fix verify-image issues (#3902) The commit message of e011c91 was long enough to make `git log -1` command enable scrolling. This was causing `test_verify_range_docker_image` job failures because it got stuck at `git log -1` command waiting for user action. I also noticed that the Golang version in the image does not match the one we use in other places. I changed it to Go 1.17. --- services/horizon/docker/verify-range/dependencies | 4 ++-- services/horizon/docker/verify-range/start | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/horizon/docker/verify-range/dependencies b/services/horizon/docker/verify-range/dependencies index 2bd5784f5c..3afb8485ce 100644 --- a/services/horizon/docker/verify-range/dependencies +++ b/services/horizon/docker/verify-range/dependencies @@ -10,8 +10,8 @@ echo "deb https://apt.stellar.org $(lsb_release -cs) stable" | sudo tee -a /etc/ apt-get update apt-get install -y stellar-core=${STELLAR_CORE_VERSION} -wget https://dl.google.com/go/go1.16.5.linux-amd64.tar.gz -tar -C /usr/local -xzf go1.16.5.linux-amd64.tar.gz +wget https://dl.google.com/go/go1.17.linux-amd64.tar.gz +tar -C /usr/local -xzf go1.17.linux-amd64.tar.gz # configure postgres service postgresql start diff --git a/services/horizon/docker/verify-range/start b/services/horizon/docker/verify-range/start index 5ec4880a4d..5cf340b6c2 100644 --- a/services/horizon/docker/verify-range/start +++ b/services/horizon/docker/verify-range/start @@ -67,7 +67,7 @@ git pull origin if [ ! -z "$BRANCH" ]; then git checkout $BRANCH fi -git log -1 +git log -1 --pretty=oneline function alter_tables_unlogged() { # UNLOGGED for performance reasons (order is important because some tables reference others) From fec4a550faefa78969573e18e5cb485462bfb2d3 Mon Sep 17 00:00:00 2001 From: Jacek Nykis Date: Fri, 10 Sep 2021 13:36:04 +0100 Subject: [PATCH 02/16] Update docker image build timestamp format Golang time parser is not compatible with format rendered by the GNU date, even though both claim to be RFC3339. This change replaces space separator betwen date and time with "T" which will allow golang to parse it. It also switches from `--utc` to `-u` that's supported by both GNU and BSD date commands. --- exp/services/recoverysigner/Makefile | 2 +- exp/services/webauth/Makefile | 2 +- services/friendbot/Makefile | 2 +- services/horizon/docker/Makefile | 2 +- services/keystore/Makefile | 2 +- services/regulated-assets-approval-server/Makefile | 2 +- services/ticker/Makefile | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/exp/services/recoverysigner/Makefile b/exp/services/recoverysigner/Makefile index d563bab2b6..f5e357d356 100644 --- a/exp/services/recoverysigner/Makefile +++ b/exp/services/recoverysigner/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/recoverysigner:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../../ && \ diff --git a/exp/services/webauth/Makefile b/exp/services/webauth/Makefile index 367c5bf9c4..f29f06d53c 100644 --- a/exp/services/webauth/Makefile +++ b/exp/services/webauth/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/webauth:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../../ && \ diff --git a/services/friendbot/Makefile b/services/friendbot/Makefile index b8527efa03..49ab73a5cd 100644 --- a/services/friendbot/Makefile +++ b/services/friendbot/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/friendbot:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ diff --git a/services/horizon/docker/Makefile b/services/horizon/docker/Makefile index 0b5966055e..2ae7ffd816 100644 --- a/services/horizon/docker/Makefile +++ b/services/horizon/docker/Makefile @@ -1,7 +1,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) TAG ?= stellar/stellar-horizon:$(VERSION) diff --git a/services/keystore/Makefile b/services/keystore/Makefile index 293f3455c1..734cecc4dd 100644 --- a/services/keystore/Makefile +++ b/services/keystore/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/keystore:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -utc +%FT%TZ) docker-build: cd ../../ && \ diff --git a/services/regulated-assets-approval-server/Makefile b/services/regulated-assets-approval-server/Makefile index cf7cdbbd8d..a9e83306c6 100644 --- a/services/regulated-assets-approval-server/Makefile +++ b/services/regulated-assets-approval-server/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/regulated-assets-approval-server:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ diff --git a/services/ticker/Makefile b/services/ticker/Makefile index 5755699508..eb2ef29bff 100644 --- a/services/ticker/Makefile +++ b/services/ticker/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/ticker:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ From 0c23e9d780d79d65373833105aaa063f6f612b91 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Mon, 13 Sep 2021 06:59:51 -0700 Subject: [PATCH 03/16] .vscode: fix tasks file (#3908) Format the VSCode tasks file. When I open the stellar/go repo locally in the latest version of VSCode it displays a warning saying the file is "dirty". It appears the file is not formatted to how VSCode prefers it to be formatted. I don't really know what this file does or is for, but supposedly it was added because somebody uses it. This change resolves the warning. --- .vscode/tasks.json | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 0cebe53c3d..b6291051e5 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -9,9 +9,15 @@ "options": { "cwd": "${workspaceRoot}" }, - "tasks": [{ - "taskName": "test all", - "isWatching": false, - "args": ["test", "./..."] - }] -} \ No newline at end of file + "tasks": [ + { + "label": "test all", + "type": "shell", + "args": [ + "test", + "./..." + ], + "problemMatcher": [] + } + ] +} From 8f30b82208e36d89bfe1c35051a750ceb72bed44 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 14 Sep 2021 11:12:37 +0200 Subject: [PATCH 04/16] Avoid docker-compose side-effects during integration tests cleanup (#3912) Using a generic `docker-compose down` command for stopping the Horizon integration test containers causes every docker-compose container to be brought down (including other containers, like the Horizon database container which can be started using docker-compose). This change makes sure we only destroy the containers brought up during integration testing. --- services/horizon/internal/test/integration/integration.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 54361c5847..aab4d6162c 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -204,7 +204,8 @@ func (i *Test) prepareShutdownHandlers() { if i.app != nil { i.app.Close() } - i.runComposeCommand("down", "-v", "--remove-orphans") + i.runComposeCommand("rm", "-fvs", "core") + i.runComposeCommand("rm", "-fvs", "core-postgres") }, i.environment.Restore, ) From b5b4ed7e80eed4d3a302f2b8ec561de69a70b1bf Mon Sep 17 00:00:00 2001 From: Jacek Nykis Date: Tue, 14 Sep 2021 11:33:20 +0100 Subject: [PATCH 05/16] Fix error in one Makefile --- services/keystore/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/keystore/Makefile b/services/keystore/Makefile index 734cecc4dd..d091c04836 100644 --- a/services/keystore/Makefile +++ b/services/keystore/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/keystore:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date -utc +%FT%TZ) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ From 873275324dfa825a349db96f662cf2a9575ebebc Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Tue, 14 Sep 2021 22:14:08 +0200 Subject: [PATCH 06/16] services/horizon/ingest: Batch Offers insert/updates (#3917) This commit adds a new `UpsertOffers` method (and generic `upsertRows`) and modifies `OffersProcessor` to use it. The performance of `OffersProcessor` degraded recently due to number of offers in the network and batching updates should improve the performance of the processor and ingestion overall. --- .../horizon/internal/action_offers_test.go | 6 +- .../horizon/internal/actions/offer_test.go | 21 +- .../internal/actions/orderbook_test.go | 7 +- services/horizon/internal/db2/history/main.go | 93 +++++++-- .../mock_offers_batch_insert_builder.go | 20 -- .../internal/db2/history/mock_q_offers.go | 12 +- .../horizon/internal/db2/history/offers.go | 47 ++++- .../history/offers_batch_insert_builder.go | 14 -- .../internal/db2/history/offers_test.go | 10 +- .../internal/db2/history/orderbook_test.go | 13 +- .../internal/ingest/processor_runner_test.go | 30 --- .../ingest/processors/offers_processor.go | 54 ++--- .../processors/offers_processor_test.go | 190 ++++++++++++------ 13 files changed, 268 insertions(+), 249 deletions(-) delete mode 100644 services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go delete mode 100644 services/horizon/internal/db2/history/offers_batch_insert_builder.go diff --git a/services/horizon/internal/action_offers_test.go b/services/horizon/internal/action_offers_test.go index 737afe2fac..13458db9fe 100644 --- a/services/horizon/internal/action_offers_test.go +++ b/services/horizon/internal/action_offers_test.go @@ -68,12 +68,8 @@ func TestOfferActions_Show(t *testing.T) { LastModifiedLedger: uint32(4), } - batch := q.NewOffersBatchInsertBuilder(3) - err = batch.Add(ctx, eurOffer) + err = q.UpsertOffers(ctx, []history.Offer{eurOffer, usdOffer}) ht.Assert.NoError(err) - err = batch.Add(ctx, usdOffer) - ht.Assert.NoError(err) - ht.Assert.NoError(batch.Exec(ctx)) w := ht.Get("/offers") if ht.Assert.Equal(200, w.Code) { diff --git a/services/horizon/internal/actions/offer_test.go b/services/horizon/internal/actions/offer_test.go index 781265aaf0..108af3236d 100644 --- a/services/horizon/internal/actions/offer_test.go +++ b/services/horizon/internal/actions/offer_test.go @@ -89,12 +89,8 @@ func TestGetOfferByIDHandler(t *testing.T) { }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) - batch := q.NewOffersBatchInsertBuilder(0) - err = batch.Add(tt.Ctx, eurOffer) + err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer}) tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) - tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) for _, testCase := range []struct { name string @@ -200,14 +196,8 @@ func TestGetOffersHandler(t *testing.T) { }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) - batch := q.NewOffersBatchInsertBuilder(0) - err = batch.Add(tt.Ctx, eurOffer) - tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, twoEurOffer) + err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) - tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) t.Run("No filter", func(t *testing.T) { records, err := handler.GetResourcePage( @@ -477,13 +467,8 @@ func TestGetAccountOffersHandler(t *testing.T) { q := &history.Q{tt.HorizonSession()} handler := GetAccountOffersHandler{} - batch := q.NewOffersBatchInsertBuilder(0) - err := batch.Add(tt.Ctx, eurOffer) - err = batch.Add(tt.Ctx, twoEurOffer) - tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) + err := q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) records, err := handler.GetResourcePage( httptest.NewRecorder(), diff --git a/services/horizon/internal/actions/orderbook_test.go b/services/horizon/internal/actions/orderbook_test.go index eef810d606..b73191361a 100644 --- a/services/horizon/internal/actions/orderbook_test.go +++ b/services/horizon/internal/actions/orderbook_test.go @@ -575,12 +575,7 @@ func TestOrderbookGetResource(t *testing.T) { } assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"})) - - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 0ef6a958ae..4ec9aa36d9 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -8,12 +8,14 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "strings" "sync" "time" sq "github.com/Masterminds/squirrel" "github.com/guregu/null" "github.com/jmoiron/sqlx" + "github.com/lib/pq" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/support/db" @@ -573,6 +575,14 @@ type ManageOffer struct { OfferID int64 `json:"offer_id"` } +// upsertField is used in upsertRows function generating upsert query for +// different tables. +type upsertField struct { + name string + dbType string + objects []interface{} +} + // Offer is row of data from the `offers` table from horizon DB type Offer struct { SellerID string `db:"seller_id"` @@ -591,16 +601,6 @@ type Offer struct { Sponsor null.String `db:"sponsor"` } -type OffersBatchInsertBuilder interface { - Add(ctx context.Context, offer Offer) error - Exec(ctx context.Context) error -} - -// offersBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder -type offersBatchInsertBuilder struct { - builder db.BatchInsertBuilder -} - // OperationsQ is a helper struct to aid in configuring queries that loads // slices of Operation structs. type OperationsQ struct { @@ -765,15 +765,6 @@ func (q *Q) NewAccountDataBatchInsertBuilder(maxBatchSize int) AccountDataBatchI } } -func (q *Q) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder { - return &offersBatchInsertBuilder{ - builder: db.BatchInsertBuilder{ - Table: q.GetTable("offers"), - MaxBatchSize: maxBatchSize, - }, - } -} - func (q *Q) NewTrustLinesBatchInsertBuilder(maxBatchSize int) TrustLinesBatchInsertBuilder { return &trustLinesBatchInsertBuilder{ builder: db.BatchInsertBuilder{ @@ -853,3 +844,67 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error { } return nil } + +// upsertRows builds and executes an upsert query that allows very fast upserts +// to a given table. The final query is of form: +// +// WITH r AS +// (SELECT +// /* unnestPart */ +// unnest(?::type1[]), /* field1 */ +// unnest(?::type2[]), /* field2 */ +// ... +// ) +// INSERT INTO table ( +// /* insertFieldsPart */ +// field1, +// field2, +// ... +// ) +// SELECT * from r +// ON CONFLICT (conflictField) DO UPDATE SET +// /* onConflictPart */ +// field1 = excluded.field1, +// field2 = excluded.field2, +// ... +func (q *Q) upsertRows(ctx context.Context, table string, conflictField string, fields []upsertField) error { + unnestPart := make([]string, 0, len(fields)) + insertFieldsPart := make([]string, 0, len(fields)) + onConflictPart := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + insertFieldsPart = append( + insertFieldsPart, + field.name, + ) + onConflictPart = append( + onConflictPart, + fmt.Sprintf("%s = excluded.%s", field.name, field.name), + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + + sql := ` + WITH r AS + (SELECT ` + strings.Join(unnestPart, ",") + `) + INSERT INTO ` + table + ` + (` + strings.Join(insertFieldsPart, ",") + `) + SELECT * from r + ON CONFLICT (` + conflictField + `) DO UPDATE SET + ` + strings.Join(onConflictPart, ",") + + _, err := q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), + sql, + pqArrays..., + ) + return err +} diff --git a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go deleted file mode 100644 index 75e88f3263..0000000000 --- a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go +++ /dev/null @@ -1,20 +0,0 @@ -package history - -import ( - "context" - "github.com/stretchr/testify/mock" -) - -type MockOffersBatchInsertBuilder struct { - mock.Mock -} - -func (m *MockOffersBatchInsertBuilder) Add(ctx context.Context, row Offer) error { - a := m.Called(ctx, row) - return a.Error(0) -} - -func (m *MockOffersBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) - return a.Error(0) -} diff --git a/services/horizon/internal/db2/history/mock_q_offers.go b/services/horizon/internal/db2/history/mock_q_offers.go index 0b23720b7f..9cbf673fc5 100644 --- a/services/horizon/internal/db2/history/mock_q_offers.go +++ b/services/horizon/internal/db2/history/mock_q_offers.go @@ -2,6 +2,7 @@ package history import ( "context" + "github.com/stretchr/testify/mock" ) @@ -30,14 +31,9 @@ func (m *MockQOffers) CountOffers(ctx context.Context) (int, error) { return a.Get(0).(int), a.Error(1) } -func (m *MockQOffers) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder { - a := m.Called(maxBatchSize) - return a.Get(0).(OffersBatchInsertBuilder) -} - -func (m *MockQOffers) UpdateOffer(ctx context.Context, row Offer) (int64, error) { - a := m.Called(ctx, row) - return a.Get(0).(int64), a.Error(1) +func (m *MockQOffers) UpsertOffers(ctx context.Context, rows []Offer) error { + a := m.Called(ctx, rows) + return a.Error(0) } func (m *MockQOffers) RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) { diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index fe37ba91c9..5b875b72cf 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -14,8 +14,7 @@ type QOffers interface { GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error) CountOffers(ctx context.Context) (int, error) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error) - NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder - UpdateOffer(ctx context.Context, offer Offer) (int64, error) + UpsertOffers(ctx context.Context, offers []Offer) error RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error) } @@ -96,15 +95,43 @@ func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]O return offers, err } -// UpdateOffer updates a row in the offers table. -// Returns number of rows affected and error. -func (q *Q) UpdateOffer(ctx context.Context, offer Offer) (int64, error) { - updateBuilder := q.GetTable("offers").Update() - result, err := updateBuilder.SetStruct(offer, []string{}).Where("offer_id = ?", offer.OfferID).Exec(ctx) - if err != nil { - return 0, err +// UpsertOffers upserts a batch of offers in the offerss table. +// There's currently no limit of the number of offers this method can +// accept other than 2GB limit of the query string length what should be enough +// for each ledger with the current limits. +func (q *Q) UpsertOffers(ctx context.Context, offers []Offer) error { + var sellerID, sellingAsset, buyingAsset, offerID, amount, priceN, priceD, + price, flags, lastModifiedLedger, sponsor []interface{} + + for _, offer := range offers { + sellerID = append(sellerID, offer.SellerID) + offerID = append(offerID, offer.OfferID) + sellingAsset = append(sellingAsset, offer.SellingAsset) + buyingAsset = append(buyingAsset, offer.BuyingAsset) + amount = append(amount, offer.Amount) + priceN = append(priceN, offer.Pricen) + priceD = append(priceD, offer.Priced) + price = append(price, offer.Price) + flags = append(flags, offer.Flags) + lastModifiedLedger = append(lastModifiedLedger, offer.LastModifiedLedger) + sponsor = append(sponsor, offer.Sponsor) } - return result.RowsAffected() + + upsertFields := []upsertField{ + {"seller_id", "text", sellerID}, + {"offer_id", "bigint", offerID}, + {"selling_asset", "text", sellingAsset}, + {"buying_asset", "text", buyingAsset}, + {"amount", "bigint", amount}, + {"pricen", "integer", priceN}, + {"priced", "integer", priceD}, + {"price", "double precision", price}, + {"flags", "integer", flags}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + {"sponsor", "text", sponsor}, + } + + return q.upsertRows(ctx, "offers", "offer_id", upsertFields) } // RemoveOffers marks rows in the offers table as deleted. diff --git a/services/horizon/internal/db2/history/offers_batch_insert_builder.go b/services/horizon/internal/db2/history/offers_batch_insert_builder.go deleted file mode 100644 index c1d3b628a9..0000000000 --- a/services/horizon/internal/db2/history/offers_batch_insert_builder.go +++ /dev/null @@ -1,14 +0,0 @@ -package history - -import ( - "context" -) - -// Add adds a new offer entry to the batch. -func (i *offersBatchInsertBuilder) Add(ctx context.Context, offer Offer) error { - return i.builder.RowStruct(ctx, offer) -} - -func (i *offersBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) -} diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 2218ffc71a..61f228e33a 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -64,12 +64,7 @@ var ( ) func insertOffer(tt *test.T, q *Q, offer Offer) error { - batch := q.NewOffersBatchInsertBuilder(0) - err := batch.Add(tt.Ctx, offer) - if err != nil { - return err - } - return batch.Exec(tt.Ctx) + return q.UpsertOffers(tt.Ctx, []Offer{offer}) } func TestGetOfferByID(t *testing.T) { @@ -194,9 +189,8 @@ func TestUpdateOffer(t *testing.T) { modifiedEurOffer := eurOffer modifiedEurOffer.Amount -= 10 - rowsAffected, err := q.UpdateOffer(tt.Ctx, modifiedEurOffer) + err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer}) tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), rowsAffected) offers, err = q.GetAllOffers(tt.Ctx) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/orderbook_test.go b/services/horizon/internal/db2/history/orderbook_test.go index 30b27f4a64..84b3213b69 100644 --- a/services/horizon/internal/db2/history/orderbook_test.go +++ b/services/horizon/internal/db2/history/orderbook_test.go @@ -213,12 +213,7 @@ func TestGetOrderBookSummary(t *testing.T) { } { t.Run(testCase.name, func(t *testing.T) { assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"})) - - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range testCase.offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, testCase.offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, @@ -260,11 +255,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) { sellEurOffer, } - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 48ee414237..a2c4975bd4 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -24,12 +24,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q := &mockDBQ{} // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -113,12 +107,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { ).Once() // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -185,11 +173,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t defer mock.AssertExpectationsForObjects(t, historyAdapter) // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). @@ -227,8 +210,6 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { defer mock.AssertExpectationsForObjects(t, q) // Twice = checking ledgerSource and historyArchiveSource - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(&history.MockOffersBatchInsertBuilder{}).Twice() q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). Return(&history.MockAccountDataBatchInsertBuilder{}).Twice() q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). @@ -330,12 +311,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -400,11 +375,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t } // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index 5a99d0a24c..d09610e025 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -18,7 +18,6 @@ type OffersProcessor struct { sequence uint32 cache *ingest.ChangeCompactor - insertBatch history.OffersBatchInsertBuilder removeBatch []int64 } @@ -30,7 +29,6 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess func (p *OffersProcessor) reset() { p.cache = ingest.NewChangeCompactor() - p.insertBatch = p.offersQ.NewOffersBatchInsertBuilder(maxBatchSize) p.removeBatch = []int64{} } @@ -71,59 +69,43 @@ func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer } func (p *OffersProcessor) flushCache(ctx context.Context) error { + batchUpsertOffers := []history.Offer{} changes := p.cache.GetChanges() for _, change := range changes { - var rowsAffected int64 - var err error - var action string - var offerID xdr.Int64 - switch { - case change.Pre == nil && change.Post != nil: - // Created - action = "inserting" + case change.Post != nil: + // Created and updated row := p.ledgerEntryToRow(change.Post) - err = p.insertBatch.Add(ctx, row) - rowsAffected = 1 // We don't track this when batch inserting + batchUpsertOffers = append(batchUpsertOffers, row) case change.Pre != nil && change.Post == nil: // Removed - action = "removing" offer := change.Pre.Data.MustOffer() p.removeBatch = append(p.removeBatch, int64(offer.OfferId)) - rowsAffected = 1 // We don't track this when batch removing default: - // Updated - action = "updating" - offer := change.Post.Data.MustOffer() - offerID = offer.OfferId - row := p.ledgerEntryToRow(change.Post) - rowsAffected, err = p.offersQ.UpdateOffer(ctx, row) + return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } + } + if len(batchUpsertOffers) > 0 { + err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers) if err != nil { - return err - } - - if rowsAffected != 1 { - return ingest.NewStateError(errors.Errorf( - "%d rows affected when %s offer %d", - rowsAffected, - action, - offerID, - )) + return errors.Wrap(err, "errors in UpsertOffers") } } - err := p.insertBatch.Exec(ctx) - if err != nil { - return errors.Wrap(err, "error executing batch") - } - if len(p.removeBatch) > 0 { - _, err = p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence) + rowsAffected, err := p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence) if err != nil { return errors.Wrap(err, "error in RemoveOffers") } + + if rowsAffected != int64(len(p.removeBatch)) { + return ingest.NewStateError(errors.Errorf( + "%d rows affected when removing %d offers", + rowsAffected, + len(p.removeBatch), + )) + } } return nil diff --git a/services/horizon/internal/ingest/processors/offers_processor_test.go b/services/horizon/internal/ingest/processors/offers_processor_test.go index c49bbaf26a..86eb67941d 100644 --- a/services/horizon/internal/ingest/processors/offers_processor_test.go +++ b/services/horizon/internal/ingest/processors/offers_processor_test.go @@ -59,33 +59,25 @@ func TestOffersProcessorTestSuiteState(t *testing.T) { type OffersProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - mockBatchInsertBuilder *history.MockOffersBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 } func (s *OffersProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} - s.mockBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} - - s.mockQ. - On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) } func (s *OffersProcessorTestSuiteState) TearDownTest() { - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *OffersProcessorTestSuiteState) TestCreateOffer() { @@ -103,13 +95,15 @@ func (s *OffersProcessorTestSuiteState) TestCreateOffer() { LastModifiedLedgerSeq: lastModifiedLedgerSeq, } - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 1, - Pricen: int32(1), - Priced: int32(2), - Price: float64(0.5), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 1, + Pricen: int32(1), + Priced: int32(2), + Price: float64(0.5), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ @@ -126,21 +120,15 @@ func TestOffersProcessorTestSuiteLedger(t *testing.T) { type OffersProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - mockBatchInsertBuilder *history.MockOffersBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 } func (s *OffersProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} - s.mockBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} - - s.mockQ. - On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) @@ -148,7 +136,6 @@ func (s *OffersProcessorTestSuiteLedger) SetupTest() { func (s *OffersProcessorTestSuiteLedger) TearDownTest() { s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { @@ -217,16 +204,16 @@ func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() - - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() } func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { @@ -254,7 +241,7 @@ func (s *OffersProcessorTestSuiteLedger) TestCompactionError() { s.Assert().EqualError(s.processor.Commit(s.ctx), "could not compact offers: compaction error") } -func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { +func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() { lastModifiedLedgerSeq := xdr.Uint32(1234) offer := xdr.OfferEntry{ @@ -268,6 +255,12 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { Price: xdr.Price{1, 6}, } + anotherOffer := xdr.OfferEntry{ + SellerId: xdr.MustAddress("GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P"), + OfferId: xdr.Int64(3), + Price: xdr.Price{2, 3}, + } + updatedEntry := xdr.LedgerEntry{ LastModifiedLedgerSeq: lastModifiedLedgerSeq, Data: xdr.LedgerEntryData{ @@ -289,19 +282,46 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { }) s.Assert().NoError(err) - s.mockQ.On("UpdateOffer", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }).Return(int64(0), nil).Once() + err = s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &anotherOffer, + }, + }, + }) + s.Assert().NoError(err) - err = s.processor.Commit(s.ctx) - s.Assert().Error(err) - s.Assert().IsType(ingest.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "error flushing cache: 0 rows affected when updating offer 2") + s.mockQ.On("UpsertOffers", s.ctx, mock.Anything).Run(func(args mock.Arguments) { + // To fix order issue due to using ChangeCompactor + offers := args.Get(1).([]history.Offer) + s.Assert().ElementsMatch( + offers, + []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + { + SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P", + OfferID: 3, + Pricen: int32(2), + Priced: int32(3), + Price: float64(2) / float64(3), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + }, + ) + }).Return(nil).Once() + s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() + s.Assert().NoError(s.processor.Commit(s.ctx)) } func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { @@ -322,8 +342,6 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { s.Assert().NoError(err) s.mockQ.On("RemoveOffers", s.ctx, []int64{3}, s.sequence).Return(int64(1), nil).Once() - - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -377,16 +395,17 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -424,14 +443,57 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffers() { }) s.Assert().NoError(err) - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.mockQ.On("RemoveOffers", s.ctx, mock.Anything, s.sequence).Run(func(args mock.Arguments) { // To fix order issue due to using ChangeCompactor ids := args.Get(1).([]int64) s.Assert().ElementsMatch(ids, []int64{3, 4}) - }).Return(int64(0), nil).Once() + }).Return(int64(2), nil).Once() err = s.processor.Commit(s.ctx) s.Assert().NoError(err) } + +func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffersRowsAffectedCheck() { + err := s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + err = s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(4), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + s.mockQ.On("RemoveOffers", s.ctx, mock.Anything, s.sequence).Run(func(args mock.Arguments) { + // To fix order issue due to using ChangeCompactor + ids := args.Get(1).([]int64) + s.Assert().ElementsMatch(ids, []int64{3, 4}) + }).Return(int64(0), nil).Once() + + err = s.processor.Commit(s.ctx) + s.Assert().IsType(ingest.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "error flushing cache: 0 rows affected when removing 2 offers") +} From 9a7c6b3e7696ad2e3033e4bd54b6c15eee051c60 Mon Sep 17 00:00:00 2001 From: Jake Urban Date: Thu, 12 Aug 2021 13:26:58 -0700 Subject: [PATCH 07/16] Added 5 second grace period to ReadChallengeTx MinTime constraint --- txnbuild/transaction.go | 4 +- txnbuild/transaction_test.go | 73 ++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/txnbuild/transaction.go b/txnbuild/transaction.go index 35777e767a..942d81237d 100644 --- a/txnbuild/transaction.go +++ b/txnbuild/transaction.go @@ -1098,8 +1098,10 @@ func ReadChallengeTx(challengeTx, serverAccountID, network, webAuthDomain string if tx.Timebounds().MaxTime == TimeoutInfinite { return tx, clientAccountID, matchedHomeDomain, errors.New("transaction requires non-infinite timebounds") } + // Apply a grace period to the challenge MinTime to account for clock drift between the server and client + var gracePeriod int64 = 5 // seconds currentTime := time.Now().UTC().Unix() - if currentTime < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { + if currentTime + gracePeriod < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { return tx, clientAccountID, matchedHomeDomain, errors.Errorf("transaction is not within range of the specified timebounds (currentTime=%d, MinTime=%d, MaxTime=%d)", currentTime, tx.Timebounds().MinTime, tx.Timebounds().MaxTime) } diff --git a/txnbuild/transaction_test.go b/txnbuild/transaction_test.go index f1b6274ec8..bea6408839 100644 --- a/txnbuild/transaction_test.go +++ b/txnbuild/transaction_test.go @@ -2023,6 +2023,79 @@ func TestReadChallengeTx_invalidTimeboundsOutsideRange(t *testing.T) { assert.Regexp(t, "transaction is not within range of the specified timebounds", err.Error()) } +func TestReadChallengeTx_validTimeboundsWithGracePeriod(t *testing.T) { + serverKP := newKeypair0() + clientKP := newKeypair1() + txSource := NewSimpleAccount(serverKP.Address(), -1) + op := ManageData{ + SourceAccount: clientKP.Address(), + Name: "testanchor.stellar.org auth", + Value: []byte(base64.StdEncoding.EncodeToString(make([]byte, 48))), + } + webAuthDomainOp := ManageData{ + SourceAccount: serverKP.Address(), + Name: "web_auth_domain", + Value: []byte("testwebauth.stellar.org"), + } + unixNow := time.Now().UTC().Unix() + tx, err := NewTransaction( + TransactionParams{ + SourceAccount: &txSource, + IncrementSequenceNum: true, + Operations: []Operation{&op, &webAuthDomainOp}, + BaseFee: MinBaseFee, + Timebounds: NewTimebounds(unixNow + 4, unixNow + 60), + }, + ) + assert.NoError(t, err) + + tx, err = tx.Sign(network.TestNetworkPassphrase, serverKP) + assert.NoError(t, err) + tx64, err := tx.Base64() + require.NoError(t, err) + readTx, readClientAccountID, _, err := ReadChallengeTx(tx64, serverKP.Address(), network.TestNetworkPassphrase, "testwebauth.stellar.org", []string{"testanchor.stellar.org"}) + assert.Equal(t, tx, readTx) + assert.Equal(t, clientKP.Address(), readClientAccountID) + assert.NoError(t, err) +} + +func TestReadChallengeTx_invalidTimeboundsWithGracePeriod(t *testing.T) { + serverKP := newKeypair0() + clientKP := newKeypair1() + txSource := NewSimpleAccount(serverKP.Address(), -1) + op := ManageData{ + SourceAccount: clientKP.Address(), + Name: "testanchor.stellar.org auth", + Value: []byte(base64.StdEncoding.EncodeToString(make([]byte, 48))), + } + webAuthDomainOp := ManageData{ + SourceAccount: serverKP.Address(), + Name: "web_auth_domain", + Value: []byte("testwebauth.stellar.org"), + } + unixNow := time.Now().UTC().Unix() + tx, err := NewTransaction( + TransactionParams{ + SourceAccount: &txSource, + IncrementSequenceNum: true, + Operations: []Operation{&op, &webAuthDomainOp}, + BaseFee: MinBaseFee, + Timebounds: NewTimebounds(unixNow + 10, unixNow + 60), + }, + ) + assert.NoError(t, err) + + tx, err = tx.Sign(network.TestNetworkPassphrase, serverKP) + assert.NoError(t, err) + tx64, err := tx.Base64() + require.NoError(t, err) + readTx, readClientAccountID, _, err := ReadChallengeTx(tx64, serverKP.Address(), network.TestNetworkPassphrase, "testwebauth.stellar.org", []string{"testanchor.stellar.org"}) + assert.Equal(t, tx, readTx) + assert.Equal(t, "", readClientAccountID) + assert.Error(t, err) + assert.Regexp(t, "transaction is not within range of the specified timebounds", err.Error()) +} + func TestReadChallengeTx_invalidOperationWrongType(t *testing.T) { serverKP := newKeypair0() clientKP := newKeypair1() From 84db266c72c73d6b08f89d183d7be3ed10b0b669 Mon Sep 17 00:00:00 2001 From: Jake Urban Date: Thu, 12 Aug 2021 14:08:00 -0700 Subject: [PATCH 08/16] correct grace period from 5 seconds to 5 minutes --- txnbuild/CHANGELOG.md | 1 + txnbuild/transaction.go | 2 +- txnbuild/transaction_test.go | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/txnbuild/CHANGELOG.md b/txnbuild/CHANGELOG.md index 3c89431c46..bbadc08d24 100644 --- a/txnbuild/CHANGELOG.md +++ b/txnbuild/CHANGELOG.md @@ -8,6 +8,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * GenericTransaction, Transaction, and FeeBumpTransaction now implement encoding.TextMarshaler and encoding.TextUnmarshaler. +* Adds 5-minute grace period to `transaction.ReadChallengeTx`'s minimum time bound constraint. ([#3824](https://github.com/stellar/go/pull/3824)) ## [v7.1.1](https://github.com/stellar/go/releases/tag/horizonclient-v7.1.1) - 2021-06-25 diff --git a/txnbuild/transaction.go b/txnbuild/transaction.go index 942d81237d..7c6c44bcb1 100644 --- a/txnbuild/transaction.go +++ b/txnbuild/transaction.go @@ -1099,7 +1099,7 @@ func ReadChallengeTx(challengeTx, serverAccountID, network, webAuthDomain string return tx, clientAccountID, matchedHomeDomain, errors.New("transaction requires non-infinite timebounds") } // Apply a grace period to the challenge MinTime to account for clock drift between the server and client - var gracePeriod int64 = 5 // seconds + var gracePeriod int64 = 5 * 60 // seconds currentTime := time.Now().UTC().Unix() if currentTime + gracePeriod < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { return tx, clientAccountID, matchedHomeDomain, errors.Errorf("transaction is not within range of the specified timebounds (currentTime=%d, MinTime=%d, MaxTime=%d)", diff --git a/txnbuild/transaction_test.go b/txnbuild/transaction_test.go index bea6408839..7cf0f69962 100644 --- a/txnbuild/transaction_test.go +++ b/txnbuild/transaction_test.go @@ -2044,7 +2044,7 @@ func TestReadChallengeTx_validTimeboundsWithGracePeriod(t *testing.T) { IncrementSequenceNum: true, Operations: []Operation{&op, &webAuthDomainOp}, BaseFee: MinBaseFee, - Timebounds: NewTimebounds(unixNow + 4, unixNow + 60), + Timebounds: NewTimebounds(unixNow + 5 * 59, unixNow + 60 * 60), }, ) assert.NoError(t, err) @@ -2080,7 +2080,7 @@ func TestReadChallengeTx_invalidTimeboundsWithGracePeriod(t *testing.T) { IncrementSequenceNum: true, Operations: []Operation{&op, &webAuthDomainOp}, BaseFee: MinBaseFee, - Timebounds: NewTimebounds(unixNow + 10, unixNow + 60), + Timebounds: NewTimebounds(unixNow + 5 * 61, unixNow + 60 * 60), }, ) assert.NoError(t, err) From 5b2b40bd69f85a6e387b6f7ee76c1699d22b431e Mon Sep 17 00:00:00 2001 From: Jake Urban Date: Thu, 12 Aug 2021 14:22:04 -0700 Subject: [PATCH 09/16] fix formatting --- txnbuild/transaction.go | 2 +- txnbuild/transaction_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/txnbuild/transaction.go b/txnbuild/transaction.go index 7c6c44bcb1..1199ca5367 100644 --- a/txnbuild/transaction.go +++ b/txnbuild/transaction.go @@ -1101,7 +1101,7 @@ func ReadChallengeTx(challengeTx, serverAccountID, network, webAuthDomain string // Apply a grace period to the challenge MinTime to account for clock drift between the server and client var gracePeriod int64 = 5 * 60 // seconds currentTime := time.Now().UTC().Unix() - if currentTime + gracePeriod < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { + if currentTime+gracePeriod < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { return tx, clientAccountID, matchedHomeDomain, errors.Errorf("transaction is not within range of the specified timebounds (currentTime=%d, MinTime=%d, MaxTime=%d)", currentTime, tx.Timebounds().MinTime, tx.Timebounds().MaxTime) } diff --git a/txnbuild/transaction_test.go b/txnbuild/transaction_test.go index 7cf0f69962..116706b34a 100644 --- a/txnbuild/transaction_test.go +++ b/txnbuild/transaction_test.go @@ -2044,7 +2044,7 @@ func TestReadChallengeTx_validTimeboundsWithGracePeriod(t *testing.T) { IncrementSequenceNum: true, Operations: []Operation{&op, &webAuthDomainOp}, BaseFee: MinBaseFee, - Timebounds: NewTimebounds(unixNow + 5 * 59, unixNow + 60 * 60), + Timebounds: NewTimebounds(unixNow+5*59, unixNow+60*60), }, ) assert.NoError(t, err) @@ -2080,7 +2080,7 @@ func TestReadChallengeTx_invalidTimeboundsWithGracePeriod(t *testing.T) { IncrementSequenceNum: true, Operations: []Operation{&op, &webAuthDomainOp}, BaseFee: MinBaseFee, - Timebounds: NewTimebounds(unixNow + 5 * 61, unixNow + 60 * 60), + Timebounds: NewTimebounds(unixNow+5*61, unixNow+60*60), }, ) assert.NoError(t, err) From 37d273cc3758c8b950831e0944f87a8fdc475955 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 16 Sep 2021 16:29:47 +0200 Subject: [PATCH 10/16] Add additional test cases for parallel reingestion (#3916) --- services/horizon/internal/ingest/parallel_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 7df5e09aed..7d9747330d 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -20,6 +20,7 @@ func TestCalculateParallelLedgerBatchSize(t *testing.T) { assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1)) } type sorteableRanges []ledgerRange @@ -62,6 +63,18 @@ func TestParallelReingestRange(t *testing.T) { } assert.Equal(t, expected, rangesCalled) + rangesCalled = nil + system, err = newParallelSystems(config, 1, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 1024, 64) + expected = sorteableRanges{ + {from: 0, to: 63}, {from: 64, to: 127}, {from: 128, to: 191}, {from: 192, to: 255}, {from: 256, to: 319}, + {from: 320, to: 383}, {from: 384, to: 447}, {from: 448, to: 511}, {from: 512, to: 575}, {from: 576, to: 639}, + {from: 640, to: 703}, {from: 704, to: 767}, {from: 768, to: 831}, {from: 832, to: 895}, {from: 896, to: 959}, + {from: 960, to: 1023}, + } + assert.NoError(t, err) + assert.Equal(t, expected, rangesCalled) } func TestParallelReingestRangeError(t *testing.T) { From e0465e35dab76990700e11850ff0d63253aebb96 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 16 Sep 2021 16:30:28 +0200 Subject: [PATCH 11/16] `check_release_hash` improvements (#3925) * Run `apt-get` commands separately. Commands were previously joined by `&&` operator which returned 0 code in case any of the commands failure. * Hide hashes if they match. * Add unstable repo. --- .../scripts/check_release_hash/Dockerfile | 1 + .../scripts/check_release_hash/check.sh | 57 ++++++++++++++----- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/services/horizon/internal/scripts/check_release_hash/Dockerfile b/services/horizon/internal/scripts/check_release_hash/Dockerfile index 44cdbcfe1b..f05ca6260e 100644 --- a/services/horizon/internal/scripts/check_release_hash/Dockerfile +++ b/services/horizon/internal/scripts/check_release_hash/Dockerfile @@ -9,6 +9,7 @@ ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl wget gnupg apt-utils git zip unzip apt-transport-https ca-certificates RUN wget -qO - https://apt.stellar.org/SDF.asc | APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=true apt-key add - RUN echo "deb https://apt.stellar.org xenial stable" >/etc/apt/sources.list.d/SDF.list +RUN echo "deb https://apt.stellar.org xenial testing" >/etc/apt/sources.list.d/SDF-testing.list RUN git clone https://github.com/stellar/go.git /go/src/github.com/stellar/go # Fetch dependencies and prebuild binaries. Not necessary but will make check faster. diff --git a/services/horizon/internal/scripts/check_release_hash/check.sh b/services/horizon/internal/scripts/check_release_hash/check.sh index 95208f2a9a..b1377d9810 100644 --- a/services/horizon/internal/scripts/check_release_hash/check.sh +++ b/services/horizon/internal/scripts/check_release_hash/check.sh @@ -1,7 +1,9 @@ #!/bin/bash set -e -apt-get clean && apt-get update && apt-get install -y stellar-horizon=$PACKAGE_VERSION +apt-get clean +apt-get update +apt-get install -y stellar-horizon=$PACKAGE_VERSION mkdir released cd released @@ -23,30 +25,57 @@ git checkout $TAG # -keep: artifact directories are not removed after packaging CIRCLE_TAG=$TAG go run -v ./support/scripts/build_release_artifacts -keep -suffixes=(darwin-amd64 linux-amd64 linux-arm windows-amd64) +echo "RESULTS" +echo "=======" +echo "" +echo "compiled version" +./dist/$TAG-linux-amd64/horizon version + +echo "github releases version" +./released/$TAG-linux-amd64/horizon version + +echo "debian package version" +stellar-horizon version +echo "" + +suffixes=(darwin-amd64 linux-amd64 linux-arm windows-amd64) for S in "${suffixes[@]}" do - echo $TAG-$S + released="" + dist="" + msg="" if [ -f "./released/$TAG-$S.tar.gz" ]; then - shasum -a 256 ./released/$TAG-$S.tar.gz - shasum -a 256 ./released/$TAG-$S/horizon + released=($(shasum -a 256 ./released/$TAG-$S/horizon)) else # windows - shasum -a 256 ./released/$TAG-$S.zip - shasum -a 256 ./released/$TAG-$S/horizon.exe + released=($(shasum -a 256 ./released/$TAG-$S/horizon.exe)) fi if [ -f "./dist/$TAG-$S.tar.gz" ]; then - shasum -a 256 ./dist/$TAG-$S.tar.gz - shasum -a 256 ./dist/$TAG-$S/horizon + dist=($(shasum -a 256 ./dist/$TAG-$S/horizon)) else # windows - shasum -a 256 ./dist/$TAG-$S.zip - shasum -a 256 ./dist/$TAG-$S/horizon.exe + dist=($(shasum -a 256 ./dist/$TAG-$S/horizon.exe)) + fi + + if [ $S == "linux-amd64" ]; then + path=$(which stellar-horizon) + debian=($(shasum -a 256 $path)) + + if [[ "$released" == "$dist" && "$dist" == "$debian" ]]; then + msg="$TAG-$S ok" + else + msg="$TAG-$S NO MATCH! github=$released compile=$dist debian=$debian" + fi + else + if [ "$released" == "$dist" ]; then + msg="$TAG-$S ok" + else + msg="$TAG-$S NO MATCH! github=$released compile=$dist" + fi fi -done -echo "debian package" -shasum -a 256 $(which stellar-horizon) \ No newline at end of file + echo $msg +done \ No newline at end of file From 2eeb698be2e9d8dafe02a74a87a5a73cb550a391 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 16 Sep 2021 10:42:39 -0700 Subject: [PATCH 12/16] keypair: add test coverage for hint functions (#3922) Add test coverage for the Hint functions of the keypair.FromAddress and keypair.Full types. Those functions aren't tested, and I'm iterating on the keypair code in #3914 that involves changing the Hint functions, so I'd like test coverage present before hand. --- keypair/from_address_test.go | 5 +++++ keypair/full_test.go | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/keypair/from_address_test.go b/keypair/from_address_test.go index 1800f03829..e6ce4cd052 100644 --- a/keypair/from_address_test.go +++ b/keypair/from_address_test.go @@ -10,6 +10,11 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFromAddress_Hint(t *testing.T) { + kp := MustParseAddress("GAYUB4KATGTUZEGUMJEOZDPPWM4MQLHCIKC4T55YSXHN234WI6BJMIY2") + assert.Equal(t, [4]byte{0x96, 0x47, 0x82, 0x96}, kp.Hint()) +} + func TestFromAddress_Equal(t *testing.T) { // A nil FromAddress. var kp0 *FromAddress diff --git a/keypair/full_test.go b/keypair/full_test.go index 8d2913e0cf..bc0adcd9e9 100644 --- a/keypair/full_test.go +++ b/keypair/full_test.go @@ -10,6 +10,12 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFull_Hint(t *testing.T) { + kp := MustParseFull("SBFGFF27Y64ZUGFAIG5AMJGQODZZKV2YQKAVUUN4HNE24XZXD2OEUVUP") + assert.Equal(t, [4]byte{0x96, 0x47, 0x82, 0x96}, kp.Hint()) + assert.Equal(t, [4]byte{0x96, 0x47, 0x82, 0x96}, kp.FromAddress().Hint()) +} + func TestFull_Equal(t *testing.T) { // A nil Full. var kp0 *Full From 2b122329770f5a9433b54e10aa3b0069d71f27c0 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 16 Sep 2021 16:01:04 -0700 Subject: [PATCH 13/16] crc16: add tests (#3924) Add tests to the crc16 package. We don't have any. I was digging around in the code for some thing else and wanted to make a change to the crc logic and make sure I don't break it. --- crc16/main_test.go | 20 ++++++++++++++++++++ go.list | 4 ++-- go.mod | 3 ++- go.sum | 7 +++++-- 4 files changed, 29 insertions(+), 5 deletions(-) create mode 100644 crc16/main_test.go diff --git a/crc16/main_test.go b/crc16/main_test.go new file mode 100644 index 0000000000..a1314a9fa1 --- /dev/null +++ b/crc16/main_test.go @@ -0,0 +1,20 @@ +package crc16 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChecksum(t *testing.T) { + result := Checksum([]byte{0x12, 0x34, 0x56, 0x78, 0x90}) + assert.Equal(t, []byte{0xe6, 0x48}, result) +} + +func TestValidate(t *testing.T) { + err := Validate([]byte{0x12, 0x34, 0x56, 0x78, 0x90}, []byte{0xe6, 0x48}) + assert.NoError(t, err) + + err = Validate([]byte{0x12, 0x34, 0x56, 0x78, 0x90}, []byte{0xe7, 0x48}) + assert.ErrorIs(t, err, ErrInvalidChecksum) +} diff --git a/go.list b/go.list index a692515e77..12bc77f5f0 100644 --- a/go.list +++ b/go.list @@ -72,8 +72,8 @@ github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189 github.com/stellar/go github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible -github.com/stretchr/objx v0.1.1 -github.com/stretchr/testify v1.6.1 +github.com/stretchr/objx v0.3.0 +github.com/stretchr/testify v1.7.0 github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/fasthttp v0.0.0-20170109085056-0a7f0a797cd6 diff --git a/go.mod b/go.mod index 9d9084f1be..15db0c82f5 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,8 @@ require ( github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189 github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible - github.com/stretchr/testify v1.6.1 + github.com/stretchr/objx v0.3.0 // indirect + github.com/stretchr/testify v1.7.0 github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v0.0.0-20170109085056-0a7f0a797cd6 // indirect diff --git a/go.sum b/go.sum index abc984b5da..9481c87597 100644 --- a/go.sum +++ b/go.sum @@ -323,13 +323,16 @@ github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a/go.mod h1:yoxyU/M8n github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible h1:jMXXAcz6xTarGDQ4VtVbtERogcmDQw4RaE85Cr9CgoQ= github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/go.mod h1:7CJ23pXirXBJq45DqvO6clzTEGM/l1SfKrgrzLry8b4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 h1:g3yQGZK+G6dfF/mw/SOwsTMzUVkpT4hB8pHxpbTXkKw= github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= From 1acc65cb0bac51ae30175875fd03d8914b2c8d95 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Fri, 17 Sep 2021 06:58:48 -0700 Subject: [PATCH 14/16] crc16: move crc16 inside strkey/internal (#3932) Move the `crc16` package to `strkey/internal/crc16`. The `crc16` package implements a single form of crc16, the form specifically used by [strkeys](https://stellar.org/protocol/sep-23). It is only used by the `strkey` package and it is not really something that needs to be exported for others to use in other contexts. The `crc16` package is not officially released in any particular package. --- {crc16 => strkey/internal/crc16}/main.go | 0 {crc16 => strkey/internal/crc16}/main_test.go | 0 strkey/main.go | 2 +- 3 files changed, 1 insertion(+), 1 deletion(-) rename {crc16 => strkey/internal/crc16}/main.go (100%) rename {crc16 => strkey/internal/crc16}/main_test.go (100%) diff --git a/crc16/main.go b/strkey/internal/crc16/main.go similarity index 100% rename from crc16/main.go rename to strkey/internal/crc16/main.go diff --git a/crc16/main_test.go b/strkey/internal/crc16/main_test.go similarity index 100% rename from crc16/main_test.go rename to strkey/internal/crc16/main_test.go diff --git a/strkey/main.go b/strkey/main.go index 6facce6e2c..ee6ece4d8c 100644 --- a/strkey/main.go +++ b/strkey/main.go @@ -5,7 +5,7 @@ import ( "encoding/base32" "encoding/binary" - "github.com/stellar/go/crc16" + "github.com/stellar/go/strkey/internal/crc16" "github.com/stellar/go/support/errors" ) From 2be7469ff153eb44e51f5a8b9b0fad9456384d18 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 17 Sep 2021 19:47:12 +0100 Subject: [PATCH 15/16] Speed up path finding dfs by avoiding edges leading to visited nodes (#3933) --- exp/orderbook/dfs.go | 33 +++++++++++++++++++-------------- exp/orderbook/graph.go | 2 ++ 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/exp/orderbook/dfs.go b/exp/orderbook/dfs.go index 7dc32d560e..c128e6abe8 100644 --- a/exp/orderbook/dfs.go +++ b/exp/orderbook/dfs.go @@ -59,11 +59,21 @@ type searchState interface { ) (xdr.Asset, xdr.Int64, error) } +func contains(list []string, want string) bool { + for i := 0; i < len(list); i++ { + if list[i] == want { + return true + } + } + return false +} + func dfs( ctx context.Context, state searchState, maxPathLength int, - visited []xdr.Asset, + visitedAssets []xdr.Asset, + visitedAssetStrings []string, remainingTerminalNodes int, currentAssetString string, currentAsset xdr.Asset, @@ -73,31 +83,25 @@ func dfs( if err := ctx.Err(); err != nil { return err } - if currentAssetAmount <= 0 { - return nil - } - for _, asset := range visited { - if asset.Equals(currentAsset) { - return nil - } - } - updatedVisitedList := append(visited, currentAsset) + updatedVisitedAssets := append(visitedAssets, currentAsset) + updatedVisitedStrings := append(visitedAssetStrings, currentAssetString) + if state.isTerminalNode(currentAssetString, currentAssetAmount) { state.appendToPaths( - updatedVisitedList, + updatedVisitedAssets, currentAssetString, currentAssetAmount, ) remainingTerminalNodes-- } // abort search if we've visited all destination nodes or if we've exceeded maxPathLength - if remainingTerminalNodes == 0 || len(updatedVisitedList) > maxPathLength { + if remainingTerminalNodes == 0 || len(updatedVisitedStrings) > maxPathLength { return nil } for nextAssetString, offers := range state.edges(currentAssetString) { - if len(offers) == 0 { + if len(offers) == 0 || contains(visitedAssetStrings, nextAssetString) { continue } @@ -113,7 +117,8 @@ func dfs( ctx, state, maxPathLength, - updatedVisitedList, + updatedVisitedAssets, + updatedVisitedStrings, remainingTerminalNodes, nextAssetString, nextAsset, diff --git a/exp/orderbook/graph.go b/exp/orderbook/graph.go index d41b184656..5fe24f18cb 100644 --- a/exp/orderbook/graph.go +++ b/exp/orderbook/graph.go @@ -329,6 +329,7 @@ func (graph *OrderBookGraph) FindPaths( searchState, maxPathLength, []xdr.Asset{}, + []string{}, len(sourceAssets), destinationAssetString, destinationAsset, @@ -380,6 +381,7 @@ func (graph *OrderBookGraph) FindFixedPaths( searchState, maxPathLength, []xdr.Asset{}, + []string{}, len(destinationAssets), sourceAsset.String(), sourceAsset, From 0599e8167019c13d433563dc76a0172cb53dd42d Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Mon, 20 Sep 2021 13:55:08 +0200 Subject: [PATCH 16/16] services/horizon: Change `ProcessorsRunDuration` metric type from counter to summary (#3940) Add a new metric `ProcessorsRunDurationSummary`/`processor_run_duration_seconds` to replace existing `ProcessorsRunDuration`(`processor_run_duration_seconds_total`. The old metric is now deprecated. The `ProcessorsRunDuration` is a counter. While it allows estimating and comparing actual duration of processors it's impossible to calculate average per ledger run duration because number of events are not counted. --- services/horizon/internal/ingest/fsm.go | 2 ++ services/horizon/internal/ingest/main.go | 12 ++++++++++++ services/horizon/internal/init.go | 1 + 3 files changed, 15 insertions(+) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 6c87f932dd..d610d7f5c7 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -523,6 +523,8 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string] processorName = strings.Replace(processorName, "*", "", -1) s.Metrics().ProcessorsRunDuration. With(prometheus.Labels{"name": processorName}).Add(value.Seconds()) + s.Metrics().ProcessorsRunDurationSummary. + With(prometheus.Labels{"name": processorName}).Observe(value.Seconds()) } } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index d5c0827351..6d77ea2f26 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -129,8 +129,12 @@ type Metrics struct { LedgerStatsCounter *prometheus.CounterVec // ProcessorsRunDuration exposes processors run durations. + // Deprecated in favour of: ProcessorsRunDurationSummary. ProcessorsRunDuration *prometheus.CounterVec + // ProcessorsRunDurationSummary exposes processors run durations. + ProcessorsRunDurationSummary *prometheus.SummaryVec + // CaptiveStellarCoreSynced exposes synced status of Captive Stellar-Core. // 1 if sync, 0 if not synced, -1 if unable to connect or HTTP server disabled. CaptiveStellarCoreSynced prometheus.GaugeFunc @@ -327,6 +331,14 @@ func (s *system) initMetrics() { []string{"name"}, ) + s.metrics.ProcessorsRunDurationSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "processor_run_duration_seconds", + Help: "run durations of ingestion processors, sliding window = 10m", + }, + []string{"name"}, + ) + s.metrics.CaptiveStellarCoreSynced = prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "captive_stellar_core_synced", diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 04755a0807..1e7c798a0c 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -263,6 +263,7 @@ func initIngestMetrics(app *App) { app.prometheusRegistry.MustRegister(app.ingester.Metrics().StateInvalidGauge) app.prometheusRegistry.MustRegister(app.ingester.Metrics().LedgerStatsCounter) app.prometheusRegistry.MustRegister(app.ingester.Metrics().ProcessorsRunDuration) + app.prometheusRegistry.MustRegister(app.ingester.Metrics().ProcessorsRunDurationSummary) app.prometheusRegistry.MustRegister(app.ingester.Metrics().CaptiveStellarCoreSynced) app.prometheusRegistry.MustRegister(app.ingester.Metrics().CaptiveCoreSupportedProtocolVersion) }