Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add loki_ingester_rf1_segment_age_seconds metric #13653

Merged
merged 5 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,9 @@ func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error {

func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
start := time.Now()
defer func() {
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
w.ReportMetrics()
}()

i.metrics.flushesTotal.Add(1)
defer func() { i.metrics.flushDuration.Observe(time.Since(start).Seconds()) }()

buf := i.flushBuffers[j]
defer buf.Reset()
Expand All @@ -111,6 +108,9 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
return err
}

stats := wal.GetSegmentStats(w, time.Now())
wal.ReportSegmentStats(stats, i.metrics.segmentMetrics)

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
Expand All @@ -121,7 +121,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
Block: w.Meta(id),
}); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("metastore add block: %w", err)
return fmt.Errorf("failed to update metastore: %w", err)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func New(cfg Config, clientConfig client.Config,
MaxAge: cfg.MaxSegmentAge,
MaxSegments: int64(cfg.MaxSegments),
MaxSegmentSize: int64(cfg.MaxSegmentSize),
}, wal.NewMetrics(registerer))
}, wal.NewManagerMetrics(registerer))
if err != nil {
return nil, err
}
Expand Down
56 changes: 25 additions & 31 deletions pkg/ingester-rf1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,37 @@ package ingesterrf1
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/storage/wal"
)

type flushMetrics struct {
type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushQueues prometheus.Gauge
flushDuration prometheus.Histogram
flushSizeBytes prometheus.Histogram
flushSize prometheus.Histogram
segmentMetrics *wal.SegmentMetrics
}

func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
return &flushMetrics{
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
Expand All @@ -33,37 +52,12 @@ func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_size_bytes",
Help: "The flush size (as written to object storage).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
*flushMetrics
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushMetrics: newFlushMetrics(r),
segmentMetrics: wal.NewSegmentMetrics(r),
}
}
32 changes: 7 additions & 25 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

const (
DefaultMaxAge = 500 * time.Millisecond
DefaultMaxSegments = 10
DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB.
)

var (
// ErrClosed is returned when the WAL is closed. It is a permanent error
// as once closed, a WAL cannot be re-opened.
Expand Down Expand Up @@ -109,31 +103,24 @@ type Manager struct {
clock quartz.Clock
}

// segment is similar to PendingSegment, however it is an internal struct used
// in the available and pending lists. It contains a single-use result that is
// returned to callers appending to the WAL and a re-usable segment that is reset
// after each flush.
// segment is an internal struct used in the available and pending lists. It
// contains a single-use result that is returned to callers appending to the
// WAL and a re-usable segment that is reset after each flush.
type segment struct {
r *AppendResult
w *SegmentWriter

// moved is the time the segment was moved to the pending list. It is used
// to calculate the age of the segment. A segment is moved when it has
// exceeded the maximum age or the maximum size.
moved time.Time
}

// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
Moved time.Time
}

func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error) {
m := Manager{
cfg: cfg,
metrics: metrics.ManagerMetrics,
metrics: metrics,
available: list.New(),
pending: list.New(),
clock: quartz.NewReal(),
Expand All @@ -142,7 +129,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter(metrics.SegmentMetrics)
w, err := NewWalSegmentWriter()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -205,11 +192,7 @@ func (m *Manager) NextPending() (*PendingSegment, error) {
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingSegment{
Result: s.r,
Writer: s.w,
Moved: s.moved,
}, nil
return &PendingSegment{Result: s.r, Writer: s.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
Expand All @@ -229,7 +212,6 @@ func (m *Manager) Put(s *PendingSegment) {
// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, s *segment) {
s.moved = m.clock.Now()
m.pending.PushBack(s)
m.metrics.NumPending.Inc()
m.available.Remove(el)
Expand Down
28 changes: 14 additions & 14 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestManager_Append(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestManager_AppendFailed(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestManager_AppendFailedWALClosed(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 10,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestManager_AppendFailedWALFull(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 10,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Should be able to write 100KB of data, 10KB per segment.
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestManager_AppendMaxAgeExceeded(t *testing.T) {
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestManager_AppendMaxSizeExceeded(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append 512B of data.
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestManager_NextPending(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// There should be no segments waiting to be flushed as no data has been
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestManager_NextPendingAge(t *testing.T) {
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
Expand All @@ -311,7 +311,7 @@ func TestManager_NextPendingAge(t *testing.T) {
s, err := m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, 100*time.Millisecond, s.Writer.Age(s.Moved))
require.Equal(t, 100*time.Millisecond, s.Writer.Age(clock.Now()))
m.Put(s)

// Append 1KB of data using two separate append requests, 1ms apart.
Expand Down Expand Up @@ -342,15 +342,15 @@ func TestManager_NextPendingAge(t *testing.T) {
s, err = m.NextPending()
require.NoError(t, err)
require.NotNil(t, s)
require.Equal(t, time.Millisecond, s.Writer.Age(s.Moved))
require.Equal(t, time.Millisecond, s.Writer.Age(clock.Now()))
}

func TestManager_NextPendingMaxAgeExceeded(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Create a mock clock.
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestManager_NextPendingWALClosed(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// Append some data.
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestManager_Put(t *testing.T) {
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
}, NewManagerMetrics(nil))
require.NoError(t, err)

// There should be 1 available and 0 pending segments.
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestManager_Metrics(t *testing.T) {
m, err := NewManager(Config{
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(r))
}, NewManagerMetrics(r))
require.NoError(t, err)

metricNames := []string{
Expand Down
Loading
Loading