Skip to content

Commit

Permalink
rpc: added user agent config on ha archive pool (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Feb 12, 2024
1 parent a8d8412 commit e960236
Show file tree
Hide file tree
Showing 20 changed files with 153 additions and 64 deletions.
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
EventLedgerRetentionWindow uint32
FriendbotURL string
HistoryArchiveURLs []string
HistoryArchiveUserAgent string
IngestionTimeout time.Duration
LogFormat LogFormat
LogLevel logrus.Level
Expand Down Expand Up @@ -64,6 +65,13 @@ type Config struct {
flagset *pflag.FlagSet
}

func (cfg *Config) ExtendedUserAgent(extension string) string {
if cfg.HistoryArchiveUserAgent == "" {
return extension
}
return cfg.HistoryArchiveUserAgent + "/" + extension
}

func (cfg *Config) SetValues(lookupEnv func(string) (string, bool)) error {
// We start with the defaults
if err := cfg.loadDefaults(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func TestConfigLoadDefaults(t *testing.T) {
assert.Equal(t, uint(runtime.NumCPU()), cfg.PreflightWorkerCount)
}

func TestConfigExtendedUserAgent(t *testing.T) {
cfg := Config{
HistoryArchiveUserAgent: "Test",
}
require.NoError(t, cfg.loadDefaults())
assert.Equal(t, "Test/123", cfg.ExtendedUserAgent("123"))
}

func TestConfigLoadFlagsDefaultValuesOverrideExisting(t *testing.T) {
// Set up a config with an existing non-default value
cfg := Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAllConfigFieldsMustHaveASingleOption(t *testing.T) {

// Allow us to explicitly exclude any fields on the Config struct, which are not going to have Options.
// e.g. "ConfigPath"
excluded := map[string]bool{}
excluded := map[string]bool{"HistoryArchiveUserAgent": true}

cfg := Config{}
cfgValue := reflect.ValueOf(cfg)
Expand Down
13 changes: 10 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
Expand Down Expand Up @@ -121,7 +122,7 @@ func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbacken
CheckpointFrequency: cfg.CheckpointFrequency,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "captivecore",
UserAgent: cfg.ExtendedUserAgent("captivecore"),
UseDB: true,
}
return ledgerbackend.NewCaptive(captiveConfig)
Expand All @@ -144,12 +145,18 @@ func MustNew(cfg *config.Config) *Daemon {
if len(cfg.HistoryArchiveURLs) == 0 {
logger.Fatal("no history archives url were provided")
}
historyArchive, err := historyarchive.Connect(
cfg.HistoryArchiveURLs[0],

historyArchive, err := historyarchive.NewArchivePool(
cfg.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
NetworkPassphrase: cfg.NetworkPassphrase,
CheckpointFrequency: cfg.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Context: context.Background(),
UserAgent: cfg.HistoryArchiveUserAgent},
},
)

if err != nil {
logger.WithError(err).Fatal("could not connect to history archive")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down
8 changes: 4 additions & 4 deletions cmd/soroban-rpc/internal/ingest/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func (s *Service) ingestLedgerEntryChanges(ctx context.Context, reader ingest.Ch
results := changeStatsProcessor.GetResults()
for stat, value := range results.Map() {
stat = strings.Replace(stat, "stats_", "change_", 1)
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": stat}).Add(float64(value.(int64)))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand All @@ -66,10 +66,10 @@ func (s *Service) ingestTempLedgerEntryEvictions(
}

for evictionType, count := range counts {
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": evictionType}).Add(float64(count))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "evicted_temp_ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand Down
57 changes: 33 additions & 24 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,24 @@ func newService(cfg Config) *Service {
[]string{"type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(ingestionDurationMetric, latestLedgerMetric, ledgerStatsMetric)
cfg.Daemon.MetricsRegistry().MustRegister(
ingestionDurationMetric,
latestLedgerMetric,
ledgerStatsMetric)

service := &Service{
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
metrics: Metrics{
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
},
}

return service
Expand Down Expand Up @@ -119,21 +124,25 @@ func startService(service *Service, cfg Config) {
})
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
type Metrics struct {
ingestionDurationMetric *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
ledgerStatsMetric *prometheus.CounterVec
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
metrics Metrics
}

func (s *Service) Close() error {
s.done()
s.wg.Wait()
Expand Down Expand Up @@ -286,9 +295,9 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
}
s.logger.Debugf("Ingested ledger %d", sequence)

s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
s.latestLedgerMetric.Set(float64(sequence))
s.metrics.latestLedgerMetric.Set(float64(sequence))
return nil
}

Expand All @@ -297,7 +306,7 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
return err
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestRetryRunningIngestion(t *testing.T) {
func TestIngestion(t *testing.T) {
mockDB := &MockDB{}
mockLedgerBackend := &ledgerbackend.MockDatabaseBackend{}

daemon := interfaces.MakeNoOpDeamon()
config := Config{
Logger: supportlog.New(),
Expand Down
25 changes: 25 additions & 0 deletions cmd/soroban-rpc/internal/test/archive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package test

import (
"net/http"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestArchiveUserAgent(t *testing.T) {
userAgents := sync.Map{}
cfg := &TestConfig{
historyArchiveProxyCallback: func(r *http.Request) {
userAgents.Store(r.Header["User-Agent"][0], "")
},
}
NewTest(t, cfg)

_, ok := userAgents.Load("testing")
assert.True(t, ok, "rpc service should set user agent for history archives")

_, ok = userAgents.Load("testing/captivecore")
assert.True(t, ok, "rpc captive core should set user agent for history archives")
}
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func getCLIDefaultAccount(t *testing.T) string {
}

func NewCLITest(t *testing.T) *Test {
test := NewTest(t)
test := NewTest(t, nil)
fundAccount(t, test, getCLIDefaultAccount(t), "1000000")
return test
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/cors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// Specifically, when we include an Origin header in the request, a soroban-rpc should response
// with a corresponding Access-Control-Allow-Origin.
func TestCORS(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

request, err := http.NewRequest("POST", test.sorobanRPCURL(), bytes.NewBufferString("{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"getHealth\"}"))
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/test/get_ledger_entries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestGetLedgerEntriesNotFound(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestGetLedgerEntriesNotFound(t *testing.T) {
}

func TestGetLedgerEntriesInvalidParams(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand All @@ -74,7 +74,7 @@ func TestGetLedgerEntriesInvalidParams(t *testing.T) {
}

func TestGetLedgerEntriesSucceeds(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/test/get_ledger_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

func TestGetLedgerEntryNotFound(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestGetLedgerEntryNotFound(t *testing.T) {
}

func TestGetLedgerEntryInvalidParams(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand All @@ -67,7 +67,7 @@ func TestGetLedgerEntryInvalidParams(t *testing.T) {
}

func TestGetLedgerEntrySucceeds(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/get_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func TestGetNetworkSucceeds(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestHealth(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

ch := jhttp.NewChannel(test.sorobanRPCURL(), nil)
client := jrpc2.NewClient(ch, nil)
Expand Down
Loading

0 comments on commit e960236

Please sign in to comment.