Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Provide option to compress WAL records (#609)
Browse files Browse the repository at this point in the history
In running Prometheus instances, compressing the records was shown to
reduce disk usage by half while incurring a negligible CPU cost.

Signed-off-by: Chris Marchbanks <[email protected]>
  • Loading branch information
csmarchbanks authored and krasi-georgiev committed Jun 19, 2019
1 parent c2c921a commit b40cc43
Show file tree
Hide file tree
Showing 16 changed files with 681 additions and 507 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## master / unreleased

- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)

## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
Expand Down
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Checkpoint(w *wal.WAL, from, to int, keep func(id uint64) bool, mint int64)
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return nil, errors.Wrap(err, "create checkpoint dir")
}
cp, err := wal.New(nil, nil, cpdirtmp)
cp, err := wal.New(nil, nil, cpdirtmp, w.CompressionEnabled())
if err != nil {
return nil, errors.Wrap(err, "open checkpoint")
}
Expand Down
188 changes: 96 additions & 92 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) {
}

func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())

// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"))
testutil.Ok(t, err)

// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024)
testutil.Ok(t, err)
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))

last += 100
}
testutil.Ok(t, w.Close())
var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())

_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
testutil.Ok(t, err)

// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])
// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
defer sr.Close()
// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024, compress)
testutil.Ok(t, err)

var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)
var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))

last += 100
}
testutil.Ok(t, w.Close())

for r.Next() {
rec := r.Record()
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])

sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
defer sr.Close()

var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)

for r.Next() {
rec := r.Record()

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
}
}
}
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
})
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
Expand All @@ -197,7 +201,7 @@ func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
w, err := wal.NewSize(nil, nil, dir, 64*1024)
w, err := wal.NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err)
testutil.Ok(t, w.Log([]byte{99}))
w.Close()
Expand Down
6 changes: 5 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var DefaultOptions = &Options{
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
NoLockfile: false,
AllowOverlappingBlocks: false,
WALCompression: false,
}

// Options of the DB storage.
Expand Down Expand Up @@ -80,6 +81,9 @@ type Options struct {
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
// This in-turn enables vertical compaction and vertical query merge.
AllowOverlappingBlocks bool

// WALCompression will turn on Snappy compression for records on the WAL.
WALCompression bool
}

// Appender allows appending a batch of data. It must be completed with a
Expand Down Expand Up @@ -306,7 +310,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize)
wlog, err = wal.NewSize(l, r, filepath.Join(dir, "wal"), segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
}()

testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
Expand Down Expand Up @@ -1454,7 +1454,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
createBlock(t, dir, genSeries(1, 1, 1000, 6000))

testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954
github.com/go-kit/kit v0.8.0
github.com/golang/snappy v0.0.1
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
Expand Down
Loading

0 comments on commit b40cc43

Please sign in to comment.