Skip to content

Commit

Permalink
services/horizon, clients/horizonclient: Allow filtering ingested tra…
Browse files Browse the repository at this point in the history
…nsactions by account or asset. (#4277)

Co-authored-by: Alfonso Acosta <[email protected]>
Co-authored-by: Bartek Nowotarski <[email protected]>
Co-authored-by: George <[email protected]>
  • Loading branch information
4 people authored May 12, 2022
1 parent 429ecee commit 14ebb04
Show file tree
Hide file tree
Showing 64 changed files with 3,086 additions and 299 deletions.
116 changes: 116 additions & 0 deletions clients/horizonclient/admin_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package horizonclient

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/support/errors"
)

// port - the horizon admin port, zero value defaults to 4200
// host - the host interface name that horizon has bound admin web service, zero value defaults to 'localhost'
// timeout - the length of time for the http client to wait on responses from admin web service
func NewAdminClient(port uint16, host string, timeout time.Duration) (*AdminClient, error) {
baseURL, err := getAdminBaseURL(port, host)
if err != nil {
return nil, err
}
if timeout == 0 {
timeout = HorizonTimeout
}

return &AdminClient{
baseURL: baseURL,
http: http.DefaultClient,
horizonTimeout: timeout,
}, nil
}

func getAdminBaseURL(port uint16, host string) (string, error) {
baseURL, err := url.Parse("http://localhost")
if err != nil {
return "", err
}
adminPort := uint16(4200)
if port > 0 {
adminPort = port
}
adminHost := baseURL.Hostname()
if len(host) > 0 {
adminHost = host
}
baseURL.Host = fmt.Sprintf("%s:%d", adminHost, adminPort)
return baseURL.String(), nil
}

func (c *AdminClient) sendGetRequest(requestURL string, a interface{}) error {
req, err := http.NewRequest("GET", requestURL, nil)
if err != nil {
return errors.Wrap(err, "error creating Admin HTTP request")
}
return c.sendHTTPRequest(req, a)
}

func (c *AdminClient) sendHTTPRequest(req *http.Request, a interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), c.horizonTimeout)
defer cancel()

if resp, err := c.http.Do(req.WithContext(ctx)); err != nil {
return err
} else {
return decodeResponse(resp, a, req.URL.String(), nil)
}
}

func (c *AdminClient) getIngestionFiltersURL(filter string) string {
return fmt.Sprintf("%s/ingestion/filters/%s", c.baseURL, filter)
}

func (c *AdminClient) GetIngestionAssetFilter() (hProtocol.AssetFilterConfig, error) {
var filter hProtocol.AssetFilterConfig
err := c.sendGetRequest(c.getIngestionFiltersURL("asset"), &filter)
return filter, err
}

func (c *AdminClient) GetIngestionAccountFilter() (hProtocol.AccountFilterConfig, error) {
var filter hProtocol.AccountFilterConfig
err := c.sendGetRequest(c.getIngestionFiltersURL("account"), &filter)
return filter, err
}

func (c *AdminClient) SetIngestionAssetFilter(filter hProtocol.AssetFilterConfig) error {
buf := bytes.NewBuffer(nil)
err := json.NewEncoder(buf).Encode(filter)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, c.getIngestionFiltersURL("asset"), buf)
if err != nil {
return errors.Wrap(err, "error creating HTTP request")
}
req.Header.Add("Content-Type", "application/json")
return c.sendHTTPRequest(req, nil)
}

func (c *AdminClient) SetIngestionAccountFilter(filter hProtocol.AccountFilterConfig) error {
buf := bytes.NewBuffer(nil)
err := json.NewEncoder(buf).Encode(filter)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, c.getIngestionFiltersURL("account"), buf)
if err != nil {
return errors.Wrap(err, "error creating HTTP request")
}
req.Header.Add("Content-Type", "application/json")
return c.sendHTTPRequest(req, nil)
}

// ensure that the horizon admin client implements AdminClientInterface
var _ AdminClientInterface = &AdminClient{}
24 changes: 24 additions & 0 deletions clients/horizonclient/admin_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package horizonclient

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaultAdminHostPort(t *testing.T) {
horizonAdminClient, err := NewAdminClient(0, "", 0)

fullAdminURL := horizonAdminClient.getIngestionFiltersURL("test")
require.NoError(t, err)
assert.Equal(t, "http://localhost:4200/ingestion/filters/test", fullAdminURL)
}

