Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use metrics namespace for more metrics #11025

Merged
merged 35 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e406484
Add metrics namespace
MichelHollands Oct 24, 2023
a63be83
Do not use underscore in metrics namespace param
MichelHollands Oct 24, 2023
04f1830
Do not use underscore in metrics namespace param
MichelHollands Oct 24, 2023
d88cbc3
Add option to specify metrics namespace in migrate
MichelHollands Oct 24, 2023
1bda0a9
Make cortex the default metrics namespace
MichelHollands Oct 24, 2023
ff2a058
Update changelog
MichelHollands Oct 24, 2023
619b5ee
Add docs update
MichelHollands Oct 24, 2023
f5e4808
Rename distributor field
MichelHollands Oct 24, 2023
c0480cc
Add temp image override in docker compose
MichelHollands Oct 24, 2023
0518a76
Merge branch 'main' into add_metrics_namespace_setting
MichelHollands Oct 25, 2023
065b45f
Rename some more metrics starting with cortex
MichelHollands Oct 25, 2023
5b59e91
Change setting to metrics namespace
MichelHollands Oct 25, 2023
9a4471e
Revert test image
MichelHollands Oct 25, 2023
f0eddd6
Remove change of metrics namespace to loki
MichelHollands Oct 25, 2023
f28d682
Replace cortex namespace with setting
MichelHollands Oct 25, 2023
f00b7cd
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 25, 2023
f0742ae
One more
MichelHollands Oct 25, 2023
c8d90f7
Update DefaultTenantrManagerFactory
MichelHollands Oct 25, 2023
d43074d
Fix test
MichelHollands Oct 25, 2023
539da55
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 25, 2023
74988f6
Update pkg/lokifrontend/frontend/v1/frontend.go
MichelHollands Oct 25, 2023
cd92d41
Update pkg/scheduler/scheduler.go
MichelHollands Oct 25, 2023
40e672c
Address review feedback
MichelHollands Oct 25, 2023
fc73abc
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 25, 2023
eaeca5b
Use loki as metrics namespace in tests
MichelHollands Oct 25, 2023
d7e7e2e
Merge branch 'use_metrics_namespace_for_more_metrics' of github.com:g…
MichelHollands Oct 25, 2023
a7193a4
Change around variable order
MichelHollands Oct 27, 2023
81c1193
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 27, 2023
ba1bb29
Use constants
MichelHollands Oct 27, 2023
937ffb8
Fix the import order
MichelHollands Oct 27, 2023
216be21
Fix tests and formatting
MichelHollands Oct 27, 2023
93a53b1
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 27, 2023
e702c1c
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 27, 2023
907fab4
Update comment for new setting
MichelHollands Oct 30, 2023
d9fbdb0
Merge branch 'main' into use_metrics_namespace_for_more_metrics
MichelHollands Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace.
* [10906](https://github.com/grafana/loki/pull/10906) **kavirajk**: Support Loki ruler to notify WAL writes to remote storage.
* [10613](https://github.com/grafana/loki/pull/10613) **ngc4579**: Helm: allow GrafanaAgent tolerations
* [10295](https://github.com/grafana/loki/pull/10295) **changhyuni**: Storage: remove signatureversionv2 from s3.
Expand Down
5 changes: 3 additions & 2 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func main() {
batch := flag.Int("batchLen", 500, "Specify how many chunks to read/write in one batch")
shardBy := flag.Duration("shardBy", 6*time.Hour, "Break down the total interval into shards of this size, making this too small can lead to syncing a lot of duplicate chunks")
parallel := flag.Int("parallel", 8, "How many parallel threads to process each shard")
metricsNamespace := flag.String("metrics.namespace", "cortex", "Namespace of the generated metrics")
flag.Parse()

go func() {
Expand Down Expand Up @@ -127,7 +128,7 @@ func main() {
// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
clientMetrics := storage.NewClientMetrics()
s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, util_log.Logger, *metricsNamespace)
if err != nil {
log.Println("Failed to create source store:", err)
os.Exit(1)
Expand All @@ -136,7 +137,7 @@ func main() {
// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()

d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, util_log.Logger, *metricsNamespace)
if err != nil {
log.Println("Failed to create destination store:", err)
os.Exit(1)
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# will report 503 Service Unavailable status via /ready endpoint.
# CLI flag: -shutdown-delay
[shutdown_delay: <duration> | default = 0s]

# Namespace of the metrics.
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
# CLI flag: -metrics-namespace
[metrics_namespace: <string> | default = "cortex"]
```

### server
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s
pendingTasks: makePendingTasks(pendingTasksInitialCap),
}

g.queueMetrics = queue.NewMetrics("bloom_gateway", reg)
g.queueMetrics = queue.NewMetrics("bloom_gateway", reg, "loki")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have this "loki" string in a constant somewhere. Same for the "cortex" string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constants were added. It does look kinda busy though so please have a look.

g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type GatewayClient struct {
ring ring.ReadRing
}

func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Registerer, logger log.Logger) (*GatewayClient, error) {
func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Registerer, logger log.Logger, metricsNamespace string) (*GatewayClient, error) {
latency := promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "bloom_gateway",
Expand All @@ -118,7 +118,7 @@ func NewGatewayClient(cfg ClientConfig, limits Limits, registerer prometheus.Reg
cfg: cfg,
logger: logger,
limits: limits,
pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger),
pool: clientpool.NewPool("bloom-gateway", cfg.PoolConfig, cfg.Ring, ringclient.PoolAddrFunc(poolFactory), logger, metricsNamespace),
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
}

return c, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBloomGatewayClient(t *testing.T) {
flagext.DefaultValues(&cfg)

t.Run("", func(t *testing.T) {
_, err := NewGatewayClient(cfg, l, reg, logger)
_, err := NewGatewayClient(cfg, l, reg, logger, "loki")
require.NoError(t, err)
})
}
Expand All @@ -40,7 +40,7 @@ func TestBloomGatewayClient_GroupStreamsByAddresses(t *testing.T) {
cfg := ClientConfig{}
flagext.DefaultValues(&cfg)

c, err := NewGatewayClient(cfg, l, reg, logger)
c, err := NewGatewayClient(cfg, l, reg, logger, "loki")
require.NoError(t, err)

testCases := []struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type Limits interface {
DefaultLimits() *validation.Limits
}

func NewCompactor(cfg Config, objectStoreClients map[string]client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer) (*Compactor, error) {
func NewCompactor(cfg Config, objectStoreClients map[string]client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer, metricsNamespace string) (*Compactor, error) {
retentionEnabledStats.Set("false")
if cfg.RetentionEnabled {
retentionEnabledStats.Set("true")
Expand Down Expand Up @@ -241,7 +241,7 @@ func NewCompactor(cfg Config, objectStoreClients map[string]client.ObjectClient,
}

ringCfg := cfg.CompactorRing.ToRingConfig(ringReplicationFactor)
compactor.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", r), util_log.Logger)
compactor.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", r), util_log.Logger)
if err != nil {
return nil, errors.Wrap(err, "create ring client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func setupTestCompactor(t *testing.T, objectClients map[string]client.ObjectClie

c, err := NewCompactor(cfg, objectClients, config.SchemaConfig{
Configs: periodConfigs,
}, nil, nil)
}, nil, nil, "cortex")
require.NoError(t, err)

c.RegisterIndexCompactor("dummy", testIndexCompactor{})
Expand Down
16 changes: 7 additions & 9 deletions pkg/distributor/clientpool/ingester_client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@ import (
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var clients = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "distributor_ingester_clients",
Help: "The current number of ingester clients.",
})

// PoolConfig is config for creating a Pool.
type PoolConfig struct {
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`
Expand All @@ -31,13 +24,18 @@ func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.RemoteTimeout, prefix+"remote-timeout", 1*time.Second, "Timeout for the health check.")
}

func NewPool(name string, cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool {
func NewPool(name string, cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger, metricsNamespace string) *ring_client.Pool {
clients := prometheus.NewGauge(prometheus.GaugeOpts{
MichelHollands marked this conversation as resolved.
Show resolved Hide resolved
Namespace: metricsNamespace,
Name: "distributor_ingester_clients",
Help: "The current number of ingester clients.",
})

poolCfg := ring_client.PoolConfig{
CheckInterval: cfg.ClientCleanupPeriod,
HealthCheckEnabled: cfg.HealthCheckIngesters,
HealthCheckTimeout: cfg.RemoteTimeout,
}

// TODO(chaudum): Allow cofiguration of metric name by the caller.
return ring_client.NewPool(name, poolCfg, ring_client.NewRingServiceDiscovery(ring), factory, clients, logger)
}
12 changes: 7 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func New(
ingestersRing ring.ReadRing,
overrides Limits,
registerer prometheus.Registerer,
metricsNamespace string,
) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
Expand Down Expand Up @@ -172,7 +173,7 @@ func New(
tenantsRetention: retention.NewTenantsRetention(overrides),
ingestersRing: ingestersRing,
validator: validator,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger, metricsNamespace),
labelCache: labelCache,
shardTracker: NewShardTracker(),
healthyInstancesCount: atomic.NewUint32(0),
Expand Down Expand Up @@ -203,7 +204,7 @@ func New(
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
d.rateLimitStrat = validation.GlobalIngestionRateStrategy

distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, util_log.Logger, registerer)
distributorsRing, distributorsLifecycler, err = newRingAndLifecycler(cfg.DistributorRing, d.healthyInstancesCount, util_log.Logger, registerer, metricsNamespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,6 +232,7 @@ func New(
ingestersRing,
ring_client.PoolAddrFunc(internalFactory),
util_log.Logger,
metricsNamespace,
),
overrides,
registerer,
Expand Down Expand Up @@ -731,7 +733,7 @@ func calculateShards(rate int64, pushSize, desiredRate int) int {
}

// newRingAndLifecycler creates a new distributor ring and lifecycler with all required lifecycler delegates
func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer) (*ring.Ring, *ring.BasicLifecycler, error) {
func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger log.Logger, reg prometheus.Registerer, metricsNamespace string) (*ring.Ring, *ring.BasicLifecycler, error) {
kvStore, err := kv.NewClient(cfg.KVStore, ring.GetCodec(), kv.RegistererWithKVName(reg, "distributor-lifecycler"), logger)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' KV store")
Expand All @@ -748,12 +750,12 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
delegate = ring.NewLeaveOnStoppingDelegate(delegate, logger)
delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.HeartbeatTimeout, delegate, logger)

distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", ringKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
distributorsLifecycler, err := ring.NewBasicLifecycler(lifecyclerCfg, "distributor", ringKey, kvStore, delegate, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' lifecycler")
}

distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", ringKey, logger, prometheus.WrapRegistererWithPrefix("cortex_", reg))
distributorsRing, err := ring.New(cfg.ToRingConfig(), "distributor", ringKey, logger, prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", reg))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to initialize distributors' ring client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry())
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), "cortex")
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
Expand Down
22 changes: 11 additions & 11 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestIngesterWAL(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -125,7 +125,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -148,7 +148,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -272,7 +272,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -293,7 +293,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -314,7 +314,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -661,7 +661,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
go i.flushLoop(j)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{})
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, "cortex")
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

Expand Down
Loading