Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use prometheus/tsdb in ingester. #1427

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/prometheus/common v0.3.0
github.com/prometheus/prometheus v0.0.0-20190417125241-3cc5f9d88062
github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89
github.com/prometheus/tsdb v0.8.0
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sercand/kuberesolver v2.1.0+incompatible // indirect
github.com/stretchr/testify v1.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ github.com/prometheus/prometheus v0.0.0-20190417125241-3cc5f9d88062/go.mod h1:nq
github.com/prometheus/tsdb v0.7.0/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89 h1:r2TOLePIzxV3KQHQuFz5wPllXq+9Y3g0pjj989nizJo=
github.com/prometheus/tsdb v0.7.2-0.20190506134726-2ae028114c89/go.mod h1:fSI0j+IUQrDd7+ZtR9WKIGtoYAYAJUKcKhYLG25tN4g=
github.com/prometheus/tsdb v0.8.0 h1:w1tAGxsBMLkuGrFMhqgcCeBkM5d1YI24udArs+aASuQ=
github.com/prometheus/tsdb v0.8.0/go.mod h1:fSI0j+IUQrDd7+ZtR9WKIGtoYAYAJUKcKhYLG25tN4g=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rlmcpherson/s3gof3r v0.5.0/go.mod h1:s7vv7SMDPInkitQMuZzH615G7yWHdrU2r/Go7Bo71Rs=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestChunkCodec(t *testing.T) {
const fixedTimestamp = model.Time(1557654321000)

func encodeForCompatibilityTest(t *testing.T) {
dummy := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.Bigchunk, 1)
dummy := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.BigChunk, 1)
encoded, err := dummy.Encoded()
require.NoError(t, err)
fmt.Printf("%q\n%q\n", dummy.ExternalKey(), encoded)
Expand All @@ -142,7 +142,7 @@ func TestChunkDecodeBackwardsCompatibility(t *testing.T) {
have, err := ParseExternalKey(userID, "userID/fd3477666dacf92a:16aab37c8e8:16aab6eb768:38eb373c")
require.NoError(t, err)
require.NoError(t, have.Decode(decodeContext, rawData))
want := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.Bigchunk, 1)
want := dummyChunkForEncoding(fixedTimestamp, labelsForDummyChunks, encoding.BigChunk, 1)
// We can't just compare these two chunks, since the Bigchunk internals are different on construction and read-in.
// Compare the serialised version instead
require.NoError(t, have.Encode())
Expand Down
50 changes: 32 additions & 18 deletions pkg/chunk/encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
)