func TestOverrideAdminHostPort(t *testing.T) {
horizonAdminClient, err := NewAdminClient(1234, "127.0.0.1", 0)

fullAdminURL := horizonAdminClient.getIngestionFiltersURL("test")
require.NoError(t, err)
assert.Equal(t, "http://127.0.0.1:1234/ingestion/filters/test", fullAdminURL)
}
5 changes: 4 additions & 1 deletion clients/horizonclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *Client) sendHTTPRequest(req *http.Request, a interface{}) error {
if resp, err := c.HTTP.Do(req.WithContext(ctx)); err != nil {
return err
} else {
return decodeResponse(resp, &a, c)
return decodeResponse(resp, a, c.HorizonURL, c.clock)
}
}

Expand Down Expand Up @@ -270,6 +270,9 @@ func (c *Client) setDefaultClient() {
// fixHorizonURL strips all slashes(/) at the end of HorizonURL if any, then adds a single slash
func (c *Client) fixHorizonURL() string {
c.fixHorizonURLOnce.Do(func() {
// TODO: we shouldn't happily edit data provided by the user,
// better store it in an internal variable or, even better,
// just parse it every time (what if the url changes during the life of the client?).
c.HorizonURL = strings.TrimRight(c.HorizonURL, "/") + "/"
})
return c.HorizonURL
Expand Down
18 changes: 11 additions & 7 deletions clients/horizonclient/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,24 @@ import (
"strings"
"time"

"github.com/stellar/go/support/clock"
"github.com/stellar/go/support/errors"
)

// decodeResponse decodes the response from a request to a horizon server
func decodeResponse(resp *http.Response, object interface{}, hc *Client) (err error) {
func decodeResponse(resp *http.Response, object interface{}, horizonUrl string, clock *clock.Clock) (err error) {
defer resp.Body.Close()
if object == nil {
// Nothing to decode
return nil
}
decoder := json.NewDecoder(resp.Body)

u, err := url.Parse(hc.HorizonURL)
u, err := url.Parse(horizonUrl)
if err != nil {
return errors.Errorf("unable to parse the provided horizon url: %s", hc.HorizonURL)
return errors.Errorf("unable to parse the provided horizon url: %s", horizonUrl)
}
setCurrentServerTime(u.Hostname(), resp.Header["Date"], hc)
setCurrentServerTime(u.Hostname(), resp.Header["Date"], clock)

if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
horizonError := &Error{
Expand All @@ -32,7 +37,6 @@ func decodeResponse(resp *http.Response, object interface{}, hc *Client) (err er
}
return horizonError
}

err = decoder.Decode(&object)
if err != nil {
return errors.Wrap(err, "error decoding response")
Expand Down Expand Up @@ -120,7 +124,7 @@ func addQueryParams(params ...interface{}) string {
}

// setCurrentServerTime saves the current time returned by a horizon server
func setCurrentServerTime(host string, serverDate []string, hc *Client) {
func setCurrentServerTime(host string, serverDate []string, clock *clock.Clock) {
if len(serverDate) == 0 {
return
}
Expand All @@ -129,7 +133,7 @@ func setCurrentServerTime(host string, serverDate []string, hc *Client) {
return
}
serverTimeMapMutex.Lock()
ServerTimeMap[host] = ServerTimeRecord{ServerTime: st.UTC().Unix(), LocalTimeRecorded: hc.clock.Now().UTC().Unix()}
ServerTimeMap[host] = ServerTimeRecord{ServerTime: st.UTC().Unix(), LocalTimeRecorded: clock.Now().UTC().Unix()}
serverTimeMapMutex.Unlock()
}

Expand Down
18 changes: 18 additions & 0 deletions clients/horizonclient/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,29 @@ type Client struct {
clock *clock.Clock
}

type AdminClient struct {
// fully qualified url for the admin web service
baseURL string

// HTTP client to make requests with
http HTTP

// max client wait time for response
horizonTimeout time.Duration
}

// SubmitTxOpts represents the submit transaction options
type SubmitTxOpts struct {
SkipMemoRequiredCheck bool
}

type AdminClientInterface interface {
GetIngestionAccountFilter() (hProtocol.AccountFilterConfig, error)
GetIngestionAssetFilter() (hProtocol.AssetFilterConfig, error)
SetIngestionAccountFilter(hProtocol.AccountFilterConfig) error
SetIngestionAssetFilter(hProtocol.AssetFilterConfig) error
}

// ClientInterface contains methods implemented by the horizon client
type ClientInterface interface {
Accounts(request AccountsRequest) (hProtocol.AccountsPage, error)
Expand Down
27 changes: 27 additions & 0 deletions clients/horizonclient/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type MockClient struct {
mock.Mock
}

type MockAdminClient struct {
mock.Mock
}

// Accounts is a mocking method
func (m *MockClient) Accounts(request AccountsRequest) (hProtocol.AccountsPage, error) {
a := m.Called(request)
Expand Down Expand Up @@ -349,5 +353,28 @@ func (m *MockClient) PrevLiquidityPoolsPage(page hProtocol.LiquidityPoolsPage) (
return a.Get(0).(hProtocol.LiquidityPoolsPage), a.Error(1)
}

func (m *MockAdminClient) GetIngestionAccountFilter() (hProtocol.AccountFilterConfig, error) {
a := m.Called()
return a.Get(0).(hProtocol.AccountFilterConfig), a.Error(1)
}

func (m *MockAdminClient) GetIngestionAssetFilter() (hProtocol.AssetFilterConfig, error) {
a := m.Called()
return a.Get(0).(hProtocol.AssetFilterConfig), a.Error(1)
}

func (m *MockAdminClient) SetIngestionAccountFilter(resource hProtocol.AccountFilterConfig) error {
a := m.Called(resource)
return a.Error(0)
}

func (m *MockAdminClient) SetIngestionAssetFilter(resource hProtocol.AssetFilterConfig) error {
a := m.Called(resource)
return a.Error(0)
}

// ensure that the MockClient implements ClientInterface
var _ ClientInterface = &MockClient{}

// ensure that the MockClient implements ClientInterface
var _ AdminClientInterface = &MockAdminClient{}
52 changes: 52 additions & 0 deletions protocols/horizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,3 +843,55 @@ type LiquidityPoolReserve struct {
Asset string `json:"asset"`
Amount string `json:"amount"`
}

type AssetFilterConfig struct {
Whitelist []string `json:"whitelist"`
Enabled *bool `json:"enabled"`
LastModified int64 `json:"last_modified,omitempty"`
}

type AccountFilterConfig struct {
Whitelist []string `json:"whitelist"`
Enabled *bool `json:"enabled"`
LastModified int64 `json:"last_modified,omitempty"`
}

func (f *AccountFilterConfig) UnmarshalJSON(data []byte) error {
type accountFilterConfig AccountFilterConfig
var config = accountFilterConfig{}

if err := json.Unmarshal(data, &config); err != nil {
return err
}

if config.Whitelist == nil {
return errors.New("missing required whitelist")
}

if config.Enabled == nil {
return errors.New("missing required enabled")
}

*f = AccountFilterConfig(config)
return nil
}

func (f *AssetFilterConfig) UnmarshalJSON(data []byte) error {
type assetFilterConfig AssetFilterConfig
var config = assetFilterConfig{}

if err := json.Unmarshal(data, &config); err != nil {
return err
}

if config.Whitelist == nil {
return errors.New("missing required whitelist")
}

if config.Enabled == nil {
return errors.New("missing required enabled")
}

*f = AssetFilterConfig(config)
return nil
}
13 changes: 13 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ This is the final release after the [release candidate](v2.17.0-release-candidat

- Timebounds within the `preconditions` object are strings containing int64 UNIX timestamps in seconds rather than formatted date-times (which was a bug) ([4361](https://github.com/stellar/go/pull/4361)).

* New Ingestion Filters Feature: Provide the ability to select which ledger transactions are accepted at ingestion time to be stored on horizon's historical databse.

Define filter rules through Admin API and the historical ingestion process will check the rules and only persist the ledger transactions that pass the filter rules. Initially, two filters and corresponding rules are possible:

* 'whitelist by account id' ([4221](https://github.com/stellar/go/issues/4221))
* 'whitelist by canonical asset id' ([4222](https://github.com/stellar/go/issues/4222))

The filters and their configuration are optional features and must be enabled with horizon command line parameters `admin-port=4200` and `enable-ingestion-filtering=true`

Once set, filter configurations and their rules are initially empty and the filters are disabled by default. To enable filters, update the configuration settings, refer to the Admin API Docs which are published on the Admin Port at http://localhost:<admin_port>/, follow details and examples for endpoints:
* `/ingestion/filters/account`
* `/ingestion/filters/asset.`

## V2.17.0 Release Candidate

**Upgrading to this version from <= v2.8.3 will trigger a state rebuild. During this process (which will take at least 10 minutes), Horizon will not ingest new ledgers.**
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
NetworkPassphrase: config.NetworkPassphrase,
HistoryArchiveURL: config.HistoryArchiveURLs[0],
CheckpointFrequency: config.CheckpointFrequency,
ReingestEnabled: true,
MaxReingestRetries: int(retries),
ReingestRetryBackoffSeconds: int(retryBackoffSeconds),
EnableCaptiveCore: config.EnableCaptiveCoreIngestion,
Expand All @@ -395,6 +396,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
StellarCoreCursor: config.CursorName,
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
Loading

0 comments on commit 14ebb04

Please sign in to comment.