Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: added user agent config on ha archive pool #56

Merged
merged 10 commits into from
Feb 12, 2024
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
EventLedgerRetentionWindow uint32
FriendbotURL string
HistoryArchiveURLs []string
HistoryArchiveUserAgent string
IngestionTimeout time.Duration
LogFormat LogFormat
LogLevel logrus.Level
Expand Down Expand Up @@ -64,6 +65,13 @@ type Config struct {
flagset *pflag.FlagSet
}

func (cfg *Config) ExtendedUserAgent(extension string) string {
if cfg.HistoryArchiveUserAgent == "" {
return extension
}
return cfg.HistoryArchiveUserAgent + "/" + extension
}

func (cfg *Config) SetValues(lookupEnv func(string) (string, bool)) error {
// We start with the defaults
if err := cfg.loadDefaults(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func TestConfigLoadDefaults(t *testing.T) {
assert.Equal(t, uint(runtime.NumCPU()), cfg.PreflightWorkerCount)
}

func TestConfigExtendedUserAgent(t *testing.T) {
cfg := Config{
HistoryArchiveUserAgent: "Test",
}
require.NoError(t, cfg.loadDefaults())
assert.Equal(t, "Test/123", cfg.ExtendedUserAgent("123"))
}

func TestConfigLoadFlagsDefaultValuesOverrideExisting(t *testing.T) {
// Set up a config with an existing non-default value
cfg := Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAllConfigFieldsMustHaveASingleOption(t *testing.T) {

// Allow us to explicitly exclude any fields on the Config struct, which are not going to have Options.
// e.g. "ConfigPath"
excluded := map[string]bool{}
excluded := map[string]bool{"HistoryArchiveUserAgent": true}

cfg := Config{}
cfgValue := reflect.ValueOf(cfg)
Expand Down
13 changes: 10 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
Expand Down Expand Up @@ -121,7 +122,7 @@ func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbacken
CheckpointFrequency: cfg.CheckpointFrequency,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "captivecore",
UserAgent: cfg.ExtendedUserAgent("captivecore"),
UseDB: true,
}
return ledgerbackend.NewCaptive(captiveConfig)
Expand All @@ -144,12 +145,18 @@ func MustNew(cfg *config.Config) *Daemon {
if len(cfg.HistoryArchiveURLs) == 0 {
logger.Fatal("no history archives url were provided")
}
historyArchive, err := historyarchive.Connect(
cfg.HistoryArchiveURLs[0],

historyArchive, err := historyarchive.NewArchivePool(
cfg.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
NetworkPassphrase: cfg.NetworkPassphrase,
CheckpointFrequency: cfg.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Context: context.Background(),
UserAgent: cfg.HistoryArchiveUserAgent},
},
)

if err != nil {
logger.WithError(err).Fatal("could not connect to history archive")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down
8 changes: 4 additions & 4 deletions cmd/soroban-rpc/internal/ingest/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func (s *Service) ingestLedgerEntryChanges(ctx context.Context, reader ingest.Ch
results := changeStatsProcessor.GetResults()
for stat, value := range results.Map() {
stat = strings.Replace(stat, "stats_", "change_", 1)
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": stat}).Add(float64(value.(int64)))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand All @@ -66,10 +66,10 @@ func (s *Service) ingestTempLedgerEntryEvictions(
}

for evictionType, count := range counts {
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": evictionType}).Add(float64(count))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "evicted_temp_ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand Down
57 changes: 33 additions & 24 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,24 @@ func newService(cfg Config) *Service {
[]string{"type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(ingestionDurationMetric, latestLedgerMetric, ledgerStatsMetric)
cfg.Daemon.MetricsRegistry().MustRegister(
ingestionDurationMetric,
latestLedgerMetric,
ledgerStatsMetric)

service := &Service{
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
metrics: Metrics{
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
},
}

return service
Expand Down Expand Up @@ -119,21 +124,25 @@ func startService(service *Service, cfg Config) {
})
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
type Metrics struct {
ingestionDurationMetric *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
ledgerStatsMetric *prometheus.CounterVec
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
metrics Metrics
}

func (s *Service) Close() error {
s.done()
s.wg.Wait()
Expand Down Expand Up @@ -286,9 +295,9 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
}
s.logger.Debugf("Ingested ledger %d", sequence)

s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
s.latestLedgerMetric.Set(float64(sequence))
s.metrics.latestLedgerMetric.Set(float64(sequence))
return nil
}

Expand All @@ -297,7 +306,7 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
return err
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestRetryRunningIngestion(t *testing.T) {
func TestIngestion(t *testing.T) {
mockDB := &MockDB{}
mockLedgerBackend := &ledgerbackend.MockDatabaseBackend{}

daemon := interfaces.MakeNoOpDeamon()
config := Config{
Logger: supportlog.New(),
Expand Down
25 changes: 25 additions & 0 deletions cmd/soroban-rpc/internal/test/archive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package test

import (
"net/http"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestArchiveUserAgent(t *testing.T) {
userAgents := sync.Map{}
cfg := &Test{
historyArchiveProxyCallback: func(r *http.Request) {
userAgents.Store(r.Header["User-Agent"][0], "")
},
}
NewTest(t, cfg)
sreuland marked this conversation as resolved.
Show resolved Hide resolved

_, ok := userAgents.Load("testing")
assert.True(t, ok, "rpc service should set user agent for history archives")

_, ok = userAgents.Load("testing/captivecore")
sreuland marked this conversation as resolved.
Show resolved Hide resolved
assert.True(t, ok, "rpc captive core should set user agent for history archives")
}
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func getCLIDefaultAccount(t *testing.T) string {
}

func NewCLITest(t *testing.T) *Test {
test := NewTest(t)
test := NewTest(t, nil)
fundAccount(t, test, getCLIDefaultAccount(t), "1000000")
return test
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/cors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// Specifically, when we include an Origin header in the request, a soroban-rpc should response
// with a corresponding Access-Control-Allow-Origin.
func TestCORS(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

request, err := http.NewRequest("POST", test.sorobanRPCURL(), bytes.NewBufferString("{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"getHealth\"}"))
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/test/get_ledger_entries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestGetLedgerEntriesNotFound(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestGetLedgerEntriesNotFound(t *testing.T) {
}

func TestGetLedgerEntriesInvalidParams(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand All @@ -74,7 +74,7 @@ func TestGetLedgerEntriesInvalidParams(t *testing.T) {
}

func TestGetLedgerEntriesSucceeds(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/test/get_ledger_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestGetLedgerEntryNotFound(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestGetLedgerEntryNotFound(t *testing.T) {
}

func TestGetLedgerEntryInvalidParams(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand All @@ -67,7 +67,7 @@ func TestGetLedgerEntryInvalidParams(t *testing.T) {
}

func TestGetLedgerEntrySucceeds(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/get_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestGetNetworkSucceeds(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestHealth(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
Loading
Loading