diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 0cebe53c3d..b6291051e5 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -9,9 +9,15 @@ "options": { "cwd": "${workspaceRoot}" }, - "tasks": [{ - "taskName": "test all", - "isWatching": false, - "args": ["test", "./..."] - }] -} \ No newline at end of file + "tasks": [ + { + "label": "test all", + "type": "shell", + "args": [ + "test", + "./..." + ], + "problemMatcher": [] + } + ] +} diff --git a/exp/orderbook/dfs.go b/exp/orderbook/dfs.go index 7dc32d560e..c128e6abe8 100644 --- a/exp/orderbook/dfs.go +++ b/exp/orderbook/dfs.go @@ -59,11 +59,21 @@ type searchState interface { ) (xdr.Asset, xdr.Int64, error) } +func contains(list []string, want string) bool { + for i := 0; i < len(list); i++ { + if list[i] == want { + return true + } + } + return false +} + func dfs( ctx context.Context, state searchState, maxPathLength int, - visited []xdr.Asset, + visitedAssets []xdr.Asset, + visitedAssetStrings []string, remainingTerminalNodes int, currentAssetString string, currentAsset xdr.Asset, @@ -73,31 +83,25 @@ func dfs( if err := ctx.Err(); err != nil { return err } - if currentAssetAmount <= 0 { - return nil - } - for _, asset := range visited { - if asset.Equals(currentAsset) { - return nil - } - } - updatedVisitedList := append(visited, currentAsset) + updatedVisitedAssets := append(visitedAssets, currentAsset) + updatedVisitedStrings := append(visitedAssetStrings, currentAssetString) + if state.isTerminalNode(currentAssetString, currentAssetAmount) { state.appendToPaths( - updatedVisitedList, + updatedVisitedAssets, currentAssetString, currentAssetAmount, ) remainingTerminalNodes-- } // abort search if we've visited all destination nodes or if we've exceeded maxPathLength - if remainingTerminalNodes == 0 || len(updatedVisitedList) > maxPathLength { + if remainingTerminalNodes == 0 || len(updatedVisitedStrings) > maxPathLength { return nil } for nextAssetString, offers := range state.edges(currentAssetString) { - if len(offers) == 0 { + if len(offers) == 0 || contains(visitedAssetStrings, nextAssetString) { continue } @@ -113,7 +117,8 @@ func dfs( ctx, state, maxPathLength, - updatedVisitedList, + updatedVisitedAssets, + updatedVisitedStrings, remainingTerminalNodes, nextAssetString, nextAsset, diff --git a/exp/orderbook/graph.go b/exp/orderbook/graph.go index 918b9e0b96..c2224549e7 100644 --- a/exp/orderbook/graph.go +++ b/exp/orderbook/graph.go @@ -365,6 +365,7 @@ func (graph *OrderBookGraph) FindPaths( searchState, maxPathLength, []xdr.Asset{}, + []string{}, len(sourceAssets), destinationAssetString, destinationAsset, @@ -416,6 +417,7 @@ func (graph *OrderBookGraph) FindFixedPaths( searchState, maxPathLength, []xdr.Asset{}, + []string{}, len(destinationAssets), sourceAsset.String(), sourceAsset, diff --git a/exp/services/recoverysigner/Makefile b/exp/services/recoverysigner/Makefile index d563bab2b6..f5e357d356 100644 --- a/exp/services/recoverysigner/Makefile +++ b/exp/services/recoverysigner/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/recoverysigner:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../../ && \ diff --git a/exp/services/webauth/Makefile b/exp/services/webauth/Makefile index 367c5bf9c4..f29f06d53c 100644 --- a/exp/services/webauth/Makefile +++ b/exp/services/webauth/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/webauth:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../../ && \ diff --git a/go.list b/go.list index 697d6430b5..141720055c 100644 --- a/go.list +++ b/go.list @@ -72,8 +72,8 @@ github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189 github.com/stellar/go github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible -github.com/stretchr/objx v0.1.1 -github.com/stretchr/testify v1.6.1 +github.com/stretchr/objx v0.3.0 +github.com/stretchr/testify v1.7.0 github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/fasthttp v0.0.0-20170109085056-0a7f0a797cd6 diff --git a/go.mod b/go.mod index ec51e84a3a..5062466bea 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,8 @@ require ( github.com/spf13/viper v0.0.0-20150621231900-db7ff930a189 github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible - github.com/stretchr/testify v1.6.1 + github.com/stretchr/objx v0.3.0 // indirect + github.com/stretchr/testify v1.7.0 github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v0.0.0-20170109085056-0a7f0a797cd6 // indirect diff --git a/go.sum b/go.sum index 0b7b5c233a..87ee939acd 100644 --- a/go.sum +++ b/go.sum @@ -323,13 +323,16 @@ github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a/go.mod h1:yoxyU/M8n github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible h1:jMXXAcz6xTarGDQ4VtVbtERogcmDQw4RaE85Cr9CgoQ= github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/go.mod h1:7CJ23pXirXBJq45DqvO6clzTEGM/l1SfKrgrzLry8b4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 h1:g3yQGZK+G6dfF/mw/SOwsTMzUVkpT4hB8pHxpbTXkKw= github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= diff --git a/keypair/from_address_test.go b/keypair/from_address_test.go index 1800f03829..e6ce4cd052 100644 --- a/keypair/from_address_test.go +++ b/keypair/from_address_test.go @@ -10,6 +10,11 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFromAddress_Hint(t *testing.T) { + kp := MustParseAddress("GAYUB4KATGTUZEGUMJEOZDPPWM4MQLHCIKC4T55YSXHN234WI6BJMIY2") + assert.Equal(t, [4]byte{0x96, 0x47, 0x82, 0x96}, kp.Hint()) +} + func TestFromAddress_Equal(t *testing.T) { // A nil FromAddress. var kp0 *FromAddress diff --git a/keypair/full_test.go b/keypair/full_test.go index 8d2913e0cf..bc0adcd9e9 100644 --- a/keypair/full_test.go +++ b/keypair/full_test.go @@ -10,6 +10,12 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFull_Hint(t *testing.T) { + kp := MustParseFull("SBFGFF27Y64ZUGFAIG5AMJGQODZZKV2YQKAVUUN4HNE24XZXD2OEUVUP") + assert.Equal(t, [4]byte{0x96, 0x47, 0x82, 0x96}, kp.Hint()) + assert.Equal(t, [4]byte{0x96, 0x47, 0x82, 0x96}, kp.FromAddress().Hint()) +} + func TestFull_Equal(t *testing.T) { // A nil Full. var kp0 *Full diff --git a/services/friendbot/Makefile b/services/friendbot/Makefile index b8527efa03..49ab73a5cd 100644 --- a/services/friendbot/Makefile +++ b/services/friendbot/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/friendbot:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ diff --git a/services/horizon/docker/Makefile b/services/horizon/docker/Makefile index 0b5966055e..2ae7ffd816 100644 --- a/services/horizon/docker/Makefile +++ b/services/horizon/docker/Makefile @@ -1,7 +1,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) TAG ?= stellar/stellar-horizon:$(VERSION) diff --git a/services/horizon/docker/verify-range/dependencies b/services/horizon/docker/verify-range/dependencies index 2bd5784f5c..3afb8485ce 100644 --- a/services/horizon/docker/verify-range/dependencies +++ b/services/horizon/docker/verify-range/dependencies @@ -10,8 +10,8 @@ echo "deb https://apt.stellar.org $(lsb_release -cs) stable" | sudo tee -a /etc/ apt-get update apt-get install -y stellar-core=${STELLAR_CORE_VERSION} -wget https://dl.google.com/go/go1.16.5.linux-amd64.tar.gz -tar -C /usr/local -xzf go1.16.5.linux-amd64.tar.gz +wget https://dl.google.com/go/go1.17.linux-amd64.tar.gz +tar -C /usr/local -xzf go1.17.linux-amd64.tar.gz # configure postgres service postgresql start diff --git a/services/horizon/docker/verify-range/start b/services/horizon/docker/verify-range/start index 5ec4880a4d..5cf340b6c2 100644 --- a/services/horizon/docker/verify-range/start +++ b/services/horizon/docker/verify-range/start @@ -67,7 +67,7 @@ git pull origin if [ ! -z "$BRANCH" ]; then git checkout $BRANCH fi -git log -1 +git log -1 --pretty=oneline function alter_tables_unlogged() { # UNLOGGED for performance reasons (order is important because some tables reference others) diff --git a/services/horizon/internal/action_offers_test.go b/services/horizon/internal/action_offers_test.go index 737afe2fac..13458db9fe 100644 --- a/services/horizon/internal/action_offers_test.go +++ b/services/horizon/internal/action_offers_test.go @@ -68,12 +68,8 @@ func TestOfferActions_Show(t *testing.T) { LastModifiedLedger: uint32(4), } - batch := q.NewOffersBatchInsertBuilder(3) - err = batch.Add(ctx, eurOffer) + err = q.UpsertOffers(ctx, []history.Offer{eurOffer, usdOffer}) ht.Assert.NoError(err) - err = batch.Add(ctx, usdOffer) - ht.Assert.NoError(err) - ht.Assert.NoError(batch.Exec(ctx)) w := ht.Get("/offers") if ht.Assert.Equal(200, w.Code) { diff --git a/services/horizon/internal/actions/offer_test.go b/services/horizon/internal/actions/offer_test.go index 781265aaf0..108af3236d 100644 --- a/services/horizon/internal/actions/offer_test.go +++ b/services/horizon/internal/actions/offer_test.go @@ -89,12 +89,8 @@ func TestGetOfferByIDHandler(t *testing.T) { }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) - batch := q.NewOffersBatchInsertBuilder(0) - err = batch.Add(tt.Ctx, eurOffer) + err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, usdOffer}) tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) - tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) for _, testCase := range []struct { name string @@ -200,14 +196,8 @@ func TestGetOffersHandler(t *testing.T) { }, 0, 0, 0, 0, 0) tt.Assert.NoError(err) - batch := q.NewOffersBatchInsertBuilder(0) - err = batch.Add(tt.Ctx, eurOffer) - tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, twoEurOffer) + err = q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) - tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) t.Run("No filter", func(t *testing.T) { records, err := handler.GetResourcePage( @@ -477,13 +467,8 @@ func TestGetAccountOffersHandler(t *testing.T) { q := &history.Q{tt.HorizonSession()} handler := GetAccountOffersHandler{} - batch := q.NewOffersBatchInsertBuilder(0) - err := batch.Add(tt.Ctx, eurOffer) - err = batch.Add(tt.Ctx, twoEurOffer) - tt.Assert.NoError(err) - err = batch.Add(tt.Ctx, usdOffer) + err := q.UpsertOffers(tt.Ctx, []history.Offer{eurOffer, twoEurOffer, usdOffer}) tt.Assert.NoError(err) - tt.Assert.NoError(batch.Exec(tt.Ctx)) records, err := handler.GetResourcePage( httptest.NewRecorder(), diff --git a/services/horizon/internal/actions/orderbook_test.go b/services/horizon/internal/actions/orderbook_test.go index eef810d606..b73191361a 100644 --- a/services/horizon/internal/actions/orderbook_test.go +++ b/services/horizon/internal/actions/orderbook_test.go @@ -575,12 +575,7 @@ func TestOrderbookGetResource(t *testing.T) { } assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"})) - - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 3f48c10814..d1347c7249 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -8,12 +8,14 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "strings" "sync" "time" sq "github.com/Masterminds/squirrel" "github.com/guregu/null" "github.com/jmoiron/sqlx" + "github.com/lib/pq" "github.com/stellar/go/services/horizon/internal/db2" "github.com/stellar/go/support/db" @@ -620,6 +622,14 @@ type ManageOffer struct { OfferID int64 `json:"offer_id"` } +// upsertField is used in upsertRows function generating upsert query for +// different tables. +type upsertField struct { + name string + dbType string + objects []interface{} +} + // Offer is row of data from the `offers` table from horizon DB type Offer struct { SellerID string `db:"seller_id"` @@ -638,16 +648,6 @@ type Offer struct { Sponsor null.String `db:"sponsor"` } -type OffersBatchInsertBuilder interface { - Add(ctx context.Context, offer Offer) error - Exec(ctx context.Context) error -} - -// offersBatchInsertBuilder is a simple wrapper around db.BatchInsertBuilder -type offersBatchInsertBuilder struct { - builder db.BatchInsertBuilder -} - // OperationsQ is a helper struct to aid in configuring queries that loads // slices of Operation structs. type OperationsQ struct { @@ -785,15 +785,6 @@ func (q *Q) NewAccountDataBatchInsertBuilder(maxBatchSize int) AccountDataBatchI } } -func (q *Q) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder { - return &offersBatchInsertBuilder{ - builder: db.BatchInsertBuilder{ - Table: q.GetTable("offers"), - MaxBatchSize: maxBatchSize, - }, - } -} - // ElderLedger loads the oldest ledger known to the history database func (q *Q) ElderLedger(ctx context.Context, dest interface{}) error { return q.GetRaw(ctx, dest, `SELECT COALESCE(MIN(sequence), 0) FROM history_ledgers`) @@ -864,3 +855,67 @@ func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error { } return nil } + +// upsertRows builds and executes an upsert query that allows very fast upserts +// to a given table. The final query is of form: +// +// WITH r AS +// (SELECT +// /* unnestPart */ +// unnest(?::type1[]), /* field1 */ +// unnest(?::type2[]), /* field2 */ +// ... +// ) +// INSERT INTO table ( +// /* insertFieldsPart */ +// field1, +// field2, +// ... +// ) +// SELECT * from r +// ON CONFLICT (conflictField) DO UPDATE SET +// /* onConflictPart */ +// field1 = excluded.field1, +// field2 = excluded.field2, +// ... +func (q *Q) upsertRows(ctx context.Context, table string, conflictField string, fields []upsertField) error { + unnestPart := make([]string, 0, len(fields)) + insertFieldsPart := make([]string, 0, len(fields)) + onConflictPart := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + insertFieldsPart = append( + insertFieldsPart, + field.name, + ) + onConflictPart = append( + onConflictPart, + fmt.Sprintf("%s = excluded.%s", field.name, field.name), + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + + sql := ` + WITH r AS + (SELECT ` + strings.Join(unnestPart, ",") + `) + INSERT INTO ` + table + ` + (` + strings.Join(insertFieldsPart, ",") + `) + SELECT * from r + ON CONFLICT (` + conflictField + `) DO UPDATE SET + ` + strings.Join(onConflictPart, ",") + + _, err := q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.UpsertQueryType), + sql, + pqArrays..., + ) + return err +} diff --git a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go b/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go deleted file mode 100644 index 75e88f3263..0000000000 --- a/services/horizon/internal/db2/history/mock_offers_batch_insert_builder.go +++ /dev/null @@ -1,20 +0,0 @@ -package history - -import ( - "context" - "github.com/stretchr/testify/mock" -) - -type MockOffersBatchInsertBuilder struct { - mock.Mock -} - -func (m *MockOffersBatchInsertBuilder) Add(ctx context.Context, row Offer) error { - a := m.Called(ctx, row) - return a.Error(0) -} - -func (m *MockOffersBatchInsertBuilder) Exec(ctx context.Context) error { - a := m.Called(ctx) - return a.Error(0) -} diff --git a/services/horizon/internal/db2/history/mock_q_offers.go b/services/horizon/internal/db2/history/mock_q_offers.go index 0b23720b7f..9cbf673fc5 100644 --- a/services/horizon/internal/db2/history/mock_q_offers.go +++ b/services/horizon/internal/db2/history/mock_q_offers.go @@ -2,6 +2,7 @@ package history import ( "context" + "github.com/stretchr/testify/mock" ) @@ -30,14 +31,9 @@ func (m *MockQOffers) CountOffers(ctx context.Context) (int, error) { return a.Get(0).(int), a.Error(1) } -func (m *MockQOffers) NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder { - a := m.Called(maxBatchSize) - return a.Get(0).(OffersBatchInsertBuilder) -} - -func (m *MockQOffers) UpdateOffer(ctx context.Context, row Offer) (int64, error) { - a := m.Called(ctx, row) - return a.Get(0).(int64), a.Error(1) +func (m *MockQOffers) UpsertOffers(ctx context.Context, rows []Offer) error { + a := m.Called(ctx, rows) + return a.Error(0) } func (m *MockQOffers) RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) { diff --git a/services/horizon/internal/db2/history/offers.go b/services/horizon/internal/db2/history/offers.go index fe37ba91c9..5b875b72cf 100644 --- a/services/horizon/internal/db2/history/offers.go +++ b/services/horizon/internal/db2/history/offers.go @@ -14,8 +14,7 @@ type QOffers interface { GetOffersByIDs(ctx context.Context, ids []int64) ([]Offer, error) CountOffers(ctx context.Context) (int, error) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]Offer, error) - NewOffersBatchInsertBuilder(maxBatchSize int) OffersBatchInsertBuilder - UpdateOffer(ctx context.Context, offer Offer) (int64, error) + UpsertOffers(ctx context.Context, offers []Offer) error RemoveOffers(ctx context.Context, offerIDs []int64, lastModifiedLedger uint32) (int64, error) CompactOffers(ctx context.Context, cutOffSequence uint32) (int64, error) } @@ -96,15 +95,43 @@ func (q *Q) GetUpdatedOffers(ctx context.Context, newerThanSequence uint32) ([]O return offers, err } -// UpdateOffer updates a row in the offers table. -// Returns number of rows affected and error. -func (q *Q) UpdateOffer(ctx context.Context, offer Offer) (int64, error) { - updateBuilder := q.GetTable("offers").Update() - result, err := updateBuilder.SetStruct(offer, []string{}).Where("offer_id = ?", offer.OfferID).Exec(ctx) - if err != nil { - return 0, err +// UpsertOffers upserts a batch of offers in the offerss table. +// There's currently no limit of the number of offers this method can +// accept other than 2GB limit of the query string length what should be enough +// for each ledger with the current limits. +func (q *Q) UpsertOffers(ctx context.Context, offers []Offer) error { + var sellerID, sellingAsset, buyingAsset, offerID, amount, priceN, priceD, + price, flags, lastModifiedLedger, sponsor []interface{} + + for _, offer := range offers { + sellerID = append(sellerID, offer.SellerID) + offerID = append(offerID, offer.OfferID) + sellingAsset = append(sellingAsset, offer.SellingAsset) + buyingAsset = append(buyingAsset, offer.BuyingAsset) + amount = append(amount, offer.Amount) + priceN = append(priceN, offer.Pricen) + priceD = append(priceD, offer.Priced) + price = append(price, offer.Price) + flags = append(flags, offer.Flags) + lastModifiedLedger = append(lastModifiedLedger, offer.LastModifiedLedger) + sponsor = append(sponsor, offer.Sponsor) } - return result.RowsAffected() + + upsertFields := []upsertField{ + {"seller_id", "text", sellerID}, + {"offer_id", "bigint", offerID}, + {"selling_asset", "text", sellingAsset}, + {"buying_asset", "text", buyingAsset}, + {"amount", "bigint", amount}, + {"pricen", "integer", priceN}, + {"priced", "integer", priceD}, + {"price", "double precision", price}, + {"flags", "integer", flags}, + {"last_modified_ledger", "integer", lastModifiedLedger}, + {"sponsor", "text", sponsor}, + } + + return q.upsertRows(ctx, "offers", "offer_id", upsertFields) } // RemoveOffers marks rows in the offers table as deleted. diff --git a/services/horizon/internal/db2/history/offers_batch_insert_builder.go b/services/horizon/internal/db2/history/offers_batch_insert_builder.go deleted file mode 100644 index c1d3b628a9..0000000000 --- a/services/horizon/internal/db2/history/offers_batch_insert_builder.go +++ /dev/null @@ -1,14 +0,0 @@ -package history - -import ( - "context" -) - -// Add adds a new offer entry to the batch. -func (i *offersBatchInsertBuilder) Add(ctx context.Context, offer Offer) error { - return i.builder.RowStruct(ctx, offer) -} - -func (i *offersBatchInsertBuilder) Exec(ctx context.Context) error { - return i.builder.Exec(ctx) -} diff --git a/services/horizon/internal/db2/history/offers_test.go b/services/horizon/internal/db2/history/offers_test.go index 2218ffc71a..61f228e33a 100644 --- a/services/horizon/internal/db2/history/offers_test.go +++ b/services/horizon/internal/db2/history/offers_test.go @@ -64,12 +64,7 @@ var ( ) func insertOffer(tt *test.T, q *Q, offer Offer) error { - batch := q.NewOffersBatchInsertBuilder(0) - err := batch.Add(tt.Ctx, offer) - if err != nil { - return err - } - return batch.Exec(tt.Ctx) + return q.UpsertOffers(tt.Ctx, []Offer{offer}) } func TestGetOfferByID(t *testing.T) { @@ -194,9 +189,8 @@ func TestUpdateOffer(t *testing.T) { modifiedEurOffer := eurOffer modifiedEurOffer.Amount -= 10 - rowsAffected, err := q.UpdateOffer(tt.Ctx, modifiedEurOffer) + err = q.UpsertOffers(tt.Ctx, []Offer{modifiedEurOffer}) tt.Assert.NoError(err) - tt.Assert.Equal(int64(1), rowsAffected) offers, err = q.GetAllOffers(tt.Ctx) tt.Assert.NoError(err) diff --git a/services/horizon/internal/db2/history/orderbook_test.go b/services/horizon/internal/db2/history/orderbook_test.go index 30b27f4a64..84b3213b69 100644 --- a/services/horizon/internal/db2/history/orderbook_test.go +++ b/services/horizon/internal/db2/history/orderbook_test.go @@ -213,12 +213,7 @@ func TestGetOrderBookSummary(t *testing.T) { } { t.Run(testCase.name, func(t *testing.T) { assert.NoError(t, q.TruncateTables(tt.Ctx, []string{"offers"})) - - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range testCase.offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, testCase.offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, @@ -260,11 +255,7 @@ func TestGetOrderBookSummaryExcludesRemovedOffers(t *testing.T) { sellEurOffer, } - batch := q.NewOffersBatchInsertBuilder(0) - for _, offer := range offers { - assert.NoError(t, batch.Add(tt.Ctx, offer)) - } - assert.NoError(t, batch.Exec(tt.Ctx)) + assert.NoError(t, q.UpsertOffers(tt.Ctx, offers)) assert.NoError(t, q.BeginTx(&sql.TxOptions{ Isolation: sql.LevelRepeatableRead, diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 6c87f932dd..d610d7f5c7 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -523,6 +523,8 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string] processorName = strings.Replace(processorName, "*", "", -1) s.Metrics().ProcessorsRunDuration. With(prometheus.Labels{"name": processorName}).Add(value.Seconds()) + s.Metrics().ProcessorsRunDurationSummary. + With(prometheus.Labels{"name": processorName}).Observe(value.Seconds()) } } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index c7fa9f3e3f..60fb5438a1 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -129,8 +129,12 @@ type Metrics struct { LedgerStatsCounter *prometheus.CounterVec // ProcessorsRunDuration exposes processors run durations. + // Deprecated in favour of: ProcessorsRunDurationSummary. ProcessorsRunDuration *prometheus.CounterVec + // ProcessorsRunDurationSummary exposes processors run durations. + ProcessorsRunDurationSummary *prometheus.SummaryVec + // CaptiveStellarCoreSynced exposes synced status of Captive Stellar-Core. // 1 if sync, 0 if not synced, -1 if unable to connect or HTTP server disabled. CaptiveStellarCoreSynced prometheus.GaugeFunc @@ -327,6 +331,14 @@ func (s *system) initMetrics() { []string{"name"}, ) + s.metrics.ProcessorsRunDurationSummary = prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "processor_run_duration_seconds", + Help: "run durations of ingestion processors, sliding window = 10m", + }, + []string{"name"}, + ) + s.metrics.CaptiveStellarCoreSynced = prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "captive_stellar_core_synced", diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 7df5e09aed..7d9747330d 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -20,6 +20,7 @@ func TestCalculateParallelLedgerBatchSize(t *testing.T) { assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) + assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1)) } type sorteableRanges []ledgerRange @@ -62,6 +63,18 @@ func TestParallelReingestRange(t *testing.T) { } assert.Equal(t, expected, rangesCalled) + rangesCalled = nil + system, err = newParallelSystems(config, 1, factory) + assert.NoError(t, err) + err = system.ReingestRange(0, 1024, 64) + expected = sorteableRanges{ + {from: 0, to: 63}, {from: 64, to: 127}, {from: 128, to: 191}, {from: 192, to: 255}, {from: 256, to: 319}, + {from: 320, to: 383}, {from: 384, to: 447}, {from: 448, to: 511}, {from: 512, to: 575}, {from: 576, to: 639}, + {from: 640, to: 703}, {from: 704, to: 767}, {from: 768, to: 831}, {from: 832, to: 895}, {from: 896, to: 959}, + {from: 960, to: 1023}, + } + assert.NoError(t, err) + assert.Equal(t, expected, rangesCalled) } func TestParallelReingestRangeError(t *testing.T) { diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index c36d98c442..549732caca 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -24,12 +24,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) { q := &mockDBQ{} // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -119,12 +113,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) { ).Once() // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -197,11 +185,6 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t defer mock.AssertExpectationsForObjects(t, historyAdapter) // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). @@ -239,8 +222,6 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { defer mock.AssertExpectationsForObjects(t, q) // Twice = checking ledgerSource and historyArchiveSource - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(&history.MockOffersBatchInsertBuilder{}).Twice() q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). Return(&history.MockAccountDataBatchInsertBuilder{}).Twice() q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). @@ -342,12 +323,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - mockOffersBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) mockAccountDataBatchInsertBuilder.On("Exec", ctx).Return(nil).Once() @@ -418,11 +393,6 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t } // Batches - mockOffersBatchInsertBuilder := &history.MockOffersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOffersBatchInsertBuilder) - q.MockQOffers.On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(mockOffersBatchInsertBuilder).Once() - mockAccountDataBatchInsertBuilder := &history.MockAccountDataBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockAccountDataBatchInsertBuilder) q.MockQData.On("NewAccountDataBatchInsertBuilder", maxBatchSize). diff --git a/services/horizon/internal/ingest/processors/offers_processor.go b/services/horizon/internal/ingest/processors/offers_processor.go index 150080f442..72ee493c59 100644 --- a/services/horizon/internal/ingest/processors/offers_processor.go +++ b/services/horizon/internal/ingest/processors/offers_processor.go @@ -18,7 +18,6 @@ type OffersProcessor struct { sequence uint32 cache *ingest.ChangeCompactor - insertBatch history.OffersBatchInsertBuilder removeBatch []int64 } @@ -30,7 +29,6 @@ func NewOffersProcessor(offersQ history.QOffers, sequence uint32) *OffersProcess func (p *OffersProcessor) reset() { p.cache = ingest.NewChangeCompactor() - p.insertBatch = p.offersQ.NewOffersBatchInsertBuilder(maxBatchSize) p.removeBatch = []int64{} } @@ -71,59 +69,43 @@ func (p *OffersProcessor) ledgerEntryToRow(entry *xdr.LedgerEntry) history.Offer } func (p *OffersProcessor) flushCache(ctx context.Context) error { + batchUpsertOffers := []history.Offer{} changes := p.cache.GetChanges() for _, change := range changes { - var rowsAffected int64 - var err error - var action string - var offerID xdr.Int64 - switch { - case change.Pre == nil && change.Post != nil: - // Created - action = "inserting" + case change.Post != nil: + // Created and updated row := p.ledgerEntryToRow(change.Post) - err = p.insertBatch.Add(ctx, row) - rowsAffected = 1 // We don't track this when batch inserting + batchUpsertOffers = append(batchUpsertOffers, row) case change.Pre != nil && change.Post == nil: // Removed - action = "removing" offer := change.Pre.Data.MustOffer() p.removeBatch = append(p.removeBatch, int64(offer.OfferId)) - rowsAffected = 1 // We don't track this when batch removing default: - // Updated - action = "updating" - offer := change.Post.Data.MustOffer() - offerID = offer.OfferId - row := p.ledgerEntryToRow(change.Post) - rowsAffected, err = p.offersQ.UpdateOffer(ctx, row) + return errors.New("Invalid io.Change: change.Pre == nil && change.Post == nil") } + } + if len(batchUpsertOffers) > 0 { + err := p.offersQ.UpsertOffers(ctx, batchUpsertOffers) if err != nil { - return err - } - - if rowsAffected != 1 { - return ingest.NewStateError(errors.Errorf( - "%d rows affected when %s offer %d", - rowsAffected, - action, - offerID, - )) + return errors.Wrap(err, "errors in UpsertOffers") } } - err := p.insertBatch.Exec(ctx) - if err != nil { - return errors.Wrap(err, "error executing batch") - } - if len(p.removeBatch) > 0 { - _, err = p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence) + rowsAffected, err := p.offersQ.RemoveOffers(ctx, p.removeBatch, p.sequence) if err != nil { return errors.Wrap(err, "error in RemoveOffers") } + + if rowsAffected != int64(len(p.removeBatch)) { + return ingest.NewStateError(errors.Errorf( + "%d rows affected when removing %d offers", + rowsAffected, + len(p.removeBatch), + )) + } } return nil diff --git a/services/horizon/internal/ingest/processors/offers_processor_test.go b/services/horizon/internal/ingest/processors/offers_processor_test.go index 1b1e621350..ca2e106c5e 100644 --- a/services/horizon/internal/ingest/processors/offers_processor_test.go +++ b/services/horizon/internal/ingest/processors/offers_processor_test.go @@ -21,33 +21,25 @@ func TestOffersProcessorTestSuiteState(t *testing.T) { type OffersProcessorTestSuiteState struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - mockBatchInsertBuilder *history.MockOffersBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 } func (s *OffersProcessorTestSuiteState) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} - s.mockBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} - - s.mockQ. - On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) } func (s *OffersProcessorTestSuiteState) TearDownTest() { - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *OffersProcessorTestSuiteState) TestCreateOffer() { @@ -65,13 +57,15 @@ func (s *OffersProcessorTestSuiteState) TestCreateOffer() { LastModifiedLedgerSeq: lastModifiedLedgerSeq, } - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 1, - Pricen: int32(1), - Priced: int32(2), - Price: float64(0.5), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 1, + Pricen: int32(1), + Priced: int32(2), + Price: float64(0.5), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() err := s.processor.ProcessChange(s.ctx, ingest.Change{ @@ -88,21 +82,15 @@ func TestOffersProcessorTestSuiteLedger(t *testing.T) { type OffersProcessorTestSuiteLedger struct { suite.Suite - ctx context.Context - processor *OffersProcessor - mockQ *history.MockQOffers - mockBatchInsertBuilder *history.MockOffersBatchInsertBuilder - sequence uint32 + ctx context.Context + processor *OffersProcessor + mockQ *history.MockQOffers + sequence uint32 } func (s *OffersProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() s.mockQ = &history.MockQOffers{} - s.mockBatchInsertBuilder = &history.MockOffersBatchInsertBuilder{} - - s.mockQ. - On("NewOffersBatchInsertBuilder", maxBatchSize). - Return(s.mockBatchInsertBuilder).Once() s.sequence = 456 s.processor = NewOffersProcessor(s.mockQ, s.sequence) @@ -110,7 +98,6 @@ func (s *OffersProcessorTestSuiteLedger) SetupTest() { func (s *OffersProcessorTestSuiteLedger) TearDownTest() { s.mockQ.AssertExpectations(s.T()) - s.mockBatchInsertBuilder.AssertExpectations(s.T()) } func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { @@ -179,16 +166,16 @@ func (s *OffersProcessorTestSuiteLedger) setupInsertOffer() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() - - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() } func (s *OffersProcessorTestSuiteLedger) TestInsertOffer() { @@ -216,7 +203,7 @@ func (s *OffersProcessorTestSuiteLedger) TestCompactionError() { s.Assert().EqualError(s.processor.Commit(s.ctx), "could not compact offers: compaction error") } -func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { +func (s *OffersProcessorTestSuiteLedger) TestUpsertManyOffers() { lastModifiedLedgerSeq := xdr.Uint32(1234) offer := xdr.OfferEntry{ @@ -230,6 +217,12 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { Price: xdr.Price{1, 6}, } + anotherOffer := xdr.OfferEntry{ + SellerId: xdr.MustAddress("GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P"), + OfferId: xdr.Int64(3), + Price: xdr.Price{2, 3}, + } + updatedEntry := xdr.LedgerEntry{ LastModifiedLedgerSeq: lastModifiedLedgerSeq, Data: xdr.LedgerEntryData{ @@ -251,19 +244,46 @@ func (s *OffersProcessorTestSuiteLedger) TestUpdateOfferNoRowsAffected() { }) s.Assert().NoError(err) - s.mockQ.On("UpdateOffer", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), - }).Return(int64(0), nil).Once() + err = s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: nil, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: lastModifiedLedgerSeq, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &anotherOffer, + }, + }, + }) + s.Assert().NoError(err) - err = s.processor.Commit(s.ctx) - s.Assert().Error(err) - s.Assert().IsType(ingest.StateError{}, errors.Cause(err)) - s.Assert().EqualError(err, "error flushing cache: 0 rows affected when updating offer 2") + s.mockQ.On("UpsertOffers", s.ctx, mock.Anything).Run(func(args mock.Arguments) { + // To fix order issue due to using ChangeCompactor + offers := args.Get(1).([]history.Offer) + s.Assert().ElementsMatch( + offers, + []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + { + SellerID: "GDMUVYVYPYZYBDXNJWKFT3X2GCZCICTL3GSVP6AWBGB4ZZG7ZRDA746P", + OfferID: 3, + Pricen: int32(2), + Priced: int32(3), + Price: float64(2) / float64(3), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, + }, + ) + }).Return(nil).Once() + s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() + s.Assert().NoError(s.processor.Commit(s.ctx)) } func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { @@ -284,8 +304,6 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveOffer() { s.Assert().NoError(err) s.mockQ.On("RemoveOffers", s.ctx, []int64{3}, s.sequence).Return(int64(1), nil).Once() - - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -339,16 +357,17 @@ func (s *OffersProcessorTestSuiteLedger) TestProcessUpgradeChange() { s.Assert().NoError(err) // We use LedgerEntryChangesCache so all changes are squashed - s.mockBatchInsertBuilder.On("Add", s.ctx, history.Offer{ - SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", - OfferID: 2, - Pricen: int32(1), - Priced: int32(6), - Price: float64(1) / float64(6), - LastModifiedLedger: uint32(lastModifiedLedgerSeq), + s.mockQ.On("UpsertOffers", s.ctx, []history.Offer{ + { + SellerID: "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + OfferID: 2, + Pricen: int32(1), + Priced: int32(6), + Price: float64(1) / float64(6), + LastModifiedLedger: uint32(lastModifiedLedgerSeq), + }, }).Return(nil).Once() - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.Assert().NoError(s.processor.Commit(s.ctx)) } @@ -386,14 +405,57 @@ func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffers() { }) s.Assert().NoError(err) - s.mockBatchInsertBuilder.On("Exec", s.ctx).Return(nil).Once() s.mockQ.On("CompactOffers", s.ctx, s.sequence-100).Return(int64(0), nil).Once() s.mockQ.On("RemoveOffers", s.ctx, mock.Anything, s.sequence).Run(func(args mock.Arguments) { // To fix order issue due to using ChangeCompactor ids := args.Get(1).([]int64) s.Assert().ElementsMatch(ids, []int64{3, 4}) - }).Return(int64(0), nil).Once() + }).Return(int64(2), nil).Once() err = s.processor.Commit(s.ctx) s.Assert().NoError(err) } + +func (s *OffersProcessorTestSuiteLedger) TestRemoveMultipleOffersRowsAffectedCheck() { + err := s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(3), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + err = s.processor.ProcessChange(s.ctx, ingest.Change{ + Type: xdr.LedgerEntryTypeOffer, + Pre: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeOffer, + Offer: &xdr.OfferEntry{ + SellerId: xdr.MustAddress("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + OfferId: xdr.Int64(4), + Price: xdr.Price{3, 1}, + }, + }, + }, + Post: nil, + }) + s.Assert().NoError(err) + + s.mockQ.On("RemoveOffers", s.ctx, mock.Anything, s.sequence).Run(func(args mock.Arguments) { + // To fix order issue due to using ChangeCompactor + ids := args.Get(1).([]int64) + s.Assert().ElementsMatch(ids, []int64{3, 4}) + }).Return(int64(0), nil).Once() + + err = s.processor.Commit(s.ctx) + s.Assert().IsType(ingest.StateError{}, errors.Cause(err)) + s.Assert().EqualError(err, "error flushing cache: 0 rows affected when removing 2 offers") +} diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 04755a0807..1e7c798a0c 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -263,6 +263,7 @@ func initIngestMetrics(app *App) { app.prometheusRegistry.MustRegister(app.ingester.Metrics().StateInvalidGauge) app.prometheusRegistry.MustRegister(app.ingester.Metrics().LedgerStatsCounter) app.prometheusRegistry.MustRegister(app.ingester.Metrics().ProcessorsRunDuration) + app.prometheusRegistry.MustRegister(app.ingester.Metrics().ProcessorsRunDurationSummary) app.prometheusRegistry.MustRegister(app.ingester.Metrics().CaptiveStellarCoreSynced) app.prometheusRegistry.MustRegister(app.ingester.Metrics().CaptiveCoreSupportedProtocolVersion) } diff --git a/services/horizon/internal/scripts/check_release_hash/Dockerfile b/services/horizon/internal/scripts/check_release_hash/Dockerfile index 44cdbcfe1b..f05ca6260e 100644 --- a/services/horizon/internal/scripts/check_release_hash/Dockerfile +++ b/services/horizon/internal/scripts/check_release_hash/Dockerfile @@ -9,6 +9,7 @@ ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates curl wget gnupg apt-utils git zip unzip apt-transport-https ca-certificates RUN wget -qO - https://apt.stellar.org/SDF.asc | APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=true apt-key add - RUN echo "deb https://apt.stellar.org xenial stable" >/etc/apt/sources.list.d/SDF.list +RUN echo "deb https://apt.stellar.org xenial testing" >/etc/apt/sources.list.d/SDF-testing.list RUN git clone https://github.com/stellar/go.git /go/src/github.com/stellar/go # Fetch dependencies and prebuild binaries. Not necessary but will make check faster. diff --git a/services/horizon/internal/scripts/check_release_hash/check.sh b/services/horizon/internal/scripts/check_release_hash/check.sh index 95208f2a9a..b1377d9810 100644 --- a/services/horizon/internal/scripts/check_release_hash/check.sh +++ b/services/horizon/internal/scripts/check_release_hash/check.sh @@ -1,7 +1,9 @@ #!/bin/bash set -e -apt-get clean && apt-get update && apt-get install -y stellar-horizon=$PACKAGE_VERSION +apt-get clean +apt-get update +apt-get install -y stellar-horizon=$PACKAGE_VERSION mkdir released cd released @@ -23,30 +25,57 @@ git checkout $TAG # -keep: artifact directories are not removed after packaging CIRCLE_TAG=$TAG go run -v ./support/scripts/build_release_artifacts -keep -suffixes=(darwin-amd64 linux-amd64 linux-arm windows-amd64) +echo "RESULTS" +echo "=======" +echo "" +echo "compiled version" +./dist/$TAG-linux-amd64/horizon version + +echo "github releases version" +./released/$TAG-linux-amd64/horizon version + +echo "debian package version" +stellar-horizon version +echo "" + +suffixes=(darwin-amd64 linux-amd64 linux-arm windows-amd64) for S in "${suffixes[@]}" do - echo $TAG-$S + released="" + dist="" + msg="" if [ -f "./released/$TAG-$S.tar.gz" ]; then - shasum -a 256 ./released/$TAG-$S.tar.gz - shasum -a 256 ./released/$TAG-$S/horizon + released=($(shasum -a 256 ./released/$TAG-$S/horizon)) else # windows - shasum -a 256 ./released/$TAG-$S.zip - shasum -a 256 ./released/$TAG-$S/horizon.exe + released=($(shasum -a 256 ./released/$TAG-$S/horizon.exe)) fi if [ -f "./dist/$TAG-$S.tar.gz" ]; then - shasum -a 256 ./dist/$TAG-$S.tar.gz - shasum -a 256 ./dist/$TAG-$S/horizon + dist=($(shasum -a 256 ./dist/$TAG-$S/horizon)) else # windows - shasum -a 256 ./dist/$TAG-$S.zip - shasum -a 256 ./dist/$TAG-$S/horizon.exe + dist=($(shasum -a 256 ./dist/$TAG-$S/horizon.exe)) + fi + + if [ $S == "linux-amd64" ]; then + path=$(which stellar-horizon) + debian=($(shasum -a 256 $path)) + + if [[ "$released" == "$dist" && "$dist" == "$debian" ]]; then + msg="$TAG-$S ok" + else + msg="$TAG-$S NO MATCH! github=$released compile=$dist debian=$debian" + fi + else + if [ "$released" == "$dist" ]; then + msg="$TAG-$S ok" + else + msg="$TAG-$S NO MATCH! github=$released compile=$dist" + fi fi -done -echo "debian package" -shasum -a 256 $(which stellar-horizon) \ No newline at end of file + echo $msg +done \ No newline at end of file diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 8b720873c7..05ace65a49 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -204,7 +204,8 @@ func (i *Test) prepareShutdownHandlers() { if i.app != nil { i.app.Close() } - i.runComposeCommand("down", "-v", "--remove-orphans") + i.runComposeCommand("rm", "-fvs", "core") + i.runComposeCommand("rm", "-fvs", "core-postgres") }, i.environment.Restore, ) diff --git a/services/keystore/Makefile b/services/keystore/Makefile index 293f3455c1..d091c04836 100644 --- a/services/keystore/Makefile +++ b/services/keystore/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/keystore:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ diff --git a/services/regulated-assets-approval-server/Makefile b/services/regulated-assets-approval-server/Makefile index cf7cdbbd8d..a9e83306c6 100644 --- a/services/regulated-assets-approval-server/Makefile +++ b/services/regulated-assets-approval-server/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/regulated-assets-approval-server:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ diff --git a/services/ticker/Makefile b/services/ticker/Makefile index 5755699508..eb2ef29bff 100644 --- a/services/ticker/Makefile +++ b/services/ticker/Makefile @@ -4,7 +4,7 @@ SUDO := $(shell docker version >/dev/null 2>&1 || echo "sudo") # If TAG is not provided set default value TAG ?= stellar/ticker:$(shell git rev-parse --short HEAD)$(and $(shell git status -s),-dirty-$(shell id -u -n)) # https://github.com/opencontainers/image-spec/blob/master/annotations.md -BUILD_DATE := $(shell date --utc --rfc-3339=seconds) +BUILD_DATE := $(shell date -u +%FT%TZ) docker-build: cd ../../ && \ diff --git a/crc16/main.go b/strkey/internal/crc16/main.go similarity index 100% rename from crc16/main.go rename to strkey/internal/crc16/main.go diff --git a/strkey/internal/crc16/main_test.go b/strkey/internal/crc16/main_test.go new file mode 100644 index 0000000000..a1314a9fa1 --- /dev/null +++ b/strkey/internal/crc16/main_test.go @@ -0,0 +1,20 @@ +package crc16 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChecksum(t *testing.T) { + result := Checksum([]byte{0x12, 0x34, 0x56, 0x78, 0x90}) + assert.Equal(t, []byte{0xe6, 0x48}, result) +} + +func TestValidate(t *testing.T) { + err := Validate([]byte{0x12, 0x34, 0x56, 0x78, 0x90}, []byte{0xe6, 0x48}) + assert.NoError(t, err) + + err = Validate([]byte{0x12, 0x34, 0x56, 0x78, 0x90}, []byte{0xe7, 0x48}) + assert.ErrorIs(t, err, ErrInvalidChecksum) +} diff --git a/strkey/main.go b/strkey/main.go index 6facce6e2c..ee6ece4d8c 100644 --- a/strkey/main.go +++ b/strkey/main.go @@ -5,7 +5,7 @@ import ( "encoding/base32" "encoding/binary" - "github.com/stellar/go/crc16" + "github.com/stellar/go/strkey/internal/crc16" "github.com/stellar/go/support/errors" ) diff --git a/txnbuild/CHANGELOG.md b/txnbuild/CHANGELOG.md index 3c89431c46..bbadc08d24 100644 --- a/txnbuild/CHANGELOG.md +++ b/txnbuild/CHANGELOG.md @@ -8,6 +8,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/). * GenericTransaction, Transaction, and FeeBumpTransaction now implement encoding.TextMarshaler and encoding.TextUnmarshaler. +* Adds 5-minute grace period to `transaction.ReadChallengeTx`'s minimum time bound constraint. ([#3824](https://github.com/stellar/go/pull/3824)) ## [v7.1.1](https://github.com/stellar/go/releases/tag/horizonclient-v7.1.1) - 2021-06-25 diff --git a/txnbuild/transaction.go b/txnbuild/transaction.go index 35777e767a..1199ca5367 100644 --- a/txnbuild/transaction.go +++ b/txnbuild/transaction.go @@ -1098,8 +1098,10 @@ func ReadChallengeTx(challengeTx, serverAccountID, network, webAuthDomain string if tx.Timebounds().MaxTime == TimeoutInfinite { return tx, clientAccountID, matchedHomeDomain, errors.New("transaction requires non-infinite timebounds") } + // Apply a grace period to the challenge MinTime to account for clock drift between the server and client + var gracePeriod int64 = 5 * 60 // seconds currentTime := time.Now().UTC().Unix() - if currentTime < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { + if currentTime+gracePeriod < tx.Timebounds().MinTime || currentTime > tx.Timebounds().MaxTime { return tx, clientAccountID, matchedHomeDomain, errors.Errorf("transaction is not within range of the specified timebounds (currentTime=%d, MinTime=%d, MaxTime=%d)", currentTime, tx.Timebounds().MinTime, tx.Timebounds().MaxTime) } diff --git a/txnbuild/transaction_test.go b/txnbuild/transaction_test.go index 78c35ddc18..0aa5040732 100644 --- a/txnbuild/transaction_test.go +++ b/txnbuild/transaction_test.go @@ -2024,6 +2024,79 @@ func TestReadChallengeTx_invalidTimeboundsOutsideRange(t *testing.T) { assert.Regexp(t, "transaction is not within range of the specified timebounds", err.Error()) } +func TestReadChallengeTx_validTimeboundsWithGracePeriod(t *testing.T) { + serverKP := newKeypair0() + clientKP := newKeypair1() + txSource := NewSimpleAccount(serverKP.Address(), -1) + op := ManageData{ + SourceAccount: clientKP.Address(), + Name: "testanchor.stellar.org auth", + Value: []byte(base64.StdEncoding.EncodeToString(make([]byte, 48))), + } + webAuthDomainOp := ManageData{ + SourceAccount: serverKP.Address(), + Name: "web_auth_domain", + Value: []byte("testwebauth.stellar.org"), + } + unixNow := time.Now().UTC().Unix() + tx, err := NewTransaction( + TransactionParams{ + SourceAccount: &txSource, + IncrementSequenceNum: true, + Operations: []Operation{&op, &webAuthDomainOp}, + BaseFee: MinBaseFee, + Timebounds: NewTimebounds(unixNow+5*59, unixNow+60*60), + }, + ) + assert.NoError(t, err) + + tx, err = tx.Sign(network.TestNetworkPassphrase, serverKP) + assert.NoError(t, err) + tx64, err := tx.Base64() + require.NoError(t, err) + readTx, readClientAccountID, _, err := ReadChallengeTx(tx64, serverKP.Address(), network.TestNetworkPassphrase, "testwebauth.stellar.org", []string{"testanchor.stellar.org"}) + assert.Equal(t, tx, readTx) + assert.Equal(t, clientKP.Address(), readClientAccountID) + assert.NoError(t, err) +} + +func TestReadChallengeTx_invalidTimeboundsWithGracePeriod(t *testing.T) { + serverKP := newKeypair0() + clientKP := newKeypair1() + txSource := NewSimpleAccount(serverKP.Address(), -1) + op := ManageData{ + SourceAccount: clientKP.Address(), + Name: "testanchor.stellar.org auth", + Value: []byte(base64.StdEncoding.EncodeToString(make([]byte, 48))), + } + webAuthDomainOp := ManageData{ + SourceAccount: serverKP.Address(), + Name: "web_auth_domain", + Value: []byte("testwebauth.stellar.org"), + } + unixNow := time.Now().UTC().Unix() + tx, err := NewTransaction( + TransactionParams{ + SourceAccount: &txSource, + IncrementSequenceNum: true, + Operations: []Operation{&op, &webAuthDomainOp}, + BaseFee: MinBaseFee, + Timebounds: NewTimebounds(unixNow+5*61, unixNow+60*60), + }, + ) + assert.NoError(t, err) + + tx, err = tx.Sign(network.TestNetworkPassphrase, serverKP) + assert.NoError(t, err) + tx64, err := tx.Base64() + require.NoError(t, err) + readTx, readClientAccountID, _, err := ReadChallengeTx(tx64, serverKP.Address(), network.TestNetworkPassphrase, "testwebauth.stellar.org", []string{"testanchor.stellar.org"}) + assert.Equal(t, tx, readTx) + assert.Equal(t, "", readClientAccountID) + assert.Error(t, err) + assert.Regexp(t, "transaction is not within range of the specified timebounds", err.Error()) +} + func TestReadChallengeTx_invalidOperationWrongType(t *testing.T) { serverKP := newKeypair0() clientKP := newKeypair1()