Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: added user agent config on ha archive pool #56

Merged
merged 10 commits into from
Feb 12, 2024
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
EventLedgerRetentionWindow uint32
FriendbotURL string
HistoryArchiveURLs []string
HistoryArchiveUserAgent string
IngestionTimeout time.Duration
LogFormat LogFormat
LogLevel logrus.Level
Expand Down Expand Up @@ -64,6 +65,13 @@ type Config struct {
flagset *pflag.FlagSet
}

func (cfg *Config) ExtendedUserAgent(extension string) string {
if cfg.HistoryArchiveUserAgent == "" {
return extension
}
return cfg.HistoryArchiveUserAgent + "/" + extension
}

func (cfg *Config) SetValues(lookupEnv func(string) (string, bool)) error {
// We start with the defaults
if err := cfg.loadDefaults(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func TestConfigLoadDefaults(t *testing.T) {
assert.Equal(t, uint(runtime.NumCPU()), cfg.PreflightWorkerCount)
}

func TestConfigExtendedUserAgent(t *testing.T) {
cfg := Config{
HistoryArchiveUserAgent: "Test",
}
require.NoError(t, cfg.loadDefaults())
assert.Equal(t, "Test/123", cfg.ExtendedUserAgent("123"))
}

func TestConfigLoadFlagsDefaultValuesOverrideExisting(t *testing.T) {
// Set up a config with an existing non-default value
cfg := Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAllConfigFieldsMustHaveASingleOption(t *testing.T) {

// Allow us to explicitly exclude any fields on the Config struct, which are not going to have Options.
// e.g. "ConfigPath"
excluded := map[string]bool{}
excluded := map[string]bool{"HistoryArchiveUserAgent": true}

cfg := Config{}
cfgValue := reflect.ValueOf(cfg)
Expand Down
16 changes: 13 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
Expand Down Expand Up @@ -121,7 +122,7 @@ func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbacken
CheckpointFrequency: cfg.CheckpointFrequency,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "captivecore",
UserAgent: cfg.ExtendedUserAgent("captivecore"),
UseDB: true,
}
return ledgerbackend.NewCaptive(captiveConfig)
Expand All @@ -144,12 +145,21 @@ func MustNew(cfg *config.Config) *Daemon {
if len(cfg.HistoryArchiveURLs) == 0 {
logger.Fatal("no history archives url were provided")
}
historyArchive, err := historyarchive.Connect(
cfg.HistoryArchiveURLs[0],

historyArchive, err := historyarchive.NewArchivePool(
cfg.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
NetworkPassphrase: cfg.NetworkPassphrase,
CheckpointFrequency: cfg.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Context: context.Background(),
UserAgent: cfg.HistoryArchiveUserAgent},
CacheConfig: historyarchive.CacheOptions{
sreuland marked this conversation as resolved.
Show resolved Hide resolved
sreuland marked this conversation as resolved.
Show resolved Hide resolved
Cache: false,
},
},
)

if err != nil {
logger.WithError(err).Fatal("could not connect to history archive")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down
8 changes: 4 additions & 4 deletions cmd/soroban-rpc/internal/ingest/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func (s *Service) ingestLedgerEntryChanges(ctx context.Context, reader ingest.Ch
results := changeStatsProcessor.GetResults()
for stat, value := range results.Map() {
stat = strings.Replace(stat, "stats_", "change_", 1)
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": stat}).Add(float64(value.(int64)))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand All @@ -66,10 +66,10 @@ func (s *Service) ingestTempLedgerEntryEvictions(
}

for evictionType, count := range counts {
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": evictionType}).Add(float64(count))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "evicted_temp_ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand Down
57 changes: 33 additions & 24 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,24 @@ func newService(cfg Config) *Service {
[]string{"type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(ingestionDurationMetric, latestLedgerMetric, ledgerStatsMetric)
cfg.Daemon.MetricsRegistry().MustRegister(
ingestionDurationMetric,
latestLedgerMetric,
ledgerStatsMetric)

service := &Service{
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
metrics: Metrics{
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
},
}

return service
Expand Down Expand Up @@ -119,21 +124,25 @@ func startService(service *Service, cfg Config) {
})
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
type Metrics struct {
ingestionDurationMetric *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
ledgerStatsMetric *prometheus.CounterVec
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
metrics Metrics
}

func (s *Service) Close() error {
s.done()
s.wg.Wait()
Expand Down Expand Up @@ -286,9 +295,9 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
}
s.logger.Debugf("Ingested ledger %d", sequence)

s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
s.latestLedgerMetric.Set(float64(sequence))
s.metrics.latestLedgerMetric.Set(float64(sequence))
return nil
}

Expand All @@ -297,7 +306,7 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
return err
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestRetryRunningIngestion(t *testing.T) {
func TestIngestion(t *testing.T) {
mockDB := &MockDB{}
mockLedgerBackend := &ledgerbackend.MockDatabaseBackend{}

daemon := interfaces.MakeNoOpDeamon()
config := Config{
Logger: supportlog.New(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/simulate_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,7 +1125,7 @@ func TestSimulateSystemEvent(t *testing.T) {
err = xdr.SafeUnmarshalBase64(response.TransactionData, &transactionData)
require.NoError(t, err)
assert.InDelta(t, 6856, uint32(transactionData.Resources.ReadBytes), 200)

// the resulting fee is derived from compute factors and a default padding is applied to instructions by preflight
// for test purposes, the most deterministic way to assert the resulting fee is expected value in test scope, is to capture
// the resulting fee from current preflight output and re-plug it in here, rather than try to re-implement the cost-model algo
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func main() {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
cfg.HistoryArchiveUserAgent = fmt.Sprintf("soroban-rpc/%s", config.Version)
daemon.MustNew(&cfg).Run()
},
}
Expand Down
Loading