From 8161b929060659069a7c860e9c5ea8708bc378f0 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 26 Jun 2020 10:45:20 -0400 Subject: [PATCH] Update to latest cortex. (#2266) * Update to latest cortex. Signed-off-by: Cyril Tovena * update go.sum Signed-off-by: Cyril Tovena --- go.mod | 2 +- go.sum | 10 +- .../cortex/pkg/chunk/chunk_store.go | 4 +- .../cortex/pkg/chunk/series_store.go | 4 +- .../cortex/pkg/distributor/distributor.go | 10 +- .../cortex/pkg/ingester/ingester_v2.go | 5 +- .../cortexproject/cortex/pkg/ruler/ruler.go | 3 + .../cortex/pkg/storage/tsdb/config.go | 2 +- .../cortex/pkg/storegateway/bucket_stores.go | 17 ++- .../cortex/tools/querytee/proxy.go | 16 +++ .../cortex/tools/querytee/proxy_metrics.go | 3 +- .../thanos-io/thanos/pkg/shipper/shipper.go | 122 +++++++++--------- .../thanos-io/thanos/pkg/store/bucket.go | 42 +++--- vendor/modules.txt | 4 +- 14 files changed, 142 insertions(+), 102 deletions(-) diff --git a/go.mod b/go.mod index 40f12e2506d9..eb28f56fde25 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/containerd/containerd v1.3.2 // indirect github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf - github.com/cortexproject/cortex v1.1.1-0.20200625134921-0ea2318b0174 + github.com/cortexproject/cortex v1.1.1-0.20200626122052-962b37ad100a github.com/davecgh/go-spew v1.1.1 github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/docker v0.7.3-0.20190817195342-4760db040282 diff --git a/go.sum b/go.sum index 48ab85eb11f6..0c2d8d7b0f08 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995/go.mod h1:3Xa3DjJxtpXqxcMGdk850lcIRb81M0fyY1MQ6udY134= -github.com/cortexproject/cortex v1.1.1-0.20200625134921-0ea2318b0174 h1:b9GU5db5TuOz9pry6Ed4Nyu/sWuxGjDe1dQrzJtUzlU= -github.com/cortexproject/cortex v1.1.1-0.20200625134921-0ea2318b0174/go.mod h1:oE48OnEVmpMi0prqknK/DnNCqIBAXZ6NIebKB5KHtYE= +github.com/cortexproject/cortex v1.1.1-0.20200626122052-962b37ad100a h1:E1IniiPCOF9AAxXj1n5+9oWygYtQId8UF93ghK1yJrM= +github.com/cortexproject/cortex v1.1.1-0.20200626122052-962b37ad100a/go.mod h1:++KnbcT+1Gz9Nmgfbe6vzRRRsJtm8skMF5aP5+RIhDM= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= @@ -1036,7 +1036,7 @@ github.com/prometheus/prometheus v0.0.0-20190818123050-43acd0e2e93f/go.mod h1:rM github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33 h1:HBYrMJj5iosUjUkAK9L5GO+5eEQXbcrzdjkqY9HV5W4= github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33/go.mod h1:fkIPPkuZnkXyopYHmXPxf9rgiPkVgZCN8w9o8+UgBlY= -github.com/prometheus/prometheus v1.8.2-0.20200609165731-66dfb951c4ca/go.mod h1:CwaXafRa0mm72de2GQWtfQxjGytbSKIGivWxQvjpRZs= +github.com/prometheus/prometheus v1.8.2-0.20200619100132-74207c04655e/go.mod h1:QV6T0PPQi5UFmqcLBJw3JiyIR8r1O7KEv9qlVw4VV40= github.com/prometheus/prometheus v1.8.2-0.20200622142935-153f859b7499 h1:q+yGm39CmSV1S7oxCz36nlvx9ugRoEodwuHusgJw+iU= github.com/prometheus/prometheus v1.8.2-0.20200622142935-153f859b7499/go.mod h1:QV6T0PPQi5UFmqcLBJw3JiyIR8r1O7KEv9qlVw4VV40= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1 h1:+kGqA4dNN5hn7WwvKdzHl0rdN5AEkbNZd0VjRltAiZg= @@ -1128,8 +1128,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d/go.mod h1:usT/TxtJQ7DzinTt+G9kinDQmRS5sxwu0unVKZ9vdcw= -github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f h1:UnMVEOejh6tWKUag5tuC0WjKfKmGwJ2+ky0MV4KM52I= -github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f/go.mod h1:CPqrM/ibNtlraee0to4dSRiTs+KLI1c3agMS2lmJpz0= +github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96 h1:McsluZ8fXVwGbdXsZ20uZNGukmPycDU9m6df64S2bqQ= +github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96/go.mod h1:VuNcGvUE0u57S1XXqYhf0dQzUO3wUnw2B5IKsju+1z4= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index 41eef9c41be5..4363696cf0df 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -424,11 +424,13 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro // Receive chunkSets from all matchers var chunkIDs []string var lastErr error + var initialized bool for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingChunkIDs: - if chunkIDs == nil { + if !initialized { chunkIDs = incoming + initialized = true } else { chunkIDs = intersectStrings(chunkIDs, incoming) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index a1e062f9770c..4792ca1ae2e0 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -305,12 +305,14 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from var lastErr error var cardinalityExceededErrors int var cardinalityExceededError CardinalityExceededError + var initialized bool for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingIDs: preIntersectionCount += len(incoming) - if ids == nil { + if !initialized { ids = incoming + initialized = true } else { ids = intersectStrings(ids, incoming) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor.go b/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor.go index 165887c7fbd8..c26642fc29b0 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor.go +++ b/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor.go @@ -154,6 +154,10 @@ type Config struct { // for testing ingesterClientFactory ring_client.PoolFactory `yaml:"-"` + + // when true the distributor does not validate labels at ingest time, Cortex doesn't directly use + // this (and should never use it) but this feature is used by other projects built on top of it + SkipLabelValidation bool `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -332,8 +336,10 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica // Returns the validated series with it's labels/samples, and any error. func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, userID string) (client.PreallocTimeseries, error) { labelsHistogram.Observe(float64(len(ts.Labels))) - if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil { - return emptyPreallocSeries, err + if !d.cfg.SkipLabelValidation { + if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil { + return emptyPreallocSeries, err + } } metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels) diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester_v2.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester_v2.go index 02a7531efbba..610cd79dee9b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester_v2.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester_v2.go @@ -898,7 +898,10 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { tsdbPromReg, udir, cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), - func() labels.Labels { return l }, metadata.ReceiveSource) + func() labels.Labels { return l }, + metadata.ReceiveSource, + true, // Allow out of order uploads. It's fine in Cortex's context. + ) } i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg) diff --git a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go index 6bc0fb3ee60e..341be3a36ea8 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler.go @@ -330,8 +330,11 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) { return n.notifier, nil } + reg := prometheus.WrapRegistererWith(prometheus.Labels{"user": userID}, r.registry) + reg = prometheus.WrapRegistererWithPrefix("cortex_", reg) n = newRulerNotifier(¬ifier.Options{ QueueCapacity: r.cfg.NotificationQueueCapacity, + Registerer: reg, Do: func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) { // Note: The passed-in context comes from the Prometheus notifier // and does *not* contain the userID. So it needs to be added to the context diff --git a/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go b/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go index d9e9f65084b4..f0beff5ad9cc 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go @@ -207,7 +207,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size - in bytes - of a per-tenant chunk pool, used to reduce memory allocations.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples per query when loading series from the long-term storage. 0 disables the limit.") - f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 20, "Max number of concurrent queries to execute against the long-term storage on a per-tenant basis.") + f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") f.IntVar(&cfg.TenantSyncConcurrency, "experimental.tsdb.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "experimental.tsdb.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "experimental.tsdb.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/vendor/github.com/cortexproject/cortex/pkg/storegateway/bucket_stores.go b/vendor/github.com/cortexproject/cortex/pkg/storegateway/bucket_stores.go index 820c9518021d..b480c41a70a2 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/storegateway/bucket_stores.go +++ b/vendor/github.com/cortexproject/cortex/pkg/storegateway/bucket_stores.go @@ -18,6 +18,7 @@ import ( "github.com/thanos-io/thanos/pkg/block" thanos_metadata "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store" storecache "github.com/thanos-io/thanos/pkg/store/cache" @@ -43,6 +44,9 @@ type BucketStores struct { // Index cache shared across all tenants. indexCache storecache.IndexCache + // Gate used to limit query concurrency across all tenants. + queryGate gate.Gate + // Keeps a bucket store for each tenant. storesMu sync.RWMutex stores map[string]*store.BucketStore @@ -59,6 +63,14 @@ func NewBucketStores(cfg tsdb.Config, filters []block.MetadataFilter, bucketClie return nil, errors.Wrapf(err, "create caching bucket") } + // The number of concurrent queries against the tenants BucketStores are limited. + queryGateReg := extprom.WrapRegistererWithPrefix("cortex_bucket_stores_", reg) + queryGate := gate.NewKeeper(queryGateReg).NewGate(cfg.BucketStore.MaxConcurrent) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_gate_queries_concurrent_max", + Help: "Number of maximum concurrent queries allowed.", + }).Set(float64(cfg.BucketStore.MaxConcurrent)) + u := &BucketStores{ logger: logger, cfg: cfg, @@ -68,6 +80,7 @@ func NewBucketStores(cfg tsdb.Config, filters []block.MetadataFilter, bucketClie logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), + queryGate: queryGate, syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_bucket_stores_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -268,9 +281,9 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro fetcher, filepath.Join(u.cfg.BucketStore.SyncDir, userID), u.indexCache, - uint64(u.cfg.BucketStore.MaxChunkPoolBytes), + u.queryGate, + u.cfg.BucketStore.MaxChunkPoolBytes, u.cfg.BucketStore.MaxSampleCount, - u.cfg.BucketStore.MaxConcurrent, u.logLevel.String() == "debug", // Turn on debug logging, if the log level is set to debug u.cfg.BucketStore.BlockSyncConcurrency, nil, // Do not limit timerange. diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go index 7d2802e2cf56..8a5b0a458b23 100644 --- a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go @@ -107,6 +107,21 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer pro return nil, errMinBackends } + // If the preferred backend is configured, then it must exists among the actual backends. + if cfg.PreferredBackend != "" { + exists := false + for _, b := range p.backends { + if b.preferred { + exists = true + break + } + } + + if !exists { + return nil, fmt.Errorf("the preferred backend (hostname) has not been found among the list of configured backends") + } + } + if cfg.CompareResponses && len(p.backends) != 2 { return nil, fmt.Errorf("when enabling comparison of results number of backends should be 2 exactly") } @@ -159,6 +174,7 @@ func (p *Proxy) Start() error { } }() + level.Info(p.logger).Log("msg", "The proxy is up and running.") return nil } diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go index 5fda10bbfbe3..ea126d12a659 100644 --- a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go @@ -3,7 +3,6 @@ package querytee import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/weaveworks/common/instrument" ) const ( @@ -23,7 +22,7 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { Namespace: "cortex_querytee", Name: "request_duration_seconds", Help: "Time (in seconds) spent serving HTTP requests.", - Buckets: instrument.DefBuckets, + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 0.75, 1, 1.5, 2, 3, 4, 5, 10, 25, 50, 100}, }, []string{"backend", "method", "route", "status_code"}), responsesTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex_querytee", diff --git a/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go b/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go index e3f1bec55493..5c961f4459fc 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go +++ b/vendor/github.com/thanos-io/thanos/pkg/shipper/shipper.go @@ -72,13 +72,15 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { // Shipper watches a directory for matching files and directories and uploads // them to a remote data store. type Shipper struct { - logger log.Logger - dir string - metrics *metrics - bucket objstore.Bucket - labels func() labels.Labels - source metadata.SourceType - uploadCompacted bool + logger log.Logger + dir string + metrics *metrics + bucket objstore.Bucket + labels func() labels.Labels + source metadata.SourceType + + uploadCompacted bool + allowOutOfOrderUploads bool } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -90,6 +92,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -99,12 +102,13 @@ func New( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, false), - source: source, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, false), + source: source, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -118,6 +122,7 @@ func NewWithCompacted( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -127,13 +132,14 @@ func NewWithCompacted( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, true), - source: source, - uploadCompacted: true, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, true), + source: source, + uploadCompacted: true, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -153,23 +159,23 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, 0, err + } + for _, m := range metas { if m.MinTime < minTime { minTime = m.MinTime } if _, ok := hasUploaded[m.ULID]; ok && m.MaxTime > maxSyncTime { maxSyncTime = m.MaxTime } - return nil - }); err != nil { - return 0, 0, errors.Wrap(err, "iter Block metas for timestamp") } if minTime == math.MaxInt64 { // No block yet found. We cannot assume any min block size so propagate 0 minTime. minTime = 0 } - return minTime, maxSyncTime, nil } @@ -272,72 +278,78 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { checker = newLazyOverlapChecker(s.logger, s.bucket, s.labels) uploadErrs int ) - // Sync non compacted blocks first. - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, err + } + for _, m := range metas { // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket, // it was generally removed by the compaction process. if _, uploaded := hasUploaded[m.ULID]; uploaded { meta.Uploaded = append(meta.Uploaded, m.ULID) - return nil + continue } if m.Stats.NumSamples == 0 { // Ignore empty blocks. level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID) - return nil + continue } // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { if !s.uploadCompacted { - return nil + continue } } // Check against bucket if the meta file for this block exists. ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) if err != nil { - return errors.Wrap(err, "check exists") + return 0, errors.Wrap(err, "check exists") } if ok { - return nil + continue } if m.Compaction.Level > 1 { if err := checker.IsOverlapping(ctx, m.BlockMeta); err != nil { + if !s.allowOutOfOrderUploads { + return 0, errors.Errorf("Found overlap or error during sync, cannot upload compacted block, details: %v", err) + } level.Error(s.logger).Log("msg", "found overlap or error during sync, cannot upload compacted block", "err", err) uploadErrs++ - return nil + continue } } if err := s.upload(ctx, m); err != nil { - level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) + if !s.allowOutOfOrderUploads { + return 0, errors.Wrapf(err, "upload %v", m.ULID) + } + // No error returned, just log line. This is because we want other blocks to be uploaded even // though this one failed. It will be retried on second Sync iteration. + level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) uploadErrs++ - return nil + continue } meta.Uploaded = append(meta.Uploaded, m.ULID) - uploaded++ s.metrics.uploads.Inc() - return nil - }); err != nil { - s.metrics.dirSyncFailures.Inc() - return uploaded, errors.Wrap(err, "iter local block metas") } - if err := WriteMetaFile(s.logger, s.dir, meta); err != nil { level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err) } s.metrics.dirSyncs.Inc() - if uploadErrs > 0 { s.metrics.uploadFailures.Add(float64(uploadErrs)) return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs) - } else if s.uploadCompacted { + } + + if s.uploadCompacted { s.metrics.uploadedCompacted.Set(1) } return uploaded, nil @@ -380,15 +392,12 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return block.Upload(ctx, s.logger, s.bucket, updir) } -// iterBlockMetas calls f with the block meta for each block found in dir -// sorted by minTime asc. It logs an error and continues if it cannot access a -// meta.json file. -// If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { - var metas []*metadata.Meta +// blockMetasFromOldest returns the block meta of each block found in dir +// sorted by minTime asc. +func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) { fis, err := ioutil.ReadDir(s.dir) if err != nil { - return errors.Wrap(err, "read dir") + return nil, errors.Wrap(err, "read dir") } names := make([]string, 0, len(fis)) for _, fi := range fis { @@ -402,28 +411,21 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { fi, err := os.Stat(dir) if err != nil { - level.Warn(s.logger).Log("msg", "open file failed", "err", err) - continue + return nil, errors.Wrapf(err, "stat block %v", dir) } if !fi.IsDir() { continue } m, err := metadata.Read(dir) if err != nil { - level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) - continue + return nil, errors.Wrapf(err, "read metadata for block %v", dir) } metas = append(metas, m) } sort.Slice(metas, func(i, j int) bool { return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime }) - for _, m := range metas { - if err := f(m); err != nil { - return err - } - } - return nil + return metas, nil } func hardlinkBlock(src, dst string) error { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index e5793d78c005..33e9d6891801 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -40,7 +40,6 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" @@ -101,7 +100,6 @@ type bucketStoreMetrics struct { resultSeriesCount prometheus.Summary chunkSizeBytes prometheus.Histogram queriesDropped prometheus.Counter - queriesLimit prometheus.Gauge seriesRefetches prometheus.Counter cachedPostingsCompressions *prometheus.CounterVec @@ -184,10 +182,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { Name: "thanos_bucket_store_queries_dropped_total", Help: "Number of queries that were dropped due to the sample limit.", }) - m.queriesLimit = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "thanos_bucket_store_queries_concurrent_max", - Help: "Number of maximum concurrent queries.", - }) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), @@ -273,9 +267,9 @@ func NewBucketStore( fetcher block.MetadataFetcher, dir string, indexCache storecache.IndexCache, + queryGate gate.Gate, maxChunkPoolBytes uint64, maxSampleCount uint64, - maxConcurrent int, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -288,10 +282,6 @@ func NewBucketStore( logger = log.NewNopLogger() } - if maxConcurrent < 0 { - return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent) - } - chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes) if err != nil { return nil, errors.Wrap(err, "create chunk pool") @@ -310,7 +300,7 @@ func NewBucketStore( debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, filterConfig: filterConfig, - queryGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrent), + queryGate: queryGate, samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, enableCompatibilityLabel: enableCompatibilityLabel, @@ -324,8 +314,6 @@ func NewBucketStore( return nil, errors.Wrap(err, "create dir") } - s.metrics.queriesLimit.Set(float64(maxConcurrent)) - return s, nil } @@ -844,14 +832,16 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { - tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { - err = s.queryGate.Start(srv.Context()) - }) - if err != nil { - return errors.Wrapf(err, "failed to wait for turn") - } + if s.queryGate != nil { + tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { + err = s.queryGate.Start(srv.Context()) + }) + if err != nil { + return errors.Wrapf(err, "failed to wait for turn") + } - defer s.queryGate.Done() + defer s.queryGate.Done() + } matchers, err := promclient.TranslateMatchers(req.Matchers) if err != nil { @@ -1329,11 +1319,15 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([] } defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader") - c, err := ioutil.ReadAll(r) - if err != nil { + // Preallocate the buffer with the exact size so we don't waste allocations + // while progressively growing an initial small buffer. The buffer capacity + // is increased by MinRead to avoid extra allocations due to how ReadFrom() + // internally works. + buf := bytes.NewBuffer(make([]byte, 0, length+bytes.MinRead)) + if _, err := buf.ReadFrom(r); err != nil { return nil, errors.Wrap(err, "read range") } - return c, nil + return buf.Bytes(), nil } func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64) (*[]byte, error) { diff --git a/vendor/modules.txt b/vendor/modules.txt index 9232c015cfdd..e24c7cecde06 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -149,7 +149,7 @@ github.com/coreos/go-systemd/journal github.com/coreos/go-systemd/sdjournal # github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f github.com/coreos/pkg/capnslog -# github.com/cortexproject/cortex v1.1.1-0.20200625134921-0ea2318b0174 +# github.com/cortexproject/cortex v1.1.1-0.20200626122052-962b37ad100a ## explicit github.com/cortexproject/cortex/pkg/alertmanager github.com/cortexproject/cortex/pkg/alertmanager/alerts @@ -769,7 +769,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-io/thanos v0.12.3-0.20200618165043-6c513e5f5c5f +# github.com/thanos-io/thanos v0.13.1-0.20200625180332-f078faed1b96 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader github.com/thanos-io/thanos/pkg/block/metadata