Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Add a rate limit for path finding requests. #4310

Merged
merged 2 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.list
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
google.golang.org/api v0.50.0
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84
google.golang.org/grpc v1.38.0
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.4 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
28 changes: 18 additions & 10 deletions services/horizon/internal/actions/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@ func (handler FindPathsHandler) GetResource(w HeaderWriter, r *http.Request) (in
if len(query.SourceAssets) > 0 {
var lastIngestedLedger uint32
records, lastIngestedLedger, err = handler.PathFinder.Find(ctx, query, handler.MaxPathLength)
if err == simplepath.ErrEmptyInMemoryOrderBook {
err = horizonProblem.StillIngesting
}
if err != nil {
return nil, err
switch err {
case simplepath.ErrEmptyInMemoryOrderBook:
return nil, horizonProblem.StillIngesting
case paths.ErrRateLimitExceeded:
return nil, horizonProblem.ServerOverCapacity
default:
if err != nil {
return nil, err
}
}

if handler.SetLastLedgerHeader {
Expand Down Expand Up @@ -338,11 +342,15 @@ func (handler FindFixedPathsHandler) GetResource(w HeaderWriter, r *http.Request
destinationAssets,
handler.MaxPathLength,
)
if err == simplepath.ErrEmptyInMemoryOrderBook {
err = horizonProblem.StillIngesting
}
if err != nil {
return nil, err
switch err {
case simplepath.ErrEmptyInMemoryOrderBook:
return nil, horizonProblem.StillIngesting
case paths.ErrRateLimitExceeded:
return nil, horizonProblem.ServerOverCapacity
default:
if err != nil {
return nil, err
}
}

if handler.SetLastLedgerHeader {
Expand Down
56 changes: 56 additions & 0 deletions services/horizon/internal/actions_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,62 @@ func mockPathFindingClient(
return test.NewRequestHelper(router)
}

func TestPathActionsLimitExceeded(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

assertions := &test.Assertions{tt.Assert}
finder := paths.MockFinder{}
finder.On("Find", mock.Anything, mock.Anything, uint(3)).
Return([]paths.Path{}, uint32(0), paths.ErrRateLimitExceeded).Times(2)
finder.On("FindFixedPaths", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]paths.Path{}, uint32(0), paths.ErrRateLimitExceeded).Times(1)

rh := mockPathFindingClient(
tt,
&finder,
2,
tt.HorizonSession(),
)

var q = make(url.Values)

q.Add(
"source_assets",
"native",
)
q.Add(
"destination_asset_issuer",
"GDSBCQO34HWPGUGQSP3QBFEXVTSR2PW46UIGTHVWGWJGQKH3AFNHXHXN",
)
q.Add("destination_asset_type", "credit_alphanum4")
q.Add("destination_asset_code", "EUR")
q.Add("destination_amount", "10")

for _, uri := range []string{"/paths", "/paths/strict-receive"} {
w := rh.Get(uri + "?" + q.Encode())
assertions.Equal(horizonProblem.ServerOverCapacity.Status, w.Code)
assertions.Problem(w.Body, horizonProblem.ServerOverCapacity)
assertions.Equal("", w.Header().Get(actions.LastLedgerHeaderName))
}

q = make(url.Values)

q.Add("destination_assets", "native")
q.Add("source_asset_issuer", "GDSBCQO34HWPGUGQSP3QBFEXVTSR2PW46UIGTHVWGWJGQKH3AFNHXHXN")
q.Add("source_asset_type", "credit_alphanum4")
q.Add("source_asset_code", "EUR")
q.Add("source_amount", "10")

w := rh.Get("/paths/strict-send" + "?" + q.Encode())
assertions.Equal(horizonProblem.ServerOverCapacity.Status, w.Code)
assertions.Problem(w.Body, horizonProblem.ServerOverCapacity)
assertions.Equal("", w.Header().Get(actions.LastLedgerHeaderName))

finder.AssertExpectations(t)
}

func TestPathActionsStillIngesting(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func (a *App) Config() Config {
return a.config
}

// Paths returns the paths.Finder instance used by horizon
func (a *App) Paths() paths.Finder {
return a.paths
}

// UpdateCoreLedgerState triggers a refresh of Stellar-Core ledger state.
// This is done separately from Horizon ledger state update to prevent issues
// in case Stellar-Core query timeout.
Expand Down
7 changes: 6 additions & 1 deletion services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ type Config struct {
MaxPathLength uint
// MaxAssetsPerPathRequest is the maximum number of assets considered for `/paths/strict-send` and `/paths/strict-recieve`
MaxAssetsPerPathRequest int
DisablePoolPathFinding bool
// DisablePoolPathFinding configures horizon to run path finding without including liquidity pools
// in the path finding search.
DisablePoolPathFinding bool
// MaxPathFindingRequests is the maximum number of path finding requests horizon will allow
// in a 1-second period. A value of 0 disables the limit.
MaxPathFindingRequests uint

NetworkPassphrase string
SentryDSN string
Expand Down
9 changes: 9 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ func Flags() (*Config, support.ConfigOptions) {
Required: false,
Usage: "excludes liquidity pools from consideration in the `/paths` endpoint",
},
&support.ConfigOption{
Name: "max-path-finding-requests",
ConfigKey: &config.MaxPathFindingRequests,
OptType: types.Uint,
FlagDefault: uint(0),
Required: false,
Usage: "The maximum number of path finding requests per second horizon will allow." +
" A value of zero (the default) disables the limit.",
},
&support.ConfigOption{
Name: "network-passphrase",
ConfigKey: &config.NetworkPassphrase,
Expand Down
7 changes: 6 additions & 1 deletion services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package horizon

import (
"context"
"github.com/stellar/go/services/horizon/internal/paths"
"net/http"
"runtime"

Expand Down Expand Up @@ -116,7 +117,11 @@ func initPathFinder(app *App) {
orderBookGraph,
)

app.paths = simplepath.NewInMemoryFinder(orderBookGraph, !app.config.DisablePoolPathFinding)
var finder paths.Finder = simplepath.NewInMemoryFinder(orderBookGraph, !app.config.DisablePoolPathFinding)
if app.config.MaxPathFindingRequests != 0 {
finder = paths.NewRateLimitedFinder(finder, app.config.MaxPathFindingRequests)
}
app.paths = finder
}

// initSentry initialized the default sentry client with the configured DSN
Expand Down
26 changes: 26 additions & 0 deletions services/horizon/internal/integration/parameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package integration

import (
"github.com/stellar/go/services/horizon/internal/paths"
"github.com/stellar/go/services/horizon/internal/simplepath"
"io/ioutil"
"os"
"os/exec"
Expand Down Expand Up @@ -164,6 +166,30 @@ func TestMaxAssetsForPathRequests(t *testing.T) {
})
}

func TestMaxPathFindingRequests(t *testing.T) {
t.Run("default", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{})
err := test.StartHorizon()
assert.NoError(t, err)
test.WaitForHorizon()
assert.Equal(t, test.Horizon().Config().MaxPathFindingRequests, uint(0))
_, ok := test.Horizon().Paths().(simplepath.InMemoryFinder)
assert.True(t, ok)
test.Shutdown()
})
t.Run("set to 5", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{"max-path-finding-requests": "5"})
err := test.StartHorizon()
assert.NoError(t, err)
test.WaitForHorizon()
assert.Equal(t, test.Horizon().Config().MaxPathFindingRequests, uint(5))
finder, ok := test.Horizon().Paths().(*paths.RateLimitedFinder)
assert.True(t, ok)
assert.Equal(t, finder.Limit(), 5)
test.Shutdown()
})
}

// Pattern taken from testify issue:
// https://github.com/stretchr/testify/issues/858#issuecomment-600491003
//
Expand Down
59 changes: 59 additions & 0 deletions services/horizon/internal/paths/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package paths

import (
"context"

"golang.org/x/time/rate"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

var (
// ErrRateLimitExceeded indicates that the Finder is not able to fulfill the request due to rate limits.
ErrRateLimitExceeded = errors.New("Rate limit exceeded")
)

// RateLimitedFinder is a Finder implementation which limits the number of path finding requests.
type RateLimitedFinder struct {
finder Finder
limiter *rate.Limiter
}

// NewRateLimitedFinder constructs a new RateLimitedFinder which enforces a per
// second limit on path finding requests.
func NewRateLimitedFinder(finder Finder, limit uint) *RateLimitedFinder {
return &RateLimitedFinder{
finder: finder,
limiter: rate.NewLimiter(rate.Limit(limit), int(limit)),
}
}

// Limit returns the per second limit of path finding requests.
func (f *RateLimitedFinder) Limit() int {
return f.limiter.Burst()
}

// Find implements the Finder interface and returns ErrRateLimitExceeded if the
// RateLimitedFinder is unable to complete the request due to rate limits.
func (f *RateLimitedFinder) Find(ctx context.Context, q Query, maxLength uint) ([]Path, uint32, error) {
if !f.limiter.Allow() {
return nil, 0, ErrRateLimitExceeded
}
return f.finder.Find(ctx, q, maxLength)
}

// FindFixedPaths implements the Finder interface and returns ErrRateLimitExceeded if the
// RateLimitedFinder is unable to complete the request due to rate limits.
func (f *RateLimitedFinder) FindFixedPaths(
ctx context.Context,
sourceAsset xdr.Asset,
amountToSpend xdr.Int64,
destinationAssets []xdr.Asset,
maxLength uint,
) ([]Path, uint32, error) {
if !f.limiter.Allow() {
return nil, 0, ErrRateLimitExceeded
}
return f.finder.FindFixedPaths(ctx, sourceAsset, amountToSpend, destinationAssets, maxLength)
}
71 changes: 71 additions & 0 deletions services/horizon/internal/paths/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package paths

import (
"context"
"strconv"
"sync"
"testing"

"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestRateLimitedFinder(t *testing.T) {
for _, limit := range []int{0, 1, 5} {
t.Run("Limit of "+strconv.Itoa(limit), func(t *testing.T) {
totalCalls := limit + 4
errorChan := make(chan error, totalCalls)
find := func(finder Finder) {
_, _, err := finder.Find(context.Background(), Query{}, 1)
errorChan <- err
}
findFixedPaths := func(finder Finder) {
_, _, err := finder.FindFixedPaths(
context.Background(),
xdr.MustNewNativeAsset(),
10,
nil,
0,
)
errorChan <- err
}

wg := &sync.WaitGroup{}
mockFinder := &MockFinder{}
mockFinder.On("Find", mock.Anything, mock.Anything, mock.Anything).
Return([]Path{}, uint32(0), nil).Maybe().Times(limit).
Run(func(args mock.Arguments) {
wg.Done()
wg.Wait()
})
mockFinder.On("FindFixedPaths", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]Path{}, uint32(0), nil).Maybe().Times(limit).
Run(func(args mock.Arguments) {
wg.Done()
wg.Wait()
})

for _, f := range []func(Finder){find, findFixedPaths} {
wg.Add(totalCalls)
rateLimitedFinder := NewRateLimitedFinder(mockFinder, uint(limit))
assert.Equal(t, limit, rateLimitedFinder.Limit())
for i := 0; i < totalCalls; i++ {
go f(rateLimitedFinder)
}

requestsExceedingLimit := totalCalls - limit
for i := 0; i < requestsExceedingLimit; i++ {
err := <-errorChan
assert.Equal(t, ErrRateLimitExceeded, err)
}

wg.Add(-requestsExceedingLimit)
for i := 0; i < limit; i++ {
assert.NoError(t, <-errorChan)
}
}
mockFinder.AssertExpectations(t)
})
}
}