diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index ac27cf0c27..e722959325 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -8,9 +8,10 @@ import ( "fmt" "time" + "github.com/alecthomas/units" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" + grpclogging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" "github.com/oklog/run" "github.com/opentracing/opentracing-go" @@ -18,6 +19,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" + commonmodel "github.com/prometheus/common/model" + blocksAPI "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -40,99 +43,144 @@ import ( "github.com/thanos-io/thanos/pkg/ui" ) -// registerStore registers a store command. -func registerStore(app *extkingpin.App) { - cmd := app.Command(component.Store.String(), "Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS.") +type storeConfig struct { + indexCacheConfigs extflag.PathOrContent + objStoreConfig extflag.PathOrContent + dataDir string + grpcConfig grpcConfig + httpConfig httpConfig + indexCacheSizeBytes units.Base2Bytes + chunkPoolSize units.Base2Bytes + maxSampleCount uint64 + maxTouchedSeriesCount uint64 + maxConcurrency int + component component.StoreAPI + debugLogging bool + syncInterval time.Duration + blockSyncConcurrency int + blockMetaFetchConcurrency int + filterConf *store.FilterConfig + selectorRelabelConf extflag.PathOrContent + advertiseCompatibilityLabel bool + consistencyDelay commonmodel.Duration + ignoreDeletionMarksDelay commonmodel.Duration + webConfig webConfig + postingOffsetsInMemSampling int + cachingBucketConfig extflag.PathOrContent + reqLogConfig *extflag.PathOrContent + lazyIndexReaderEnabled bool + lazyIndexReaderIdleTimeout time.Duration +} - httpBindAddr, httpGracePeriod := extkingpin.RegisterHTTPFlags(cmd) - grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA := extkingpin.RegisterGRPCFlags(cmd) +func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { + sc.httpConfig = *sc.httpConfig.registerFlag(cmd) + sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd) - dataDir := cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar."). - Default("./data").String() + cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar."). + Default("./data").StringVar(&sc.dataDir) - indexCacheSize := cmd.Flag("index-cache-size", "Maximum size of items held in the in-memory index cache. Ignored if --index-cache.config or --index-cache.config-file option is specified."). - Default("250MB").Bytes() + cmd.Flag("index-cache-size", "Maximum size of items held in the in-memory index cache. Ignored if --index-cache.config or --index-cache.config-file option is specified."). + Default("250MB").BytesVar(&sc.indexCacheSizeBytes) - indexCacheConfig := extflag.RegisterPathOrContent(cmd, "index-cache.config", + sc.indexCacheConfigs = *extflag.RegisterPathOrContent(cmd, "index-cache.config", "YAML file that contains index cache configuration. See format details: https://thanos.io/tip/components/store.md/#index-cache", false) - cachingBucketConfig := extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config", + sc.cachingBucketConfig = *extflag.RegisterPathOrContent(extflag.HiddenCmdClause(cmd), "store.caching-bucket.config", "YAML that contains configuration for caching bucket. Experimental feature, with high risk of changes. See format details: https://thanos.io/tip/components/store.md/#caching-bucket", false) - chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). - Default("2GB").Bytes() + cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory."). + Default("2GB").BytesVar(&sc.chunkPoolSize) - maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", + cmd.Flag("store.grpc.series-sample-limit", "Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit."). - Default("0").Uint() - maxTouchedSeriesCount := cmd.Flag("store.grpc.touched-series-limit", + Default("0").Uint64Var(&sc.maxSampleCount) + + cmd.Flag("store.grpc.touched-series-limit", "Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit."). - Default("0").Uint() + Default("0").Uint64Var(&sc.maxTouchedSeriesCount) + + cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").IntVar(&sc.maxConcurrency) - maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() + sc.component = component.Store - objStoreConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "", true) + sc.objStoreConfig = *extkingpin.RegisterCommonObjStoreFlags(cmd, "", true) - syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). - Default("3m").Duration() + cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). + Default("3m").DurationVar(&sc.syncInterval) - blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). - Default("20").Int() + cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). + Default("20").IntVar(&sc.blockSyncConcurrency) - blockMetaFetchConcurrency := cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). - Default("32").Int() + cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). + Default("32").IntVar(&sc.blockMetaFetchConcurrency) - minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). - Default("0000-01-01T00:00:00Z")) + sc.filterConf = &store.FilterConfig{} - maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store will serve only blocks, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). - Default("9999-12-31T23:59:59Z")) + cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("0000-01-01T00:00:00Z").SetValue(&sc.filterConf.MinTime) - advertiseCompatibilityLabel := cmd.Flag("debug.advertise-compatibility-label", "If true, Store Gateway in addition to other labels, will advertise special \"@thanos_compatibility_store_type=store\" label set. This makes store Gateway compatible with Querier before 0.8.0"). - Hidden().Default("true").Bool() + cmd.Flag("max-time", "End of time range limit to serve. Thanos Store will serve only blocks, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("9999-12-31T23:59:59Z").SetValue(&sc.filterConf.MaxTime) - selectorRelabelConf := extkingpin.RegisterSelectorRelabelFlags(cmd) + cmd.Flag("debug.advertise-compatibility-label", "If true, Store Gateway in addition to other labels, will advertise special \"@thanos_compatibility_store_type=store\" label set. This makes store Gateway compatible with Querier before 0.8.0"). + Hidden().Default("true").BoolVar(&sc.advertiseCompatibilityLabel) - postingOffsetsInMemSampling := cmd.Flag("store.index-header-posting-offsets-in-mem-sampling", "Controls what is the ratio of postings offsets store will hold in memory. "+ + sc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd) + + cmd.Flag("store.index-header-posting-offsets-in-mem-sampling", "Controls what is the ratio of postings offsets store will hold in memory. "+ "Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings. It's meant for setups that want low baseline memory pressure and where less traffic is expected. "+ "On the contrary, smaller value will increase baseline memory usage, but improve latency slightly. 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance."). - Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).Int() + Hidden().Default(fmt.Sprintf("%v", store.DefaultPostingOffsetInMemorySampling)).IntVar(&sc.postingOffsetsInMemSampling) - consistencyDelay := extkingpin.ModelDuration(cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). - Default("0s")) + cmd.Flag("consistency-delay", "Minimum age of all blocks before they are being read. Set it to safe value (e.g 30m) if your object storage is eventually consistent. GCS and S3 are (roughly) strongly consistent."). + Default("0s").SetValue(&sc.consistencyDelay) - ignoreDeletionMarksDelay := extkingpin.ModelDuration(cmd.Flag("ignore-deletion-marks-delay", "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+ + cmd.Flag("ignore-deletion-marks-delay", "Duration after which the blocks marked for deletion will be filtered out while fetching blocks. "+ "The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+ "If delete-delay duration is provided to compactor or bucket verify component, it will upload deletion-mark.json file to mark after what duration the block should be deleted rather than deleting the block straight away. "+ "If delete-delay is non-zero for compactor or bucket verify component, ignore-deletion-marks-delay should be set to (delete-delay)/2 so that blocks marked for deletion are filtered out while fetching blocks before being deleted from bucket. "+ "Default is 24h, half of the default value for --delete-delay on compactor."). - Default("24h")) + Default("24h").SetValue(&sc.ignoreDeletionMarksDelay) + + cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query."). + Default("false").BoolVar(&sc.lazyIndexReaderEnabled) - lazyIndexReaderEnabled := cmd.Flag("store.enable-index-header-lazy-reader", "If true, Store Gateway will lazy memory map index-header only once the block is required by a query."). - Default("false").Bool() + cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). + Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout) - lazyIndexReaderIdleTimeout := cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity."). - Hidden().Default("5m").Duration() + cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). + Default("").StringVar(&sc.webConfig.externalPrefix) + + cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path."). + Default("").StringVar(&sc.webConfig.prefixHeaderName) + + cmd.Flag("web.disable-cors", "Whether to disable CORS headers to be set by Thanos. By default Thanos sets CORS headers to be allowed by all."). + Default("false").BoolVar(&sc.webConfig.disableCORS) + + sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) +} + +// registerStore registers a store command. +func registerStore(app *extkingpin.App) { + cmd := app.Command(component.Store.String(), "Store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift, Tencent COS and Aliyun OSS.") - webExternalPrefix := cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").Default("").String() - webPrefixHeaderName := cmd.Flag("web.prefix-header", "Name of HTTP request header used for dynamic prefixing of UI links and redirects. This option is ignored if web.external-prefix argument is set. Security risk: enable this option only if a reverse proxy in front of thanos is resetting the header. The --web.prefix-header=X-Forwarded-Prefix option can be useful, for example, if Thanos UI is served via Traefik reverse proxy with PathPrefixStrip option enabled, which sends the stripped prefix value in X-Forwarded-Prefix header. This allows thanos UI to be served on a sub-path.").Default("").String() - webDisableCORS := cmd.Flag("web.disable-cors", "Whether to disable CORS headers to be set by Thanos. By default Thanos sets CORS headers to be allowed by all.").Default("false").Bool() - reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd) + conf := &storeConfig{} + conf.registerFlag(cmd) cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error { - if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { + if conf.filterConf.MinTime.PrometheusTimestamp() > conf.filterConf.MaxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", - minTime, maxTime) + conf.filterConf.MinTime, conf.filterConf.MaxTime) } - httpLogOpts, err := logging.ParseHTTPOptions("", reqLogConfig) + httpLogOpts, err := logging.ParseHTTPOptions("", conf.reqLogConfig) if err != nil { return errors.Wrap(err, "error while parsing config for request logging") } - tagOpts, grpcLogOpts, err := logging.ParsegRPCOptions("", reqLogConfig) + tagOpts, grpcLogOpts, err := logging.ParsegRPCOptions("", conf.reqLogConfig) if err != nil { return errors.Wrap(err, "error while parsing config for request logging") } @@ -144,42 +192,8 @@ func registerStore(app *extkingpin.App) { httpLogOpts, grpcLogOpts, tagOpts, - indexCacheConfig, - objStoreConfig, - *dataDir, - *grpcBindAddr, - time.Duration(*grpcGracePeriod), - *grpcCert, - *grpcKey, - *grpcClientCA, - *httpBindAddr, - time.Duration(*httpGracePeriod), - uint64(*indexCacheSize), - uint64(*chunkPoolSize), - uint64(*maxSampleCount), - uint64(*maxTouchedSeriesCount), - *maxConcurrent, - component.Store, - debugLogging, - *syncInterval, - *blockSyncConcurrency, - *blockMetaFetchConcurrency, - &store.FilterConfig{ - MinTime: *minTime, - MaxTime: *maxTime, - }, - selectorRelabelConf, - *advertiseCompatibilityLabel, - time.Duration(*consistencyDelay), - time.Duration(*ignoreDeletionMarksDelay), - *webExternalPrefix, - *webPrefixHeaderName, - *webDisableCORS, - *postingOffsetsInMemSampling, - cachingBucketConfig, + *conf, getFlagsMap(cmd.Flags()), - *lazyIndexReaderEnabled, - *lazyIndexReaderIdleTimeout, ) }) } @@ -191,46 +205,22 @@ func runStore( reg *prometheus.Registry, tracer opentracing.Tracer, httpLogOpts []logging.Option, - grpcLogOpts []grpc_logging.Option, + grpcLogOpts []grpclogging.Option, tagOpts []tags.Option, - indexCacheConfig *extflag.PathOrContent, - objStoreConfig *extflag.PathOrContent, - dataDir string, - grpcBindAddr string, - grpcGracePeriod time.Duration, - grpcCert, grpcKey, grpcClientCA, httpBindAddr string, - httpGracePeriod time.Duration, - indexCacheSizeBytes, chunkPoolSizeBytes, maxSampleCount, maxSeriesCount uint64, - maxConcurrency int, - component component.Component, - verbose bool, - syncInterval time.Duration, - blockSyncConcurrency int, - metaFetchConcurrency int, - filterConf *store.FilterConfig, - selectorRelabelConf *extflag.PathOrContent, - advertiseCompatibilityLabel bool, - consistencyDelay time.Duration, - ignoreDeletionMarksDelay time.Duration, - externalPrefix, prefixHeader string, - disableCORS bool, - postingOffsetsInMemSampling int, - cachingBucketConfig *extflag.PathOrContent, + conf storeConfig, flagsMap map[string]string, - lazyIndexReaderEnabled bool, - lazyIndexReaderIdleTimeout time.Duration, ) error { grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( httpProbe, grpcProbe, - prober.NewInstrumentation(component, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), + prober.NewInstrumentation(conf.component, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) - srv := httpserver.New(logger, reg, component, httpProbe, - httpserver.WithListen(httpBindAddr), - httpserver.WithGracePeriod(httpGracePeriod), + srv := httpserver.New(logger, reg, conf.component, httpProbe, + httpserver.WithListen(conf.httpConfig.bindAddress), + httpserver.WithGracePeriod(time.Duration(conf.httpConfig.gracePeriod)), ) g.Add(func() error { @@ -244,17 +234,17 @@ func runStore( srv.Shutdown(err) }) - confContentYaml, err := objStoreConfig.Content() + confContentYaml, err := conf.objStoreConfig.Content() if err != nil { return err } - bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String()) + bkt, err := client.NewBucket(logger, confContentYaml, reg, conf.component.String()) if err != nil { return errors.Wrap(err, "create bucket client") } - cachingBucketConfigYaml, err := cachingBucketConfig.Content() + cachingBucketConfigYaml, err := conf.cachingBucketConfig.Content() if err != nil { return errors.Wrap(err, "get caching bucket configuration") } @@ -265,7 +255,7 @@ func runStore( } } - relabelContentYaml, err := selectorRelabelConf.Content() + relabelContentYaml, err := conf.selectorRelabelConf.Content() if err != nil { return errors.Wrap(err, "get content of relabel configuration") } @@ -275,7 +265,7 @@ func runStore( return err } - indexCacheContentYaml, err := indexCacheConfig.Content() + indexCacheContentYaml, err := conf.indexCacheConfigs.Content() if err != nil { return errors.Wrap(err, "get content of index cache configuration") } @@ -294,7 +284,7 @@ func runStore( indexCache, err = storecache.NewIndexCache(logger, indexCacheContentYaml, reg) } else { indexCache, err = storecache.NewInMemoryIndexCacheWithConfig(logger, reg, storecache.InMemoryIndexCacheConfig{ - MaxSize: model.Bytes(indexCacheSizeBytes), + MaxSize: model.Bytes(conf.indexCacheSizeBytes), MaxItemSize: storecache.DefaultInMemoryIndexCacheConfig.MaxItemSize, }) } @@ -302,12 +292,12 @@ func runStore( return errors.Wrap(err, "create index cache") } - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay, metaFetchConcurrency) - metaFetcher, err := block.NewMetaFetcher(logger, metaFetchConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency) + metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, bkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), + block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)), + block.NewConsistencyDelayMetaFilter(logger, time.Duration(conf.consistencyDelay), extprom.WrapRegistererWithPrefix("thanos_", reg)), ignoreDeletionMarkFilter, block.NewDeduplicateFilter(), }, nil) @@ -316,13 +306,13 @@ func runStore( } // Limit the concurrency on queries against the Thanos store. - if maxConcurrency < 0 { - return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency) + if conf.maxConcurrency < 0 { + return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", conf.maxConcurrency) } - queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), maxConcurrency) + queriesGate := gate.New(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), int(conf.maxConcurrency)) - chunkPool, err := store.NewDefaultChunkBytesPool(chunkPoolSizeBytes) + chunkPool, err := store.NewDefaultChunkBytesPool(uint64(conf.chunkPoolSize)) if err != nil { return errors.Wrap(err, "create chunk pool") } @@ -333,26 +323,26 @@ func runStore( store.WithIndexCache(indexCache), store.WithQueryGate(queriesGate), store.WithChunkPool(chunkPool), - store.WithFilterConfig(filterConf), + store.WithFilterConfig(conf.filterConf), } - if verbose { + if conf.debugLogging { options = append(options, store.WithDebugLogging()) } bs, err := store.NewBucketStore( bkt, metaFetcher, - dataDir, - store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. - store.NewSeriesLimiterFactory(maxSeriesCount), + conf.dataDir, + store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. + store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount), store.NewGapBasedPartitioner(store.PartitionerMaxGapSize), - blockSyncConcurrency, - advertiseCompatibilityLabel, - postingOffsetsInMemSampling, + conf.blockSyncConcurrency, + conf.advertiseCompatibilityLabel, + conf.postingOffsetsInMemSampling, false, - lazyIndexReaderEnabled, - lazyIndexReaderIdleTimeout, + conf.lazyIndexReaderEnabled, + conf.lazyIndexReaderIdleTimeout, options..., ) if err != nil { @@ -375,7 +365,7 @@ func runStore( level.Info(logger).Log("msg", "bucket store ready", "init_duration", time.Since(begin).String()) close(bucketStoreReady) - err := runutil.Repeat(syncInterval, ctx.Done(), func() error { + err := runutil.Repeat(conf.syncInterval, ctx.Done(), func() error { if err := bs.SyncBlocks(ctx); err != nil { level.Warn(logger).Log("msg", "syncing blocks failed", "err", err) } @@ -390,15 +380,15 @@ func runStore( } // Start query (proxy) gRPC StoreAPI. { - tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcCert, grpcKey, grpcClientCA) + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA) if err != nil { return errors.Wrap(err, "setup gRPC server") } - s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, component, grpcProbe, + s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe, grpcserver.WithServer(store.RegisterStoreServer(bs)), - grpcserver.WithListen(grpcBindAddr), - grpcserver.WithGracePeriod(grpcGracePeriod), + grpcserver.WithListen(conf.grpcConfig.bindAddress), + grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)), grpcserver.WithTLSConfig(tlsCfg), ) @@ -416,12 +406,12 @@ func runStore( r := route.New() ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) - compactorView := ui.NewBucketUI(logger, "", externalPrefix, prefixHeader, "/loaded", component) + compactorView := ui.NewBucketUI(logger, "", conf.webConfig.externalPrefix, conf.webConfig.prefixHeaderName, "/loaded", conf.component) compactorView.Register(r, true, ins) // Configure Request Logging for HTTP calls. logMiddleware := logging.NewHTTPServerMiddleware(logger, httpLogOpts...) - api := blocksAPI.NewBlocksAPI(logger, disableCORS, "", flagsMap) + api := blocksAPI.NewBlocksAPI(logger, conf.webConfig.disableCORS, "", flagsMap) api.Register(r.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) metaFetcher.UpdateOnChange(func(blocks []metadata.Meta, err error) {