Skip to content

Commit

Permalink
Add latest uploaded ledger metric (stellar#5322)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored May 17, 2024
1 parent 7d6f7e7 commit afd526d
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 73 deletions.
14 changes: 11 additions & 3 deletions exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ import (
const (
adminServerReadTimeout = 5 * time.Second
adminServerShutdownTimeout = time.Second * 5
// TODO: make this timeout configurable
uploadShutdownTimeout = 10 * time.Second
// We expect the queue size to rarely exceed 1 or 2 because
// upload speeds are expected to be much faster than the rate at which
// captive core emits ledgers. However, configuring a higher capacity
// than our expectation is useful because if we observe a large queue
// size in our metrics that is an indication that uploads to the
// data store have degraded
uploadQueueCapacity = 128
)

var (
Expand Down Expand Up @@ -116,8 +125,7 @@ func (a *App) init(ctx context.Context) error {
return err
}

// TODO: make queue size configurable instead of hard coding it to 1
queue := NewUploadQueue(1, registry)
queue := NewUploadQueue(uploadQueueCapacity, registry)
if a.exportManager, err = NewExportManager(a.config.LedgerBatchConfig, a.ledgerBackend, queue, registry); err != nil {
return err
}
Expand Down Expand Up @@ -190,7 +198,7 @@ func (a *App) Run() {
go func() {
defer wg.Done()

err := a.uploader.Run(ctx)
err := a.uploader.Run(ctx, uploadShutdownTimeout)
if err != nil && !errors.Is(err, context.Canceled) {
logger.WithError(err).Error("Error executing Uploader")
cancel()
Expand Down
45 changes: 33 additions & 12 deletions exp/services/ledgerexporter/internal/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Uploader struct {
queue UploadQueue
uploadDurationMetric *prometheus.SummaryVec
objectSizeMetrics *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
}

// NewUploader constructs a new Uploader instance
Expand All @@ -43,12 +44,17 @@ func NewUploader(
},
[]string{"ledgers", "already_exists", "compression"},
)
prometheusRegistry.MustRegister(uploadDurationMetric, objectSizeMetrics)
latestLedgerMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ledger_exporter", Subsystem: "uploader", Name: "latest_ledger",
Help: "sequence number of the latest ledger uploaded",
})
prometheusRegistry.MustRegister(uploadDurationMetric, objectSizeMetrics, latestLedgerMetric)
return Uploader{
dataStore: destination,
queue: queue,
uploadDurationMetric: uploadDurationMetric,
objectSizeMetrics: objectSizeMetrics,
latestLedgerMetric: latestLedgerMetric,
}
}

Expand Down Expand Up @@ -93,6 +99,8 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) er
if err != nil {
return errors.Wrapf(err, "error uploading %s", metaArchive.ObjectKey)
}

logger.Infof("Uploaded %s successfully", metaArchive.ObjectKey)
alreadyExists := strconv.FormatBool(!ok)

u.uploadDurationMetric.With(prometheus.Labels{
Expand All @@ -109,22 +117,36 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *LedgerMetaArchive) er
"ledgers": numLedgers,
"already_exists": alreadyExists,
}).Observe(float64(writerTo.totalCompressed))
u.latestLedgerMetric.Set(float64(metaArchive.Data.EndSequence))
return nil
}

// TODO: make it configurable
var uploaderShutdownWaitTime = 10 * time.Second

// Run starts the uploader, continuously listening for LedgerMetaArchive objects to upload.
func (u Uploader) Run(ctx context.Context) error {
func (u Uploader) Run(ctx context.Context, shutdownDelayTime time.Duration) error {
uploadCtx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
<-ctx.Done()
logger.Info("Context done, waiting for remaining uploads to complete...")
// wait for a few seconds to upload remaining objects from metaArchiveCh
<-time.After(uploaderShutdownWaitTime)
logger.Info("Timeout reached, canceling remaining uploads...")
cancel()
select {
case <-uploadCtx.Done():
// if uploadCtx is cancelled that means we have exited Run()
// and therefore there are no remaining uploads
return
case <-ctx.Done():
logger.Info("Received shutdown signal, waiting for remaining uploads to complete...")
}

select {
case <-time.After(shutdownDelayTime):
// wait for some time to upload remaining objects from
// the upload queue
logger.Info("Timeout reached, canceling remaining uploads...")
cancel()
case <-uploadCtx.Done():
// if uploadCtx is cancelled that means we have exited Run()
// and therefore there are no remaining uploads
return
}
}()

for {
Expand All @@ -141,6 +163,5 @@ func (u Uploader) Run(ctx context.Context) error {
if err = u.Upload(uploadCtx, metaObject); err != nil {
return err
}
logger.Infof("Uploaded %s successfully", metaObject.ObjectKey)
}
}
Loading

0 comments on commit afd526d

Please sign in to comment.