Skip to content

Commit

Permalink
services/ticker: cache tomls during scraping (#4286)
Browse files Browse the repository at this point in the history
Cache TOMLs to during scraping to some degree. Caching is local to each routine that is scraping and not shared across those routines, so it is possible to still get duplicate requests. Cache usage is logged.

The asset scraper may end up repeatedly scraping the same issuer's TOML many times if the issuer has issued many assets. This is not great since any scraper should attempt to respect the resources of hosts as much as possible.

The cache is in-memory and per routine because the cache is only required temporarily, and to introduce an external cache, such as redis, would be overkill.

Cache usage is logged so that it can be inspected and understood.
  • Loading branch information
leighmcculloch authored Mar 16, 2022
1 parent 3d935af commit e5837ff
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 18 deletions.
43 changes: 30 additions & 13 deletions services/ticker/internal/scraper/asset_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,8 @@ func decodeTOMLIssuer(tomlData string) (issuer TOMLIssuer, err error) {
return
}

// fetchTOMLData fetches the TOML data for a given hProtocol.AssetStat
func fetchTOMLData(asset hProtocol.AssetStat) (data string, err error) {
tomlURL := asset.Links.Toml.Href

// fetchTOMLData fetches the TOML data from the URL.
func fetchTOMLData(tomlURL string) (data string, err error) {
if tomlURL == "" {
err = errors.New("Asset does not have a TOML URL")
return
Expand Down Expand Up @@ -214,19 +212,30 @@ func makeFinalAsset(
}

// processAsset merges data from an AssetStat with data retrieved from its corresponding TOML file
func processAsset(asset hProtocol.AssetStat, shouldValidateTOML bool) (FinalAsset, error) {
func (c *ScraperConfig) processAsset(asset hProtocol.AssetStat, tomlCache map[string]TOMLIssuer, shouldValidateTOML bool) (FinalAsset, error) {
var errors []error
var issuer TOMLIssuer

if shouldValidateTOML {
tomlData, err := fetchTOMLData(asset)
if err != nil {
errors = append(errors, err)
}
tomlURL := asset.Links.Toml.Href

var ok bool
issuer, ok = tomlCache[tomlURL]
if ok {
c.Logger.Infof("Using cached TOML for asset %s:%s", asset.Asset.Code, asset.Asset.Issuer)
} else {
c.Logger.Infof("Fetching TOML for asset %s:%s", asset.Asset.Code, asset.Asset.Issuer)
tomlData, err := fetchTOMLData(tomlURL)
if err != nil {
errors = append(errors, err)
}

issuer, err = decodeTOMLIssuer(tomlData)
if err != nil {
errors = append(errors, err)
issuer, err = decodeTOMLIssuer(tomlData)
if err != nil {
errors = append(errors, err)
}

tomlCache[tomlURL] = issuer
}
}

Expand Down Expand Up @@ -255,9 +264,16 @@ func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, para
end = numAssets
}

// Each routine running concurrently has a separate cache of TOMLs
// loaded. A single shared cache would be better, but this is a
// tradeoff for simplicity because a shared map mutated with HTTP
// lookups would have a significant amount of contention.
tomlCache := map[string]TOMLIssuer{}

for j := start; j < end; j++ {
if !shouldDiscardAsset(assets[j], shouldValidateTOML) {
finalAsset, err := processAsset(assets[j], shouldValidateTOML)
c.Logger.Infof("Processing asset %s:%s", assets[j].Asset.Code, assets[j].Asset.Issuer)
finalAsset, err := c.processAsset(assets[j], tomlCache, shouldValidateTOML)
if err != nil {
mutex.Lock()
numTrash++
Expand All @@ -266,6 +282,7 @@ func (c *ScraperConfig) parallelProcessAssets(assets []hProtocol.AssetStat, para
}
assetQueue <- finalAsset
} else {
c.Logger.Infof("Discarding asset %s:%s", assets[j].Asset.Code, assets[j].Asset.Issuer)
mutex.Lock()
numTrash++
mutex.Unlock()
Expand Down
48 changes: 43 additions & 5 deletions services/ticker/internal/scraper/asset_scraper_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package scraper

import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/hal"
"github.com/stellar/go/support/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestShouldDiscardAsset(t *testing.T) {
Expand Down Expand Up @@ -134,10 +138,7 @@ func TestIsDomainVerified(t *testing.T) {

func TestIgnoreInvalidTOMLUrls(t *testing.T) {
invalidURL := "https:// there is something wrong here.com/stellar.toml"
assetStat := hProtocol.AssetStat{}
assetStat.Links.Toml = hal.Link{Href: invalidURL}

_, err := fetchTOMLData(assetStat)
_, err := fetchTOMLData(invalidURL)

urlErr, ok := errors.Cause(err).(*url.Error)
if !ok {
Expand All @@ -147,3 +148,40 @@ func TestIgnoreInvalidTOMLUrls(t *testing.T) {
assert.Equal(t, "https:// there is something wrong here.com/stellar.toml", urlErr.URL)
assert.EqualError(t, urlErr.Err, `invalid character " " in host name`)
}

func TestProcessAsset_notCached(t *testing.T) {
scraper := &ScraperConfig{Logger: log.DefaultLogger}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, `SIGNING_KEY="not cached signing key"`)
}))
asset := hProtocol.AssetStat{
Amount: "123901.0129310",
NumAccounts: 100,
}
asset.Code = "SOMETHINGVALID"
asset.Links.Toml.Href = server.URL
tomlCache := map[string]TOMLIssuer{}
finalAsset, err := scraper.processAsset(asset, tomlCache, true)
require.NoError(t, err)
assert.NotZero(t, finalAsset)
assert.Equal(t, "not cached signing key", finalAsset.IssuerDetails.SigningKey)
}

func TestProcessAsset_cached(t *testing.T) {
scraper := &ScraperConfig{Logger: log.DefaultLogger}
asset := hProtocol.AssetStat{
Amount: "123901.0129310",
NumAccounts: 100,
}
asset.Code = "SOMETHINGVALID"
asset.Links.Toml.Href = "url"
tomlCache := map[string]TOMLIssuer{
"url": {
SigningKey: "signing key",
},
}
finalAsset, err := scraper.processAsset(asset, tomlCache, true)
require.NoError(t, err)
assert.NotZero(t, finalAsset)
assert.Equal(t, "signing key", finalAsset.IssuerDetails.SigningKey)
}

0 comments on commit e5837ff

Please sign in to comment.