const samplesPerChunk = 120
Expand All @@ -22,18 +23,19 @@ type smallChunk struct {

// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no
// upperbound on number of samples it can contain.
type bigchunk struct {
type Bigchunk struct {
chunks []smallChunk

appender chunkenc.Appender
remainingSamples int
}

func newBigchunk() *bigchunk {
return &bigchunk{}
// NewBigchunk makes a new Bigchunk.
func NewBigchunk() *Bigchunk {
return &Bigchunk{}
}

func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
func (b *Bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
return addToOverflowChunk(b, sample)
Expand All @@ -50,7 +52,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
}

// addNextChunk adds a new XOR "subchunk" to the internal list of chunks.
func (b *bigchunk) addNextChunk(start model.Time) error {
func (b *Bigchunk) addNextChunk(start model.Time) error {
// To save memory, we "compact" the previous chunk - the array backing the slice
// will be upto 2x too big, and we can save this space.
const chunkCapacityExcess = 32 // don't bother copying if it's within this range
Expand Down Expand Up @@ -84,7 +86,7 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
return nil
}

func (b *bigchunk) Marshal(wio io.Writer) error {
func (b *Bigchunk) Marshal(wio io.Writer) error {
w := writer{wio}
if err := w.WriteVarInt16(uint16(len(b.chunks))); err != nil {
return err
Expand All @@ -101,12 +103,12 @@ func (b *bigchunk) Marshal(wio io.Writer) error {
return nil
}

func (b *bigchunk) MarshalToBuf(buf []byte) error {
func (b *Bigchunk) MarshalToBuf(buf []byte) error {
writer := bytes.NewBuffer(buf)
return b.Marshal(writer)
}

func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
func (b *Bigchunk) UnmarshalFromBuf(buf []byte) error {
r := reader{buf: buf}
numChunks, err := r.ReadUint16()
if err != nil {
Expand Down Expand Up @@ -144,23 +146,23 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
return nil
}

func (b *bigchunk) Encoding() Encoding {
return Bigchunk
func (b *Bigchunk) Encoding() Encoding {
return BigChunk
}

func (b *bigchunk) Utilization() float64 {
func (b *Bigchunk) Utilization() float64 {
return 1.0
}

func (b *bigchunk) Len() int {
func (b *Bigchunk) Len() int {
sum := 0
for _, c := range b.chunks {
sum += c.NumSamples()
}
return sum
}

func (b *bigchunk) Size() int {
func (b *Bigchunk) Size() int {
sum := 2 // For the number of sub chunks.
for _, c := range b.chunks {
sum += 2 // For the length of the sub chunk.
Expand All @@ -169,14 +171,14 @@ func (b *bigchunk) Size() int {
return sum
}

func (b *bigchunk) NewIterator() Iterator {
func (b *Bigchunk) NewIterator() Iterator {
return &bigchunkIterator{
bigchunk: b,
Bigchunk: b,
curr: b.chunks[0].Iterator(),
}
}

func (b *bigchunk) Slice(start, end model.Time) Chunk {
func (b *Bigchunk) Slice(start, end model.Time) Chunk {
i, j := 0, len(b.chunks)
for k := 0; k < len(b.chunks); k++ {
if b.chunks[k].end < int64(start) {
Expand All @@ -187,11 +189,23 @@ func (b *bigchunk) Slice(start, end model.Time) Chunk {
break
}
}
return &bigchunk{
return &Bigchunk{
chunks: b.chunks[i:j],
}
}

func (b *Bigchunk) AddSmallChunks(cs []chunks.Meta) {
scs := make([]smallChunk, len(cs), 0)
for _, c := range cs {
xoc := c.Chunk.(*chunkenc.XORChunk)
scs = append(scs, smallChunk{
XORChunk: xoc,
start: c.MinTime,
end: c.MaxTime,
})
}
}

type writer struct {
io.Writer
}
Expand Down Expand Up @@ -227,7 +241,7 @@ func (r *reader) ReadBytes(count int) ([]byte, error) {
}

type bigchunkIterator struct {
*bigchunk
*Bigchunk

curr chunkenc.Iterator
i int
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/encoding/bigchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestSliceBiggerChunk(t *testing.T) {
var c Chunk = newBigchunk()
var c Chunk = NewBigchunk()
for i := 0; i < 12*3600/15; i++ {
cs, err := c.Add(model.SamplePair{
Timestamp: model.Time(i * step),
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestSliceBiggerChunk(t *testing.T) {

func BenchmarkBiggerChunkMemory(b *testing.B) {
for i := 0; i < b.N; i++ {
var c Chunk = newBigchunk()
var c Chunk = NewBigchunk()
for i := 0; i < 12*3600/15; i++ {
cs, err := c.Add(model.SamplePair{
Timestamp: model.Time(i * step),
Expand All @@ -70,12 +70,12 @@ func BenchmarkBiggerChunkMemory(b *testing.B) {
c = cs[0]
}

c.(*bigchunk).printSize()
c.(*Bigchunk).printSize()
}
}

// printSize calculates various sizes of the chunk when encoded, and in memory.
func (b *bigchunk) printSize() {
func (b *Bigchunk) printSize() {
var buf bytes.Buffer
b.Marshal(&buf)

Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestChunk(t *testing.T) {
}{
{DoubleDelta, 989},
{Varbit, 2048},
{Bigchunk, 4096},
{BigChunk, 4096},
} {
for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 {

Expand Down
6 changes: 3 additions & 3 deletions pkg/chunk/encoding/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
// Varbit encoding
Varbit
// Bigchunk encoding
Bigchunk
BigChunk
)

type encoding struct {
Expand All @@ -66,10 +66,10 @@ var encodings = map[Encoding]encoding{
return newVarbitChunk(varbitZeroEncoding)
},
},
Bigchunk: {
BigChunk: {
Name: "Bigchunk",
New: func() Chunk {
return newBigchunk()
return NewBigchunk()
},
},
}
Expand Down
Loading