diff --git a/Gopkg.lock b/Gopkg.lock
index 94d9dfb1be..57e6d89a26 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -199,12 +199,12 @@
revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a"
[[projects]]
- branch = "master"
- digest = "1:9abc49f39e3e23e262594bb4fb70abf74c0c99e94f99153f43b143805e850719"
+ digest = "1:cea4aa2038169ee558bf507d5ea02c94ca85bcca28a4c7bb99fd59b31e43a686"
name = "github.com/google/go-querystring"
packages = ["query"]
pruneopts = ""
- revision = "53e6ce116135b80d037921a7fdd5138cf32d7a8a"
+ revision = "44c6ddd0a2342c386950e880b658017258da92fc"
+ version = "v1.0.0"
[[projects]]
digest = "1:e097a364f4e8d8d91b9b9eeafb992d3796a41fde3eb548c1a87eb9d9f60725cf"
@@ -335,12 +335,12 @@
version = "v0.3.0"
[[projects]]
- digest = "1:82b912465c1da0668582a7d1117339c278e786c2536b3c3623029a0c7141c2d0"
+ digest = "1:84c28d9899cc4e00c38042d345cea8819275a5a62403a58530cac67022894776"
name = "github.com/mattn/go-runewidth"
packages = ["."]
pruneopts = ""
- revision = "ce7b0b5c7b45a81508558cd1dba6bb1e4ddb51bb"
- version = "v0.0.3"
+ revision = "3ee7d812e62a0804a7d0a324e0249ca2db3476d3"
+ version = "v0.0.4"
[[projects]]
digest = "1:49a8b01a6cd6558d504b65608214ca40a78000e1b343ed0da5c6a9ccd83d6d30"
@@ -390,20 +390,12 @@
version = "v0.11.0"
[[projects]]
- digest = "1:912349f5cf927bf96dca709623631ace7db723f07c70c4d56cfc22d9a667ed16"
+ digest = "1:b09858acd58e0873236c7b96903e3ec4e238d5de644c08bd8e712fa2d3d51ad2"
name = "github.com/mozillazg/go-httpheader"
packages = ["."]
pruneopts = ""
- revision = "4e5d6424981844faafc4b0649036b2e0395bdf99"
- version = "v0.2.0"
-
-[[projects]]
- branch = "master"
- digest = "1:3adc46876d4d0e4d5bbcfcc44c2116b95d7a5c966e2ee92a219488547fd453f2"
- name = "github.com/nightlyone/lockfile"
- packages = ["."]
- pruneopts = ""
- revision = "0ad87eef1443f64d3d8c50da647e2b1552851124"
+ revision = "61f2392c3317b60616c9dcb10d0a4cfef131fe62"
+ version = "v0.2.1"
[[projects]]
digest = "1:94e9081cc450d2cdf4e6886fc2c06c07272f86477df2d74ee5931951fa3d2577"
@@ -508,11 +500,12 @@
revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92"
[[projects]]
- digest = "1:b5ff9852eabe841003da4b0a4b742a2878c722dda6481003432344f633a814fc"
+ digest = "1:912a82611430bfd1e597e76aac99cac6fd34094a07b07d7c5996cf51a21e7e07"
name = "github.com/prometheus/prometheus"
packages = [
"discovery/file",
"discovery/targetgroup",
+ "pkg/gate",
"pkg/labels",
"pkg/rulefmt",
"pkg/textparse",
@@ -528,11 +521,10 @@
"util/testutil",
]
pruneopts = ""
- revision = "71af5e29e815795e9dd14742ee7725682fa14b7b"
- version = "v2.3.2"
+ revision = "6e08029b56ae17c49e133d92a2792f6f119f2cbd"
[[projects]]
- digest = "1:216dcf26fbfb3f36f286ca3306882a157c51648e4b5d4f3a9e9c719faea6ea58"
+ digest = "1:0a03b362c09b1186dd53330881430c7c2c26ba07806ebad861f2aa99d9c5c6ae"
name = "github.com/prometheus/tsdb"
packages = [
".",
@@ -541,9 +533,11 @@
"fileutil",
"index",
"labels",
+ "wal",
]
pruneopts = ""
- revision = "bd832fc8274e8fe63999ac749daaaff9d881241f"
+ revision = "fd04e0963c04a1fbd891be7552b50f58e357f75c"
+ source = "github.com/bwplotka/tsdb"
[[projects]]
branch = "master"
diff --git a/Gopkg.toml b/Gopkg.toml
index 98bbc9494d..e639afa84d 100644
--- a/Gopkg.toml
+++ b/Gopkg.toml
@@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
name = "github.com/prometheus/common"
[[constraint]]
- version = "v2.3.2"
+ # TODO(bwplotka): Move to released version once our recent fixes will be merged & released.
+ revision = "6e08029b56ae17c49e133d92a2792f6f119f2cbd"
name = "github.com/prometheus/prometheus"
[[override]]
@@ -46,7 +47,9 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
[[constraint]]
name = "github.com/prometheus/tsdb"
- revision = "bd832fc8274e8fe63999ac749daaaff9d881241f"
+ # TODO(bwplotka): Move to upstream version once https://github.com/prometheus/tsdb/pull/492 is merged.
+ revision = "fd04e0963c04a1fbd891be7552b50f58e357f75c"
+ source = "github.com/bwplotka/tsdb"
[[constraint]]
branch = "master"
diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go
index 01b9b54bf6..eca845231b 100644
--- a/cmd/thanos/bucket.go
+++ b/cmd/thanos/bucket.go
@@ -10,10 +10,9 @@ import (
"text/template"
"time"
- "github.com/prometheus/tsdb/labels"
-
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/verifier"
@@ -23,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/alecthomas/kingpin.v2"
@@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
defer cancel()
// Getting Metas.
- var blockMetas []*block.Meta
+ var blockMetas []*blockmeta.Meta
if err = bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
@@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
}
}
-func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error {
+func printTable(blockMetas []*blockmeta.Meta, selectorLabels labels.Labels, sortBy []string) error {
header := inspectColumns
var lines [][]string
@@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string {
// matchesSelector checks if blockMeta contains every label from
// the selector with the correct value
-func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool {
+func matchesSelector(blockMeta *blockmeta.Meta, selectorLabels labels.Labels) bool {
for _, l := range selectorLabels {
if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value {
return false
diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go
index c00aadd43d..dddd1e91af 100644
--- a/cmd/thanos/downsample.go
+++ b/cmd/thanos/downsample.go
@@ -8,11 +8,10 @@ import (
"path/filepath"
"time"
- "github.com/prometheus/tsdb/chunkenc"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
@@ -23,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/chunkenc"
"gopkg.in/alecthomas/kingpin.v2"
)
@@ -105,7 +105,7 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}
- var metas []*block.Meta
+ var metas []*blockmeta.Meta
err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
@@ -119,7 +119,7 @@ func downsampleBucket(
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")
- var m block.Meta
+ var m blockmeta.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return errors.Wrap(err, "decode meta")
}
@@ -201,7 +201,7 @@ func downsampleBucket(
return nil
}
-func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error {
+func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *blockmeta.Meta, dir string, resolution int64) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())
diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go
index bf6cfa64fe..9e6c5ac19e 100644
--- a/cmd/thanos/query.go
+++ b/cmd/thanos/query.go
@@ -296,7 +296,16 @@ func runQuery(
return stores.Get(), nil
}, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
- engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout)
+ engine = promql.NewEngine(
+ promql.EngineOpts{
+ Logger: logger,
+ Reg: reg,
+ MaxConcurrent: maxConcurrentQueries,
+ // TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703
+ MaxSamples: math.MaxInt32,
+ Timeout: queryTimeout,
+ },
+ )
)
// Periodically update the store set with the addresses we see in our cluster.
{
diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go
index 03fed102d2..35d81858c1 100644
--- a/cmd/thanos/rule.go
+++ b/cmd/thanos/rule.go
@@ -19,15 +19,14 @@ import (
"syscall"
"time"
- "github.com/improbable-eng/thanos/pkg/extprom"
-
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
+ "github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
@@ -290,7 +289,7 @@ func runRule(
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)
- notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
+ notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
@@ -309,8 +308,6 @@ func runRule(
res = append(res, a)
}
alertQ.Push(res)
-
- return nil
}
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
@@ -579,7 +576,7 @@ func runRule(
}
}()
- s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource)
+ s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, blockmeta.RulerSource)
ctx, cancel := context.WithCancel(context.Background())
diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go
index 61f493f93a..1662a310fa 100644
--- a/cmd/thanos/sidecar.go
+++ b/cmd/thanos/sidecar.go
@@ -14,7 +14,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/reloader"
@@ -252,7 +252,7 @@ func runSidecar(
}
}()
- s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource)
+ s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, blockmeta.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
diff --git a/pkg/block/block.go b/pkg/block/block.go
index 118b7ea96c..e6750e0881 100644
--- a/pkg/block/block.go
+++ b/pkg/block/block.go
@@ -5,11 +5,12 @@ package block
import (
"context"
"encoding/json"
- "io/ioutil"
"os"
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"fmt"
"github.com/go-kit/kit/log"
@@ -17,8 +18,6 @@ import (
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
- "github.com/prometheus/tsdb"
- "github.com/prometheus/tsdb/fileutil"
)
const (
@@ -33,103 +32,6 @@ const (
DebugMetas = "debug/metas"
)
-type SourceType string
-
-const (
- UnknownSource SourceType = ""
- SidecarSource SourceType = "sidecar"
- CompactorSource SourceType = "compactor"
- CompactorRepairSource SourceType = "compactor.repair"
- RulerSource SourceType = "ruler"
- BucketRepairSource SourceType = "bucket.repair"
- TestSource SourceType = "test"
-)
-
-// Meta describes the a block's meta. It wraps the known TSDB meta structure and
-// extends it by Thanos-specific fields.
-type Meta struct {
- Version int `json:"version"`
-
- tsdb.BlockMeta
-
- Thanos ThanosMeta `json:"thanos"`
-}
-
-// ThanosMeta holds block meta information specific to Thanos.
-type ThanosMeta struct {
- Labels map[string]string `json:"labels"`
- Downsample ThanosDownsampleMeta `json:"downsample"`
-
- // Source is a real upload source of the block.
- Source SourceType `json:"source"`
-}
-
-type ThanosDownsampleMeta struct {
- Resolution int64 `json:"resolution"`
-}
-
-// WriteMetaFile writes the given meta into
/meta.json.
-func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
- // Make any changes to the file appear atomic.
- path := filepath.Join(dir, MetaFilename)
- tmp := path + ".tmp"
-
- f, err := os.Create(tmp)
- if err != nil {
- return err
- }
-
- enc := json.NewEncoder(f)
- enc.SetIndent("", "\t")
-
- if err := enc.Encode(meta); err != nil {
- runutil.CloseWithLogOnErr(logger, f, "close meta")
- return err
- }
- if err := f.Close(); err != nil {
- return err
- }
- return renameFile(logger, tmp, path)
-}
-
-// ReadMetaFile reads the given meta from /meta.json.
-func ReadMetaFile(dir string) (*Meta, error) {
- b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
- if err != nil {
- return nil, err
- }
- var m Meta
-
- if err := json.Unmarshal(b, &m); err != nil {
- return nil, err
- }
- if m.Version != 1 {
- return nil, errors.Errorf("unexpected meta file version %d", m.Version)
- }
- return &m, nil
-}
-
-func renameFile(logger log.Logger, from, to string) error {
- if err := os.RemoveAll(to); err != nil {
- return err
- }
- if err := os.Rename(from, to); err != nil {
- return err
- }
-
- // Directory was renamed; sync parent dir to persist rename.
- pdir, err := fileutil.OpenDir(filepath.Dir(to))
- if err != nil {
- return err
- }
-
- if err = fileutil.Fsync(pdir); err != nil {
- runutil.CloseWithLogOnErr(logger, pdir, "close dir")
- return err
- }
- return pdir.Close()
-}
-
// Download downloads directory that is mean to be block directory.
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error {
if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), dst); err != nil {
@@ -169,7 +71,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "not a block dir")
}
- meta, err := ReadMetaFile(bdir)
+ meta, err := blockmeta.Read(bdir)
if err != nil {
// No meta or broken meta file.
return errors.Wrap(err, "read meta")
@@ -216,16 +118,16 @@ func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
}
// DownloadMeta downloads only meta file from bucket by block ID.
-func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) {
+func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (blockmeta.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
- return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
+ return blockmeta.Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
}
defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client")
- var m Meta
+ var m blockmeta.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
- return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
+ return blockmeta.Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
}
return m, nil
}
@@ -234,24 +136,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}
-
-// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk.
-// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
-func InjectThanosMeta(logger log.Logger, bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
- newMeta, err := ReadMetaFile(bdir)
- if err != nil {
- return nil, errors.Wrap(err, "read new meta")
- }
- newMeta.Thanos = meta
-
- // While downsampling we need to copy original compaction.
- if downsampledMeta != nil {
- newMeta.Compaction = downsampledMeta.Compaction
- }
-
- if err := WriteMetaFile(logger, bdir, newMeta); err != nil {
- return nil, errors.Wrap(err, "write new meta")
- }
-
- return newMeta, nil
-}
diff --git a/pkg/block/blockmeta/meta.go b/pkg/block/blockmeta/meta.go
new file mode 100644
index 0000000000..d470b4ec2c
--- /dev/null
+++ b/pkg/block/blockmeta/meta.go
@@ -0,0 +1,142 @@
+package blockmeta
+
+// blockmeta package is implements writing and reading wrapped meta.json where Thanos puts its metadata.
+// Those metadata contains external labels, downsampling resolution and source type.
+// This package is minimal and separated because it usited by testutils which limits test helpers we can use in
+// this package.
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ "github.com/pkg/errors"
+ "github.com/prometheus/tsdb"
+ "github.com/prometheus/tsdb/fileutil"
+)
+
+type SourceType string
+
+const (
+ UnknownSource SourceType = ""
+ SidecarSource SourceType = "sidecar"
+ CompactorSource SourceType = "compactor"
+ CompactorRepairSource SourceType = "compactor.repair"
+ RulerSource SourceType = "ruler"
+ BucketRepairSource SourceType = "bucket.repair"
+ TestSource SourceType = "test"
+)
+
+const (
+ // MetaFilename is the known JSON filename for meta information.
+ MetaFilename = "meta.json"
+)
+
+// Meta describes the a block's meta. It wraps the known TSDB meta structure and
+// extends it by Thanos-specific fields.
+type Meta struct {
+ Version int `json:"version"`
+
+ tsdb.BlockMeta
+
+ Thanos Thanos `json:"thanos"`
+}
+
+// Thanos holds block meta information specific to Thanos.
+type Thanos struct {
+ Labels map[string]string `json:"labels"`
+ Downsample ThanosDownsample `json:"downsample"`
+
+ // Source is a real upload source of the block.
+ Source SourceType `json:"source"`
+}
+
+type ThanosDownsample struct {
+ Resolution int64 `json:"resolution"`
+}
+
+// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk.
+// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
+func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
+ newMeta, err := Read(bdir)
+ if err != nil {
+ return nil, errors.Wrap(err, "read new meta")
+ }
+ newMeta.Thanos = meta
+
+ // While downsampling we need to copy original compaction.
+ if downsampledMeta != nil {
+ newMeta.Compaction = downsampledMeta.Compaction
+ }
+
+ if err := Write(logger, bdir, newMeta); err != nil {
+ return nil, errors.Wrap(err, "write new meta")
+ }
+
+ return newMeta, nil
+}
+
+// Write writes the given meta into /meta.json.
+func Write(logger log.Logger, dir string, meta *Meta) error {
+ // Make any changes to the file appear atomic.
+ path := filepath.Join(dir, MetaFilename)
+ tmp := path + ".tmp"
+
+ f, err := os.Create(tmp)
+ if err != nil {
+ return err
+ }
+
+ enc := json.NewEncoder(f)
+ enc.SetIndent("", "\t")
+
+ if err := enc.Encode(meta); err != nil {
+ runutil.CloseWithLogOnErr(logger, f, "close meta")
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return renameFile(logger, tmp, path)
+}
+
+func renameFile(logger log.Logger, from, to string) error {
+ if err := os.RemoveAll(to); err != nil {
+ return err
+ }
+ if err := os.Rename(from, to); err != nil {
+ return err
+ }
+
+ // Directory was renamed; sync parent dir to persist rename.
+ pdir, err := fileutil.OpenDir(filepath.Dir(to))
+ if err != nil {
+ return err
+ }
+
+ if err = fileutil.Fsync(pdir); err != nil {
+ runutil.CloseWithLogOnErr(logger, pdir, "close dir")
+ return err
+ }
+ return pdir.Close()
+}
+
+// Read reads the given meta from /meta.json.
+func Read(dir string) (*Meta, error) {
+ b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
+ if err != nil {
+ return nil, err
+ }
+ var m Meta
+
+ if err := json.Unmarshal(b, &m); err != nil {
+ return nil, err
+ }
+ if m.Version != 1 {
+ return nil, errors.Errorf("unexpected meta file version %d", m.Version)
+ }
+ return &m, nil
+}
diff --git a/pkg/block/index.go b/pkg/block/index.go
index 2249863b2d..2d088953fc 100644
--- a/pkg/block/index.go
+++ b/pkg/block/index.go
@@ -11,6 +11,10 @@ import (
"strings"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
+ "github.com/prometheus/tsdb/fileutil"
+
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
@@ -36,23 +40,84 @@ type indexCache struct {
Postings []postingsRange
}
+type realByteSlice []byte
+
+func (b realByteSlice) Len() int {
+ return len(b)
+}
+
+func (b realByteSlice) Range(start, end int) []byte {
+ return b[start:end]
+}
+
+func (b realByteSlice) Sub(start, end int) index.ByteSlice {
+ return b[start:end]
+}
+
+func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) {
+ version := int(b.Range(4, 5)[0])
+
+ if version != 1 && version != 2 {
+ return nil, errors.Errorf("unknown index file version %d", version)
+ }
+
+ toc, err := index.NewTOCFromByteSlice(b)
+ if err != nil {
+ return nil, errors.Wrap(err, "read TOC")
+ }
+
+ symbolsV2, symbolsV1, err := index.ReadSymbols(b, version, int(toc.Symbols))
+ if err != nil {
+ return nil, errors.Wrap(err, "read symbols")
+ }
+
+ symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2))
+ for o, s := range symbolsV1 {
+ symbolsTable[o] = s
+ }
+ for o, s := range symbolsV2 {
+ symbolsTable[uint32(o)] = s
+ }
+
+ return symbolsTable, nil
+}
+
// WriteIndexCache writes a cache file containing the first lookup stages
// for an index file.
-func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
+func WriteIndexCache(logger log.Logger, indexFn string, fn string) error {
+ indexFile, err := fileutil.OpenMmapFile(indexFn)
+ if err != nil {
+ return errors.Wrapf(err, "open mmap index file %s", indexFn)
+ }
+ defer runutil.CloseWithLogOnErr(logger, indexFile, "close index cache mmap file from %s", indexFn)
+
+ b := realByteSlice(indexFile.Bytes())
+ indexr, err := index.NewReader(b)
+ if err != nil {
+ return errors.Wrap(err, "open index reader")
+ }
+ defer runutil.CloseWithLogOnErr(logger, indexr, "load index cache reader")
+
+ // We assume reader verified index already.
+ symbols, err := getSymbolTable(b)
+ if err != nil {
+ return err
+ }
+
f, err := os.Create(fn)
if err != nil {
- return errors.Wrap(err, "create file")
+ return errors.Wrap(err, "create index cache file")
}
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")
v := indexCache{
- Version: r.Version(),
- Symbols: r.SymbolTable(),
+ Version: indexr.Version(),
+ Symbols: symbols,
LabelValues: map[string][]string{},
}
// Extract label value indices.
- lnames, err := r.LabelIndices()
+ lnames, err := indexr.LabelIndices()
if err != nil {
return errors.Wrap(err, "read label indices")
}
@@ -62,7 +127,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
}
ln := lns[0]
- tpls, err := r.LabelValues(ln)
+ tpls, err := indexr.LabelValues(ln)
if err != nil {
return errors.Wrap(err, "get label values")
}
@@ -82,7 +147,7 @@ func WriteIndexCache(logger log.Logger, fn string, r *index.Reader) error {
}
// Extract postings ranges.
- pranges, err := r.PostingsRanges()
+ pranges, err := indexr.PostingsRanges()
if err != nil {
return errors.Wrap(err, "read postings ranges")
}
@@ -164,7 +229,7 @@ func VerifyIndex(logger log.Logger, fn string, minTime int64, maxTime int64) err
}
type Stats struct {
- // TotalSeries represents total number of series in block.
+ // TotalSeries represnts total number of series in block.
TotalSeries int
// OutOfOrderSeries represents number of series that have out of order chunks.
OutOfOrderSeries int
@@ -346,7 +411,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (
// - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347.
// Fixable inconsistencies are resolved in the new block.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378
-func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) {
+func Repair(logger log.Logger, dir string, id ulid.ULID, source blockmeta.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) {
if len(ignoreChkFns) == 0 {
return resid, errors.New("no ignore chunk function specified")
}
@@ -355,7 +420,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
resid = ulid.MustNew(ulid.Now(), entropy)
- meta, err := ReadMetaFile(bdir)
+ meta, err := blockmeta.Read(bdir)
if err != nil {
return resid, errors.Wrap(err, "read meta file")
}
@@ -405,7 +470,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source SourceType, igno
if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
- if err := WriteMetaFile(logger, resdir, &resmeta); err != nil {
+ if err := blockmeta.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
return resid, nil
@@ -494,7 +559,7 @@ OUTER:
func rewrite(
indexr tsdb.IndexReader, chunkr tsdb.ChunkReader,
indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter,
- meta *Meta,
+ meta *blockmeta.Meta,
ignoreChkFns []ignoreFnType,
) error {
symbols, err := indexr.Symbols()
diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go
new file mode 100644
index 0000000000..80c10e8e6e
--- /dev/null
+++ b/pkg/block/index_test.go
@@ -0,0 +1,46 @@
+package block
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/prometheus/tsdb/labels"
+)
+
+func TestWriteReadIndexCache(t *testing.T) {
+ tmpDir, err := ioutil.TempDir("", "test-compact-prepare")
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
+
+ b, err := testutil.CreateBlock(tmpDir, []labels.Labels{
+ {{Name: "a", Value: "1"}},
+ {{Name: "a", Value: "2"}},
+ {{Name: "a", Value: "3"}},
+ {{Name: "a", Value: "4"}},
+ {{Name: "b", Value: "1"}},
+ }, 100, 0, 1000, nil, 124)
+ testutil.Ok(t, err)
+
+ fn := filepath.Join(tmpDir, "index.cache.json")
+ testutil.Ok(t, WriteIndexCache(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn))
+
+ version, symbols, lvals, postings, err := ReadIndexCache(log.NewNopLogger(), fn)
+ testutil.Ok(t, err)
+
+ testutil.Equals(t, 2, version)
+ testutil.Equals(t, 6, len(symbols))
+ testutil.Equals(t, 2, len(lvals))
+
+ vals, ok := lvals["a"]
+ testutil.Assert(t, ok, "")
+ testutil.Equals(t, []string{"1", "2", "3", "4"}, vals)
+
+ vals, ok = lvals["b"]
+ testutil.Assert(t, ok, "")
+ testutil.Equals(t, []string{"1"}, vals)
+ testutil.Equals(t, 6, len(postings))
+}
diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go
index 544de920ea..fe95ccfd71 100644
--- a/pkg/compact/compact.go
+++ b/pkg/compact/compact.go
@@ -9,6 +9,8 @@ import (
"sync"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"io/ioutil"
"github.com/go-kit/kit/log"
@@ -39,7 +41,7 @@ type Syncer struct {
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
- blocks map[ulid.ULID]*block.Meta
+ blocks map[ulid.ULID]*blockmeta.Meta
metrics *syncerMetrics
}
@@ -130,7 +132,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
logger: logger,
reg: reg,
syncDelay: syncDelay,
- blocks: map[ulid.ULID]*block.Meta{},
+ blocks: map[ulid.ULID]*blockmeta.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
}, nil
@@ -185,9 +187,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
- meta.Thanos.Source != block.BucketRepairSource &&
- meta.Thanos.Source != block.CompactorSource &&
- meta.Thanos.Source != block.CompactorRepairSource {
+ meta.Thanos.Source != blockmeta.BucketRepairSource &&
+ meta.Thanos.Source != blockmeta.CompactorSource &&
+ meta.Thanos.Source != blockmeta.CompactorRepairSource {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil
@@ -214,7 +216,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
-func GroupKey(meta block.Meta) string {
+func GroupKey(meta blockmeta.Meta) string {
return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels))
}
@@ -381,7 +383,7 @@ type Group struct {
labels labels.Labels
resolution int64
mtx sync.Mutex
- blocks map[ulid.ULID]*block.Meta
+ blocks map[ulid.ULID]*blockmeta.Meta
compactions prometheus.Counter
compactionFailures prometheus.Counter
groupGarbageCollectedBlocks prometheus.Counter
@@ -405,7 +407,7 @@ func newGroup(
bkt: bkt,
labels: lset,
resolution: resolution,
- blocks: map[ulid.ULID]*block.Meta{},
+ blocks: map[ulid.ULID]*blockmeta.Meta{},
compactions: compactions,
compactionFailures: compactionFailures,
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
@@ -419,7 +421,7 @@ func (cg *Group) Key() string {
}
// Add the block with the given meta to the group.
-func (cg *Group) Add(meta *block.Meta) error {
+func (cg *Group) Add(meta *blockmeta.Meta) error {
cg.mtx.Lock()
defer cg.mtx.Unlock()
@@ -541,7 +543,7 @@ func IsRetryError(err error) bool {
return ok
}
-func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string) error {
+func (cg *Group) areBlocksOverlapping(include *blockmeta.Meta, excludeDirs ...string) error {
var (
metas []tsdb.BlockMeta
exclude = map[ulid.ULID]struct{}{}
@@ -597,12 +599,12 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return retry(errors.Wrapf(err, "download block %s", ie.id))
}
- meta, err := block.ReadMetaFile(bdir)
+ meta, err := blockmeta.Read(bdir)
if err != nil {
return errors.Wrapf(err, "read meta from %s", bdir)
}
- resid, err := block.Repair(logger, tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
+ resid, err := block.Repair(logger, tmpdir, ie.id, blockmeta.CompactorRepairSource, block.IgnoreIssue347OutsideChunk)
if err != nil {
return errors.Wrapf(err, "repair failed for block %s", ie.id)
}
@@ -647,7 +649,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := os.MkdirAll(bdir, 0777); err != nil {
return compID, errors.Wrap(err, "create planning block dir")
}
- if err := block.WriteMetaFile(cg.logger, bdir, meta); err != nil {
+ if err := blockmeta.Write(cg.logger, bdir, meta); err != nil {
return compID, errors.Wrap(err, "write planning meta file")
}
}
@@ -670,7 +672,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin := time.Now()
for _, pdir := range plan {
- meta, err := block.ReadMetaFile(pdir)
+ meta, err := blockmeta.Read(pdir)
if err != nil {
return compID, errors.Wrapf(err, "read meta from %s", pdir)
}
@@ -718,7 +720,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
begin = time.Now()
- compID, err = comp.Compact(dir, plan...)
+ compID, err = comp.Compact(dir, plan, nil)
if err != nil {
return compID, halt(errors.Wrapf(err, "compact blocks %v", plan))
}
@@ -727,10 +729,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
bdir := filepath.Join(dir, compID.String())
- newMeta, err := block.InjectThanosMeta(cg.logger, bdir, block.ThanosMeta{
+ newMeta, err := blockmeta.InjectThanos(cg.logger, bdir, blockmeta.Thanos{
Labels: cg.labels.Map(),
- Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution},
- Source: block.CompactorSource,
+ Downsample: blockmeta.ThanosDownsample{Resolution: cg.resolution},
+ Source: blockmeta.CompactorSource,
}, nil)
if err != nil {
return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir)
diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go
index 74df73cc6f..266fb93cdf 100644
--- a/pkg/compact/compact_e2e_test.go
+++ b/pkg/compact/compact_e2e_test.go
@@ -13,6 +13,8 @@ import (
"testing"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/objstore"
@@ -37,13 +39,13 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
// After the first synchronization the first 5 should be dropped and the
// last 5 be loaded from the bucket.
var ids []ulid.ULID
- var metas []*block.Meta
+ var metas []*blockmeta.Meta
for i := 0; i < 15; i++ {
id, err := ulid.New(uint64(i), nil)
testutil.Ok(t, err)
- var meta block.Meta
+ var meta blockmeta.Meta
meta.Version = 1
meta.ULID = id
@@ -56,7 +58,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
for _, m := range metas[5:] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
- testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
+ testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), blockmeta.MetaFilename), &buf))
}
groups, err := sy.Groups()
@@ -79,11 +81,11 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
// Generate 10 source block metas and construct higher level blocks
// that are higher compactions of them.
- var metas []*block.Meta
+ var metas []*blockmeta.Meta
var ids []ulid.ULID
for i := 0; i < 10; i++ {
- var m block.Meta
+ var m blockmeta.Meta
m.Version = 1
m.ULID = ulid.MustNew(uint64(i), nil)
@@ -94,28 +96,28 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
metas = append(metas, &m)
}
- var m1 block.Meta
+ var m1 blockmeta.Meta
m1.Version = 1
m1.ULID = ulid.MustNew(100, nil)
m1.Compaction.Level = 2
m1.Compaction.Sources = ids[:4]
m1.Thanos.Downsample.Resolution = 0
- var m2 block.Meta
+ var m2 blockmeta.Meta
m2.Version = 1
m2.ULID = ulid.MustNew(200, nil)
m2.Compaction.Level = 2
m2.Compaction.Sources = ids[4:8] // last two source IDs is not part of a level 2 block.
m2.Thanos.Downsample.Resolution = 0
- var m3 block.Meta
+ var m3 blockmeta.Meta
m3.Version = 1
m3.ULID = ulid.MustNew(300, nil)
m3.Compaction.Level = 3
m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block.
m3.Thanos.Downsample.Resolution = 0
- var m4 block.Meta
+ var m4 blockmeta.Meta
m4.Version = 14
m4.ULID = ulid.MustNew(400, nil)
m4.Compaction.Level = 2
@@ -127,7 +129,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
fmt.Println("create", m.ULID)
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
- testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), block.MetaFilename), &buf))
+ testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), blockmeta.MetaFilename), &buf))
}
// Do one initial synchronization with the bucket.
@@ -173,7 +175,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
- var metas []*block.Meta
+ var metas []*blockmeta.Meta
extLset := labels.Labels{{Name: "e1", Value: "1"}}
b1, err := testutil.CreateBlock(prepareDir, []labels.Labels{
{{Name: "a", Value: "1"}},
@@ -183,7 +185,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
}, 100, 0, 1000, extLset, 124)
testutil.Ok(t, err)
- meta, err := block.ReadMetaFile(filepath.Join(prepareDir, b1.String()))
+ meta, err := blockmeta.Read(filepath.Join(prepareDir, b1.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -196,7 +198,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
testutil.Ok(t, err)
// Mix order to make sure compact is able to deduct min time / max time.
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b3.String()))
+ meta, err = blockmeta.Read(filepath.Join(prepareDir, b3.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -204,7 +206,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
b2, err := testutil.CreateBlock(prepareDir, []labels.Labels{}, 100, 1001, 2000, extLset, 124)
testutil.Ok(t, err)
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, b2.String()))
+ meta, err = blockmeta.Read(filepath.Join(prepareDir, b2.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -217,7 +219,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
}, 100, 3001, 4000, extLset, 124)
testutil.Ok(t, err)
- meta, err = block.ReadMetaFile(filepath.Join(prepareDir, freshB.String()))
+ meta, err = blockmeta.Read(filepath.Join(prepareDir, freshB.String()))
testutil.Ok(t, err)
metas = append(metas, meta)
@@ -263,7 +265,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
resDir := filepath.Join(dir, id.String())
testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir))
- meta, err = block.ReadMetaFile(resDir)
+ meta, err = blockmeta.Read(resDir)
testutil.Ok(t, err)
testutil.Equals(t, int64(0), meta.MinTime)
diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go
index 305f72021c..5b69b3dc7f 100644
--- a/pkg/compact/downsample/downsample.go
+++ b/pkg/compact/downsample/downsample.go
@@ -5,7 +5,8 @@ import (
"path/filepath"
"sort"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"
@@ -31,7 +32,7 @@ const (
// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
func Downsample(
logger log.Logger,
- origMeta *block.Meta,
+ origMeta *blockmeta.Meta,
b tsdb.BlockReader,
dir string,
resolution int64,
@@ -125,18 +126,18 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
- id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime)
+ id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrap(err, "compact head")
}
bdir := filepath.Join(dir, id.String())
- var tmeta block.ThanosMeta
+ var tmeta blockmeta.Thanos
tmeta = origMeta.Thanos
- tmeta.Source = block.CompactorSource
+ tmeta.Source = blockmeta.CompactorSource
tmeta.Downsample.Resolution = resolution
- _, err = block.InjectThanosMeta(logger, bdir, tmeta, &origMeta.BlockMeta)
+ _, err = blockmeta.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}
@@ -228,13 +229,20 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
}
func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
- return tsdb.EmptyTombstoneReader(), nil
+ return emptyTombstoneReader{}, nil
}
func (b *memBlock) Close() error {
return nil
}
+type emptyTombstoneReader struct{}
+
+func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil }
+func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil }
+func (emptyTombstoneReader) Total() uint64 { return 0 }
+func (emptyTombstoneReader) Close() error { return nil }
+
// currentWindow returns the end timestamp of the window that t falls into.
func currentWindow(t, r int64) int64 {
// The next timestamp is the next number after s.t that's aligned with window.
diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go
index d384478416..1d82dabe77 100644
--- a/pkg/compact/downsample/downsample_test.go
+++ b/pkg/compact/downsample/downsample_test.go
@@ -7,6 +7,8 @@ import (
"path/filepath"
"testing"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunks"
@@ -59,7 +61,7 @@ func TestDownsampleRaw(t *testing.T) {
},
},
}
- testDownsample(t, input, &block.Meta{}, 100)
+ testDownsample(t, input, &blockmeta.Meta{}, 100)
}
func TestDownsampleAggr(t *testing.T) {
@@ -96,7 +98,7 @@ func TestDownsampleAggr(t *testing.T) {
},
},
}
- var meta block.Meta
+ var meta blockmeta.Meta
meta.Thanos.Downsample.Resolution = 10
testDownsample(t, input, &meta, 500)
@@ -123,7 +125,7 @@ type downsampleTestSet struct {
// testDownsample inserts the input into a block and invokes the downsampler with the given resolution.
// The chunk ranges within the input block are aligned at 500 time units.
-func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, resolution int64) {
+func testDownsample(t *testing.T, data []*downsampleTestSet, meta *blockmeta.Meta, resolution int64) {
t.Helper()
dir, err := ioutil.TempDir("", "downsample-raw")
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index 0aef91c697..3ec3b9c50b 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
@@ -253,15 +253,15 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
func uploadMockBlock(t *testing.T, bkt objstore.Bucket, id string, minTime, maxTime time.Time, resolutionLevel int64) {
t.Helper()
- meta1 := block.Meta{
+ meta1 := blockmeta.Meta{
Version: 1,
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustParse(id),
MinTime: minTime.Unix() * 1000,
MaxTime: maxTime.Unix() * 1000,
},
- Thanos: block.ThanosMeta{
- Downsample: block.ThanosDownsampleMeta{
+ Thanos: blockmeta.Thanos{
+ Downsample: blockmeta.ThanosDownsample{
Resolution: resolutionLevel,
},
},
diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go
index 23e5b0a801..cf693dd2fb 100644
--- a/pkg/query/api/v1.go
+++ b/pkg/query/api/v1.go
@@ -527,15 +527,16 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) {
var sets []storage.SeriesSet
for _, mset := range matcherSets {
- s, err := q.Select(&storage.SelectParams{}, mset...)
+ s, _, err := q.Select(&storage.SelectParams{}, mset...)
if err != nil {
return nil, nil, &apiError{errorExec, err}
}
sets = append(sets, s)
}
- set := storage.NewMergeSeriesSet(sets)
- metrics := []labels.Labels{}
+ set := storage.NewMergeSeriesSet(sets, nil)
+
+ var metrics []labels.Labels
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
diff --git a/pkg/query/querier.go b/pkg/query/querier.go
index 6e962f6472..819ff3ac2a 100644
--- a/pkg/query/querier.go
+++ b/pkg/query/querier.go
@@ -169,13 +169,13 @@ func aggrsFromFunc(f string) ([]storepb.Aggr, resAggr) {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}, resAggrAvg
}
-func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, error) {
+func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_select")
defer span.Finish()
sms, err := translateMatchers(ms...)
if err != nil {
- return nil, errors.Wrap(err, "convert matchers")
+ return nil, nil, errors.Wrap(err, "convert matchers")
}
queryAggrs, resAggr := aggrsFromFunc(params.Func)
@@ -189,10 +189,12 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
}, resp); err != nil {
- return nil, errors.Wrap(err, "proxy Series()")
+ return nil, nil, errors.Wrap(err, "proxy Series()")
}
for _, w := range resp.warnings {
+ // NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method,
+ // so we choose to be consistent and keep reporter.
q.warningReporter(errors.New(w))
}
@@ -203,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
- }, nil
+ }, nil, nil
}
// TODO(fabxc): this could potentially pushed further down into the store API
@@ -220,7 +222,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
// The merged series set assembles all potentially-overlapping time ranges
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
- return newDedupSeriesSet(set, q.replicaLabel), nil
+ return newDedupSeriesSet(set, q.replicaLabel), nil, nil
}
// sortDedupLabels resorts the set so that the same series with different replica
@@ -245,6 +247,7 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
})
}
+// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()
@@ -261,6 +264,12 @@ func (q *querier) LabelValues(name string) ([]string, error) {
return resp.Values, nil
}
+// LabelNames returns all the unique label names present in the block in sorted order.
+// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702.
+func (q *querier) LabelNames() ([]string, error) {
+ return nil, errors.New("not implemented")
+}
+
func (q *querier) Close() error {
q.cancel()
return nil
diff --git a/pkg/query/test_print.go b/pkg/query/test_print.go
new file mode 100644
index 0000000000..70bc292439
--- /dev/null
+++ b/pkg/query/test_print.go
@@ -0,0 +1,34 @@
+package query
+
+import (
+ "fmt"
+
+ "github.com/prometheus/prometheus/storage"
+)
+
+type printSeriesSet struct {
+ set storage.SeriesSet
+}
+
+func newPrintSeriesSet(set storage.SeriesSet) storage.SeriesSet {
+ return &printSeriesSet{set: set}
+}
+
+func (s *printSeriesSet) Next() bool {
+ return s.set.Next()
+}
+
+func (s *printSeriesSet) At() storage.Series {
+ at := s.set.At()
+ fmt.Println("Series", at.Labels())
+
+ i := at.Iterator()
+ for i.Next() {
+ fmt.Println(i.At())
+ }
+ return at
+}
+
+func (s *printSeriesSet) Err() error {
+ return s.set.Err()
+}
diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go
index 5c1df9a9b7..542799ce5e 100644
--- a/pkg/shipper/shipper.go
+++ b/pkg/shipper/shipper.go
@@ -11,6 +11,8 @@ import (
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -69,7 +71,7 @@ type Shipper struct {
metrics *metrics
bucket objstore.Bucket
labels func() labels.Labels
- source block.SourceType
+ source blockmeta.SourceType
}
// New creates a new shipper that detects new TSDB blocks in dir and uploads them
@@ -80,7 +82,7 @@ func New(
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
- source block.SourceType,
+ source blockmeta.SourceType,
) *Shipper {
if logger == nil {
logger = log.NewNopLogger()
@@ -114,7 +116,7 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
minTime = math.MaxInt64
maxSyncTime = math.MinInt64
- if err := s.iterBlockMetas(func(m *block.Meta) error {
+ if err := s.iterBlockMetas(func(m *blockmeta.Meta) error {
if m.MinTime < minTime {
minTime = m.MinTime
}
@@ -158,7 +160,7 @@ func (s *Shipper) Sync(ctx context.Context) {
// TODO(bplotka): If there are no blocks in the system check for WAL dir to ensure we have actually
// access to real TSDB dir (!).
- if err = s.iterBlockMetas(func(m *block.Meta) error {
+ if err = s.iterBlockMetas(func(m *blockmeta.Meta) error {
// Do not sync a block if we already uploaded it. If it is no longer found in the bucket,
// it was generally removed by the compaction process.
if _, ok := hasUploaded[m.ULID]; !ok {
@@ -180,7 +182,7 @@ func (s *Shipper) Sync(ctx context.Context) {
}
}
-func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
+func (s *Shipper) sync(ctx context.Context, meta *blockmeta.Meta) (err error) {
dir := filepath.Join(s.dir, meta.ULID.String())
// We only ship of the first compacted block level.
@@ -225,7 +227,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
meta.Thanos.Labels = lset.Map()
}
meta.Thanos.Source = s.source
- if err := block.WriteMetaFile(s.logger, updir, meta); err != nil {
+ if err := blockmeta.Write(s.logger, updir, meta); err != nil {
return errors.Wrap(err, "write meta file")
}
return block.Upload(ctx, s.logger, s.bucket, updir)
@@ -234,7 +236,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) {
// iterBlockMetas calls f with the block meta for each block found in dir. It logs
// an error and continues if it cannot access a meta.json file.
// If f returns an error, the function returns with the same error.
-func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error {
+func (s *Shipper) iterBlockMetas(f func(m *blockmeta.Meta) error) error {
names, err := fileutil.ReadDir(s.dir)
if err != nil {
return errors.Wrap(err, "read dir")
@@ -253,7 +255,7 @@ func (s *Shipper) iterBlockMetas(f func(m *block.Meta) error) error {
if !fi.IsDir() {
continue
}
- m, err := block.ReadMetaFile(dir)
+ m, err := blockmeta.Read(dir)
if err != nil {
level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err)
continue
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index e121e9d408..a7de1cf976 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -16,6 +16,8 @@ import (
"sync"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -459,6 +461,7 @@ func (s *bucketSeriesSet) Err() error {
return s.err
}
+// blockSeries return requested series from given index and chunk readers.
func (s *BucketStore) blockSeries(
ctx context.Context,
ulid ulid.ULID,
@@ -488,7 +491,6 @@ func (s *BucketStore) blockSeries(
}
// Get result postings list by resolving the postings tree.
- // TODO(bwplotka): Users are seeing panics here, because of lazyPosting being not loaded by preloadPostings.
ps, err := index.ExpandPostings(lazyPostings)
if err != nil {
return nil, stats, errors.Wrap(err, "expand postings")
@@ -504,7 +506,8 @@ func (s *BucketStore) blockSeries(
}
}
- // Preload all series index data
+ // Preload all series index data.
+ // TODO(bwplotka): Consider not keeping all series in memory all the time.
if err := indexr.preloadSeries(ps); err != nil {
return nil, stats, errors.Wrap(err, "preload series")
}
@@ -1001,7 +1004,7 @@ func (s *bucketBlockSet) labelMatchers(matchers ...labels.Matcher) ([]labels.Mat
type bucketBlock struct {
logger log.Logger
bucket objstore.BucketReader
- meta *block.Meta
+ meta *blockmeta.Meta
dir string
indexCache *indexCache
chunkPool *pool.BytesPool
@@ -1065,7 +1068,7 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error {
} else if err != nil {
return err
}
- meta, err := block.ReadMetaFile(b.dir)
+ meta, err := blockmeta.Read(b.dir)
if err != nil {
return errors.Wrap(err, "read meta.json")
}
@@ -1095,19 +1098,15 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) {
}
}()
- indexr, err := index.NewFileReader(fn)
- if err != nil {
- return errors.Wrap(err, "open index reader")
- }
- defer runutil.CloseWithLogOnErr(b.logger, indexr, "load index cache reader")
+ // Create index cache adhoc.
- if err := block.WriteIndexCache(b.logger, cachefn, indexr); err != nil {
+ if err := block.WriteIndexCache(b.logger, fn, cachefn); err != nil {
return errors.Wrap(err, "write index cache")
}
b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn)
if err != nil {
- return errors.Wrap(err, "read index cache")
+ return errors.Wrap(err, "read fresh index cache")
}
return nil
}
@@ -1179,15 +1178,22 @@ func newBucketIndexReader(ctx context.Context, logger log.Logger, block *bucketB
logger: logger,
ctx: ctx,
block: block,
- dec: &index.Decoder{},
stats: &queryStats{},
cache: cache,
loadedSeries: map[uint64][]byte{},
}
- r.dec.SetSymbolTable(r.block.symbols)
+ r.dec = &index.Decoder{LookupSymbol: r.lookupSymbol}
return r
}
+func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
+ s, ok := r.block.symbols[o]
+ if !ok {
+ return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o)
+ }
+ return s, nil
+}
+
func (r *bucketIndexReader) preloadPostings() error {
const maxGapSize = 512 * 1024
@@ -1270,23 +1276,24 @@ func (r *bucketIndexReader) loadPostings(ctx context.Context, postings []*lazyPo
return nil
}
-func (r *bucketIndexReader) preloadSeries(ids []uint64) error {
+func (r *bucketIndexReader) preloadSeries(refs []uint64) error {
const maxSeriesSize = 64 * 1024
const maxGapSize = 512 * 1024
- var newIDs []uint64
+ var newRefs []uint64
- for _, id := range ids {
- if b, ok := r.cache.series(r.block.meta.ULID, id); ok {
- r.loadedSeries[id] = b
+ for _, ref := range refs {
+ if b, ok := r.cache.series(r.block.meta.ULID, ref); ok {
+ r.loadedSeries[ref] = b
continue
}
- newIDs = append(newIDs, id)
+ newRefs = append(newRefs, ref)
}
- ids = newIDs
+ refs = newRefs
- parts := partitionRanges(len(ids), func(i int) (start, end uint64) {
- return ids[i], ids[i] + maxSeriesSize
+ // Combine multiple close byte ranges to not be rate-limited from object storage.
+ parts := partitionRanges(len(refs), func(i int) (start, end uint64) {
+ return refs[i], refs[i] + maxSeriesSize
}, maxGapSize)
var g run.Group
@@ -1295,7 +1302,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error {
i, j := p[0], p[1]
g.Add(func() error {
- return r.loadSeries(ctx, ids[i:j], ids[i], ids[j-1]+maxSeriesSize)
+ return r.loadSeries(ctx, refs[i:j], refs[i], refs[j-1]+maxSeriesSize)
}, func(err error) {
if err != nil {
cancel()
@@ -1305,7 +1312,7 @@ func (r *bucketIndexReader) preloadSeries(ids []uint64) error {
return g.Run()
}
-func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error {
+func (r *bucketIndexReader) loadSeries(ctx context.Context, refs []uint64, start, end uint64) error {
begin := time.Now()
b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
@@ -1317,12 +1324,12 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
defer r.mtx.Unlock()
r.stats.seriesFetchCount++
- r.stats.seriesFetched += len(ids)
+ r.stats.seriesFetched += len(refs)
r.stats.seriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
- for _, id := range ids {
- c := b[id-start:]
+ for _, ref := range refs {
+ c := b[ref-start:]
l, n := binary.Uvarint(c)
if n < 1 {
@@ -1332,8 +1339,8 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l))
}
c = c[n : n+int(l)]
- r.loadedSeries[id] = c
- r.cache.setSeries(r.block.meta.ULID, id, c)
+ r.loadedSeries[ref] = c
+ r.cache.setSeries(r.block.meta.ULID, ref, c)
}
return nil
}
@@ -1421,6 +1428,7 @@ func (r *bucketIndexReader) SortedPostings(p index.Postings) index.Postings {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
+// prealoadSeries needs to be invoked first to have this method return loaded results.
func (r *bucketIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
b, ok := r.loadedSeries[ref]
if !ok {
@@ -1438,6 +1446,11 @@ func (r *bucketIndexReader) LabelIndices() ([][]string, error) {
return nil, errors.New("not implemented")
}
+// LabelNames returns all the unique label names present in the index in sorted order.
+func (r *bucketIndexReader) LabelNames() ([]string, error) {
+ return nil, errors.New("not implemented")
+}
+
// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()
diff --git a/pkg/store/bucket_profile_test.go b/pkg/store/bucket_profile_test.go
new file mode 100644
index 0000000000..942231418f
--- /dev/null
+++ b/pkg/store/bucket_profile_test.go
@@ -0,0 +1,408 @@
+package store
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "runtime"
+ "runtime/pprof"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/oklog/ulid"
+
+ "github.com/improbable-eng/thanos/pkg/objstore/inmem"
+
+ "github.com/go-kit/kit/log"
+ "github.com/improbable-eng/thanos/pkg/block"
+ "github.com/improbable-eng/thanos/pkg/runutil"
+ "github.com/improbable-eng/thanos/pkg/store/storepb"
+ "github.com/improbable-eng/thanos/pkg/testutil"
+ "github.com/pkg/errors"
+ "github.com/prometheus/prometheus/pkg/timestamp"
+ "github.com/prometheus/tsdb/labels"
+)
+
+func saveHeap(t *testing.T, name string) {
+ time.Sleep(500 * time.Millisecond)
+ runtime.GC()
+ f, err := os.OpenFile("heap-"+name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
+ testutil.Ok(t, err)
+
+ defer f.Close()
+ testutil.Ok(t, pprof.WriteHeapProfile(f))
+}
+
+func TestBucketStore_PROFILE(t *testing.T) {
+ bkt := inmem.NewBucket()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ dir, err := ioutil.TempDir("", "test_bucketstore_e2e")
+ testutil.Ok(t, err)
+ //defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
+
+ series := []labels.Labels{
+ labels.FromStrings("a", "1", "b", "1"),
+ labels.FromStrings("a", "1", "b", "2"),
+ labels.FromStrings("a", "2", "b", "1"),
+ labels.FromStrings("a", "2", "b", "2"),
+ labels.FromStrings("a", "1", "c", "1"),
+ labels.FromStrings("a", "1", "c", "2"),
+ labels.FromStrings("a", "2", "c", "1"),
+ labels.FromStrings("a", "2", "c", "2"),
+ }
+ extLset := labels.FromStrings("ext1", "value1")
+
+ start := time.Now()
+ now := start
+
+ var ids []ulid.ULID
+ for i := 0; i < 3; i++ {
+ mint := timestamp.FromTime(now)
+ now = now.Add(2 * time.Hour)
+ maxt := timestamp.FromTime(now)
+
+ // Create two blocks per time slot. Only add 10 samples each so only one chunk
+ // gets created each. This way we can easily verify we got 10 chunks per series below.
+ id1, err := testutil.CreateBlock(dir, series[:4], 10, mint, maxt, extLset, 0)
+ testutil.Ok(t, err)
+ id2, err := testutil.CreateBlock(dir, series[4:], 10, mint, maxt, extLset, 0)
+ testutil.Ok(t, err)
+
+ ids = append(ids, id1, id2)
+ dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String())
+
+ // Add labels to the meta of the second block.
+ meta, err := block.ReadMetaFile(dir2)
+ testutil.Ok(t, err)
+ meta.Thanos.Labels = map[string]string{"ext2": "value2"}
+ testutil.Ok(t, block.WriteMetaFile(log.NewNopLogger(), dir2, meta))
+
+ testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir1))
+ testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, dir2))
+
+ testutil.Ok(t, os.RemoveAll(dir1))
+ testutil.Ok(t, os.RemoveAll(dir2))
+ }
+
+ store, err := NewBucketStore(nil, nil, bkt, dir, 100, 0, false)
+ testutil.Ok(t, err)
+
+ ctx, _ = context.WithTimeout(ctx, 30*time.Second)
+
+ if err := runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
+ if err := store.SyncBlocks(ctx); err != nil {
+ return err
+ }
+ if store.numBlocks() < 6 {
+ return errors.New("not all blocks loaded")
+ }
+ return nil
+ }); err != nil && errors.Cause(err) != context.Canceled {
+ t.Error(err)
+ t.FailNow()
+ }
+ testutil.Ok(t, err)
+
+ pbseries := [][]storepb.Label{
+ {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
+ {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
+ {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
+ {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
+ {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
+ {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
+ {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
+ {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
+ }
+
+ srv := newStoreSeriesServer(ctx)
+
+ err = store.Series(&storepb.SeriesRequest{
+ Matchers: []storepb.LabelMatcher{
+ {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"},
+ },
+ MinTime: timestamp.FromTime(start),
+ MaxTime: timestamp.FromTime(now),
+ }, srv)
+ testutil.Ok(t, err)
+ testutil.Equals(t, len(pbseries), len(srv.SeriesSet))
+
+ g := sync.WaitGroup{}
+
+ // NO REPRO
+ go func() {
+ g.Add(1)
+ time.Sleep(10 * time.Millisecond)
+ // Simulate deleted blocks without sync (compaction!)
+ testutil.Ok(t, block.Delete(ctx, bkt, ids[2]))
+ time.Sleep(10 * time.Millisecond)
+ store.SyncBlocks(ctx)
+ store.SyncBlocks(ctx)
+
+ g.Done()
+ }()
+
+ for i := 0; i < 1000; i++ {
+ go func() {
+ g.Add(1)
+ srv := newStoreSeriesServer(ctx)
+
+ err = store.Series(&storepb.SeriesRequest{
+ Matchers: []storepb.LabelMatcher{
+ {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"},
+ },
+ MinTime: timestamp.FromTime(start),
+ MaxTime: timestamp.FromTime(now),
+ }, srv)
+ fmt.Println(err)
+ //testutil.Ok(t, err)
+ //testutil.Equals(t, len(pbseries), len(srv.SeriesSet))
+
+ g.Done()
+ }()
+ }
+ time.Sleep(10 * time.Millisecond)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ g.Add(1)
+ srv := newStoreSeriesServer(ctx)
+
+ err = store.Series(&storepb.SeriesRequest{
+ Matchers: []storepb.LabelMatcher{
+ {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"},
+ },
+ MinTime: timestamp.FromTime(start),
+ MaxTime: timestamp.FromTime(now),
+ }, srv)
+ fmt.Println(err)
+ //testutil.Ok(t, err)
+ //testutil.Equals(t, len(pbseries), len(srv.SeriesSet))
+
+ g.Done()
+ }()
+ }
+
+ g.Wait()
+
+ //for i, s := range srv.SeriesSet {
+ // testutil.Equals(t, pbseries[i], s.Labels)
+ // testutil.Equals(t, 3, len(s.Chunks))
+ //}
+
+ saveHeap(t, "2")
+}
+
+/*
+==================
+WARNING: DATA RACE
+Read at 0x00c4201c22f8 by goroutine 75:
+ github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Put()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:83 +0x14c
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).Close()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1570 +0x115
+ github.com/improbable-eng/thanos/pkg/runutil.CloseWithLogOnErr()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/runutil/runutil.go:60 +0x59
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:811 +0x2d7e
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x4e6
+
+Previous write at 0x00c4201c22f8 by goroutine 25:
+ sync/atomic.AddInt64()
+ /usr/local/go/src/runtime/race_amd64.s:276 +0xb
+ github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Get()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:65 +0x1ad
+ github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1130 +0x95
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34
+
+Goroutine 75 (running) created at:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+
+Goroutine 25 (finished) created at:
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34
+==================
+
+==================
+WARNING: DATA RACE
+Write at 0x00c42029c2fc by goroutine 10:
+ internal/race.Write()
+ /usr/local/go/src/internal/race/race.go:41 +0x38
+ sync.(*WaitGroup).Wait()
+ /usr/local/go/src/sync/waitgroup.go:127 +0xf3
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:164 +0x2442
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+
+Previous read at 0x00c42029c2fc by goroutine 74:
+ internal/race.Read()
+ /usr/local/go/src/internal/race/race.go:37 +0x38
+ sync.(*WaitGroup).Add()
+ /usr/local/go/src/sync/waitgroup.go:70 +0x16e
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:137 +0x5c
+
+Goroutine 10 (running) created at:
+ testing.(*T).Run()
+ /usr/local/go/src/testing/testing.go:824 +0x564
+ testing.runTests.func1()
+ /usr/local/go/src/testing/testing.go:1063 +0xa4
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+ testing.runTests()
+ /usr/local/go/src/testing/testing.go:1061 +0x4e1
+ testing.(*M).Run()
+ /usr/local/go/src/testing/testing.go:978 +0x2cd
+ main.main()
+ _testmain.go:70 +0x22a
+
+Goroutine 74 (running) created at:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+==================
+==================
+WARNING: DATA RACE
+Write at 0x00c4202647b0 by goroutine 230:
+ runtime.mapdelete_faststr()
+ /usr/local/go/src/runtime/hashmap_fast.go:883 +0x0
+ github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).Delete()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:138 +0x69
+ github.com/improbable-eng/thanos/pkg/objstore.DeleteDir.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/objstore.go:102 +0x113
+ github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).Iter()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:76 +0x616
+ github.com/improbable-eng/thanos/pkg/objstore.DeleteDir()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/objstore.go:97 +0x10c
+ github.com/improbable-eng/thanos/pkg/block.Delete()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/block/block.go:215 +0x7f
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func3()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:157 +0xae
+
+Previous read at 0x00c4202647b0 by goroutine 85:
+ runtime.mapaccess2_faststr()
+ /usr/local/go/src/runtime/hashmap_fast.go:261 +0x0
+ github.com/improbable-eng/thanos/pkg/objstore/inmem.(*Bucket).GetRange()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/objstore/inmem/inmem.go:103 +0x9b
+ github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1136 +0x255
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34
+
+Goroutine 230 (running) created at:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:154 +0x2431
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+
+Goroutine 85 (finished) created at:
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34
+==================
+==================
+WARNING: DATA RACE
+Read at 0x00c4200d4978 by goroutine 76:
+ github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Put()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:83 +0x14c
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).Close()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1570 +0x115
+ github.com/improbable-eng/thanos/pkg/runutil.CloseWithLogOnErr()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/runutil/runutil.go:60 +0x59
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:811 +0x2d7e
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x4e6
+
+Previous write at 0x00c4200d4978 by goroutine 365:
+ sync/atomic.AddInt64()
+ /usr/local/go/src/runtime/race_amd64.s:276 +0xb
+ github.com/improbable-eng/thanos/pkg/pool.(*BytesPool).Get()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/pool/pool.go:65 +0x1ad
+ github.com/improbable-eng/thanos/pkg/store.(*bucketBlock).readChunkRange()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1130 +0x95
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).loadChunks()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1499 +0xe7
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload.func3()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1485 +0x23a
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34
+
+Goroutine 76 (running) created at:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+
+Goroutine 365 (finished) created at:
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:37 +0x10b
+ github.com/improbable-eng/thanos/pkg/store.(*bucketChunkReader).preload()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:1493 +0x6f0
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).blockSeries()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:572 +0x1109
+ github.com/improbable-eng/thanos/pkg/store.(*BucketStore).Series.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket.go:721 +0x1e7
+ github.com/improbable-eng/thanos/vendor/github.com/oklog/run.(*Group).Run.func1()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/vendor/github.com/oklog/run/group.go:38 +0x34
+==================
+
+==================
+WARNING: DATA RACE
+Write at 0x00c4200837d0 by goroutine 77:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x50b
+
+Previous write at 0x00c4200837d0 by goroutine 76:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE.func2()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:140 +0x50b
+
+Goroutine 77 (running) created at:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+
+Goroutine 76 (finished) created at:
+ github.com/improbable-eng/thanos/pkg/store.TestBucketStore_PROFILE()
+ /home/bartek/Repos/thanosGo/src/github.com/improbable-eng/thanos/pkg/store/bucket_profile_test.go:136 +0x238e
+ testing.tRunner()
+ /usr/local/go/src/testing/testing.go:777 +0x16d
+==================
+
+*/
diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go
index 807b61f8a0..6c5729531b 100644
--- a/pkg/testutil/prometheus.go
+++ b/pkg/testutil/prometheus.go
@@ -12,8 +12,9 @@ import (
"syscall"
"time"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/go-kit/kit/log"
- "github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@@ -188,7 +189,7 @@ func CreateBlock(
extLset labels.Labels,
resolution int64,
) (id ulid.ULID, err error) {
- h, err := tsdb.NewHead(nil, nil, tsdb.NopWAL(), 10000000000)
+ h, err := tsdb.NewHead(nil, nil, nil, 10000000000)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
@@ -238,15 +239,15 @@ func CreateBlock(
return id, errors.Wrap(err, "create compactor")
}
- id, err = c.Write(dir, h, mint, maxt)
+ id, err = c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}
- if _, err = block.InjectThanosMeta(log.NewNopLogger(), filepath.Join(dir, id.String()), block.ThanosMeta{
+ if _, err = blockmeta.InjectThanos(log.NewNopLogger(), filepath.Join(dir, id.String()), blockmeta.Thanos{
Labels: extLset.Map(),
- Downsample: block.ThanosDownsampleMeta{Resolution: resolution},
- Source: block.TestSource,
+ Downsample: blockmeta.ThanosDownsample{Resolution: resolution},
+ Source: blockmeta.TestSource,
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}
diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go
index 54a20703d4..3e96ccb206 100644
--- a/pkg/verifier/index_issue.go
+++ b/pkg/verifier/index_issue.go
@@ -8,6 +8,8 @@ import (
"path"
"path/filepath"
+ "github.com/improbable-eng/thanos/pkg/block/blockmeta"
+
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
@@ -94,7 +96,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
logger,
tmpdir,
id,
- block.BucketRepairSource,
+ blockmeta.BucketRepairSource,
block.IgnoreCompleteOutsideChunk,
block.IgnoreDuplicateOutsideChunk,
block.IgnoreIssue347OutsideChunk,