Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from JSON to Protobuf+Snappy format for index cache #1013

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.8.0
github.com/gogo/protobuf v1.2.0
github.com/golang/protobuf v1.3.1 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/martian v2.1.0+incompatible // indirect
github.com/googleapis/gax-go v2.0.2+incompatible // indirect
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c h1:Y5u
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/go-autorest v10.8.1+incompatible h1:u0jVQf+a6k6x8A+sT60l6EY9XZu+kHdnZVPAYqpVRo0=
github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0=
github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
Expand Down Expand Up @@ -83,6 +81,8 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20160529050041-d9eb7a3d35ec/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -272,6 +272,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.19.0 h1:+jrnNy8MR4GZXvwF9PEuSyHxA4NaTf6601oNRwCSXq0=
Expand Down
5 changes: 4 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
"github.com/prometheus/tsdb/labels"
)

// IndexCacheFilename is the canonical name for index cache files.
// IndexCacheFilename is the canonical name for index cache files in JSON.
const IndexCacheFilename = "index.cache.json"

// indexCacheBinaryFilename is the canonical name for index cache files in a binary format.
const indexCacheBinaryFilename = "index.cache.dat"

type postingsRange struct {
Name, Value string
Start, End int64
Expand Down
96 changes: 96 additions & 0 deletions pkg/block/indexcache/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package indexcache

import (
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/tsdb/labels"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randRunes(n int, runes []rune) string {
b := make([]rune, n)
for i := range b {
b[i] = runes[rand.Intn(len(runes))]
}
return string(b)
}

func benchmarkJSON(n int, b *testing.B) {
tmpDir, err := ioutil.TempDir("", "test-json-encode")
testutil.Ok(b, err)
defer func() { testutil.Ok(b, os.RemoveAll(tmpDir)) }()

lbls := []labels.Labels{}
for i := 0; i < n; i++ {
lbls = append(lbls, labels.Labels{labels.Label{Name: randRunes(10, letterRunes), Value: randRunes(10, letterRunes)}})
}

block, err := testutil.CreateBlock(tmpDir, lbls, 100, 0, 1000, nil, 124)
testutil.Ok(b, err)

fn := filepath.Join(tmpDir, "index.cache.json")
l := log.NewNopLogger()
j := JSONCache{logger: l}

testutil.Ok(b, j.WriteIndexCache(filepath.Join(tmpDir, block.String(), "index"), fn))

version, symbols, lvals, postings, err := j.ReadIndexCache(fn)
testutil.Ok(b, err)
var _ = version
var _ = lvals
var _ = symbols
var _ = postings
}

func benchmarkBinary(n int, b *testing.B) {
tmpDir, err := ioutil.TempDir("", "test-bin-encode")
testutil.Ok(b, err)
defer func() { testutil.Ok(b, os.RemoveAll(tmpDir)) }()

lbls := []labels.Labels{}
for i := 0; i < n; i++ {
lbls = append(lbls, labels.Labels{labels.Label{Name: randRunes(10, letterRunes), Value: randRunes(10, letterRunes)}})
}

block, err := testutil.CreateBlock(tmpDir, lbls, 100, 0, 1000, nil, 124)
testutil.Ok(b, err)

fn := filepath.Join(tmpDir, "index.cache.dat")
l := log.NewNopLogger()
bCache := BinaryCache{logger: l}

testutil.Ok(b, bCache.WriteIndexCache(filepath.Join(tmpDir, block.String(), "index"), fn))

version, symbols, lvals, postings, err := bCache.ReadIndexCache(fn)
testutil.Ok(b, err)
var _ = version
var _ = lvals
var _ = symbols
var _ = postings
}

func BenchmarkJSON1(b *testing.B) { benchmarkJSON(1000, b) }
func BenchmarkJSON2(b *testing.B) { benchmarkJSON(2000, b) }
func BenchmarkJSON3(b *testing.B) { benchmarkJSON(3000, b) }
func BenchmarkJSON4(b *testing.B) { benchmarkJSON(4000, b) }
func BenchmarkJSON10(b *testing.B) { benchmarkJSON(10000, b) }
func BenchmarkJSON20(b *testing.B) { benchmarkJSON(20000, b) }

func BenchmarkBinary1(b *testing.B) { benchmarkBinary(1000, b) }
func BenchmarkBinary2(b *testing.B) { benchmarkBinary(2000, b) }
func BenchmarkBinary3(b *testing.B) { benchmarkBinary(3000, b) }
func BenchmarkBinary4(b *testing.B) { benchmarkBinary(4000, b) }
func BenchmarkBinary10(b *testing.B) { benchmarkBinary(10000, b) }
func BenchmarkBinary20(b *testing.B) { benchmarkBinary(20000, b) }
174 changes: 174 additions & 0 deletions pkg/block/indexcache/binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package indexcache

import (
"io/ioutil"
"os"

"github.com/golang/snappy"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
)

// BinaryCache is a binary index cache that uses protobufs + snappy compression.
type BinaryCache struct {
IndexCache

logger log.Logger
}

// WriteIndexCache writes an index cache into the specified filename.
func (c *BinaryCache) WriteIndexCache(indexFn string, fn string) error {
indexFile, err := fileutil.OpenMmapFile(indexFn)
if err != nil {
return errors.Wrapf(err, "open mmap index file %s", indexFn)
}
defer runutil.CloseWithLogOnErr(c.logger, indexFile, "close index cache mmap file from %s", indexFn)

b := realByteSlice(indexFile.Bytes())
indexr, err := index.NewReader(b)
if err != nil {
return errors.Wrap(err, "open index reader")
}
defer runutil.CloseWithLogOnErr(c.logger, indexr, "load index cache reader")

// We assume reader verified index already.
symbols, err := getSymbolTableJSON(b)
if err != nil {
return err
}

f, err := os.Create(fn)
if err != nil {
return errors.Wrap(err, "create index cache file")
}
defer runutil.CloseWithLogOnErr(c.logger, f, "index cache writer")

// Extract label value indices.
lnames, err := indexr.LabelNames()
if err != nil {
return errors.Wrap(err, "read label indices")
}
labelValues := map[string]*Values{}
for _, lns := range lnames {
if len(lns) != 1 {
continue
}
ln := string(lns[0])

tpls, err := indexr.LabelValues(ln)
if err != nil {
return errors.Wrap(err, "get label values")
}
vals := Values{}

for i := 0; i < tpls.Len(); i++ {
v, err := tpls.At(i)
if err != nil {
return errors.Wrap(err, "get label value")
}
if len(v) != 1 {
return errors.Errorf("unexpected tuple length %d", len(v))
}
vals.Val = append(vals.Val, v[0])
}

labelValues[ln] = &vals
}

// Extract postings ranges.
pranges, err := indexr.PostingsRanges()
if err != nil {
return errors.Wrap(err, "read postings ranges")
}
postings := []*PostingsRange{}
for l, rng := range pranges {
postings = append(postings, &PostingsRange{
Name: l.Name,
Value: l.Value,
Start: rng.Start,
End: rng.End,
})
}

indexCache := Cache{
Version: int32(indexr.Version()),
Symbols: symbols,
Labelvalues: labelValues,
Postings: postings,
}

snappyWriter := snappy.NewBufferedWriter(f)
defer runutil.CloseWithLogOnErr(c.logger, f, "snappy index cache writer")
snappyBuffer, err := indexCache.Marshal()
if err != nil {
return errors.Wrap(err, "index cache marshal")
}
if _, err := snappyWriter.Write(snappyBuffer); err != nil {
return errors.Wrap(err, "snappy writer write index cache")
}
if err := snappyWriter.Flush(); err != nil {
return errors.Wrap(err, "snappy writer flush")
}

return nil
}

// ReadIndexCache reads the index cache from the specified file.
func (c *BinaryCache) ReadIndexCache(fn string) (version int,
symbols map[uint32]string,
lvals map[string][]string,
postings map[labels.Label]index.Range,
err error) {
indexFile, err := os.Open(fn)
if err != nil {
return 0, nil, nil, nil, errors.Wrapf(err, "open index file %s", fn)
}
defer runutil.CloseWithLogOnErr(c.logger, indexFile, "index cache reader")
snappyReader := snappy.NewReader(indexFile)
indexContent, err := ioutil.ReadAll(snappyReader)
if err != nil {
return 0, nil, nil, nil, errors.Wrap(err, "snappy read all index cache")
}
bCache := &Cache{}
if err = bCache.Unmarshal(indexContent); err != nil {
return 0, nil, nil, nil, errors.Wrap(err, "unmarshal index content")
}

strs := map[string]string{}
lvals = make(map[string][]string, len(bCache.Labelvalues))
postings = make(map[labels.Label]index.Range, len(bCache.Postings))

// Most strings we encounter are duplicates. Dedup string objects that we keep
// around after the function returns to reduce total memory usage.
// NOTE(fabxc): it could even make sense to deduplicate globally.
getStr := func(s string) string {
if cs, ok := strs[s]; ok {
return cs
}
strs[s] = s
return s
}

for o, s := range bCache.Symbols {
bCache.Symbols[o] = getStr(s)
}
for ln, vals := range bCache.Labelvalues {
for i := range vals.Val {
vals.Val[i] = getStr(vals.Val[i])
}
lvals[getStr(ln)] = vals.Val
}
for _, e := range bCache.Postings {
l := labels.Label{
Name: getStr(e.Name),
Value: getStr(e.Value),
}
postings[l] = index.Range{Start: e.Start, End: e.End}
}
return int(bCache.Version), bCache.Symbols, lvals, postings, nil
}
49 changes: 49 additions & 0 deletions pkg/block/indexcache/binary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package indexcache

import (
"io/ioutil"
"os"
"path/filepath"
"testing"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/tsdb/labels"
)

func TestWriteReadBinaryCache(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-compact-prepare")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

b, err := testutil.CreateBlock(tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, nil, 124)
testutil.Ok(t, err)

l := log.NewNopLogger()
bCache := BinaryCache{logger: l}

fn := filepath.Join(tmpDir, "index.cache.dat")
testutil.Ok(t, bCache.WriteIndexCache(filepath.Join(tmpDir, b.String(), "index"), fn))

version, symbols, lvals, postings, err := bCache.ReadIndexCache(fn)
testutil.Ok(t, err)

testutil.Equals(t, 6, len(symbols))
testutil.Equals(t, 2, len(lvals))
testutil.Equals(t, 2, version)

vals, ok := lvals["a"]
testutil.Assert(t, ok, "")
testutil.Equals(t, []string{"1", "2", "3", "4"}, vals)

vals, ok = lvals["b"]
testutil.Assert(t, ok, "")
testutil.Equals(t, []string{"1"}, vals)
testutil.Equals(t, 6, len(postings))
}
Loading