Skip to content

Commit

Permalink
Improve protobuf serialization (#2031)
Browse files Browse the repository at this point in the history
* Generate custom proto type for Streams.

This is the first step to improve protobuf serialization.

Signed-off-by: Cyril Tovena <[email protected]>

* Improve protobuf serialization allocations.

Signed-off-by: Cyril Tovena <[email protected]>

* Saves allocations by stringifying labels only once.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored May 11, 2020
1 parent bce4470 commit 9988ce4
Show file tree
Hide file tree
Showing 40 changed files with 1,063 additions and 449 deletions.
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (d *Distributor) stopping(_ error) error {

// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
stream *logproto.Stream
stream logproto.Stream
minSuccess int
maxFailures int
succeeded int32
Expand Down Expand Up @@ -329,7 +329,7 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
}

req := &logproto.PushRequest{
Streams: make([]*logproto.Stream, len(streams)),
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.stream
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri

func makeWriteRequest(lines int, size int) *logproto.PushRequest {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Labels: `{foo="bar"}`,
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (v Validator) ValidateEntry(userID string, labels string, entry logproto.En
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, stream *logproto.Stream) error {
func (v Validator) ValidateLabels(userID string, stream logproto.Stream) error {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
Expand Down Expand Up @@ -93,7 +93,7 @@ func (v Validator) ValidateLabels(userID string, stream *logproto.Stream) error
return nil
}

func updateMetrics(reason, userID string, stream *logproto.Stream) {
func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateLabels(tt.userID, &logproto.Stream{Labels: tt.labels})
err = v.ValidateLabels(tt.userID, logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err)
})
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
// checkData only iterates between unix seconds 0 and 1000
now := time.Unix(0, 0)

req := &logproto.PushRequest{Streams: []*logproto.Stream{
req := &logproto.PushRequest{Streams: []logproto.Stream{
// some colliding label sets
{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}.String(), Entries: entries(5, now)},
Expand All @@ -95,7 +95,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))

// verify that we get all the data back
store.checkData(t, map[string][]*logproto.Stream{userID: req.Streams})
store.checkData(t, map[string][]logproto.Stream{userID: req.Streams})

// make sure all chunks have different fingerprint, even colliding ones.
chunkFingerprints := map[model.Fingerprint]bool{}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestFlushMaxAge(t *testing.T) {
{Timestamp: now.Add(time.Second * 61), Line: "3"},
}

req := &logproto.PushRequest{Streams: []*logproto.Stream{
req := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries},
}}

Expand All @@ -138,9 +138,9 @@ func TestFlushMaxAge(t *testing.T) {
time.Sleep(2 * cfg.FlushCheckPeriod)

// ensure chunk is not flushed after flush period elapses
store.checkData(t, map[string][]*logproto.Stream{})
store.checkData(t, map[string][]logproto.Stream{})

req2 := &logproto.PushRequest{Streams: []*logproto.Stream{
req2 := &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries},
}}

Expand All @@ -150,7 +150,7 @@ func TestFlushMaxAge(t *testing.T) {
time.Sleep(2 * cfg.FlushCheckPeriod)

// assert stream is now both batches
store.checkData(t, map[string][]*logproto.Stream{
store.checkData(t, map[string][]logproto.Stream{
userID: {
{Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)},
},
Expand Down Expand Up @@ -235,11 +235,11 @@ func (s *testStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter

func (s *testStore) Stop() {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]*logproto.Stream {
func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
userIDs := []string{"1", "2", "3"}

// Create test samples.
testData := map[string][]*logproto.Stream{}
testData := map[string][]logproto.Stream{}
for i, userID := range userIDs {
testData[userID] = buildTestStreams(i)
}
Expand All @@ -255,8 +255,8 @@ func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]*logp
return testData
}

func buildTestStreams(offset int) []*logproto.Stream {
var m []*logproto.Stream
func buildTestStreams(offset int) []logproto.Stream {
var m []logproto.Stream
for i := 0; i < numSeries; i++ {
ss := logproto.Stream{
Labels: model.Metric{
Expand All @@ -270,7 +270,7 @@ func buildTestStreams(offset int) []*logproto.Stream {
Line: "line",
})
}
m = append(m, &ss)
m = append(m, ss)
}

sort.Slice(m, func(i, j int) bool {
Expand All @@ -281,15 +281,15 @@ func buildTestStreams(offset int) []*logproto.Stream {
}

// check that the store is holding data equivalent to what we expect
func (s *testStore) checkData(t *testing.T, testData map[string][]*logproto.Stream) {
func (s *testStore) checkData(t *testing.T, testData map[string][]logproto.Stream) {
for userID, expected := range testData {
streams := s.getStreamsForUser(t, userID)
require.Equal(t, expected, streams)
}
}

func (s *testStore) getStreamsForUser(t *testing.T, userID string) []*logproto.Stream {
var streams []*logproto.Stream
func (s *testStore) getStreamsForUser(t *testing.T, userID string) []logproto.Stream {
var streams []logproto.Stream
for _, c := range s.getChunksForUser(userID) {
lokiChunk := c.Data.(*chunkenc.Facade).LokiChunk()
streams = append(streams, buildStreamsFromChunk(t, c.Metric.String(), lokiChunk))
Expand All @@ -307,11 +307,11 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
return s.chunks[userID]
}

func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) logproto.Stream {
it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil)
require.NoError(t, err)

stream := &logproto.Stream{
stream := logproto.Stream{
Labels: labels,
}
for it.Next() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestIngester(t *testing.T) {
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

req := logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
return appendErr
}

func (i *instance) getOrCreateStream(pushReqStream *logproto.Stream) (*stream, error) {
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestLabelsCollisions(t *testing.T) {
tt := time.Now().Add(-5 * time.Minute)

// Notice how labels aren't sorted.
err = i.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
err = i.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{
// both label sets have FastFingerprint=e002a3a451262627
{Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"1\"}", Entries: entries(5, tt.Add(time.Minute))},
{Labels: "{uniq0=\"1\",app=\"m\",uniq1=\"1\"}", Entries: entries(5, tt)},
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestConcurrentPushes(t *testing.T) {
tt := time.Now().Add(-5 * time.Minute)

for i := 0; i < iterations; i++ {
err := inst.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
err := inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{
{Labels: labels, Entries: entries(entriesPerIteration, tt)},
}})

Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSyncPeriod(t *testing.T) {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}
pr := &logproto.PushRequest{Streams: []*logproto.Stream{{Labels: lbls, Entries: result}}}
pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls, Entries: result}}}
err = inst.Push(context.Background(), pr)
require.NoError(t, err)

Expand Down
28 changes: 15 additions & 13 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ type stream struct {
cfg *Config
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
factory func() chunkenc.Chunk
lastLine line
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
labelsString string
factory func() chunkenc.Chunk
lastLine line

tailers map[uint32]*tailer
tailerMtx sync.RWMutex
Expand All @@ -86,11 +87,12 @@ type entryWithError struct {

func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory func() chunkenc.Chunk) *stream {
return &stream{
cfg: cfg,
fp: fp,
labels: labels,
factory: factory,
tailers: map[uint32]*tailer{},
cfg: cfg,
fp: fp,
labels: labels,
labelsString: labels.String(),
factory: factory,
tailers: map[uint32]*tailer{},
}
}

Expand Down Expand Up @@ -172,7 +174,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize

if len(storedEntries) != 0 {
go func() {
stream := logproto.Stream{Labels: s.labels.String(), Entries: storedEntries}
stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries}

closedTailers := []uint32{}

Expand Down Expand Up @@ -202,7 +204,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
// return bad http status request response with all failed entries
buf := bytes.Buffer{}
streamName := s.labels.String()
streamName := s.labelsString

limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
Expand Down Expand Up @@ -272,7 +274,7 @@ func (s *stream) Iterator(ctx context.Context, from, through time.Time, directio
}
}

return iter.NewNonOverlappingIterator(iterators, s.labels.String()), nil
return iter.NewNonOverlappingIterator(iterators, s.labelsString), nil
}

func (s *stream) addTailer(t *tailer) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestTransferOut(t *testing.T) {
// Push some data into our original ingester
ctx := user.InjectOrgID(context.Background(), "test")
_, err := ing.Push(ctx, &logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "line 0", Timestamp: time.Unix(0, 0)},
Expand All @@ -59,7 +59,7 @@ func TestTransferOut(t *testing.T) {

// verify we get out of order exception on adding an entry with older timestamps
_, err2 := ing.Push(ctx, &logproto.PushRequest{
Streams: []*logproto.Stream{
Streams: []logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "out of order line", Timestamp: time.Unix(0, 0)},
Expand Down
8 changes: 4 additions & 4 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type streamIterator struct {
}

// NewStreamIterator iterates over entries in a stream.
func NewStreamIterator(stream *logproto.Stream) EntryIterator {
func NewStreamIterator(stream logproto.Stream) EntryIterator {
return &streamIterator{
i: -1,
entries: stream.Entries,
Expand Down Expand Up @@ -355,7 +355,7 @@ func (i *heapIterator) Len() int {
}

// NewStreamsIterator returns an iterator over logproto.Stream
func NewStreamsIterator(ctx context.Context, streams []*logproto.Stream, direction logproto.Direction) EntryIterator {
func NewStreamsIterator(ctx context.Context, streams []logproto.Stream, direction logproto.Direction) EntryIterator {
is := make([]EntryIterator, 0, len(streams))
for i := range streams {
is = append(is, NewStreamIterator(streams[i]))
Expand Down Expand Up @@ -603,10 +603,10 @@ func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, e
}

result := logproto.QueryResponse{
Streams: make([]*logproto.Stream, 0, len(streams)),
Streams: make([]logproto.Stream, 0, len(streams)),
}
for _, stream := range streams {
result.Streams = append(result.Streams, stream)
result.Streams = append(result.Streams, *stream)
}
return &result, respSize, i.Error()
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func mkStreamIterator(f generator, labels string) EntryIterator {
for i := int64(0); i < testSize; i++ {
entries = append(entries, f(i))
}
return NewStreamIterator(&logproto.Stream{
return NewStreamIterator(logproto.Stream{
Entries: entries,
Labels: labels,
})
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestReverseEntryIteratorUnlimited(t *testing.T) {
}

func Test_PeekingIterator(t *testing.T) {
iter := NewPeekingIterator(NewStreamIterator(&logproto.Stream{
iter := NewPeekingIterator(NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Expand Down Expand Up @@ -381,7 +381,7 @@ func Test_PeekingIterator(t *testing.T) {
}

func Test_DuplicateCount(t *testing.T) {
stream := &logproto.Stream{
stream := logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Expand Down Expand Up @@ -440,7 +440,7 @@ func Test_DuplicateCount(t *testing.T) {
NewStreamIterator(stream),
NewStreamIterator(stream),
NewStreamIterator(stream),
NewStreamIterator(&logproto.Stream{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Expand All @@ -457,7 +457,7 @@ func Test_DuplicateCount(t *testing.T) {
NewStreamIterator(stream),
NewStreamIterator(stream),
NewStreamIterator(stream),
NewStreamIterator(&logproto.Stream{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Expand All @@ -471,7 +471,7 @@ func Test_DuplicateCount(t *testing.T) {
{
"single f",
[]EntryIterator{
NewStreamIterator(&logproto.Stream{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Expand All @@ -485,7 +485,7 @@ func Test_DuplicateCount(t *testing.T) {
{
"single b",
[]EntryIterator{
NewStreamIterator(&logproto.Stream{
NewStreamIterator(logproto.Stream{
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Expand Down
Loading

0 comments on commit 9988ce4

Please sign in to comment.