Skip to content

Commit

Permalink
feat: add loki_ingester_rf1_segment_age_seconds metric
Browse files Browse the repository at this point in the history
This commit adds a new metric loki_ingester_rf1_segment_age_seconds.
It also cleans up a lot of the code that is used to report metrics
for segments and adds a new SegmentsStats struct to get data from
a SegmentWriter.
  • Loading branch information
grobinson-grafana committed Jul 24, 2024
1 parent 4f534d7 commit f29a3f3
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 124 deletions.
10 changes: 5 additions & 5 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,9 @@ func (i *Ingester) flush(l log.Logger, j int, it *wal.PendingSegment) error {

func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter) error {
start := time.Now()
defer func() {
i.metrics.flushDuration.Observe(time.Since(start).Seconds())
w.ReportMetrics()
}()

i.metrics.flushesTotal.Add(1)
defer i.metrics.flushDuration.Observe(time.Since(start).Seconds())

buf := i.flushBuffers[j]
defer buf.Reset()
Expand All @@ -111,6 +108,9 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
return err
}

stats := wal.GetSegmentStats(w, time.Now())
wal.ReportSegmentStats(stats, i.metrics.segmentMetrics)

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
Expand All @@ -121,7 +121,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
Block: w.Meta(id),
}); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("metastore add block: %w", err)
return fmt.Errorf("failed to update metastore: %w", err)
}

return nil
Expand Down
55 changes: 24 additions & 31 deletions pkg/ingester-rf1/metrics.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
package ingesterrf1

import (
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type flushMetrics struct {
type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
flushesTotal prometheus.Counter
flushFailuresTotal prometheus.Counter
flushQueues prometheus.Gauge
flushDuration prometheus.Histogram
flushSizeBytes prometheus.Histogram
flushSize prometheus.Histogram
segmentMetrics *wal.SegmentMetrics
}

func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
return &flushMetrics{
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_flushes_total",
Help: "The total number of flushes.",
Expand All @@ -33,37 +51,12 @@ func newFlushMetrics(r prometheus.Registerer) *flushMetrics {
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
NativeHistogramBucketFactor: 1.1,
}),
flushSizeBytes: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
flushSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_flush_size_bytes",
Help: "The flush size (as written to object storage).",
Buckets: prometheus.ExponentialBuckets(100, 10, 8),
NativeHistogramBucketFactor: 1.1,
}),
}
}

type ingesterMetrics struct {
autoForgetUnhealthyIngestersTotal prometheus.Counter
limiterEnabled prometheus.Gauge
// Shutdown marker for ingester scale down.
shutdownMarker prometheus.Gauge
*flushMetrics
}

func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_rf1_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten.",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_limiter_enabled",
Help: "1 if the limiter is enabled, otherwise 0.",
}),
shutdownMarker: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_rf1_shutdown_marker",
Help: "1 if prepare shutdown has been called, 0 otherwise.",
}),
flushMetrics: newFlushMetrics(r),
segmentMetrics: wal.NewSegmentMetrics(r),
}
}
32 changes: 7 additions & 25 deletions pkg/storage/wal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
)

const (
DefaultMaxAge = 500 * time.Millisecond
DefaultMaxSegments = 10
DefaultMaxSegmentSize = 8 * 1024 * 1024 // 8MB.
)

