Skip to content

Commit

Permalink
services/horizon: Add a rate limit for path finding requests. (#4310)
Browse files Browse the repository at this point in the history
Add a per second rate limit for path finding requests backed by https://pkg.go.dev/golang.org/x/time/rate
  • Loading branch information
tamirms authored Mar 28, 2022
1 parent e2a78e0 commit 463d2b9
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.list
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
golang.org/x/text v0.3.6
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
google.golang.org/api v0.50.0
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84
google.golang.org/grpc v1.38.0
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.4 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 h1:M73Iuj3xbbb9Uk1DYhzydthsj6oOd6l9bpuFcNoUvTs=
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
28 changes: 18 additions & 10 deletions services/horizon/internal/actions/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@ func (handler FindPathsHandler) GetResource(w HeaderWriter, r *http.Request) (in
if len(query.SourceAssets) > 0 {
var lastIngestedLedger uint32
records, lastIngestedLedger, err = handler.PathFinder.Find(ctx, query, handler.MaxPathLength)
if err == simplepath.ErrEmptyInMemoryOrderBook {
err = horizonProblem.StillIngesting
}
if err != nil {
return nil, err
switch err {
case simplepath.ErrEmptyInMemoryOrderBook:
return nil, horizonProblem.StillIngesting
case paths.ErrRateLimitExceeded:
return nil, horizonProblem.ServerOverCapacity
default:
if err != nil {
return nil, err
}
}

if handler.SetLastLedgerHeader {
Expand Down Expand Up @@ -338,11 +342,15 @@ func (handler FindFixedPathsHandler) GetResource(w HeaderWriter, r *http.Request
destinationAssets,
handler.MaxPathLength,
)
if err == simplepath.ErrEmptyInMemoryOrderBook {
err = horizonProblem.StillIngesting
}
if err != nil {
return nil, err
switch err {
case simplepath.ErrEmptyInMemoryOrderBook:
return nil, horizonProblem.StillIngesting
case paths.ErrRateLimitExceeded:
return nil, horizonProblem.ServerOverCapacity
default:
if err != nil {
return nil, err
}
}

if handler.SetLastLedgerHeader {
Expand Down
56 changes: 56 additions & 0 deletions services/horizon/internal/actions_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,62 @@ func mockPathFindingClient(
return test.NewRequestHelper(router)
}

func TestPathActionsLimitExceeded(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)

assertions := &test.Assertions{tt.Assert}
finder := paths.MockFinder{}
finder.On("Find", mock.Anything, mock.Anything, uint(3)).
Return([]paths.Path{}, uint32(0), paths.ErrRateLimitExceeded).Times(2)
finder.On("FindFixedPaths", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]paths.Path{}, uint32(0), paths.ErrRateLimitExceeded).Times(1)

rh := mockPathFindingClient(
tt,
&finder,
2,
tt.HorizonSession(),
)

var q = make(url.Values)

q.Add(
"source_assets",
"native",
)
q.Add(
"destination_asset_issuer",
"GDSBCQO34HWPGUGQSP3QBFEXVTSR2PW46UIGTHVWGWJGQKH3AFNHXHXN",
)
q.Add("destination_asset_type", "credit_alphanum4")
q.Add("destination_asset_code", "EUR")
q.Add("destination_amount", "10")

for _, uri := range []string{"/paths", "/paths/strict-receive"} {
w := rh.Get(uri + "?" + q.Encode())
assertions.Equal(horizonProblem.ServerOverCapacity.Status, w.Code)
assertions.Problem(w.Body, horizonProblem.ServerOverCapacity)
assertions.Equal("", w.Header().Get(actions.LastLedgerHeaderName))
}

q = make(url.Values)

q.Add("destination_assets", "native")
q.Add("source_asset_issuer", "GDSBCQO34HWPGUGQSP3QBFEXVTSR2PW46UIGTHVWGWJGQKH3AFNHXHXN")
q.Add("source_asset_type", "credit_alphanum4")
q.Add("source_asset_code", "EUR")
q.Add("source_amount", "10")

w := rh.Get("/paths/strict-send" + "?" + q.Encode())
assertions.Equal(horizonProblem.ServerOverCapacity.Status, w.Code)
assertions.Problem(w.Body, horizonProblem.ServerOverCapacity)
assertions.Equal("", w.Header().Get(actions.LastLedgerHeaderName))

finder.AssertExpectations(t)
}

func TestPathActionsStillIngesting(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ func (a *App) Config() Config {
return a.config
}

// Paths returns the paths.Finder instance used by horizon
func (a *App) Paths() paths.Finder {
return a.paths
}

// UpdateCoreLedgerState triggers a refresh of Stellar-Core ledger state.
// This is done separately from Horizon ledger state update to prevent issues
// in case Stellar-Core query timeout.
Expand Down
7 changes: 6 additions & 1 deletion services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ type Config struct {
MaxPathLength uint
// MaxAssetsPerPathRequest is the maximum number of assets considered for `/paths/strict-send` and `/paths/strict-recieve`
MaxAssetsPerPathRequest int
DisablePoolPathFinding bool
// DisablePoolPathFinding configures horizon to run path finding without including liquidity pools
// in the path finding search.
DisablePoolPathFinding bool
// MaxPathFindingRequests is the maximum number of path finding requests horizon will allow
// in a 1-second period. A value of 0 disables the limit.
MaxPathFindingRequests uint

NetworkPassphrase string
SentryDSN string
Expand Down
9 changes: 9 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ func Flags() (*Config, support.ConfigOptions) {
Required: false,
Usage: "excludes liquidity pools from consideration in the `/paths` endpoint",
},
&support.ConfigOption{
Name: "max-path-finding-requests",
ConfigKey: &config.MaxPathFindingRequests,
OptType: types.Uint,
FlagDefault: uint(0),
Required: false,
Usage: "The maximum number of path finding requests per second horizon will allow." +
" A value of zero (the default) disables the limit.",
},
&support.ConfigOption{
Name: "network-passphrase",
ConfigKey: &config.NetworkPassphrase,
Expand Down
7 changes: 6 additions & 1 deletion services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package horizon

import (
"context"
"github.com/stellar/go/services/horizon/internal/paths"
"net/http"
"runtime"

Expand Down Expand Up @@ -116,7 +117,11 @@ func initPathFinder(app *App) {
orderBookGraph,
)

app.paths = simplepath.NewInMemoryFinder(orderBookGraph, !app.config.DisablePoolPathFinding)
var finder paths.Finder = simplepath.NewInMemoryFinder(orderBookGraph, !app.config.DisablePoolPathFinding)
if app.config.MaxPathFindingRequests != 0 {
finder = paths.NewRateLimitedFinder(finder, app.config.MaxPathFindingRequests)
}
app.paths = finder
}

// initSentry initialized the default sentry client with the configured DSN
Expand Down
26 changes: 26 additions & 0 deletions services/horizon/internal/integration/parameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package integration

import (
"github.com/stellar/go/services/horizon/internal/paths"
"github.com/stellar/go/services/horizon/internal/simplepath"
"io/ioutil"
"os"
"os/exec"
Expand Down Expand Up @@ -164,6 +166,30 @@ func TestMaxAssetsForPathRequests(t *testing.T) {
})
}

func TestMaxPathFindingRequests(t *testing.T) {
t.Run("default", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{})
err := test.StartHorizon()
assert.NoError(t, err)
test.WaitForHorizon()
assert.Equal(t, test.Horizon().Config().MaxPathFindingRequests, uint(0))
_, ok := test.Horizon().Paths().(simplepath.InMemoryFinder)
assert.True(t, ok)
test.Shutdown()
})
t.Run("set to 5", func(t *testing.T) {
test := NewParameterTest(t, map[string]string{"max-path-finding-requests": "5"})
err := test.StartHorizon()
assert.NoError(t, err)
test.WaitForHorizon()
assert.Equal(t, test.Horizon().Config().MaxPathFindingRequests, uint(5))
finder, ok := test.Horizon().Paths().(*paths.RateLimitedFinder)
assert.True(t, ok)
assert.Equal(t, finder.Limit(), 5)
test.Shutdown()
})
}

// Pattern taken from testify issue:
// https://github.com/stretchr/testify/issues/858#issuecomment-600491003
//
Expand Down
59 changes: 59 additions & 0 deletions services/horizon/internal/paths/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package paths

import (
"context"

"golang.org/x/time/rate"

"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

var (
// ErrRateLimitExceeded indicates that the Finder is not able to fulfill the request due to rate limits.
ErrRateLimitExceeded = errors.New("Rate limit exceeded")
)

// RateLimitedFinder is a Finder implementation which limits the number of path finding requests.
type RateLimitedFinder struct {
finder Finder
limiter *rate.Limiter
}

// NewRateLimitedFinder constructs a new RateLimitedFinder which enforces a per
// second limit on path finding requests.
func NewRateLimitedFinder(finder Finder, limit uint) *RateLimitedFinder {
return &RateLimitedFinder{
finder: finder,
limiter: rate.NewLimiter(rate.Limit(limit), int(limit)),
}
}

// Limit returns the per second limit of path finding requests.
func (f *RateLimitedFinder) Limit() int {
return f.limiter.Burst()
}

// Find implements the Finder interface and returns ErrRateLimitExceeded if the
// RateLimitedFinder is unable to complete the request due to rate limits.
func (f *RateLimitedFinder) Find(ctx context.Context, q Query, maxLength uint) ([]Path, uint32, error) {
if !f.limiter.Allow() {
return nil, 0, ErrRateLimitExceeded
}
return f.finder.Find(ctx, q, maxLength)
}

// FindFixedPaths implements the Finder interface and returns ErrRateLimitExceeded if the
// RateLimitedFinder is unable to complete the request due to rate limits.
func (f *RateLimitedFinder) FindFixedPaths(
ctx context.Context,
sourceAsset xdr.Asset,
amountToSpend xdr.Int64,
destinationAssets []xdr.Asset,
maxLength uint,
) ([]Path, uint32, error) {
if !f.limiter.Allow() {
return nil, 0, ErrRateLimitExceeded
}
return f.finder.FindFixedPaths(ctx, sourceAsset, amountToSpend, destinationAssets, maxLength)
}
71 changes: 71 additions & 0 deletions services/horizon/internal/paths/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package paths

import (
"context"
"strconv"
"sync"
"testing"

"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestRateLimitedFinder(t *testing.T) {
for _, limit := range []int{0, 1, 5} {
t.Run("Limit of "+strconv.Itoa(limit), func(t *testing.T) {
totalCalls := limit + 4
errorChan := make(chan error, totalCalls)
find := func(finder Finder) {
_, _, err := finder.Find(context.Background(), Query{}, 1)
errorChan <- err
}
findFixedPaths := func(finder Finder) {
_, _, err := finder.FindFixedPaths(
context.Background(),
xdr.MustNewNativeAsset(),
10,
nil,
0,
)
errorChan <- err
}

wg := &sync.WaitGroup{}
mockFinder := &MockFinder{}
mockFinder.On("Find", mock.Anything, mock.Anything, mock.Anything).
Return([]Path{}, uint32(0), nil).Maybe().Times(limit).
Run(func(args mock.Arguments) {
wg.Done()
wg.Wait()
})
mockFinder.On("FindFixedPaths", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]Path{}, uint32(0), nil).Maybe().Times(limit).
Run(func(args mock.Arguments) {
wg.Done()
wg.Wait()
})

for _, f := range []func(Finder){find, findFixedPaths} {
wg.Add(totalCalls)
rateLimitedFinder := NewRateLimitedFinder(mockFinder, uint(limit))
assert.Equal(t, limit, rateLimitedFinder.Limit())
for i := 0; i < totalCalls; i++ {
go f(rateLimitedFinder)
}

requestsExceedingLimit := totalCalls - limit
for i := 0; i < requestsExceedingLimit; i++ {
err := <-errorChan
assert.Equal(t, ErrRateLimitExceeded, err)
}

wg.Add(-requestsExceedingLimit)
for i := 0; i < limit; i++ {
assert.NoError(t, <-errorChan)
}
}
mockFinder.AssertExpectations(t)
})
}
}

0 comments on commit 463d2b9

Please sign in to comment.