Skip to content

Commit

Permalink
stellar#5161: http archive requests include user agent and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Jan 17, 2024
1 parent 15324b7 commit 7942d01
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 1 deletion.
42 changes: 42 additions & 0 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ type Ledger struct {
TransactionResult xdr.TransactionHistoryResultEntry
}

// all counters are uint32, golang will auto wrap them back to 0 if they overflow after addition.
type ArchiveStats struct {
Requests uint32
FileDownloads uint32
FileUploads uint32
BackendName string
}

func (as *ArchiveStats) incrementDownloads() {
as.FileDownloads++
as.incrementRequests()
}

func (as *ArchiveStats) incrementUploads() {
as.FileUploads++
as.incrementRequests()
}

func (as *ArchiveStats) incrementRequests() {
as.Requests++
}

type ArchiveBackend interface {
Exists(path string) (bool, error)
Size(path string) (int64, error)
Expand Down Expand Up @@ -87,6 +109,7 @@ type ArchiveInterface interface {
GetXdrStreamForHash(hash Hash) (*XdrStream, error)
GetXdrStream(pth string) (*XdrStream, error)
GetCheckpointManager() CheckpointManager
GetStats() []ArchiveStats
}

var _ ArchiveInterface = &Archive{}
Expand Down Expand Up @@ -115,6 +138,11 @@ type Archive struct {
checkpointManager CheckpointManager

backend ArchiveBackend
stats ArchiveStats
}

func (arch *Archive) GetStats() []ArchiveStats {
return []ArchiveStats{arch.stats}
}

func (arch *Archive) GetCheckpointManager() CheckpointManager {
Expand All @@ -124,6 +152,7 @@ func (arch *Archive) GetCheckpointManager() CheckpointManager {
func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {
var has HistoryArchiveState
rdr, err := a.backend.GetFile(path)
a.stats.incrementDownloads()
if err != nil {
return has, err
}
Expand All @@ -150,6 +179,7 @@ func (a *Archive) GetPathHAS(path string) (HistoryArchiveState, error) {

func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *CommandOptions) error {
exists, err := a.backend.Exists(path)
a.stats.incrementRequests()
if err != nil {
return err
}
Expand All @@ -161,19 +191,23 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command
if err != nil {
return err
}
a.stats.incrementUploads()
return a.backend.PutFile(path,
ioutil.NopCloser(bytes.NewReader(buf)))
}

func (a *Archive) BucketExists(bucket Hash) (bool, error) {
a.stats.incrementRequests()
return a.backend.Exists(BucketPath(bucket))
}

func (a *Archive) BucketSize(bucket Hash) (int64, error) {
a.stats.incrementRequests()
return a.backend.Size(BucketPath(bucket))
}

func (a *Archive) CategoryCheckpointExists(cat string, chk uint32) (bool, error) {
a.stats.incrementRequests()
return a.backend.Exists(CategoryCheckpointPath(cat, chk))
}

Expand Down Expand Up @@ -306,14 +340,17 @@ func (a *Archive) PutRootHAS(has HistoryArchiveState, opts *CommandOptions) erro
}

func (a *Archive) ListBucket(dp DirPrefix) (chan string, chan error) {
a.stats.incrementRequests()
return a.backend.ListFiles(path.Join("bucket", dp.Path()))
}

func (a *Archive) ListAllBuckets() (chan string, chan error) {
a.stats.incrementRequests()
return a.backend.ListFiles("bucket")
}

func (a *Archive) ListAllBucketHashes() (chan Hash, chan error) {
a.stats.incrementRequests()
sch, errs := a.backend.ListFiles("bucket")
ch := make(chan Hash)
rx := regexp.MustCompile("bucket" + hexPrefixPat + "bucket-([0-9a-f]{64})\\.xdr\\.gz$")
Expand All @@ -335,6 +372,7 @@ func (a *Archive) ListCategoryCheckpoints(cat string, pth string) (chan uint32,
rx := regexp.MustCompile(cat + hexPrefixPat + cat +
"-([0-9a-f]{8})\\." + regexp.QuoteMeta(ext) + "$")
sch, errs := a.backend.ListFiles(path.Join(cat, pth))
a.stats.incrementRequests()
ch := make(chan uint32)
errs = makeErrorPump(errs)

Expand Down Expand Up @@ -372,6 +410,7 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
return nil, errors.New("File has non-.xdr.gz suffix: " + pth)
}
rdr, err := a.backend.GetFile(pth)
a.stats.incrementDownloads()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -426,6 +465,9 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
} else {
err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}

arch.stats = ArchiveStats{BackendName: parsed.String()}

return &arch, err
}

Expand Down
13 changes: 12 additions & 1 deletion historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewArchivePool(archiveURLs []string, config ConnectOptions) (ArchivePool, e
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
Context: config.Context,
UserAgent: config.UserAgent,
},
)

Expand All @@ -55,8 +56,18 @@ func NewArchivePool(archiveURLs []string, config ConnectOptions) (ArchivePool, e
return validArchives, nil
}

func (pa ArchivePool) GetStats() []ArchiveStats {
stats := []ArchiveStats{}
for _, archive := range pa {
if len(archive.GetStats()) == 1 {
stats = append(stats, archive.GetStats()[0])
}
}
return stats
}

// Ensure the pool conforms to the ArchiveInterface
var _ ArchiveInterface = ArchivePool{}
var _ ArchiveInterface = &ArchivePool{}

// Below are the ArchiveInterface method implementations.

Expand Down
25 changes: 25 additions & 0 deletions historyarchive/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"io"
"io/ioutil"
"math/big"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -176,6 +178,25 @@ func TestScan(t *testing.T) {
GetRandomPopulatedArchive().Scan(opts)
}

func TestConfiguresHttpUserAgent(t *testing.T) {
var userAgent string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userAgent = r.Header["User-Agent"][0]
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

archive, err := Connect(server.URL, ConnectOptions{
UserAgent: "uatest",
})
assert.NoError(t, err)

ok, err := archive.BucketExists(EmptyXdrArrayHash())
assert.True(t, ok)
assert.NoError(t, err)
assert.Equal(t, userAgent, "uatest")
}

func TestScanSize(t *testing.T) {
defer cleanup()
opts := testOptions()
Expand Down Expand Up @@ -523,6 +544,8 @@ func assertXdrEquals(t *testing.T, a, b xdrEntry) {
func TestGetLedgers(t *testing.T) {
archive := GetTestMockArchive()
_, err := archive.GetLedgers(1000, 1002)
assert.Equal(t, uint32(1), archive.GetStats()[0].Requests)
assert.Equal(t, uint32(0), archive.GetStats()[0].FileDownloads)
assert.EqualError(t, err, "checkpoint 1023 is not published")

ledgerHeaders := []xdr.LedgerHeaderHistoryEntry{
Expand Down Expand Up @@ -610,6 +633,8 @@ func TestGetLedgers(t *testing.T) {
ledgers, err := archive.GetLedgers(1000, 1002)
assert.NoError(t, err)
assert.Len(t, ledgers, 3)
assert.Equal(t, uint32(7), archive.GetStats()[0].Requests) // it started at 1, incurred 6 requests total, 3 queries, 3 downloads
assert.Equal(t, uint32(3), archive.GetStats()[0].FileDownloads) // started 0, incurred 3 file downloads
for i, seq := range []uint32{1000, 1001, 1002} {
ledger := ledgers[seq]
assertXdrEquals(t, ledgerHeaders[i], ledger.Header)
Expand Down
5 changes: 5 additions & 0 deletions historyarchive/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ func (m *MockArchive) GetXdrStream(pth string) (*XdrStream, error) {
a := m.Called(pth)
return a.Get(0).(*XdrStream), a.Error(1)
}

func (m *MockArchive) GetStats() []ArchiveStats {
a := m.Called()
return a.Get(0).([]ArchiveStats)
}
28 changes: 28 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
Expand Down Expand Up @@ -523,6 +524,13 @@ func (r resumeState) run(s *system) (transition, error) {
r.addLedgerStatsMetricFromMap(s, "trades", tradeStatsMap)
r.addProcessorDurationsMetricFromMap(s, stats.transactionDurations)

// since a single system instance is shared throughout all states,
// this will sweep up increments to history archive counters
// done elsewhere such as verifyState invocations since the same system
// instance is passed there and the additional usages of archives will just
// roll up and be reported here as part of resumeState transition
addHistoryArchiveStatsMetrics(s, s.historyAdapter.GetStats())

localLog := log.WithFields(logpkg.F{
"sequence": ingestLedger,
"duration": duration,
Expand Down Expand Up @@ -565,6 +573,26 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStats) {
for _, historyServerStat := range stats {
s.Metrics().HistoryArchiveStatsCounter.
With(prometheus.Labels{
"source": historyServerStat.BackendName,
"type": "file_downloads"}).
Add(float64(historyServerStat.FileDownloads))
s.Metrics().HistoryArchiveStatsCounter.
With(prometheus.Labels{
"source": historyServerStat.BackendName,
"type": "file_uploads"}).
Add(float64(historyServerStat.FileUploads))
s.Metrics().HistoryArchiveStatsCounter.
With(prometheus.Labels{
"source": historyServerStat.BackendName,
"type": "requests"}).
Add(float64(historyServerStat.Requests))
}
}

type waitForCheckpointState struct{}

func (waitForCheckpointState) String() string {
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/internal/ingest/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type historyArchiveAdapterInterface interface {
GetLatestLedgerSequence() (uint32, error)
BucketListHash(sequence uint32) (xdr.Hash, error)
GetState(ctx context.Context, sequence uint32) (ingest.ChangeReader, error)
GetStats() []historyarchive.ArchiveStats
}

// newHistoryArchiveAdapter is a constructor to make a historyArchiveAdapter
Expand Down Expand Up @@ -71,3 +72,7 @@ func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32)

return sr, nil
}

func (haa *historyArchiveAdapter) GetStats() []historyarchive.ArchiveStats {
return haa.archive.GetStats()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (m *mockHistoryArchiveAdapter) GetState(ctx context.Context, sequence uint3
return args.Get(0).(ingest.ChangeReader), args.Error(1)
}

func (m *mockHistoryArchiveAdapter) GetStats() []historyarchive.ArchiveStats {
a := m.Called()
return a.Get(0).([]historyarchive.ArchiveStats)
}

func TestGetState_Read(t *testing.T) {
archive, e := getTestArchive()
if !assert.NoError(t, e) {
Expand Down
12 changes: 12 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ type Metrics struct {

// ProcessorsRunDurationSummary exposes processors run durations.
ProcessorsRunDurationSummary *prometheus.SummaryVec

// ArchiveRequestCounter counts how many http requests are sent to history server
HistoryArchiveStatsCounter *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -390,6 +393,14 @@ func (s *system) initMetrics() {
},
[]string{"name"},
)

s.metrics.HistoryArchiveStatsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "history_archive_stats_total",
Help: "counters of different history archive stats",
},
[]string{"source", "type"},
)
}

func (s *system) GetCurrentState() State {
Expand All @@ -415,6 +426,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.ProcessorsRunDuration)
registry.MustRegister(s.metrics.ProcessorsRunDurationSummary)
registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount)
registry.MustRegister(s.metrics.HistoryArchiveStatsCounter)
s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon")
}

Expand Down
4 changes: 4 additions & 0 deletions services/horizon/internal/ingest/resume_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -260,6 +261,7 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() {
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once()
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil)
s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{{}}).Once()

s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")).
Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -370,6 +372,7 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() {

s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once()
s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once()
s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{{}}).Once()
// Reap lookup tables not executed

next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system)
Expand Down Expand Up @@ -413,6 +416,7 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() {
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once()
s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, nil, errors.New("error reaping objects")).Once()
s.historyQ.On("Rollback").Return(nil).Once()
s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{{}}).Once()

next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system)
s.Assert().NoError(err)
Expand Down

0 comments on commit 7942d01

Please sign in to comment.