Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metadata: Fixed panic when no ext labels are set; Added more tests. #3461

Merged
merged 1 commit into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func registerReceive(app *extkingpin.App) {
return errors.Wrap(err, "parse labels")
}

if len(lset) == 0 {
return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

var cw *receive.ConfigWatcher
if *hashringsFile != "" {
cw, err = receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, *hashringsFile, *refreshInterval)
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func runSidecar(
}

if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured")
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")
}

// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "not a block dir")
}

meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
// No meta or broken meta file.
return errors.Wrap(err, "read meta")
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met

// Best effort load from local dir.
if f.cacheDir != "" {
m, err := metadata.Read(cachedBlockDir)
m, err := metadata.ReadFromDir(cachedBlockDir)
if err == nil {
return m, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
resid = ulid.MustNew(ulid.Now(), entropy)

meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
return resid, errors.Wrap(err, "read meta file")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/indexheader/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestReaders(t *testing.T) {
db.Close()
*/

m, err := metadata.Read("./testdata/index_format_v1")
m, err := metadata.ReadFromDir("./testdata/index_format_v1")
testutil.Ok(t, err)
e2eutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String()))

Expand Down Expand Up @@ -312,7 +312,7 @@ func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *meta
}
*/

m, err := metadata.Read("./testdata/index_format_v2")
m, err := metadata.ReadFromDir("./testdata/index_format_v2")
testutil.Ok(t, err)
e2eutil.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String()))

Expand Down
26 changes: 19 additions & 7 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package metadata
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"

Expand Down Expand Up @@ -57,6 +56,8 @@ type Thanos struct {
// Version of Thanos meta file. If none specified, 1 is assumed (since first version did not have explicit version specified).
Version int `json:"version,omitempty"`

// Labels are the external labels identifying the producer as well as tenant.
// See https://thanos.io/tip/thanos/storage.md#external-labels for details.
Labels map[string]string `json:"labels"`
Downsample ThanosDownsample `json:"downsample"`

Expand Down Expand Up @@ -87,7 +88,7 @@ type ThanosDownsample struct {
// InjectThanos sets Thanos meta to the block meta JSON and saves it to the disk.
// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
newMeta, err := Read(bdir)
newMeta, err := ReadFromDir(bdir)
if err != nil {
return nil, errors.Wrap(err, "read new meta")
}
Expand Down Expand Up @@ -154,17 +155,23 @@ func renameFile(logger log.Logger, from, to string) error {
return pdir.Close()
}

// Read reads the given meta from <dir>/meta.json.
func Read(dir string) (*Meta, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, MetaFilename))
// ReadFromDir reads the given meta from <dir>/meta.json.
func ReadFromDir(dir string) (*Meta, error) {
f, err := os.Open(filepath.Join(dir, MetaFilename))
if err != nil {
return nil, err
}
var m Meta
return read(f)
}

if err := json.Unmarshal(b, &m); err != nil {
func read(rc io.ReadCloser) (_ *Meta, err error) {
defer runutil.ExhaustCloseWithErrCapture(&err, rc, "close meta JSON")

var m Meta
if err = json.NewDecoder(rc).Decode(&m); err != nil {
return nil, err
}

if m.Version != TSDBVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
Expand All @@ -178,5 +185,10 @@ func Read(dir string) (*Meta, error) {
if version != ThanosVersion1 {
return nil, errors.Errorf("unexpected meta file Thanos section version %d", m.Version)
}

if m.Thanos.Labels == nil {
// To avoid extra nil checks, allocate map here if empty.
m.Thanos.Labels = make(map[string]string)
}
return &m, nil
}
205 changes: 205 additions & 0 deletions pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"bytes"
"io/ioutil"
"testing"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestMeta_ReadWrite(t *testing.T) {
t.Run("empty write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
testutil.Ok(t, Meta{}.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000000000000000000000",
"minTime": 0,
"maxTime": 0,
"stats": {},
"compaction": {
"level": 0
},
"version": 0,
"thanos": {
"labels": null,
"downsample": {
"resolution": 0
},
"source": ""
}
}
`, b.String())
_, err := read(ioutil.NopCloser(&b))
testutil.NotOk(t, err)
testutil.Equals(t, "unexpected meta file version 0", err.Error())
})

t.Run("real write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Sources: []ulid.ULID{ulid.MustNew(1, nil), ulid.MustNew(2, nil)},
Parents: []tsdb.BlockDesc{
{
ULID: ulid.MustNew(3, nil),
MinTime: 442,
MaxTime: 24225,
},
},
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Version: 1,
Labels: map[string]string{"ext": "lset1"},
Source: ReceiveSource,
Files: []File{
{RelPath: "index", SizeBytes: 1313},
},
Downsample: ThanosDownsample{
Resolution: 123144,
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123,
"sources": [
"00000000010000000000000000",
"00000000020000000000000000"
],
"parents": [
{
"ulid": "00000000030000000000000000",
"minTime": 442,
"maxTime": 24225
}
]
},
"version": 1,
"thanos": {
"version": 1,
"labels": {
"ext": "lset1"
},
"downsample": {
"resolution": 123144
},
"source": "receive",
"files": [
{
"rel_path": "index",
"size_bytes": 1313
}
]
}
}
`, b.String())
retMeta, err := read(ioutil.NopCloser(&b))
testutil.Ok(t, err)
testutil.Equals(t, m1, *retMeta)
})

t.Run("missing external labels write/read/write", func(t *testing.T) {
b := bytes.Buffer{}
m1 := Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Compaction: tsdb.BlockMetaCompaction{
Sources: []ulid.ULID{ulid.MustNew(1, nil), ulid.MustNew(2, nil)},
Parents: []tsdb.BlockDesc{
{
ULID: ulid.MustNew(3, nil),
MinTime: 442,
MaxTime: 24225,
},
},
Level: 123,
},
Stats: tsdb.BlockStats{NumChunks: 14, NumSamples: 245, NumSeries: 4},
},
Thanos: Thanos{
Version: 1,
Source: ReceiveSource,
Files: []File{
{RelPath: "index", SizeBytes: 1313},
},
Downsample: ThanosDownsample{
Resolution: 123144,
},
},
}
testutil.Ok(t, m1.Write(&b))
testutil.Equals(t, `{
"ulid": "00000000050000000000000000",
"minTime": 2424,
"maxTime": 134,
"stats": {
"numSamples": 245,
"numSeries": 4,
"numChunks": 14
},
"compaction": {
"level": 123,
"sources": [
"00000000010000000000000000",
"00000000020000000000000000"
],
"parents": [
{
"ulid": "00000000030000000000000000",
"minTime": 442,
"maxTime": 24225
}
]
},
"version": 1,
"thanos": {
"version": 1,
"labels": null,
"downsample": {
"resolution": 123144
},
"source": "receive",
"files": [
{
"rel_path": "index",
"size_bytes": 1313
}
]
}
}
`, b.String())
retMeta, err := read(ioutil.NopCloser(&b))
testutil.Ok(t, err)

// We expect map to be empty but allocated.
testutil.Equals(t, map[string]string(nil), m1.Thanos.Labels)
m1.Thanos.Labels = map[string]string{}
testutil.Equals(t, m1, *retMeta)
})
}
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
return retry(errors.Wrapf(err, "download block %s", ie.id))
}

meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
return errors.Wrapf(err, "read meta from %s", bdir)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (
}
testutil.Ok(t, err)

meta, err := metadata.Read(filepath.Join(prepareDir, id.String()))
meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String()))
testutil.Ok(t, err)
metas = append(metas, meta)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func TestDownsample(t *testing.T) {
}
testutil.Ok(t, err)

_, err = metadata.Read(filepath.Join(dir, id.String()))
_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
Expand Down
11 changes: 10 additions & 1 deletion pkg/compact/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.

var res []*metadata.Meta
for _, pdir := range plan {
meta, err := metadata.Read(pdir)
meta, err := metadata.ReadFromDir(pdir)
if err != nil {
return nil, errors.Wrapf(err, "read meta from %s", pdir)
}
Expand Down Expand Up @@ -340,6 +340,15 @@ func TestPlanners_Plan_Compatibility(t *testing.T) {
},
} {
t.Run(c.name, func(t *testing.T) {
for _, e := range c.expected {
// Add here to avoid boilerplate.
e.Thanos.Labels = make(map[string]string)
}
for _, e := range c.metas {
// Add here to avoid boilerplate.
e.Thanos.Labels = make(map[string]string)
}

// For compatibility.
t.Run("tsdbPlannerAdapter", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test-compact")
Expand Down
2 changes: 1 addition & 1 deletion pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) {
if !fi.IsDir() {
continue
}
m, err := metadata.Read(dir)
m, err := metadata.ReadFromDir(dir)
if err != nil {
return nil, errors.Wrapf(err, "read metadata for block %v", dir)
}
Expand Down
Loading