Skip to content

Commit

Permalink
Added -store.max-chunks-per-query limit support to blocks storage (#2852
Browse files Browse the repository at this point in the history
)

* Removed max-sample-count support and added -store.max-chunks-per-query limit support to blocks storage

Signed-off-by: Marco Pracucci <[email protected]>

* Added PR number to CHANGELOG

Signed-off-by: Marco Pracucci <[email protected]>

* Use atomic.LoadInt32() when reading back numChunks

Signed-off-by: Marco Pracucci <[email protected]>

* Use Uber atomic (safer and cleaner) instead of sync/atomic

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Jul 14, 2020
1 parent 6267964 commit 2b41aa3
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Metric `cortex_ingester_flush_reasons` has been renamed to `cortex_ingester_series_flushed_total`, and is now incremented during flush, not when series is enqueued for flushing. #2802
* [CHANGE] Experimental Delete Series: Metric `cortex_purger_oldest_pending_delete_request_age_seconds` would track age of delete requests since they are over their cancellation period instead of their creation time. #2806
* [CHANGE] Experimental TSDB: the store-gateway service is required in a Cortex cluster running with the experimental blocks storage. Removed the `-experimental.tsdb.store-gateway-enabled` CLI flag and `store_gateway_enabled` YAML config option. The store-gateway is now always enabled when the storage engine is `tsdb`. #2822
* [CHANGE] Experimental TSDB: removed support for `-experimental.tsdb.bucket-store.max-sample-count` flag because the implementation was flawed. To limit the number of samples/chunks processed by a single query you can set `-store.query-chunk-limit`, which is now supported by the blocks storage too. #2852
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
* [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826
* [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837
Expand Down Expand Up @@ -38,6 +39,7 @@
* [ENHANCEMENT] Experimental TSDB: Added `-experimental.tsdb.head-compaction-idle-timeout` option to force compaction of data in memory into a block. #2803
* [ENHANCEMENT] Experimental TSDB: Added support for flushing blocks via `/flush`, `/shutdown` (previously these only worked for chunks storage) and by using `-experimental.tsdb.flush-blocks-on-shutdown` option. #2794
* [ENHANCEMENT] Experimental TSDB: Added support to enforce max query time range length via `-store.max-query-length`. #2826
* [ENHANCEMENT] Experimental TSDB: Added support to limit the max number of chunks that can be fetched from the long-term storage while executing a query. The limit is configurable via `-store.query-chunk-limit`. #2852
* [ENHANCEMENT] Ingester: Added new metric `cortex_ingester_flush_series_in_progress` that reports number of ongoing flush-series operations. Useful when calling `/flush` handler: if `cortex_ingester_flush_queue_length + cortex_ingester_flush_series_in_progress` is 0, all flushes are finished. #2778
* [ENHANCEMENT] Memberlist members can join cluster via SRV records. #2788
* [ENHANCEMENT] Added configuration options for chunks s3 client. #2831
Expand Down
7 changes: 1 addition & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2669,7 +2669,7 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
[max_global_metadata_per_metric: <int> | default = 0]
# Maximum number of chunks that can be fetched in a single query. This limit is
# ignored when running the Cortex blocks storage.
# enforced when fetching chunks from the long-term storage.
# CLI flag: -store.query-chunk-limit
[max_chunks_per_query: <int> | default = 2000000]
Expand Down Expand Up @@ -2968,11 +2968,6 @@ bucket_store:
# CLI flag: -experimental.tsdb.bucket-store.max-chunk-pool-bytes
[max_chunk_pool_bytes: <int> | default = 2147483648]
# Max number of samples per query when loading series from the long-term
# storage. 0 disables the limit.
# CLI flag: -experimental.tsdb.bucket-store.max-sample-count
[max_sample_count: <int> | default = 0]
# Max number of concurrent queries to execute against the long-term storage.
# The limit is shared across all tenants.
# CLI flag: -experimental.tsdb.bucket-store.max-concurrent
Expand Down
5 changes: 0 additions & 5 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,6 @@ tsdb:
# CLI flag: -experimental.tsdb.bucket-store.max-chunk-pool-bytes
[max_chunk_pool_bytes: <int> | default = 2147483648]
# Max number of samples per query when loading series from the long-term
# storage. 0 disables the limit.
# CLI flag: -experimental.tsdb.bucket-store.max-sample-count
[max_sample_count: <int> | default = 0]
# Max number of concurrent queries to execute against the long-term storage.
# The limit is shared across all tenants.
# CLI flag: -experimental.tsdb.bucket-store.max-concurrent
Expand Down
10 changes: 5 additions & 5 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
var servs []services.Service

//nolint:golint // I prefer this form over removing 'else', because it allows q to have smaller scope.
if q, err := initQueryableForEngine(t.Cfg.Storage.Engine, t.Cfg, t.Store, prometheus.DefaultRegisterer); err != nil {
if q, err := initQueryableForEngine(t.Cfg.Storage.Engine, t.Cfg, t.Store, t.Overrides, prometheus.DefaultRegisterer); err != nil {
return nil, fmt.Errorf("failed to initialize querier for engine '%s': %v", t.Cfg.Storage.Engine, err)
} else {
t.StoreQueryables = append(t.StoreQueryables, querier.UseAlwaysQueryable(q))
Expand All @@ -219,7 +219,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
return nil, fmt.Errorf("second store engine used by querier '%s' must be different than primary engine '%s'", t.Cfg.Querier.SecondStoreEngine, t.Cfg.Storage.Engine)
}

sq, err := initQueryableForEngine(t.Cfg.Querier.SecondStoreEngine, t.Cfg, t.Store, prometheus.DefaultRegisterer)
sq, err := initQueryableForEngine(t.Cfg.Querier.SecondStoreEngine, t.Cfg, t.Store, t.Overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("failed to initialize querier for engine '%s': %v", t.Cfg.Querier.SecondStoreEngine, err)
}
Expand All @@ -245,7 +245,7 @@ func (t *Cortex) initStoreQueryables() (services.Service, error) {
}
}

func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, reg prometheus.Registerer) (prom_storage.Queryable, error) {
func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, limits *validation.Overrides, reg prometheus.Registerer) (prom_storage.Queryable, error) {
switch engine {
case storage.StorageEngineChunks:
if chunkStore == nil {
Expand All @@ -260,7 +260,7 @@ func initQueryableForEngine(engine string, cfg Config, chunkStore chunk.Store, r
cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort)
}

return querier.NewBlocksStoreQueryableFromConfig(cfg.Querier, cfg.StoreGateway, cfg.TSDB, util.Logger, reg)
return querier.NewBlocksStoreQueryableFromConfig(cfg.Querier, cfg.StoreGateway, cfg.TSDB, limits, util.Logger, reg)

default:
return nil, fmt.Errorf("unknown storage engine '%s'", engine)
Expand Down Expand Up @@ -588,7 +588,7 @@ func (t *Cortex) setupModuleManager() error {
Ingester: {Overrides, Store, API, RuntimeConfig, MemberlistKV},
Flusher: {Store, API},
Querier: {Overrides, Distributor, Store, Ring, API, StoreQueryable},
StoreQueryable: {Store},
StoreQueryable: {Overrides, Store},
QueryFrontend: {API, Overrides, DeleteRequestsStore},
TableManager: {API},
Ruler: {Overrides, Distributor, Store, StoreQueryable},
Expand Down
74 changes: 64 additions & 10 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
grpc_metadata "google.golang.org/grpc/metadata"

Expand All @@ -46,7 +47,8 @@ const (
)

var (
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks for %s (limit: %d)"
)

// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
Expand Down Expand Up @@ -78,6 +80,11 @@ type BlocksStoreClient interface {
RemoteAddress() string
}

// BlocksStoreLimits is the interface that should be implemented by the limits provider.
type BlocksStoreLimits interface {
MaxChunksPerQuery(userID string) int
}

type blocksStoreQueryableMetrics struct {
storesHit prometheus.Histogram
refetches prometheus.Histogram
Expand Down Expand Up @@ -111,13 +118,14 @@ type BlocksStoreQueryable struct {
logger log.Logger
queryStoreAfter time.Duration
metrics *blocksStoreQueryableMetrics
limits BlocksStoreLimits

// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
}

func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consistency *BlocksConsistencyChecker, queryStoreAfter time.Duration, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consistency *BlocksConsistencyChecker, limits BlocksStoreLimits, queryStoreAfter time.Duration, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
util.WarnExperimentalUse("Blocks storage engine")

manager, err := services.NewManager(stores, finder)
Expand All @@ -134,14 +142,15 @@ func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consist
subservices: manager,
subservicesWatcher: services.NewFailureWatcher(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: limits,
}

q.Service = services.NewBasicService(q.starting, q.running, q.stopping)

return q, nil
}

func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.Config, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.Config, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
var stores BlocksStoreSet

bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg, "querier", logger, reg)
Expand Down Expand Up @@ -209,7 +218,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
reg,
)

return NewBlocksStoreQueryable(stores, scanner, consistency, querierCfg.QueryStoreAfter, logger, reg)
return NewBlocksStoreQueryable(stores, scanner, consistency, limits, querierCfg.QueryStoreAfter, logger, reg)
}

func (q *BlocksStoreQueryable) starting(ctx context.Context) error {
Expand Down Expand Up @@ -256,6 +265,7 @@ func (q *BlocksStoreQueryable) Querier(ctx context.Context, mint, maxt int64) (s
finder: q.finder,
stores: q.stores,
metrics: q.metrics,
limits: q.limits,
consistency: q.consistency,
logger: q.logger,
queryStoreAfter: q.queryStoreAfter,
Expand All @@ -270,6 +280,7 @@ type blocksStoreQuerier struct {
stores BlocksStoreSet
metrics *blocksStoreQueryableMetrics
consistency *BlocksConsistencyChecker
limits BlocksStoreLimits
logger log.Logger

// If set, the querier manipulates the max time to not be greater than
Expand Down Expand Up @@ -357,6 +368,9 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
resSeriesSets = []storage.SeriesSet(nil)
resWarnings = storage.Warnings(nil)
resQueriedBlocks = []ulid.ULID(nil)

maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID)
leftChunksLimit = maxChunksLimit
)

for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ {
Expand All @@ -377,7 +391,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*

// Fetch series from stores. If an error occur we do not retry because retries
// are only meant to cover missing blocks.
seriesSets, queriedBlocks, warnings, err := q.fetchSeriesFromStores(spanCtx, clients, minT, maxT, convertedMatchers)
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand All @@ -387,6 +401,12 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
resWarnings = append(resWarnings, warnings...)
resQueriedBlocks = append(resQueriedBlocks, queriedBlocks...)

// Given a single block is guaranteed to not be queried twice, we can safely decrease the number of
// chunks we can still read before hitting the limit (max == 0 means disabled).
if maxChunksLimit > 0 {
leftChunksLimit -= numChunks
}

// Update the map of blocks we attempted to query.
for client, blockIDs := range clients {
touchedStores[client.RemoteAddress()] = struct{}{}
Expand Down Expand Up @@ -425,15 +445,19 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
matchers []storepb.LabelMatcher,
) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, error) {
matchers []*labels.Matcher,
convertedMatchers []storepb.LabelMatcher,
maxChunksLimit int,
leftChunksLimit int,
) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) {
var (
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID)
g, gCtx = errgroup.WithContext(reqCtx)
mtx = sync.Mutex{}
seriesSets = []storage.SeriesSet(nil)
warnings = storage.Warnings(nil)
queriedBlocks = []ulid.ULID(nil)
numChunks = atomic.NewInt32(0)
spanLog = spanlogger.FromContext(ctx)
)

Expand All @@ -444,7 +468,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
blockIDs := blockIDs

g.Go(func() error {
req, err := createSeriesRequest(minT, maxT, matchers, blockIDs)
req, err := createSeriesRequest(minT, maxT, convertedMatchers, blockIDs)
if err != nil {
return errors.Wrapf(err, "failed to create series request")
}
Expand All @@ -459,6 +483,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
myQueriedBlocks := []ulid.ULID(nil)

for {
// Ensure the context hasn't been canceled in the meanwhile (eg. an error occurred
// in another goroutine).
if gCtx.Err() != nil {
return gCtx.Err()
}

resp, err := stream.Recv()
if err == io.EOF {
break
Expand All @@ -470,6 +500,14 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
// Response may either contain series, warning or hints.
if s := resp.GetSeries(); s != nil {
mySeries = append(mySeries, s)

// Ensure the max number of chunks limit hasn't been reached (max == 0 means disabled).
if maxChunksLimit > 0 {
actual := numChunks.Add(int32(len(s.Chunks)))
if actual > int32(leftChunksLimit) {
return fmt.Errorf(errMaxChunksPerQueryLimit, convertMatchersToString(matchers), maxChunksLimit)
}
}
}

if w := resp.GetWarning(); w != "" {
Expand Down Expand Up @@ -511,10 +549,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(

// Wait until all client requests complete.
if err := g.Wait(); err != nil {
return nil, nil, nil, err
return nil, nil, nil, 0, err
}

return seriesSets, queriedBlocks, warnings, nil
return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil
}

func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) {
Expand Down Expand Up @@ -577,3 +615,19 @@ func countSeriesBytes(series []*storepb.Series) (count uint64) {

return count
}

func convertMatchersToString(matchers []*labels.Matcher) string {
out := strings.Builder{}
out.WriteRune('{')

for idx, m := range matchers {
if idx > 0 {
out.WriteRune(',')
}

out.WriteString(m.String())
}

out.WriteRune('}')
return out.String()
}
Loading

0 comments on commit 2b41aa3

Please sign in to comment.