From 4169170cfa9674edc29b012a5550c2dee77c2507 Mon Sep 17 00:00:00 2001 From: Xiang Dai <764524258@qq.com> Date: Mon, 3 Feb 2020 12:38:04 +0800 Subject: [PATCH 1/5] scripts/genflagdocs.sh: Generate downsample flag Signed-off-by: Xiang Dai <764524258@qq.com> --- docs/components/downsample.md | 67 +++++++++++++++++++++++++++++++++++ scripts/genflagdocs.sh | 2 +- 2 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 docs/components/downsample.md diff --git a/docs/components/downsample.md b/docs/components/downsample.md new file mode 100644 index 0000000000..7be66d1c6b --- /dev/null +++ b/docs/components/downsample.md @@ -0,0 +1,67 @@ +--- +title: Downsample +type: docs +menu: components +--- + +# Downsample + +The downsample component of Thanos implements the downsample API on top of historical data in an object storage bucket. It continuously downsamples blocks in an object store bucket as a service. + +```bash +$ thanos downsample \ + --data-dir "/local/state/data/dir" \ + --objstore.config-file "bucket.yml" +``` + +The content of `bucket.yml`: + +```yaml +type: GCS +config: + bucket: example-bucket +``` + +## Flags + +[embedmd]:# (flags/downsample.txt $) +```$ +usage: thanos downsample [] + +continuously downsamples blocks in an object store bucket + +Flags: + -h, --help Show context-sensitive help (also try --help-long and + --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. + --tracing.config-file= + Path to YAML file with tracing configuration. See format + details: https://thanos.io/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag (lower priority). + Content of YAML file with tracing configuration. See format + details: https://thanos.io/tracing.md/#configuration + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. + --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. + --data-dir="./data" Data directory in which to cache blocks and process + downsamplings. + --objstore.config-file= + Path to YAML file that contains object store configuration. + See format details: + https://thanos.io/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag (lower priority). + Content of YAML file that contains object store + configuration. See format details: + https://thanos.io/storage.md/#configuration + +## Probes + +- Thanos downsample exposes two endpoints for probing. + - `/-/healthy` starts as soon as initial setup completed. + - `/-/ready` starts after all the bootstrapping completed (e.g object store bucket connection) and ready to serve traffic. + +> NOTE: Metric endpoint starts immediately so, make sure you set up readiness probe on designated HTTP `/-/ready` path. diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index 9583934f42..471ca7fb95 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -34,7 +34,7 @@ fi CHECK=${1:-} -commands=("compact" "query" "rule" "sidecar" "store" "bucket" "check") +commands=("compact" "query" "rule" "sidecar" "store" "downsample" "bucket" "check") for x in "${commands[@]}"; do ./thanos "${x}" --help &> "docs/components/flags/${x}.txt" From f880d14b6fe620442acbaabdffb45c1eb1df42b7 Mon Sep 17 00:00:00 2001 From: Xiang Dai <764524258@qq.com> Date: Mon, 3 Feb 2020 12:47:43 +0800 Subject: [PATCH 2/5] Document downsample component Signed-off-by: Xiang Dai <764524258@qq.com> --- docs/components/downsample.md | 45 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/docs/components/downsample.md b/docs/components/downsample.md index 7be66d1c6b..2d0f9c22f1 100644 --- a/docs/components/downsample.md +++ b/docs/components/downsample.md @@ -31,32 +31,37 @@ usage: thanos downsample [] continuously downsamples blocks in an object store bucket Flags: - -h, --help Show context-sensitive help (also try --help-long and - --help-man). + -h, --help Show context-sensitive help (also try --help-long + and --help-man). --version Show application version. --log.level=info Log filtering level. --log.format=logfmt Log format to use. - --tracing.config-file= - Path to YAML file with tracing configuration. See format - details: https://thanos.io/tracing.md/#configuration - --tracing.config= - Alternative to 'tracing.config-file' flag (lower priority). - Content of YAML file with tracing configuration. See format - details: https://thanos.io/tracing.md/#configuration - --http-address="0.0.0.0:10902" + --tracing.config-file= + Path to YAML file with tracing configuration. See + format details: + https://thanos.io/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag (lower + priority). Content of YAML file with tracing + configuration. See format details: + https://thanos.io/tracing.md/#configuration + --http-address="0.0.0.0:10902" Listen host:port for HTTP endpoints. - --http-grace-period=2m Time to wait after an interrupt received for HTTP Server. - --data-dir="./data" Data directory in which to cache blocks and process - downsamplings. - --objstore.config-file= - Path to YAML file that contains object store configuration. - See format details: - https://thanos.io/storage.md/#configuration - --objstore.config= - Alternative to 'objstore.config-file' flag (lower priority). - Content of YAML file that contains object store + --http-grace-period=2m Time to wait after an interrupt received for HTTP + Server. + --data-dir="./data" Data directory in which to cache blocks and + process downsamplings. + --objstore.config-file= + Path to YAML file that contains object store configuration. See format details: https://thanos.io/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag (lower + priority). Content of YAML file that contains + object store configuration. See format details: + https://thanos.io/storage.md/#configuration + +``` ## Probes From 6a28ac327ff6d8097942d950c62584eafc0db98e Mon Sep 17 00:00:00 2001 From: Xiang Dai <764524258@qq.com> Date: Tue, 10 Mar 2020 17:26:31 +0800 Subject: [PATCH 3/5] Move downsample as bucket sub-command Signed-off-by: Xiang Dai <764524258@qq.com> --- cmd/thanos/bucket.go | 17 ++ pkg/downsample/downsample.go | 300 +++++++++++++++++++++++++++++++++++ 2 files changed, 317 insertions(+) create mode 100644 pkg/downsample/downsample.go diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index d4ed513cce..f2388b84e0 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -29,6 +29,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + downsamplepkg "github.com/thanos-io/thanos/pkg/downsample" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -73,6 +74,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin registerBucketInspect(m, cmd, name, objStoreConfig) registerBucketWeb(m, cmd, name, objStoreConfig) registerBucketReplicate(m, cmd, name, objStoreConfig) + registerBucketDownsample(m, cmd, name, objStoreConfig) } func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { @@ -424,6 +426,21 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na } +func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, name string, objStoreConfig *extflag.PathOrContent) { + + comp := component.Downsample + cmd := root.Command(comp.String(), "continuously downsamples blocks in an object store bucket") + + httpAddr, httpGracePeriod := regHTTPFlags(cmd) + + dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). + Default("./data").String() + + m[name+" "+comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { + return downsamplepkg.RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp) + } +} + // refresh metadata from remote storage periodically and update UI. func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error { confContentYaml, err := objStoreConfig.Content() diff --git a/pkg/downsample/downsample.go b/pkg/downsample/downsample.go new file mode 100644 index 0000000000..b6c30de8c6 --- /dev/null +++ b/pkg/downsample/downsample.go @@ -0,0 +1,300 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package downsample + +import ( + "context" + "os" + "path/filepath" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/run" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/prober" + "github.com/thanos-io/thanos/pkg/runutil" + httpserver "github.com/thanos-io/thanos/pkg/server/http" +) + +type DownsampleMetrics struct { + downsamples *prometheus.CounterVec + downsampleFailures *prometheus.CounterVec +} + +func newDownsampleMetrics(reg *prometheus.Registry) *DownsampleMetrics { + m := new(DownsampleMetrics) + + m.downsamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_downsample_total", + Help: "Total number of downsampling attempts.", + }, []string{"group"}) + m.downsampleFailures = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_downsample_failures_total", + Help: "Total number of failed downsampling attempts.", + }, []string{"group"}) + + return m +} + +func RunDownsample( + g *run.Group, + logger log.Logger, + reg *prometheus.Registry, + httpBindAddr string, + httpGracePeriod time.Duration, + dataDir string, + objStoreConfig *extflag.PathOrContent, + comp component.Component, +) error { + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Downsample.String()) + if err != nil { + return err + } + + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) + if err != nil { + return errors.Wrap(err, "create meta fetcher") + } + + // Ensure we close up everything properly. + defer func() { + if err != nil { + runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + } + }() + + httpProbe := prober.NewHTTP() + statusProber := prober.Combine( + httpProbe, + prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)), + ) + + metrics := newDownsampleMetrics(reg) + // Start cycle of syncing blocks from the bucket and garbage collecting the bucket. + { + ctx, cancel := context.WithCancel(context.Background()) + + g.Add(func() error { + defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + statusProber.Ready() + + level.Info(logger).Log("msg", "start first pass of downsampling") + + if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { + return errors.Wrap(err, "downsampling failed") + } + + level.Info(logger).Log("msg", "start second pass of downsampling") + + if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { + return errors.Wrap(err, "downsampling failed") + } + + return nil + }, func(error) { + cancel() + }) + } + + srv := httpserver.New(logger, reg, comp, httpProbe, + httpserver.WithListen(httpBindAddr), + httpserver.WithGracePeriod(httpGracePeriod), + ) + + g.Add(func() error { + statusProber.Healthy() + + return srv.ListenAndServe() + }, func(err error) { + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + srv.Shutdown(err) + }) + + level.Info(logger).Log("msg", "starting downsample node") + return nil +} + +func downsampleBucket( + ctx context.Context, + logger log.Logger, + metrics *DownsampleMetrics, + bkt objstore.Bucket, + fetcher block.MetadataFetcher, + dir string, +) error { + if err := os.RemoveAll(dir); err != nil { + return errors.Wrap(err, "clean working directory") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return errors.Wrap(err, "create dir") + } + + defer func() { + if err := os.RemoveAll(dir); err != nil { + level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err) + } + }() + + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "downsampling meta fetch") + } + + // mapping from a hash over all source IDs to blocks. We don't need to downsample a block + // if a downsampled version with the same hash already exists. + sources5m := map[ulid.ULID]struct{}{} + sources1h := map[ulid.ULID]struct{}{} + + for _, m := range metas { + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel0: + continue + case downsample.ResLevel1: + for _, id := range m.Compaction.Sources { + sources5m[id] = struct{}{} + } + case downsample.ResLevel2: + for _, id := range m.Compaction.Sources { + sources1h[id] = struct{}{} + } + default: + return errors.Errorf("unexpected downsampling resolution %d", m.Thanos.Downsample.Resolution) + } + } + + for _, m := range metas { + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel0: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources5m[id]; !ok { + missing = true + break + } + } + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { + continue + } + if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil { + metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)).Inc() + return errors.Wrap(err, "downsampling to 5 min") + } + metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)).Inc() + + case downsample.ResLevel1: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources1h[id]; !ok { + missing = true + break + } + } + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { + continue + } + if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil { + metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)) + return errors.Wrap(err, "downsampling to 60 min") + } + metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)) + } + } + return nil +} + +func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error { + begin := time.Now() + bdir := filepath.Join(dir, m.ULID.String()) + + err := block.Download(ctx, logger, bkt, m.ULID, bdir) + if err != nil { + return errors.Wrapf(err, "download block %s", m.ULID) + } + level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin)) + + if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { + return errors.Wrap(err, "input block index not valid") + } + + begin = time.Now() + + var pool chunkenc.Pool + if m.Thanos.Downsample.Resolution == 0 { + pool = chunkenc.NewPool() + } else { + pool = downsample.NewPool() + } + + b, err := tsdb.OpenBlock(logger, bdir, pool) + if err != nil { + return errors.Wrapf(err, "open block %s", m.ULID) + } + defer runutil.CloseWithLogOnErr(log.With(logger, "outcome", "potential left mmap file handlers left"), b, "tsdb reader") + + id, err := downsample.Downsample(logger, m, b, dir, resolution) + if err != nil { + return errors.Wrapf(err, "downsample block %s to window %d", m.ULID, resolution) + } + resdir := filepath.Join(dir, id.String()) + + level.Info(logger).Log("msg", "downsampled block", + "from", m.ULID, "to", id, "duration", time.Since(begin)) + + if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { + return errors.Wrap(err, "output block index not valid") + } + + begin = time.Now() + + err = block.Upload(ctx, logger, bkt, resdir) + if err != nil { + return errors.Wrapf(err, "upload downsampled block %s", id) + } + + level.Info(logger).Log("msg", "uploaded block", "id", id, "duration", time.Since(begin)) + + // It is not harmful if these fails. + if err := os.RemoveAll(bdir); err != nil { + level.Warn(logger).Log("msg", "failed to clean directory", "dir", bdir, "err", err) + } + if err := os.RemoveAll(resdir); err != nil { + level.Warn(logger).Log("msg", "failed to clean directory", "resdir", bdir, "err", err) + } + + return nil +} From 6167093809b0080894559fb00b6c6299030099bf Mon Sep 17 00:00:00 2001 From: Xiang Dai <764524258@qq.com> Date: Tue, 10 Mar 2020 17:40:03 +0800 Subject: [PATCH 4/5] update docs Signed-off-by: Xiang Dai <764524258@qq.com> --- docs/components/bucket.md | 71 ++++++++++++++++++++++++++++++++++ docs/components/downsample.md | 72 ----------------------------------- scripts/genflagdocs.sh | 4 +- 3 files changed, 73 insertions(+), 74 deletions(-) delete mode 100644 docs/components/downsample.md diff --git a/docs/components/bucket.md b/docs/components/bucket.md index 3dc3c7aaf9..8c2149f40b 100644 --- a/docs/components/bucket.md +++ b/docs/components/bucket.md @@ -78,6 +78,9 @@ Subcommands: Replicate data from one object storage to another. NOTE: Currently it works only with Thanos blocks (meta.json has to have Thanos metadata). + bucket downsample [] + continuously downsamples blocks in an object store bucket + ``` @@ -389,3 +392,71 @@ Flags: --single-run Run replication only one time, then exit. ``` + +### Downsample + +The downsample component of Thanos implements the downsample API on top of historical data in an object storage bucket. It continuously downsamples blocks in an object store bucket as a service. + +```bash +$ thanos downsample \ + --data-dir "/local/state/data/dir" \ + --objstore.config-file "bucket.yml" +``` + +The content of `bucket.yml`: + +```yaml +type: GCS +config: + bucket: example-bucket +``` + +#### Flags + +[embedmd]:# (flags/bucket_downsample.txt $) +```$ +usage: thanos bucket downsample [] + +continuously downsamples blocks in an object store bucket + +Flags: + -h, --help Show context-sensitive help (also try --help-long + and --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. Possible options: logfmt or + json. + --tracing.config-file= + Path to YAML file with tracing configuration. See + format details: + https://thanos.io/tracing.md/#configuration + --tracing.config= + Alternative to 'tracing.config-file' flag (lower + priority). Content of YAML file with tracing + configuration. See format details: + https://thanos.io/tracing.md/#configuration + --objstore.config-file= + Path to YAML file that contains object store + configuration. See format details: + https://thanos.io/storage.md/#configuration + --objstore.config= + Alternative to 'objstore.config-file' flag (lower + priority). Content of YAML file that contains + object store configuration. See format details: + https://thanos.io/storage.md/#configuration + --http-address="0.0.0.0:10902" + Listen host:port for HTTP endpoints. + --http-grace-period=2m Time to wait after an interrupt received for HTTP + Server. + --data-dir="./data" Data directory in which to cache blocks and + process downsamplings. + +``` + +#### Probes + +- Thanos downsample exposes two endpoints for probing. + - `/-/healthy` starts as soon as initial setup completed. + - `/-/ready` starts after all the bootstrapping completed (e.g object store bucket connection) and ready to serve traffic. + +> NOTE: Metric endpoint starts immediately so, make sure you set up readiness probe on designated HTTP `/-/ready` path. diff --git a/docs/components/downsample.md b/docs/components/downsample.md deleted file mode 100644 index 2d0f9c22f1..0000000000 --- a/docs/components/downsample.md +++ /dev/null @@ -1,72 +0,0 @@ ---- -title: Downsample -type: docs -menu: components ---- - -# Downsample - -The downsample component of Thanos implements the downsample API on top of historical data in an object storage bucket. It continuously downsamples blocks in an object store bucket as a service. - -```bash -$ thanos downsample \ - --data-dir "/local/state/data/dir" \ - --objstore.config-file "bucket.yml" -``` - -The content of `bucket.yml`: - -```yaml -type: GCS -config: - bucket: example-bucket -``` - -## Flags - -[embedmd]:# (flags/downsample.txt $) -```$ -usage: thanos downsample [] - -continuously downsamples blocks in an object store bucket - -Flags: - -h, --help Show context-sensitive help (also try --help-long - and --help-man). - --version Show application version. - --log.level=info Log filtering level. - --log.format=logfmt Log format to use. - --tracing.config-file= - Path to YAML file with tracing configuration. See - format details: - https://thanos.io/tracing.md/#configuration - --tracing.config= - Alternative to 'tracing.config-file' flag (lower - priority). Content of YAML file with tracing - configuration. See format details: - https://thanos.io/tracing.md/#configuration - --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. - --http-grace-period=2m Time to wait after an interrupt received for HTTP - Server. - --data-dir="./data" Data directory in which to cache blocks and - process downsamplings. - --objstore.config-file= - Path to YAML file that contains object store - configuration. See format details: - https://thanos.io/storage.md/#configuration - --objstore.config= - Alternative to 'objstore.config-file' flag (lower - priority). Content of YAML file that contains - object store configuration. See format details: - https://thanos.io/storage.md/#configuration - -``` - -## Probes - -- Thanos downsample exposes two endpoints for probing. - - `/-/healthy` starts as soon as initial setup completed. - - `/-/ready` starts after all the bootstrapping completed (e.g object store bucket connection) and ready to serve traffic. - -> NOTE: Metric endpoint starts immediately so, make sure you set up readiness probe on designated HTTP `/-/ready` path. diff --git a/scripts/genflagdocs.sh b/scripts/genflagdocs.sh index 471ca7fb95..cfd09bbbb6 100755 --- a/scripts/genflagdocs.sh +++ b/scripts/genflagdocs.sh @@ -34,13 +34,13 @@ fi CHECK=${1:-} -commands=("compact" "query" "rule" "sidecar" "store" "downsample" "bucket" "check") +commands=("compact" "query" "rule" "sidecar" "store" "bucket" "check") for x in "${commands[@]}"; do ./thanos "${x}" --help &> "docs/components/flags/${x}.txt" done -bucketCommands=("verify" "ls" "inspect" "web" "replicate") +bucketCommands=("verify" "ls" "inspect" "web" "replicate" "downsample") for x in "${bucketCommands[@]}"; do ./thanos bucket "${x}" --help &> "docs/components/flags/bucket_${x}.txt" done From b78cba5b643ae77775928daaead9d9f412e84069 Mon Sep 17 00:00:00 2001 From: Xiang Dai <764524258@qq.com> Date: Sat, 14 Mar 2020 09:44:50 +0800 Subject: [PATCH 5/5] feedback Signed-off-by: Xiang Dai <764524258@qq.com> --- cmd/thanos/bucket.go | 3 +- cmd/thanos/downsample.go | 20 +-- cmd/thanos/main.go | 1 - pkg/downsample/downsample.go | 300 ----------------------------------- 4 files changed, 2 insertions(+), 322 deletions(-) delete mode 100644 pkg/downsample/downsample.go diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index f2388b84e0..1ce41fd503 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -29,7 +29,6 @@ import ( "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" - downsamplepkg "github.com/thanos-io/thanos/pkg/downsample" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" @@ -437,7 +436,7 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n Default("./data").String() m[name+" "+comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { - return downsamplepkg.RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp) + return RunDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp) } } diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index d513d2f0a3..293ef59adf 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/kit/log/level" "github.com/oklog/run" "github.com/oklog/ulid" - opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -31,25 +30,8 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" httpserver "github.com/thanos-io/thanos/pkg/server/http" - kingpin "gopkg.in/alecthomas/kingpin.v2" ) -func registerDownsample(m map[string]setupFunc, app *kingpin.Application) { - comp := component.Downsample - cmd := app.Command(comp.String(), "continuously downsamples blocks in an object store bucket") - - httpAddr, httpGracePeriod := regHTTPFlags(cmd) - - dataDir := cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). - Default("./data").String() - - objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - - m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { - return runDownsample(g, logger, reg, *httpAddr, time.Duration(*httpGracePeriod), *dataDir, objStoreConfig, comp) - } -} - type DownsampleMetrics struct { downsamples *prometheus.CounterVec downsampleFailures *prometheus.CounterVec @@ -70,7 +52,7 @@ func newDownsampleMetrics(reg *prometheus.Registry) *DownsampleMetrics { return m } -func runDownsample( +func RunDownsample( g *run.Group, logger log.Logger, reg *prometheus.Registry, diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index b407d252bf..cad3f26d93 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -61,7 +61,6 @@ func main() { registerRule(cmds, app) registerCompact(cmds, app) registerBucket(cmds, app, "bucket") - registerDownsample(cmds, app) registerReceive(cmds, app) registerChecks(cmds, app, "check") diff --git a/pkg/downsample/downsample.go b/pkg/downsample/downsample.go deleted file mode 100644 index b6c30de8c6..0000000000 --- a/pkg/downsample/downsample.go +++ /dev/null @@ -1,300 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package downsample - -import ( - "context" - "os" - "path/filepath" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/oklog/run" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/thanos-io/thanos/pkg/block" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/compact" - "github.com/thanos-io/thanos/pkg/compact/downsample" - "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/extflag" - "github.com/thanos-io/thanos/pkg/extprom" - "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/objstore/client" - "github.com/thanos-io/thanos/pkg/prober" - "github.com/thanos-io/thanos/pkg/runutil" - httpserver "github.com/thanos-io/thanos/pkg/server/http" -) - -type DownsampleMetrics struct { - downsamples *prometheus.CounterVec - downsampleFailures *prometheus.CounterVec -} - -func newDownsampleMetrics(reg *prometheus.Registry) *DownsampleMetrics { - m := new(DownsampleMetrics) - - m.downsamples = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_downsample_total", - Help: "Total number of downsampling attempts.", - }, []string{"group"}) - m.downsampleFailures = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_compact_downsample_failures_total", - Help: "Total number of failed downsampling attempts.", - }, []string{"group"}) - - return m -} - -func RunDownsample( - g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - httpBindAddr string, - httpGracePeriod time.Duration, - dataDir string, - objStoreConfig *extflag.PathOrContent, - comp component.Component, -) error { - confContentYaml, err := objStoreConfig.Content() - if err != nil { - return err - } - - bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Downsample.String()) - if err != nil { - return err - } - - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) - if err != nil { - return errors.Wrap(err, "create meta fetcher") - } - - // Ensure we close up everything properly. - defer func() { - if err != nil { - runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - } - }() - - httpProbe := prober.NewHTTP() - statusProber := prober.Combine( - httpProbe, - prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)), - ) - - metrics := newDownsampleMetrics(reg) - // Start cycle of syncing blocks from the bucket and garbage collecting the bucket. - { - ctx, cancel := context.WithCancel(context.Background()) - - g.Add(func() error { - defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - statusProber.Ready() - - level.Info(logger).Log("msg", "start first pass of downsampling") - - if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { - return errors.Wrap(err, "downsampling failed") - } - - level.Info(logger).Log("msg", "start second pass of downsampling") - - if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { - return errors.Wrap(err, "downsampling failed") - } - - return nil - }, func(error) { - cancel() - }) - } - - srv := httpserver.New(logger, reg, comp, httpProbe, - httpserver.WithListen(httpBindAddr), - httpserver.WithGracePeriod(httpGracePeriod), - ) - - g.Add(func() error { - statusProber.Healthy() - - return srv.ListenAndServe() - }, func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) - - srv.Shutdown(err) - }) - - level.Info(logger).Log("msg", "starting downsample node") - return nil -} - -func downsampleBucket( - ctx context.Context, - logger log.Logger, - metrics *DownsampleMetrics, - bkt objstore.Bucket, - fetcher block.MetadataFetcher, - dir string, -) error { - if err := os.RemoveAll(dir); err != nil { - return errors.Wrap(err, "clean working directory") - } - if err := os.MkdirAll(dir, 0777); err != nil { - return errors.Wrap(err, "create dir") - } - - defer func() { - if err := os.RemoveAll(dir); err != nil { - level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err) - } - }() - - metas, _, err := fetcher.Fetch(ctx) - if err != nil { - return errors.Wrap(err, "downsampling meta fetch") - } - - // mapping from a hash over all source IDs to blocks. We don't need to downsample a block - // if a downsampled version with the same hash already exists. - sources5m := map[ulid.ULID]struct{}{} - sources1h := map[ulid.ULID]struct{}{} - - for _, m := range metas { - switch m.Thanos.Downsample.Resolution { - case downsample.ResLevel0: - continue - case downsample.ResLevel1: - for _, id := range m.Compaction.Sources { - sources5m[id] = struct{}{} - } - case downsample.ResLevel2: - for _, id := range m.Compaction.Sources { - sources1h[id] = struct{}{} - } - default: - return errors.Errorf("unexpected downsampling resolution %d", m.Thanos.Downsample.Resolution) - } - } - - for _, m := range metas { - switch m.Thanos.Downsample.Resolution { - case downsample.ResLevel0: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources5m[id]; !ok { - missing = true - break - } - } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { - continue - } - if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil { - metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)).Inc() - return errors.Wrap(err, "downsampling to 5 min") - } - metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)).Inc() - - case downsample.ResLevel1: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources1h[id]; !ok { - missing = true - break - } - } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { - continue - } - if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil { - metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)) - return errors.Wrap(err, "downsampling to 60 min") - } - metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)) - } - } - return nil -} - -func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error { - begin := time.Now() - bdir := filepath.Join(dir, m.ULID.String()) - - err := block.Download(ctx, logger, bkt, m.ULID, bdir) - if err != nil { - return errors.Wrapf(err, "download block %s", m.ULID) - } - level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin)) - - if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { - return errors.Wrap(err, "input block index not valid") - } - - begin = time.Now() - - var pool chunkenc.Pool - if m.Thanos.Downsample.Resolution == 0 { - pool = chunkenc.NewPool() - } else { - pool = downsample.NewPool() - } - - b, err := tsdb.OpenBlock(logger, bdir, pool) - if err != nil { - return errors.Wrapf(err, "open block %s", m.ULID) - } - defer runutil.CloseWithLogOnErr(log.With(logger, "outcome", "potential left mmap file handlers left"), b, "tsdb reader") - - id, err := downsample.Downsample(logger, m, b, dir, resolution) - if err != nil { - return errors.Wrapf(err, "downsample block %s to window %d", m.ULID, resolution) - } - resdir := filepath.Join(dir, id.String()) - - level.Info(logger).Log("msg", "downsampled block", - "from", m.ULID, "to", id, "duration", time.Since(begin)) - - if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil { - return errors.Wrap(err, "output block index not valid") - } - - begin = time.Now() - - err = block.Upload(ctx, logger, bkt, resdir) - if err != nil { - return errors.Wrapf(err, "upload downsampled block %s", id) - } - - level.Info(logger).Log("msg", "uploaded block", "id", id, "duration", time.Since(begin)) - - // It is not harmful if these fails. - if err := os.RemoveAll(bdir); err != nil { - level.Warn(logger).Log("msg", "failed to clean directory", "dir", bdir, "err", err) - } - if err := os.RemoveAll(resdir); err != nil { - level.Warn(logger).Log("msg", "failed to clean directory", "resdir", bdir, "err", err) - } - - return nil -}