Skip to content

Commit

Permalink
meta.json: Added meta.Thanos.Files section to meta.JSON. (#3396)
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Nov 4, 2020
1 parent 262efad commit 2ad3805
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 77 deletions.
60 changes: 52 additions & 8 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -66,6 +67,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id
// It makes sure cleanup is done on error to avoid partial block uploads.
// It also verifies basic features of Thanos block.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
// NOTE: Upload updates `meta.Thanos.File` section.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string) error {
df, err := os.Stat(bdir)
if err != nil {
Expand All @@ -91,8 +93,18 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.New("empty external labels are not allowed for Thanos block.")
}

if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(DebugMetas, fmt.Sprintf("%s.json", id))); err != nil {
return errors.Wrap(err, "upload meta file to debug dir")
meta.Thanos.Files, err = gatherFileStats(bdir)
if err != nil {
return errors.Wrap(err, "gather meta file stats")
}

metaEncoded := bytes.Buffer{}
if err := meta.Write(&metaEncoded); err != nil {
return errors.Wrap(err, "encode meta file")
}

if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), bytes.NewReader(metaEncoded.Bytes())); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload debug meta file"))
}

if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {
Expand All @@ -103,9 +115,8 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads.
if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), &metaEncoded); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload meta file"))
}

Expand Down Expand Up @@ -226,9 +237,7 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
// GetSegmentFiles returns list of segment files for given block. Paths are relative to the chunks directory.
// In case of errors, nil is returned.
func GetSegmentFiles(blockDir string) []string {
chunksDir := filepath.Join(blockDir, ChunksDirname)

files, err := ioutil.ReadDir(chunksDir)
files, err := ioutil.ReadDir(filepath.Join(blockDir, ChunksDirname))
if err != nil {
return nil
}
Expand All @@ -240,3 +249,38 @@ func GetSegmentFiles(blockDir string) []string {
}
return result
}

// TODO(bwplotka): Gather stats when dirctly uploading files.
func gatherFileStats(blockDir string) (res []metadata.File, _ error) {
files, err := ioutil.ReadDir(filepath.Join(blockDir, ChunksDirname))
if err != nil {
return nil, errors.Wrapf(err, "read dir %v", filepath.Join(blockDir, ChunksDirname))
}
for _, f := range files {
res = append(res, metadata.File{
RelPath: filepath.Join(ChunksDirname, f.Name()),
SizeBytes: f.Size(),
})
}

indexFile, err := os.Stat(filepath.Join(blockDir, IndexFilename))
if err != nil {
return nil, errors.Wrapf(err, "stat %v", filepath.Join(blockDir, IndexFilename))
}
res = append(res, metadata.File{
RelPath: indexFile.Name(),
SizeBytes: indexFile.Size(),
})

metaFile, err := os.Stat(filepath.Join(blockDir, MetaFilename))
if err != nil {
return nil, errors.Wrapf(err, "stat %v", filepath.Join(blockDir, MetaFilename))
}
res = append(res, metadata.File{RelPath: metaFile.Name()})

sort.Slice(res, func(i, j int) bool {
return strings.Compare(res[i].RelPath, res[j].RelPath) < 0
})
// TODO(bwplotka): Add optional files like tombstones?
return res, err
}
59 changes: 47 additions & 12 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -115,10 +116,7 @@ func TestUpload(t *testing.T) {
// Missing chunks.
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/chunks: no such file or directory"), "")

// Only debug meta.json present.
testutil.Equals(t, 1, len(bkt.Objects()))
testutil.Assert(t, strings.HasSuffix(err.Error(), "/chunks: no such file or directory"), err.Error())
}
testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String(), ChunksDirname), os.ModePerm))
e2eutil.Copy(t, path.Join(tmpDir, b1.String(), ChunksDirname, "000001"), path.Join(tmpDir, "test", b1.String(), ChunksDirname, "000001"))
Expand All @@ -127,9 +125,6 @@ func TestUpload(t *testing.T) {
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/index: no such file or directory"), "")

// Only debug meta.json present.
testutil.Equals(t, 1, len(bkt.Objects()))
}
e2eutil.Copy(t, path.Join(tmpDir, b1.String(), IndexFilename), path.Join(tmpDir, "test", b1.String(), IndexFilename))
testutil.Ok(t, os.Remove(path.Join(tmpDir, "test", b1.String(), MetaFilename)))
Expand All @@ -138,9 +133,6 @@ func TestUpload(t *testing.T) {
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "")

