Skip to content

Commit

Permalink
Add initial per tenant query and chunk metrics AND RENAME SOME EXISTI…
Browse files Browse the repository at this point in the history
…NG METRICS (grafana#2463)

* Add initial per tenant query metric

More will be added once the following is merged:
prometheus/prometheus#6890

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Add per tenant chunks stored and fetched metrics

Signed-off-by: Goutham Veeramachaneni <[email protected]>

* Address feedback

Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve authored Apr 17, 2020
1 parent 4a5cbd2 commit c79e198
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
7 changes: 6 additions & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/aws"
Expand Down Expand Up @@ -100,7 +101,9 @@ func (cfg *Config) Validate() error {
}

// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, reg prometheus.Registerer) (chunk.Store, error) {
chunkMetrics := newChunkClientMetrics(reg)

indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -146,6 +149,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
return nil, errors.Wrap(err, "error creating object client")
}

chunks = newMetricsChunkClient(chunks, chunkMetrics)

err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFactoryStop(t *testing.T) {
limits, err := validation.NewOverrides(defaults, nil)
require.NoError(t, err)

store, err := NewStore(cfg, storeConfig, schemaConfig, limits)
store, err := NewStore(cfg, storeConfig, schemaConfig, limits, nil)
require.NoError(t, err)

store.Stop()
Expand Down
110 changes: 110 additions & 0 deletions storage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package storage

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/cortexproject/cortex/pkg/chunk"
)

// takes a chunk client and exposes metrics for its operations.
type metricsChunkClient struct {
client chunk.Client

metrics chunkClientMetrics
}

func newMetricsChunkClient(client chunk.Client, metrics chunkClientMetrics) metricsChunkClient {
return metricsChunkClient{
client: client,
metrics: metrics,
}
}

type chunkClientMetrics struct {
chunksPutPerUser *prometheus.CounterVec
chunksSizePutPerUser *prometheus.CounterVec
chunksFetchedPerUser *prometheus.CounterVec
chunksSizeFetchedPerUser *prometheus.CounterVec
}

func newChunkClientMetrics(reg prometheus.Registerer) chunkClientMetrics {
return chunkClientMetrics{
chunksPutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_stored_chunks_total",
Help: "Total stored chunks per user.",
}, []string{"user"}),
chunksSizePutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_stored_chunk_bytes_total",
Help: "Total bytes stored in chunks per user.",
}, []string{"user"}),
chunksFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_fetched_chunks_total",
Help: "Total fetched chunks per user.",
}, []string{"user"}),
chunksSizeFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_fetched_chunk_bytes_total",
Help: "Total bytes fetched in chunks per user.",
}, []string{"user"}),
}
}

func (c metricsChunkClient) Stop() {
c.client.Stop()
}

func (c metricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
if err := c.client.PutChunks(ctx, chunks); err != nil {
return err
}

// For PutChunks, we explicitly encode the userID in the chunk and don't use context.
userSizes := map[string]int{}
userCounts := map[string]int{}
for _, c := range chunks {
userSizes[c.UserID] += c.Data.Size()
userCounts[c.UserID]++
}
for user, size := range userSizes {
c.metrics.chunksSizePutPerUser.WithLabelValues(user).Add(float64(size))
}
for user, num := range userCounts {
c.metrics.chunksPutPerUser.WithLabelValues(user).Add(float64(num))
}

return nil
}

func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
chks, err := c.client.GetChunks(ctx, chunks)
if err != nil {
return chks, err
}

// For GetChunks, userID is the chunk and we don't need to use context.
// For now, we just load one user chunks at once, but the interface lets us do it for multiple users.
userSizes := map[string]int{}
userCounts := map[string]int{}
for _, c := range chks {
userSizes[c.UserID] += c.Data.Size()
userCounts[c.UserID]++
}
for user, size := range userSizes {
c.metrics.chunksSizeFetchedPerUser.WithLabelValues(user).Add(float64(size))
}
for user, num := range userCounts {
c.metrics.chunksFetchedPerUser.WithLabelValues(user).Add(float64(num))
}

return chks, nil
}

func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error {
return c.client.DeleteChunk(ctx, chunkID)
}

0 comments on commit c79e198

Please sign in to comment.