Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wal): Add sizing information to writer and reader. #11

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 7 additions & 63 deletions pkg/storage/wal/chunks/chunks_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package chunks

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"
)

func TestChunkReaderWriter(t *testing.T) {
Expand Down Expand Up @@ -121,11 +119,11 @@ func TestChunkReaderWriter(t *testing.T) {
}

func TestChunkReaderWriterWithLogGenerator(t *testing.T) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
t.Run(filename, func(t *testing.T) {
gen := newLogGenerator(t, filename)
gen := testdata.NewLogGenerator(t, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -196,10 +194,10 @@ var (

// Benchmark reads with log generator
func BenchmarkReadChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()
for _, filename := range filenames {
b.Run(filename, func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -239,12 +237,12 @@ func BenchmarkReadChunkWithLogGenerator(b *testing.B) {

// Benchmark with log generator
func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
for _, count := range []int{1000, 10000, 100000} {
b.Run(fmt.Sprintf("%s-%d", filename, count), func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -278,24 +276,6 @@ func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
}
}

func testDataFile() []string {
testdataDir := "../testdata"
files, err := os.ReadDir(testdataDir)
if err != nil {
panic(err)
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}

// generateLogEntries generates a slice of logproto.Entry with the given count.
func generateLogEntries(count int) []*logproto.Entry {
entries := make([]*logproto.Entry, count)
Expand All @@ -307,39 +287,3 @@ func generateLogEntries(count int) []*logproto.Entry {
}
return entries
}

type logGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *logGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *logGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *logGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func newLogGenerator(t testing.TB, filename string) *logGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &logGenerator{
f: file,
s: bufio.NewScanner(file),
}
}
35 changes: 33 additions & 2 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type streamID struct {
}

type SegmentWriter struct {
streams *swiss.Map[streamID, *streamSegment]
buf1 encoding.Encbuf
streams *swiss.Map[streamID, *streamSegment]
buf1 encoding.Encbuf
inputSize int64
}

type streamSegment struct {
Expand All @@ -61,6 +62,9 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
if len(entries) == 0 {
return
}
for _, e := range entries {
b.inputSize += int64(len(e.Line))
}
id := streamID{labels: labelsString, tenant: tenantID}
s, ok := b.streams.Get(id)
if !ok {
Expand Down Expand Up @@ -224,6 +228,13 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
func (b *SegmentWriter) Reset() {
b.streams.Clear()
b.buf1.Reset()
b.inputSize = 0
}

// InputSize returns the total size of the input data written to the writer.
// It doesn't account for timestamps and labels.
func (b *SegmentWriter) InputSize() int64 {
return b.inputSize
}

type SegmentReader struct {
Expand Down Expand Up @@ -332,3 +343,23 @@ func (r *SegmentReader) Series(ctx context.Context) (*SeriesIter, error) {

return NewSeriesIter(r.idr, ps, r.b), nil
}

type Sizes struct {
Index int64
Series []int64
}

func (r *SegmentReader) Sizes() (Sizes, error) {
var sizes Sizes
sizes.Index = int64(r.idr.Size())
it, err := r.Series(context.Background())
if err != nil {
return sizes, err
}
sizes.Series = []int64{}
for it.Next() {
_, size := it.chunksMeta[0].Ref.Unpack()
sizes.Series = append(sizes.Series, int64(size))
}
return sizes, err
}
67 changes: 67 additions & 0 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"testing"
"time"

"github.com/dustin/go-humanize"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -186,3 +188,68 @@ func TestMultiTenantWrite(t *testing.T) {
require.NoError(t, iter.Err())
require.ElementsMatch(t, expectedSeries, actualSeries)
}

func TestCompression(t *testing.T) {
size := []int64{250 * 1024, 500 * 1024, 750 * 1024, 1 << 20, 2 << 20, 5 << 20, 10 << 20, 20 << 20, 50 << 20, 100 << 20}
for _, s := range size {
t.Run(fmt.Sprintf("size %.2f", float64(s)/(1024*1024)), func(t *testing.T) {
testCompression(t, s)
})
}
}

func testCompression(t *testing.T, maxInputSize int64) {
w := NewWalSegmentWriter()
dst := bytes.NewBuffer(nil)
files := testdata.Files()
lbls := []labels.Labels{}
generators := []*testdata.LogGenerator{}

for _, file := range files {
lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "dev"))
lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "prod"))
g := testdata.NewLogGenerator(t, file)
generators = append(generators, g, g)
}
inputSize := int64(0)
for inputSize < maxInputSize {
for i, lbl := range lbls {
more, line := generators[i].Next()
if !more {
continue
}
inputSize += int64(len(line))
w.Append("tenant", lbl.String(), lbl, []*push.Entry{
{Timestamp: time.Unix(0, int64(i*1e9)), Line: string(line)},
})
}
}

require.Equal(t, inputSize, w.InputSize())

now := time.Now()
n, err := w.WriteTo(dst)
require.NoError(t, err)
require.True(t, n > 0)
compressionTime := time.Since(now)

r, err := NewReader(dst.Bytes())
require.NoError(t, err)
inputSizeMB := float64(w.InputSize()) / (1024 * 1024)
outputSizeMB := float64(dst.Len()) / (1024 * 1024)
compressionRatio := (1 - (outputSizeMB / inputSizeMB)) * 100

t.Logf("Input Size: %s\n", humanize.Bytes(uint64(w.InputSize())))
t.Logf("Output Size: %s\n", humanize.Bytes(uint64(dst.Len())))
t.Logf("Compression Ratio: %.2f%%\n", compressionRatio)
t.Logf("Write time: %s\n", compressionTime)
sizes, err := r.Sizes()
require.NoError(t, err)
t.Logf("Total chunks %d\n", len(sizes.Series))
t.Logf("Index size %s\n", humanize.Bytes(uint64(sizes.Index)))
sizesString := ""
for _, size := range sizes.Series {
sizesString += humanize.Bytes(uint64(size)) + ", "
}
t.Logf("Series sizes: [%s]\n", sizesString)
}
71 changes: 71 additions & 0 deletions pkg/storage/wal/testdata/generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package testdata

import (
"bufio"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

type LogGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *LogGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *LogGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *LogGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func NewLogGenerator(t testing.TB, filename string) *LogGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &LogGenerator{
f: file,
s: bufio.NewScanner(file),
}
}

func Files() []string {
testdataDir := "./testdata"
files, err := os.ReadDir(testdataDir)
if err != nil && !os.IsNotExist(err) {
if !os.IsNotExist(err) {
panic(err)
}
testdataDir = "../testdata"
files, err = os.ReadDir(testdataDir)
if err != nil {
panic(err)
}
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}
Loading