Skip to content

Commit

Permalink
Fixed Downsampling process; Fixed runutil.CloseAndCaptureErr
Browse files Browse the repository at this point in the history
Fixes #1065

Root cause:
* runutil defered capture error function was not passing error properly so unit tests were passing, event though there was bug
* streamed block write index cache requires index file which was not closed (saved) properly yet. Closers need to be closed to perform this.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Apr 23, 2019
1 parent 3e04af3 commit ddc5b1d
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 32 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased


### Fixed

- [#1070](https://github.com/improbable-eng/thanos/pull/1070) Downsampling works back again. Deferred closer errors are now properly captured.


## [v0.4.0-rc.0](https://github.com/improbable-eng/thanos/releases/tag/v0.4.0-rc.0) - 2019.04.18

:warning: **IMPORTANT** :warning: This is the last release that supports gossip. From Thanos v0.5.0, gossip will be completely removed.
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ func Downsample(
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer")
defer runutil.CloseWithErrCapture(&err, streamedBlockWriter, "close stream block writer")

postings, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
return id, errors.Wrap(err, "get all postings list")
}

var (
aggrChunks []*AggrChunk
all []sample
Expand Down
3 changes: 3 additions & 0 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *metadata.Meta
id, err := Downsample(log.NewNopLogger(), meta, mb, dir, resolution)
testutil.Ok(t, err)

_, err = metadata.Read(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

exp := map[uint64]map[AggrType][]sample{}
got := map[uint64]map[AggrType][]sample{}

Expand Down
52 changes: 25 additions & 27 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,20 @@ func (w *streamedBlockWriter) Close() error {
if w.finalized {
return nil
}

var merr tsdb.MultiError
w.finalized = true

// Finalise data block only if there wasn't any internal errors.
if !w.ignoreFinalize {
merr.Add(w.finalize())
}
merr := tsdb.MultiError{}

for _, cl := range w.closers {
merr.Add(cl.Close())
if w.ignoreFinalize {
// Close open file descriptors anyway.
for _, cl := range w.closers {
merr.Add(cl.Close())
}
return merr.Err()
}

return errors.Wrap(merr.Err(), "close closers")
}
// Finalize saves prepared index and metadata to corresponding files.

// finalize saves prepared index and meta data to corresponding files.
// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway,
// so it's a caller's responsibility to remove the block's directory.
func (w *streamedBlockWriter) finalize() error {
if err := w.writeLabelSets(); err != nil {
return errors.Wrap(err, "write label sets")
}
Expand All @@ -195,7 +189,15 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "write mem postings")
}

if err := w.writeIndexCache(); err != nil {
for _, cl := range w.closers {
merr.Add(cl.Close())
}

if err := block.WriteIndexCache(
w.logger,
filepath.Join(w.blockDir, block.IndexFilename),
filepath.Join(w.blockDir, block.IndexCacheFilename),
); err != nil {
return errors.Wrap(err, "write index cache")
}

Expand All @@ -207,8 +209,14 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "sync blockDir")
}

if err := merr.Err(); err != nil {
return errors.Wrap(err, "finalize")
}

// No error, claim success.

level.Info(w.logger).Log(
"msg", "write downsampled block",
"msg", "finalized downsampled block",
"mint", w.meta.MinTime,
"maxt", w.meta.MaxTime,
"ulid", w.meta.ULID,
Expand All @@ -224,7 +232,7 @@ func (w *streamedBlockWriter) syncDir() (err error) {
return errors.Wrap(err, "open temporary block blockDir")
}

defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir")
defer runutil.CloseWithErrCapture(&err, df, "close temporary block blockDir")

if err := fileutil.Fsync(df); err != nil {
return errors.Wrap(err, "sync temporary blockDir")
Expand Down Expand Up @@ -257,16 +265,6 @@ func (w *streamedBlockWriter) writeMemPostings() error {
return nil
}

func (w *streamedBlockWriter) writeIndexCache() error {
indexFile := filepath.Join(w.blockDir, block.IndexFilename)
indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename)
if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil {
return errors.Wrap(err, "write index cache")
}

return nil
}

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
Expand Down
5 changes: 3 additions & 2 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...
}

// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function)
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
var merr tsdb.MultiError
merr := tsdb.MultiError{}

merr.Add(*err)
merr.Add(errors.Wrapf(closer.Close(), format, a...))

*err = merr.Err()
}
4 changes: 2 additions & 2 deletions pkg/testutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (p *Prometheus) SetConfig(s string) (err error) {
if err != nil {
return err
}
defer runutil.CloseWithErrCapture(nil, &err, f, "prometheus config")
defer runutil.CloseWithErrCapture(&err, f, "prometheus config")

_, err = f.Write([]byte(s))
return err
Expand Down Expand Up @@ -302,7 +302,7 @@ func createBlock(
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer runutil.CloseWithErrCapture(log.NewNopLogger(), &err, h, "TSDB Head")
defer runutil.CloseWithErrCapture(&err, h, "TSDB Head")

var g errgroup.Group
var timeStepSize = (maxt - mint) / int64(numSamples+1)
Expand Down

0 comments on commit ddc5b1d

Please sign in to comment.