// Only debug meta.json present.
testutil.Equals(t, 1, len(bkt.Objects()))
}
e2eutil.Copy(t, path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename))
{
Expand All @@ -149,15 +141,58 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 365, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 562, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
"ulid": "%s",
"minTime": 0,
"maxTime": 1000,
"stats": {
"numSamples": 500,
"numSeries": 5,
"numChunks": 5
},
"compaction": {
"level": 1,
"sources": [
"%s"
]
},
"version": 1,
"thanos": {
"version": 1,
"labels": {
"ext1": "val1"
},
"downsample": {
"resolution": 124
},
"source": "test",
"files": [
{
"rel_path": "chunks/000001",
"size_bytes": 3751
},
{
"rel_path": "index",
"size_bytes": 401
},
{
"rel_path": "meta.json"
}
]
}
}
`, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Test Upload is idempotent.
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 365, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 562, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v unmarshal: %v", metaFile, err)
}

if m.Version != metadata.MetaVersion1 {
if m.Version != metadata.TSDBVersion1 {
return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version)
}

Expand All @@ -268,7 +268,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
level.Warn(f.logger).Log("msg", "best effort mkdir of the meta.json block dir failed; ignoring", "dir", cachedBlockDir, "err", err)
}

if err := metadata.Write(f.logger, cachedBlockDir, m); err != nil {
if err := m.WriteToDir(f.logger, cachedBlockDir); err != nil {
level.Warn(f.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,12 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
return resid, errors.Wrap(err, "rewrite block")
}
resmeta.Thanos.SegmentFiles = GetSegmentFiles(resdir)
if err := metadata.Write(logger, resdir, &resmeta); err != nil {
if err := resmeta.WriteToDir(logger, resdir); err != nil {
return resid, err
}
// TSDB may rewrite metadata in bdir.
// TODO: This is not needed in newer TSDB code. See https://github.com/prometheus/tsdb/pull/637.
if err := metadata.Write(logger, bdir, meta); err != nil {
if err := meta.WriteToDir(logger, bdir); err != nil {
return resid, err
}
return resid, nil
Expand Down
55 changes: 42 additions & 13 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package metadata

import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -37,11 +38,10 @@ const (
const (
// MetaFilename is the known JSON filename for meta information.
MetaFilename = "meta.json"
)

const (
// MetaVersion is a enumeration of meta versions supported by Thanos.
MetaVersion1 = iota + 1
// TSDBVersion1 is a enumeration of TSDB meta versions supported by Thanos.
TSDBVersion1 = 1
// ThanosVersion1 is a enumeration of Thanos section of TSDB meta supported by Thanos.
ThanosVersion1 = 1
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
Expand All @@ -54,14 +54,30 @@ type Meta struct {

// Thanos holds block meta information specific to Thanos.
type Thanos struct {
// Version of Thanos meta file. If none specified, 1 is assumed (since first version did not have explicit version specified).
Version int `json:"version,omitempty"`

Labels map[string]string `json:"labels"`
Downsample ThanosDownsample `json:"downsample"`

// Source is a real upload source of the block.
Source SourceType `json:"source"`

// List of segment files (in chunks directory), in sorted order. Optional.
// Deprecated. Use Files instead.
SegmentFiles []string `json:"segment_files,omitempty"`

// File is a sorted (by rel path) list of all files in block directory of this block known to TSDB.
// Sorted by relative path.
// Useful to avoid API call to get size of each file, as well as for debugging purposes.
// Optional, added in v0.17.0.
Files []File `json:"files,omitempty"`
}

type File struct {
RelPath string `json:"rel_path"`
// SizeBytes is optional (e.g meta.json does not show size).
SizeBytes int64 `json:"size_bytes,omitempty"`
}

type ThanosDownsample struct {
Expand All @@ -82,15 +98,15 @@ func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *
newMeta.Compaction = downsampledMeta.Compaction
}

if err := Write(logger, bdir, newMeta); err != nil {
if err := newMeta.WriteToDir(logger, bdir); err != nil {
return nil, errors.Wrap(err, "write new meta")
}

return newMeta, nil
}

// Write writes the given meta into <dir>/meta.json.
func Write(logger log.Logger, dir string, meta *Meta) error {
// WriteToDir writes the encoded meta into <dir>/meta.json.
func (m Meta) WriteToDir(logger log.Logger, dir string) error {
// Make any changes to the file appear atomic.
path := filepath.Join(dir, MetaFilename)
tmp := path + ".tmp"
Expand All @@ -100,10 +116,7 @@ func Write(logger log.Logger, dir string, meta *Meta) error {
return err
}

enc := json.NewEncoder(f)
enc.SetIndent("", "\t")

if err := enc.Encode(meta); err != nil {
if err := m.Write(f); err != nil {
runutil.CloseWithLogOnErr(logger, f, "close meta")
return err
}
Expand All @@ -113,6 +126,13 @@ func Write(logger log.Logger, dir string, meta *Meta) error {
return renameFile(logger, tmp, path)
}

// Write writes the given encoded meta to writer.
func (m Meta) Write(w io.Writer) error {
enc := json.NewEncoder(w)
enc.SetIndent("", "\t")
return enc.Encode(&m)
}

func renameFile(logger log.Logger, from, to string) error {
if err := os.RemoveAll(to); err != nil {
return err
Expand Down Expand Up @@ -145,8 +165,17 @@ func Read(dir string) (*Meta, error) {
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != MetaVersion1 {
if m.Version != TSDBVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
if m.Thanos.Version == 0 {
// For compatibility.
m.Thanos.Version = ThanosVersion1
return &m, nil
}

if m.Thanos.Version != ThanosVersion1 {
return nil, errors.Errorf("unexpected meta file Thanos section version %d", m.Version)
}
return &m, nil
}
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := os.MkdirAll(bdir, 0777); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "create planning block dir")
}
if err := metadata.Write(cg.logger, bdir, meta); err != nil {
if err := meta.WriteToDir(cg.logger, bdir); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "write planning meta file")
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ func (w *streamedBlockWriter) syncDir() (err error) {

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
w.meta.Version = metadata.TSDBVersion1
w.meta.Thanos.Source = metadata.CompactorSource
w.meta.Thanos.SegmentFiles = block.GetSegmentFiles(w.blockDir)
w.meta.Stats.NumChunks = w.totalChunks
w.meta.Stats.NumSamples = w.totalSamples
w.meta.Stats.NumSeries = w.seriesRefs

return metadata.Write(w.logger, w.blockDir, &w.meta)
return w.meta.WriteToDir(w.logger, w.blockDir)
}
2 changes: 1 addition & 1 deletion pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func testMeta(ulid ulid.ULID) *metadata.Meta {
Compaction: tsdb.BlockMetaCompaction{
Level: 1,
},
Version: metadata.MetaVersion1,
Version: metadata.TSDBVersion1,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
}
meta.Thanos.Source = s.source
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)
if err := metadata.Write(s.logger, updir, meta); err != nil {
if err := meta.WriteToDir(s.logger, updir); err != nil {
return errors.Wrap(err, "write meta file")
}
return block.Upload(ctx, s.logger, s.bucket, updir)
Expand Down
Loading

0 comments on commit 2ad3805

Please sign in to comment.