var (
// ErrClosed is returned when the WAL is closed. It is a permanent error
// as once closed, a WAL cannot be re-opened.
Expand Down Expand Up @@ -109,31 +103,24 @@ type Manager struct {
clock quartz.Clock
}

// segment is similar to PendingSegment, however it is an internal struct used
// in the available and pending lists. It contains a single-use result that is
// returned to callers appending to the WAL and a re-usable segment that is reset
// after each flush.
// segment is an internal struct used in the available and pending lists. It
// contains a single-use result that is returned to callers appending to the
// WAL and a re-usable segment that is reset after each flush.
type segment struct {
r *AppendResult
w *SegmentWriter

// moved is the time the segment was moved to the pending list. It is used
// to calculate the age of the segment. A segment is moved when it has
// exceeded the maximum age or the maximum size.
moved time.Time
}

// PendingSegment contains a result and the segment to be flushed.
type PendingSegment struct {
Result *AppendResult
Writer *SegmentWriter
Moved time.Time
}

func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error) {
m := Manager{
cfg: cfg,
metrics: metrics.ManagerMetrics,
metrics: metrics,
available: list.New(),
pending: list.New(),
clock: quartz.NewReal(),
Expand All @@ -142,7 +129,7 @@ func NewManager(cfg Config, metrics *Metrics) (*Manager, error) {
m.metrics.NumPending.Set(0)
m.metrics.NumFlushing.Set(0)
for i := int64(0); i < cfg.MaxSegments; i++ {
w, err := NewWalSegmentWriter(metrics.SegmentMetrics)
w, err := NewWalSegmentWriter()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -205,11 +192,7 @@ func (m *Manager) NextPending() (*PendingSegment, error) {
m.pending.Remove(el)
m.metrics.NumPending.Dec()
m.metrics.NumFlushing.Inc()
return &PendingSegment{
Result: s.r,
Writer: s.w,
Moved: s.moved,
}, nil
return &PendingSegment{Result: s.r, Writer: s.w}, nil
}

// Put resets the segment and puts it back in the available list to accept
Expand All @@ -229,7 +212,6 @@ func (m *Manager) Put(s *PendingSegment) {
// move the element from the available list to the pending list and sets the
// relevant metrics.
func (m *Manager) move(el *list.Element, s *segment) {
s.moved = m.clock.Now()
m.pending.PushBack(s)
m.metrics.NumPending.Inc()
m.available.Remove(el)
Expand Down
132 changes: 132 additions & 0 deletions pkg/storage/wal/manager_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package wal

import (
"github.com/pkg/errors"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/grafana/loki/v3/pkg/logproto"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func BenchmarkManager_Append(t *testing.B) {
lbs := labels.Labels{{
Name: "foo",
Value: "bar",
}}

for i := 0; i < t.N; i++ {
t.StopTimer()
m, err := NewManager(Config{
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
MaxSegments: 64, // 512MB
}, NewMetrics(nil))
require.NoError(t, err)
t.StartTimer()
for {
// Write as much data as possible.
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 1024),
}}
_, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
if errors.Is(err, ErrFull) {
break
}
require.NoError(t, err)
}
}
}

func BenchmarkManager_Append_Parallel(t *testing.B) {
lbs := labels.Labels{{
Name: "foo",
Value: "bar",
}}

for i := 0; i < t.N; i++ {
t.StopTimer()
m, err := NewManager(Config{
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
MaxSegments: 64, // 512MB
}, NewMetrics(nil))
require.NoError(t, err)
wg := sync.WaitGroup{}
t.StartTimer()
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
// Write as much data as possible.
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 1024),
}}
_, appendErr := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
if errors.Is(appendErr, ErrFull) {
break
}
require.NoError(t, appendErr)
}
}()
}
wg.Wait()
}
}

func BenchmarkManager_Append_Parallel_MultipleStreams(t *testing.B) {
for i := 0; i < t.N; i++ {
t.StopTimer()
m, err := NewManager(Config{
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
MaxSegments: 64, // 512MB
}, NewMetrics(nil))
require.NoError(t, err)
wg := sync.WaitGroup{}
t.StartTimer()
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for {
lbs := labels.Labels{{
Name: "foo",
Value: strconv.Itoa(i),
}}
// Write as much data as possible.
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 1024),
}}
_, appendErr := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
if errors.Is(appendErr, ErrFull) {
break
}
require.NoError(t, appendErr)
}
}(i)
}
wg.Wait()
}
}
Loading

0 comments on commit f29a3f3

Please sign in to comment.