diff --git a/pkg/query/querier.go b/pkg/query/querier.go index b094cbd45c..8730185fc4 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -5,12 +5,14 @@ package query import ( "context" + "fmt" "sort" "strings" "sync" "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/types" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -24,6 +26,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" @@ -208,6 +211,47 @@ type seriesServer struct { seriesSet []storepb.Series seriesSetStats storepb.SeriesStatsCounter warnings []string + symbolTables []map[uint64]string + + compressedSeriesSet []storepb.CompressedSeries +} + +func (s *seriesServer) DecompressSeries() error { + for _, cs := range s.compressedSeriesSet { + newSeries := &storepb.Series{ + Chunks: cs.Chunks, + } + + lbls := labels.Labels{} + + for _, cLabel := range cs.Labels { + var name, val string + for _, symTable := range s.symbolTables { + if foundName, ok := symTable[uint64(cLabel.NameRef)]; ok { + name = foundName + } + + if foundValue, ok := symTable[uint64(cLabel.ValueRef)]; ok { + val = foundValue + } + } + if name == "" { + return fmt.Errorf("found no reference for name ref %d", cLabel.NameRef) + } + if val == "" { + return fmt.Errorf("found no reference for value ref %d", cLabel.ValueRef) + } + + lbls = append(lbls, labels.Label{ + Name: name, + Value: val, + }) + } + + newSeries.Labels = labelpb.ZLabelsFromPromLabels(lbls) + s.seriesSet = append(s.seriesSet, *newSeries) + } + return nil } func (s *seriesServer) Send(r *storepb.SeriesResponse) error { @@ -222,6 +266,23 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error { return nil } + if r.GetCompressedSeries() != nil { + s.compressedSeriesSet = append(s.compressedSeriesSet, *r.GetCompressedSeries()) + return nil + } + + if r.GetHints() != nil { + var seriesResponseHints hintspb.SeriesResponseHints + + // Some other, unknown type. Skip it. + if err := types.UnmarshalAny(r.GetHints(), &seriesResponseHints); err != nil { + return nil + } + + s.symbolTables = append(s.symbolTables, seriesResponseHints.StringSymbolTable) + return nil + } + // Unsupported field, skip. return nil } @@ -369,6 +430,10 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns = append(warns, errors.New(w)) } + if err := resp.DecompressSeries(); err != nil { + return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "decompressing series") + } + // Delete the metric's name from the result because that's what the // PromQL does either way and we want our iterator to work with data // that was either pushed down or not. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a7a5d3abd9..6edea6c15a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -333,7 +333,7 @@ type BucketStore struct { postingOffsetsInMemSampling int // Enables hints in the Series() response. - enableSeriesResponseHints bool + enableQueriedBlocksHints bool enableChunkHashCalculation bool } @@ -454,7 +454,7 @@ func NewBucketStore( partitioner: partitioner, enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, - enableSeriesResponseHints: enableSeriesResponseHints, + enableQueriedBlocksHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, } @@ -1083,6 +1083,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } } + lookupTable := newLookupTableBuilder(req.MaximumStringSlots) + s.mtx.RLock() for _, bs := range s.blockSets { blockMatchers, ok := bs.labelMatchers(matchers...) @@ -1100,7 +1102,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie b := b gctx := gctx - if s.enableSeriesResponseHints { + if s.enableQueriedBlocksHints { // Keep track of queried blocks. resHints.AddQueriedBlock(b.meta.ULID) } @@ -1231,9 +1233,38 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks))) } series.Labels = labelpb.ZLabelsFromPromLabels(lset) - if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil { - err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) - return + + var compressedResponse bool + compressedLabels := make([]labelpb.CompressedLabel, 0, len(lset)) + + for _, lbl := range lset { + nameRef, nerr := lookupTable.putString(lbl.Name) + valueRef, verr := lookupTable.putString(lbl.Value) + + if nerr != nil || verr != nil { + compressedResponse = false + break + } else if compressedResponse && nerr == nil && verr == nil { + compressedLabels = append(compressedLabels, labelpb.CompressedLabel{ + NameRef: nameRef, + ValueRef: valueRef, + }) + } + } + + if compressedResponse { + if err = srv.Send(storepb.NewCompressedSeriesResponse(&storepb.CompressedSeries{ + Labels: compressedLabels, + Chunks: series.Chunks, + })); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + return + } + } else { + if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + return + } } } if set.Err() != nil { @@ -1246,7 +1277,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie err = nil }) - if s.enableSeriesResponseHints { + resHints.StringSymbolTable = lookupTable.getTable() + + { var anyHints *types.Any if anyHints, err = types.MarshalAny(resHints); err != nil { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a186b32376..d313c2949d 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1379,6 +1379,9 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer }, // This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only. ExpectedSeries: series[:seriesCut], + ExpectedHints: []hintspb.SeriesResponseHints{ + {}, + }, }) } storetestutil.TestServerSeries(t, st, bCases...) diff --git a/pkg/store/hintspb/hints.pb.go b/pkg/store/hintspb/hints.pb.go index c22d52586e..2e5f0d7096 100644 --- a/pkg/store/hintspb/hints.pb.go +++ b/pkg/store/hintspb/hints.pb.go @@ -68,6 +68,8 @@ var xxx_messageInfo_SeriesRequestHints proto.InternalMessageInfo type SeriesResponseHints struct { /// queried_blocks is the list of blocks that have been queried. QueriedBlocks []Block `protobuf:"bytes,1,rep,name=queried_blocks,json=queriedBlocks,proto3" json:"queried_blocks"` + // Symbol table for label names/values. + StringSymbolTable map[uint64]string `protobuf:"bytes,2,rep,name=string_symbol_table,json=stringSymbolTable,proto3" json:"string_symbol_table" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (m *SeriesResponseHints) Reset() { *m = SeriesResponseHints{} } @@ -299,6 +301,7 @@ var xxx_messageInfo_LabelValuesResponseHints proto.InternalMessageInfo func init() { proto.RegisterType((*SeriesRequestHints)(nil), "hintspb.SeriesRequestHints") proto.RegisterType((*SeriesResponseHints)(nil), "hintspb.SeriesResponseHints") + proto.RegisterMapType((map[uint64]string)(nil), "hintspb.SeriesResponseHints.StringSymbolTableEntry") proto.RegisterType((*Block)(nil), "hintspb.Block") proto.RegisterType((*LabelNamesRequestHints)(nil), "hintspb.LabelNamesRequestHints") proto.RegisterType((*LabelNamesResponseHints)(nil), "hintspb.LabelNamesResponseHints") @@ -309,26 +312,31 @@ func init() { func init() { proto.RegisterFile("store/hintspb/hints.proto", fileDescriptor_b82aa23c4c11e83f) } var fileDescriptor_b82aa23c4c11e83f = []byte{ - // 295 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2c, 0x2e, 0xc9, 0x2f, - 0x4a, 0xd5, 0xcf, 0xc8, 0xcc, 0x2b, 0x29, 0x2e, 0x48, 0x82, 0xd0, 0x7a, 0x05, 0x45, 0xf9, 0x25, - 0xf9, 0x42, 0xec, 0x50, 0x41, 0x29, 0x91, 0xf4, 0xfc, 0xf4, 0x7c, 0xb0, 0x98, 0x3e, 0x88, 0x05, - 0x91, 0x96, 0x82, 0xea, 0x04, 0x93, 0x05, 0x49, 0xfa, 0x25, 0x95, 0x05, 0xa9, 0x50, 0x9d, 0x4a, - 0xe1, 0x5c, 0x42, 0xc1, 0xa9, 0x45, 0x99, 0xa9, 0xc5, 0x41, 0xa9, 0x85, 0xa5, 0xa9, 0xc5, 0x25, - 0x1e, 0x20, 0x83, 0x84, 0x1c, 0xb9, 0xf8, 0x92, 0x72, 0xf2, 0x93, 0xb3, 0xe3, 0x73, 0x13, 0x4b, - 0x92, 0x33, 0x52, 0x8b, 0x8a, 0x25, 0x18, 0x15, 0x98, 0x35, 0xb8, 0x8d, 0x44, 0xf4, 0x4a, 0x32, - 0x12, 0xf3, 0xf2, 0x8b, 0xf5, 0x7c, 0x12, 0x93, 0x52, 0x73, 0x7c, 0x21, 0x92, 0x4e, 0x2c, 0x27, - 0xee, 0xc9, 0x33, 0x04, 0xf1, 0x82, 0x75, 0x40, 0xc5, 0x8a, 0x95, 0x82, 0xb8, 0x84, 0x61, 0x06, - 0x17, 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x42, 0x4c, 0xb6, 0xe6, 0xe2, 0x2b, 0x2c, 0x05, 0x89, 0xa7, - 0xc4, 0x83, 0xd5, 0xc3, 0x4c, 0xe6, 0xd3, 0x83, 0x7a, 0x41, 0xcf, 0x09, 0x24, 0x0c, 0x33, 0x13, - 0xaa, 0x16, 0x2c, 0x56, 0xac, 0x24, 0xce, 0xc5, 0x0a, 0x66, 0x09, 0xf1, 0x71, 0x31, 0x65, 0xa6, - 0x48, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x31, 0x65, 0xa6, 0x28, 0x45, 0x73, 0x89, 0x81, 0x5d, - 0xe4, 0x97, 0x98, 0x4b, 0x7d, 0x9f, 0x84, 0x71, 0x89, 0x23, 0x1b, 0x4e, 0x35, 0xdf, 0xc4, 0x40, - 0xcd, 0x0d, 0x4b, 0xcc, 0x29, 0xa5, 0xbe, 0xab, 0xc3, 0xb9, 0x24, 0x50, 0x4c, 0xa7, 0x96, 0xb3, - 0x9d, 0x54, 0x4f, 0x3c, 0x94, 0x63, 0x38, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, - 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, - 0x28, 0x58, 0x4a, 0x4c, 0x62, 0x03, 0xa7, 0x2f, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x47, - 0x2f, 0x08, 0x1f, 0xb6, 0x02, 0x00, 0x00, + // 382 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x93, 0xc1, 0x4a, 0xc3, 0x30, + 0x18, 0xc7, 0x9b, 0x6e, 0x53, 0x16, 0xb1, 0x68, 0x37, 0xb6, 0xba, 0x43, 0x1d, 0x05, 0x61, 0xa7, + 0x0e, 0xdc, 0x45, 0xf4, 0xe4, 0x50, 0xf0, 0xa0, 0x1e, 0x3a, 0xd9, 0x40, 0x85, 0xd2, 0x6c, 0x61, + 0x2b, 0xeb, 0x9a, 0xae, 0xc9, 0x84, 0x9e, 0x7d, 0x01, 0x1f, 0x6b, 0xc7, 0x1d, 0x3d, 0x89, 0x6e, + 0x2f, 0x22, 0x49, 0x53, 0x51, 0xd9, 0xb1, 0x97, 0x36, 0xfd, 0x7f, 0xfd, 0xfd, 0xf2, 0xe5, 0x83, + 0xc0, 0x23, 0xca, 0x48, 0x8c, 0xdb, 0x13, 0x3f, 0x64, 0x34, 0x42, 0xe9, 0xdb, 0x8e, 0x62, 0xc2, + 0x88, 0xbe, 0x2b, 0xc3, 0x46, 0x75, 0x4c, 0xc6, 0x44, 0x64, 0x6d, 0xbe, 0x4a, 0xcb, 0x0d, 0x49, + 0x8a, 0x67, 0x84, 0xda, 0x2c, 0x89, 0xb0, 0x24, 0xad, 0x01, 0xd4, 0x7b, 0x38, 0xf6, 0x31, 0x75, + 0xf0, 0x7c, 0x81, 0x29, 0xbb, 0xe1, 0x22, 0xfd, 0x12, 0x6a, 0x28, 0x20, 0xc3, 0xa9, 0x3b, 0xf3, + 0xd8, 0x70, 0x82, 0x63, 0x6a, 0x80, 0x66, 0xa1, 0xb5, 0x77, 0x5a, 0xb5, 0xd9, 0xc4, 0x0b, 0x09, + 0xb5, 0x6f, 0x3d, 0x84, 0x83, 0xbb, 0xb4, 0xd8, 0x2d, 0x2e, 0x3f, 0x8e, 0x15, 0x67, 0x5f, 0x10, + 0x32, 0xa3, 0xd6, 0xab, 0x0a, 0x2b, 0x99, 0x99, 0x46, 0x24, 0xa4, 0x38, 0x55, 0x5f, 0x40, 0x6d, + 0xbe, 0xe0, 0xf9, 0xc8, 0x15, 0x40, 0xa6, 0xd6, 0x6c, 0x79, 0x06, 0xbb, 0xcb, 0xe3, 0x4c, 0x2a, + 0xff, 0x15, 0x19, 0xd5, 0x7d, 0x58, 0xa1, 0x2c, 0xf6, 0xc3, 0xb1, 0x4b, 0x93, 0x19, 0x22, 0x81, + 0xcb, 0x3c, 0x14, 0x60, 0x43, 0x15, 0x86, 0xce, 0x8f, 0x61, 0xcb, 0xbe, 0x76, 0x4f, 0x70, 0x3d, + 0x81, 0x3d, 0x70, 0xea, 0x3a, 0x64, 0x71, 0x22, 0xb7, 0x39, 0xa4, 0xff, 0xab, 0x8d, 0x2b, 0x58, + 0xdb, 0x8e, 0xe8, 0x07, 0xb0, 0x30, 0xc5, 0x89, 0x01, 0x9a, 0xa0, 0x55, 0x74, 0xf8, 0x52, 0xaf, + 0xc2, 0xd2, 0x8b, 0x17, 0x2c, 0x78, 0x23, 0xa0, 0x55, 0x76, 0xd2, 0x8f, 0x73, 0xf5, 0x0c, 0x58, + 0x75, 0x58, 0x12, 0xad, 0xeb, 0x1a, 0x54, 0xfd, 0x91, 0x60, 0xca, 0x8e, 0xea, 0x8f, 0xac, 0x27, + 0x58, 0x13, 0x33, 0xbc, 0xf7, 0x66, 0xf9, 0xcf, 0xbe, 0x0f, 0xeb, 0xbf, 0xe5, 0x79, 0x8d, 0xdf, + 0x7a, 0x96, 0xde, 0x3e, 0x3f, 0x5f, 0xee, 0x5d, 0x0f, 0xa0, 0xf1, 0xc7, 0x9e, 0x57, 0xdb, 0xdd, + 0x93, 0xe5, 0x97, 0xa9, 0x2c, 0xd7, 0x26, 0x58, 0xad, 0x4d, 0xf0, 0xb9, 0x36, 0xc1, 0xdb, 0xc6, + 0x54, 0x56, 0x1b, 0x53, 0x79, 0xdf, 0x98, 0xca, 0x63, 0x76, 0x77, 0xd0, 0x8e, 0xb8, 0x11, 0x9d, + 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x27, 0x33, 0xcb, 0xf7, 0x68, 0x03, 0x00, 0x00, } func (m *SeriesRequestHints) Marshal() (dAtA []byte, err error) { @@ -388,6 +396,23 @@ func (m *SeriesResponseHints) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.StringSymbolTable) > 0 { + for k := range m.StringSymbolTable { + v := m.StringSymbolTable[k] + baseI := i + i -= len(v) + copy(dAtA[i:], v) + i = encodeVarintHints(dAtA, i, uint64(len(v))) + i-- + dAtA[i] = 0x12 + i = encodeVarintHints(dAtA, i, uint64(k)) + i-- + dAtA[i] = 0x8 + i = encodeVarintHints(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x12 + } + } if len(m.QueriedBlocks) > 0 { for iNdEx := len(m.QueriedBlocks) - 1; iNdEx >= 0; iNdEx-- { { @@ -621,6 +646,14 @@ func (m *SeriesResponseHints) Size() (n int) { n += 1 + l + sovHints(uint64(l)) } } + if len(m.StringSymbolTable) > 0 { + for k, v := range m.StringSymbolTable { + _ = k + _ = v + mapEntrySize := 1 + sovHints(uint64(k)) + 1 + len(v) + sovHints(uint64(len(v))) + n += mapEntrySize + 1 + sovHints(uint64(mapEntrySize)) + } + } return n } @@ -850,6 +883,119 @@ func (m *SeriesResponseHints) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StringSymbolTable", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHints + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHints + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.StringSymbolTable == nil { + m.StringSymbolTable = make(map[uint64]string) + } + var mapkey uint64 + var mapvalue string + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + } else if fieldNum == 2 { + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHints + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthHints + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue < 0 { + return ErrInvalidLengthHints + } + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + } else { + iNdEx = entryPreIndex + skippy, err := skipHints(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthHints + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.StringSymbolTable[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipHints(dAtA[iNdEx:]) diff --git a/pkg/store/hintspb/hints.proto b/pkg/store/hintspb/hints.proto index f7cf68d3ff..33e27a3759 100644 --- a/pkg/store/hintspb/hints.proto +++ b/pkg/store/hintspb/hints.proto @@ -29,6 +29,9 @@ message SeriesRequestHints { message SeriesResponseHints { /// queried_blocks is the list of blocks that have been queried. repeated Block queried_blocks = 1 [(gogoproto.nullable) = false]; + + // Symbol table for label names/values. + map string_symbol_table = 2 [(gogoproto.nullable) = false]; } message Block { @@ -58,4 +61,4 @@ message LabelValuesRequestHints { message LabelValuesResponseHints { /// queried_blocks is the list of blocks that have been queried. repeated Block queried_blocks = 1 [(gogoproto.nullable) = false]; -} \ No newline at end of file +} diff --git a/pkg/store/labelpb/types.pb.go b/pkg/store/labelpb/types.pb.go index 3dd6d97299..40aeff582e 100644 --- a/pkg/store/labelpb/types.pb.go +++ b/pkg/store/labelpb/types.pb.go @@ -62,6 +62,44 @@ func (m *Label) XXX_DiscardUnknown() { var xxx_messageInfo_Label proto.InternalMessageInfo +type CompressedLabel struct { + NameRef uint64 `protobuf:"varint,1,opt,name=name_ref,json=nameRef,proto3" json:"name_ref,omitempty"` + ValueRef uint64 `protobuf:"varint,2,opt,name=value_ref,json=valueRef,proto3" json:"value_ref,omitempty"` +} + +func (m *CompressedLabel) Reset() { *m = CompressedLabel{} } +func (m *CompressedLabel) String() string { return proto.CompactTextString(m) } +func (*CompressedLabel) ProtoMessage() {} +func (*CompressedLabel) Descriptor() ([]byte, []int) { + return fileDescriptor_cdcc9e7dae4870e8, []int{1} +} +func (m *CompressedLabel) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CompressedLabel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CompressedLabel.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CompressedLabel) XXX_Merge(src proto.Message) { + xxx_messageInfo_CompressedLabel.Merge(m, src) +} +func (m *CompressedLabel) XXX_Size() int { + return m.Size() +} +func (m *CompressedLabel) XXX_DiscardUnknown() { + xxx_messageInfo_CompressedLabel.DiscardUnknown(m) +} + +var xxx_messageInfo_CompressedLabel proto.InternalMessageInfo + type LabelSet struct { Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` } @@ -70,7 +108,7 @@ func (m *LabelSet) Reset() { *m = LabelSet{} } func (m *LabelSet) String() string { return proto.CompactTextString(m) } func (*LabelSet) ProtoMessage() {} func (*LabelSet) Descriptor() ([]byte, []int) { - return fileDescriptor_cdcc9e7dae4870e8, []int{1} + return fileDescriptor_cdcc9e7dae4870e8, []int{2} } func (m *LabelSet) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -107,7 +145,7 @@ func (m *ZLabelSet) Reset() { *m = ZLabelSet{} } func (m *ZLabelSet) String() string { return proto.CompactTextString(m) } func (*ZLabelSet) ProtoMessage() {} func (*ZLabelSet) Descriptor() ([]byte, []int) { - return fileDescriptor_cdcc9e7dae4870e8, []int{2} + return fileDescriptor_cdcc9e7dae4870e8, []int{3} } func (m *ZLabelSet) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -138,6 +176,7 @@ var xxx_messageInfo_ZLabelSet proto.InternalMessageInfo func init() { proto.RegisterType((*Label)(nil), "thanos.Label") + proto.RegisterType((*CompressedLabel)(nil), "thanos.CompressedLabel") proto.RegisterType((*LabelSet)(nil), "thanos.LabelSet") proto.RegisterType((*ZLabelSet)(nil), "thanos.ZLabelSet") } @@ -145,21 +184,24 @@ func init() { func init() { proto.RegisterFile("store/labelpb/types.proto", fileDescriptor_cdcc9e7dae4870e8) } var fileDescriptor_cdcc9e7dae4870e8 = []byte{ - // 212 bytes of a gzipped FileDescriptorProto + // 264 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2c, 0x2e, 0xc9, 0x2f, 0x4a, 0xd5, 0xcf, 0x49, 0x4c, 0x4a, 0xcd, 0x29, 0x48, 0xd2, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b, 0xc9, 0x48, 0xcc, 0xcb, 0x2f, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xe9, 0x83, 0x58, 0x10, 0x59, 0x25, 0x43, 0x2e, 0x56, 0x1f, 0x90, 0x26, 0x21, 0x21, 0x2e, 0x96, 0xbc, 0xc4, 0xdc, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x30, 0x5b, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, 0x55, 0x82, 0x09, 0x2c, 0x08, 0xe1, - 0x28, 0x99, 0x73, 0x71, 0x80, 0xb5, 0x04, 0xa7, 0x96, 0x08, 0x69, 0x73, 0xb1, 0x81, 0xed, 0x2c, - 0x96, 0x60, 0x54, 0x60, 0xd6, 0xe0, 0x36, 0xe2, 0xd5, 0x83, 0xd8, 0xa6, 0x07, 0x56, 0xe1, 0xc4, - 0x72, 0xe2, 0x9e, 0x3c, 0x43, 0x10, 0x54, 0x89, 0x92, 0x13, 0x17, 0x67, 0x14, 0x5c, 0xa7, 0x29, - 0x7e, 0x9d, 0x7c, 0x20, 0x9d, 0xb7, 0xee, 0xc9, 0xb3, 0x41, 0x74, 0xc0, 0xcc, 0x70, 0x52, 0x3d, - 0xf1, 0x50, 0x8e, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, - 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0xa1, - 0x01, 0x90, 0xc4, 0x06, 0xf6, 0x9d, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xd0, 0x80, 0xe8, 0x16, - 0x18, 0x01, 0x00, 0x00, + 0x28, 0x79, 0x72, 0xf1, 0x3b, 0xe7, 0xe7, 0x16, 0x14, 0xa5, 0x16, 0x17, 0xa7, 0xa6, 0x40, 0x34, + 0x4b, 0x72, 0x71, 0x80, 0x34, 0xc4, 0x17, 0xa5, 0xa6, 0x81, 0x0d, 0x60, 0x09, 0x62, 0x07, 0xf1, + 0x83, 0x52, 0xd3, 0x84, 0xa4, 0xb9, 0x38, 0xc1, 0xda, 0xc0, 0x72, 0x4c, 0x60, 0x39, 0x0e, 0xb0, + 0x40, 0x50, 0x6a, 0x9a, 0x92, 0x39, 0x17, 0x07, 0xd8, 0x80, 0xe0, 0xd4, 0x12, 0x21, 0x6d, 0x2e, + 0x36, 0xb0, 0xf3, 0x8b, 0x25, 0x18, 0x15, 0x98, 0x35, 0xb8, 0x8d, 0x78, 0xf5, 0x20, 0x0e, 0xd7, + 0x03, 0xab, 0x70, 0x62, 0x39, 0x71, 0x4f, 0x9e, 0x21, 0x08, 0xaa, 0x44, 0xc9, 0x89, 0x8b, 0x33, + 0x0a, 0xae, 0xd3, 0x14, 0xbf, 0x4e, 0x3e, 0x90, 0xce, 0x5b, 0xf7, 0xe4, 0xd9, 0x20, 0x3a, 0x60, + 0x66, 0x38, 0xa9, 0x9e, 0x78, 0x28, 0xc7, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, + 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, + 0x0c, 0x51, 0xec, 0xd0, 0xb0, 0x4c, 0x62, 0x03, 0x07, 0x94, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, + 0x40, 0x5a, 0x96, 0x56, 0x63, 0x01, 0x00, 0x00, } func (m *Label) Marshal() (dAtA []byte, err error) { @@ -199,6 +241,39 @@ func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *CompressedLabel) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CompressedLabel) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CompressedLabel) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.ValueRef != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.ValueRef)) + i-- + dAtA[i] = 0x10 + } + if m.NameRef != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.NameRef)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *LabelSet) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -301,6 +376,21 @@ func (m *Label) Size() (n int) { return n } +func (m *CompressedLabel) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.NameRef != 0 { + n += 1 + sovTypes(uint64(m.NameRef)) + } + if m.ValueRef != 0 { + n += 1 + sovTypes(uint64(m.ValueRef)) + } + return n +} + func (m *LabelSet) Size() (n int) { if m == nil { return 0 @@ -451,6 +541,94 @@ func (m *Label) Unmarshal(dAtA []byte) error { } return nil } +func (m *CompressedLabel) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CompressedLabel: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CompressedLabel: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NameRef", wireType) + } + m.NameRef = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NameRef |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ValueRef", wireType) + } + m.ValueRef = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ValueRef |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *LabelSet) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/store/labelpb/types.proto b/pkg/store/labelpb/types.proto index 65aa195ec1..bf9efce07b 100644 --- a/pkg/store/labelpb/types.proto +++ b/pkg/store/labelpb/types.proto @@ -24,6 +24,11 @@ message Label { string value = 2; } +message CompressedLabel { + uint64 name_ref = 1; + uint64 value_ref = 2; +} + message LabelSet { repeated Label labels = 1 [(gogoproto.nullable) = false]; } diff --git a/pkg/store/lookup_table.go b/pkg/store/lookup_table.go new file mode 100644 index 0000000000..db6b52f11c --- /dev/null +++ b/pkg/store/lookup_table.go @@ -0,0 +1,68 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "math" + "strings" + + "github.com/pkg/errors" +) + +type adjusterFn func(uint64) uint64 + +func maxStringsPerStore(storeCount uint64) uint64 { + return math.MaxUint64 / uint64(storeCount) +} + +func newReferenceAdjusterFactory(storeCount uint64) func(storeIndex uint64) adjusterFn { + // Adjuster adjusts each incoming reference according to the number of stores. + // Whole label space is stored in uint64 so that's how many + // strings we are able to store. + eachStore := maxStringsPerStore(storeCount) + + return func(storeIndex uint64) adjusterFn { + startFrom := eachStore * storeIndex + + return func(ref uint64) uint64 { + return startFrom + (ref % eachStore) + } + } +} + +// lookupTableBuilder provides a way of building +// a lookup table for static strings to compress +// responses better. +type lookupTableBuilder struct { + maxElements uint64 + + current uint64 + table map[uint64]string + reverseTable map[string]uint64 +} + +func newLookupTableBuilder(maxElements uint64) *lookupTableBuilder { + return &lookupTableBuilder{maxElements: maxElements, table: make(map[uint64]string), reverseTable: make(map[string]uint64)} +} + +var maxElementsReached = errors.New("max elements reached in lookup table builder") + +func (b *lookupTableBuilder) putString(s string) (uint64, error) { + if b.current >= b.maxElements { + return 0, maxElementsReached + } + if num, ok := b.reverseTable[s]; ok { + return num, nil + } + + s = strings.Clone(s) + b.reverseTable[s] = b.current + b.table[b.current] = s + b.current++ + return b.current - 1, nil +} + +func (b *lookupTableBuilder) getTable() map[uint64]string { + return b.table +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index b4546c7d05..67a403a9e9 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" "github.com/golang/snappy" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -35,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -174,6 +176,8 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie } } + lookupTable := newLookupTableBuilder(r.MaximumStringSlots) + if r.SkipChunks { labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime) if err != nil { @@ -188,10 +192,27 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie sort.Slice(lset, func(i, j int) bool { return lset[i].Name < lset[j].Name }) + if err = s.Send(storepb.NewSeriesResponse(&storepb.Series{Labels: lset})); err != nil { return err } } + + { + var anyHints *types.Any + + resHints := &hintspb.SeriesResponseHints{StringSymbolTable: lookupTable.getTable()} + + if anyHints, err = types.MarshalAny(resHints); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error()) + return err + } + + if err = s.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error()) + return err + } + } return nil } @@ -240,7 +261,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { return errors.Errorf("not supported remote read content type: %s", contentType) } - return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) + return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, lookupTable) } func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error { @@ -367,6 +388,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( querySpan tracing.Span, extLset labels.Labels, calculateChecksums bool, + lookupTable *lookupTableBuilder, ) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") @@ -410,6 +432,23 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( continue } + compressedLabels := make([]labelpb.CompressedLabel, 0, len(completeLabelset)) + var compressedResponse bool = true + for _, lbl := range completeLabelset { + nameRef, nerr := lookupTable.putString(lbl.Name) + valueRef, verr := lookupTable.putString(lbl.Value) + + if nerr != nil || verr != nil { + compressedResponse = false + break + } else if compressedResponse && nerr == nil && verr == nil { + compressedLabels = append(compressedLabels, labelpb.CompressedLabel{ + NameRef: nameRef, + ValueRef: valueRef, + }) + } + } + seriesStats.CountSeries(series.Labels) thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) @@ -434,18 +473,43 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( series.Chunks[i].Data = nil } - r := storepb.NewSeriesResponse(&storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels( - completeLabelset, - ), - Chunks: thanosChks, - }) + var r *storepb.SeriesResponse + + if compressedResponse { + r = storepb.NewCompressedSeriesResponse(&storepb.CompressedSeries{ + Chunks: thanosChks, + Labels: compressedLabels, + }) + } else { + r = storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(completeLabelset), + Chunks: thanosChks, + }) + } + if err := s.Send(r); err != nil { return err } } } + { + var anyHints *types.Any + var err error + + resHints := &hintspb.SeriesResponseHints{StringSymbolTable: lookupTable.getTable()} + + if anyHints, err = types.MarshalAny(resHints); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error()) + return err + } + + if err = s.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error()) + return err + } + } + querySpan.SetTag("processed.series", seriesStats.Series) querySpan.SetTag("processed.chunks", seriesStats.Chunks) querySpan.SetTag("processed.samples", seriesStats.Samples) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 508b4c62ca..2555a97f58 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -287,14 +287,18 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. return nil } + r.MaximumStringSlots = maxStringsPerStore(uint64(len(stores))) + adjusterFactory := newReferenceAdjusterFactory(uint64(len(stores))) + storeResponses := make([]respSet, 0, len(stores)) - for _, st := range stores { + for storeIndex, st := range stores { st := st storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) - respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, st.SupportsSharding(), &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) + adjuster := adjusterFactory(uint64(storeIndex)) + respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, st.SupportsSharding(), &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, adjuster) if err != nil { level.Error(reqLogger).Log("err", err) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index d354d4db06..78c6f12f3c 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/types" "github.com/cespare/xxhash/v2" "github.com/go-kit/log/level" @@ -22,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" @@ -364,7 +366,8 @@ func newLazyRespSet( shardMatcher *storepb.ShardMatcher, applySharding bool, emptyStreamResponses prometheus.Counter, - + maximumStrings uint64, + adjuster adjusterFn, ) respSet { bufferedResponses := []*storepb.SeriesResponse{} bufferedResponsesMtx := &sync.Mutex{} @@ -395,6 +398,7 @@ func newLazyRespSet( l.span.Finish() }() + var stringCnt uint64 numResponses := 0 defer func() { if numResponses == 0 { @@ -402,6 +406,15 @@ func newLazyRespSet( } }() + handleErr := func(e error) { + l.bufferedResponsesMtx.Lock() + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(e)) + l.noMoreData = true + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + l.span.SetTag("err", e.Error()) + } + handleRecvResponse := func(t *time.Timer) bool { if t != nil { defer t.Reset(frameTimeout) @@ -409,14 +422,7 @@ func newLazyRespSet( select { case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) - l.span.SetTag("err", err.Error()) - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() + handleErr(errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String())) return false default: resp, err := cl.Recv() @@ -439,13 +445,7 @@ func newLazyRespSet( rerr = errors.Wrapf(err, "receive series from %s", st.String()) } - l.span.SetTag("err", rerr.Error()) - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() + handleErr(rerr) return false } @@ -460,6 +460,49 @@ func newLazyRespSet( seriesStats.Count(resp.GetSeries()) } + if resp.GetCompressedSeries() != nil { + cs := resp.GetCompressedSeries() + + stringCnt += uint64(len(cs.Labels)) + + if stringCnt > maximumStrings { + handleErr(fmt.Errorf("maximum string limit %d exceeded", maximumStrings)) + return false + } + + for i := range cs.Labels { + cs.Labels[i].NameRef = adjuster(cs.Labels[i].NameRef) + cs.Labels[i].ValueRef = adjuster(cs.Labels[i].ValueRef) + } + } + + if resp.GetHints() != nil { + var seriesResponseHints hintspb.SeriesResponseHints + var anyHints *types.Any + + if err := types.UnmarshalAny(resp.GetHints(), &seriesResponseHints); err != nil { + handleErr(err) + return false + } + + adjustedTable := map[uint64]string{} + + for k, v := range seriesResponseHints.StringSymbolTable { + adjustedTable[adjuster(k)] = v + } + + seriesResponseHints.StringSymbolTable = adjustedTable + + resHints := &hintspb.SeriesResponseHints{StringSymbolTable: adjustedTable, QueriedBlocks: seriesResponseHints.QueriedBlocks} + + if anyHints, err = types.MarshalAny(resHints); err != nil { + handleErr(err) + return false + } + + resp = storepb.NewHintsSeriesResponse(anyHints) + } + l.bufferedResponsesMtx.Lock() l.bufferedResponses = append(l.bufferedResponses, resp) l.dataOrFinishEvent.Signal() @@ -505,7 +548,9 @@ func newAsyncRespSet(ctx context.Context, buffers *sync.Pool, shardInfo *storepb.ShardInfo, logger log.Logger, - emptyStreamResponses prometheus.Counter) (respSet, error) { + emptyStreamResponses prometheus.Counter, + adjuster adjusterFn, +) (respSet, error) { var span opentracing.Span var closeSeries context.CancelFunc @@ -558,6 +603,8 @@ func newAsyncRespSet(ctx context.Context, shardMatcher, applySharding, emptyStreamResponses, + req.MaximumStringSlots, + adjuster, ), nil case EagerRetrieval: return newEagerRespSet( @@ -570,6 +617,8 @@ func newAsyncRespSet(ctx context.Context, shardMatcher, applySharding, emptyStreamResponses, + req.MaximumStringSlots, + adjuster, ), nil default: panic(fmt.Sprintf("unsupported retrieval strategy %s", retrievalStrategy)) @@ -617,6 +666,8 @@ func newEagerRespSet( shardMatcher *storepb.ShardMatcher, applySharding bool, emptyStreamResponses prometheus.Counter, + maximumStrings uint64, + adjuster adjusterFn, ) respSet { ret := &eagerRespSet{ span: span, @@ -647,6 +698,7 @@ func newEagerRespSet( }() numResponses := 0 + var stringCnt uint64 defer func() { if numResponses == 0 { emptyStreamResponses.Inc() @@ -695,6 +747,50 @@ func newEagerRespSet( seriesStats.Count(resp.GetSeries()) } + if resp.GetCompressedSeries() != nil { + cs := resp.GetCompressedSeries() + + stringCnt += uint64(len(cs.Labels)) + + if stringCnt > maximumStrings { + err := fmt.Errorf("maximum string limit %d exceeded", maximumStrings) + l.span.SetTag("err", err.Error()) + return false + } + + for i := range cs.Labels { + cs.Labels[i].NameRef = adjuster(cs.Labels[i].NameRef) + cs.Labels[i].ValueRef = adjuster(cs.Labels[i].ValueRef) + } + } + + if resp.GetHints() != nil { + var seriesResponseHints hintspb.SeriesResponseHints + var anyHints *types.Any + + if err := types.UnmarshalAny(resp.GetHints(), &seriesResponseHints); err != nil { + l.span.SetTag("err", err.Error()) + return false + } + + adjustedTable := map[uint64]string{} + + for k, v := range seriesResponseHints.StringSymbolTable { + adjustedTable[adjuster(k)] = v + } + + seriesResponseHints.StringSymbolTable = adjustedTable + + resHints := &hintspb.SeriesResponseHints{StringSymbolTable: adjustedTable, QueriedBlocks: seriesResponseHints.QueriedBlocks} + + if anyHints, err = types.MarshalAny(resHints); err != nil { + l.span.SetTag("err", err.Error()) + return false + } + + resp = storepb.NewHintsSeriesResponse(anyHints) + } + l.bufferedResponses = append(l.bufferedResponses, resp) return true } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 8a997907e4..7b711757dc 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1155,6 +1155,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { }, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, MaxResolutionWindow: 1234, + MaximumStringSlots: math.MaxUint64, } testutil.Ok(t, q.Series(req, s)) diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index eaa96f1ede..56ed32107a 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -43,6 +43,14 @@ func NewSeriesResponse(series *Series) *SeriesResponse { } } +func NewCompressedSeriesResponse(series *CompressedSeries) *SeriesResponse { + return &SeriesResponse{ + Result: &SeriesResponse_CompressedSeries{ + CompressedSeries: series, + }, + } +} + func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_Hints{ diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index bf670d0f63..9d647efc9a 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -292,6 +292,11 @@ type SeriesRequest struct { // shard_info is used by the querier to request a specific // shard of blocks instead of entire blocks. ShardInfo *ShardInfo `protobuf:"bytes,13,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` + // maximum_string_slots is the number of maximum strings + // that the receiver can send. It is safe to ignore this by + // the receiver if it does not intend on sending a symbol table + // via hints later on. + MaximumStringSlots uint64 `protobuf:"varint,14,opt,name=maximum_string_slots,json=maximumStringSlots,proto3" json:"maximum_string_slots,omitempty"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } @@ -537,6 +542,7 @@ type SeriesResponse struct { // *SeriesResponse_Series // *SeriesResponse_Warning // *SeriesResponse_Hints + // *SeriesResponse_CompressedSeries Result isSeriesResponse_Result `protobuf_oneof:"result"` } @@ -588,10 +594,14 @@ type SeriesResponse_Warning struct { type SeriesResponse_Hints struct { Hints *types.Any `protobuf:"bytes,3,opt,name=hints,proto3,oneof" json:"hints,omitempty"` } +type SeriesResponse_CompressedSeries struct { + CompressedSeries *CompressedSeries `protobuf:"bytes,4,opt,name=compressed_series,json=compressedSeries,proto3,oneof" json:"compressed_series,omitempty"` +} -func (*SeriesResponse_Series) isSeriesResponse_Result() {} -func (*SeriesResponse_Warning) isSeriesResponse_Result() {} -func (*SeriesResponse_Hints) isSeriesResponse_Result() {} +func (*SeriesResponse_Series) isSeriesResponse_Result() {} +func (*SeriesResponse_Warning) isSeriesResponse_Result() {} +func (*SeriesResponse_Hints) isSeriesResponse_Result() {} +func (*SeriesResponse_CompressedSeries) isSeriesResponse_Result() {} func (m *SeriesResponse) GetResult() isSeriesResponse_Result { if m != nil { @@ -621,12 +631,20 @@ func (m *SeriesResponse) GetHints() *types.Any { return nil } +func (m *SeriesResponse) GetCompressedSeries() *CompressedSeries { + if x, ok := m.GetResult().(*SeriesResponse_CompressedSeries); ok { + return x.CompressedSeries + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*SeriesResponse) XXX_OneofWrappers() []interface{} { return []interface{}{ (*SeriesResponse_Series)(nil), (*SeriesResponse_Warning)(nil), (*SeriesResponse_Hints)(nil), + (*SeriesResponse_CompressedSeries)(nil), } } @@ -830,89 +848,92 @@ func init() { func init() { proto.RegisterFile("store/storepb/rpc.proto", fileDescriptor_a938d55a388af629) } var fileDescriptor_a938d55a388af629 = []byte{ - // 1298 bytes of a gzipped FileDescriptorProto + // 1358 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x57, 0x5d, 0x6f, 0x13, 0x47, - 0x17, 0xf6, 0x7a, 0xbd, 0xfe, 0x38, 0x4e, 0xf2, 0x9a, 0xc1, 0xc0, 0xc6, 0x48, 0x8e, 0xdf, 0x7d, - 0xf5, 0x4a, 0x11, 0xa2, 0x36, 0x35, 0x15, 0x52, 0x2b, 0x6e, 0x92, 0x60, 0x48, 0x54, 0x62, 0xca, - 0x38, 0x21, 0x2d, 0x55, 0x65, 0xad, 0xed, 0xc9, 0x7a, 0xc5, 0x7a, 0x77, 0xd9, 0x99, 0x6d, 0xe2, - 0xdb, 0xf6, 0xbe, 0xaa, 0xfa, 0x13, 0xfa, 0x2b, 0xfa, 0x13, 0xb8, 0x2b, 0x57, 0x55, 0xd5, 0x0b, - 0xd4, 0xc2, 0x1f, 0xa9, 0xe6, 0x63, 0xd7, 0xde, 0x34, 0x40, 0x11, 0xdc, 0x44, 0x73, 0x9e, 0xe7, - 0xcc, 0x99, 0xf3, 0xed, 0x0d, 0x5c, 0xa1, 0x2c, 0x88, 0x48, 0x47, 0xfc, 0x0d, 0x47, 0x9d, 0x28, - 0x1c, 0xb7, 0xc3, 0x28, 0x60, 0x01, 0x2a, 0xb2, 0xa9, 0xed, 0x07, 0xb4, 0xb1, 0x9e, 0x55, 0x60, - 0xf3, 0x90, 0x50, 0xa9, 0xd2, 0xa8, 0x3b, 0x81, 0x13, 0x88, 0x63, 0x87, 0x9f, 0x14, 0xda, 0xca, - 0x5e, 0x08, 0xa3, 0x60, 0x76, 0xe6, 0x9e, 0x32, 0xe9, 0xd9, 0x23, 0xe2, 0x9d, 0xa5, 0x9c, 0x20, - 0x70, 0x3c, 0xd2, 0x11, 0xd2, 0x28, 0x3e, 0xee, 0xd8, 0xfe, 0x5c, 0x52, 0xd6, 0x7f, 0x60, 0xf5, - 0x28, 0x72, 0x19, 0xc1, 0x84, 0x86, 0x81, 0x4f, 0x89, 0xf5, 0xbd, 0x06, 0x2b, 0x0a, 0x79, 0x1a, - 0x13, 0xca, 0xd0, 0x16, 0x00, 0x73, 0x67, 0x84, 0x92, 0xc8, 0x25, 0xd4, 0xd4, 0x5a, 0xfa, 0x66, - 0xb5, 0x7b, 0x95, 0xdf, 0x9e, 0x11, 0x36, 0x25, 0x31, 0x1d, 0x8e, 0x83, 0x70, 0xde, 0x3e, 0x70, - 0x67, 0x64, 0x20, 0x54, 0xb6, 0x0b, 0xcf, 0x5e, 0x6c, 0xe4, 0xf0, 0xd2, 0x25, 0x74, 0x19, 0x8a, - 0x8c, 0xf8, 0xb6, 0xcf, 0xcc, 0x7c, 0x4b, 0xdb, 0xac, 0x60, 0x25, 0x21, 0x13, 0x4a, 0x11, 0x09, - 0x3d, 0x77, 0x6c, 0x9b, 0x7a, 0x4b, 0xdb, 0xd4, 0x71, 0x22, 0x5a, 0xab, 0x50, 0xdd, 0xf3, 0x8f, - 0x03, 0xe5, 0x83, 0xf5, 0x53, 0x1e, 0x56, 0xa4, 0x2c, 0xbd, 0x44, 0x63, 0x28, 0x8a, 0x40, 0x13, - 0x87, 0x56, 0xdb, 0x32, 0xb1, 0xed, 0xfb, 0x1c, 0xdd, 0xbe, 0xcd, 0x5d, 0xf8, 0xe3, 0xc5, 0xc6, - 0x27, 0x8e, 0xcb, 0xa6, 0xf1, 0xa8, 0x3d, 0x0e, 0x66, 0x1d, 0xa9, 0xf0, 0x91, 0x1b, 0xa8, 0x53, - 0x27, 0x7c, 0xe2, 0x74, 0x32, 0x39, 0x6b, 0x3f, 0x16, 0xb7, 0xb1, 0x32, 0x8d, 0xd6, 0xa1, 0x3c, - 0x73, 0xfd, 0x21, 0x0f, 0x44, 0x38, 0xae, 0xe3, 0xd2, 0xcc, 0xf5, 0x79, 0xa4, 0x82, 0xb2, 0x4f, - 0x25, 0xa5, 0x5c, 0x9f, 0xd9, 0xa7, 0x82, 0xea, 0x40, 0x45, 0x58, 0x3d, 0x98, 0x87, 0xc4, 0x2c, - 0xb4, 0xb4, 0xcd, 0xb5, 0xee, 0x85, 0xc4, 0xbb, 0x41, 0x42, 0xe0, 0x85, 0x0e, 0xba, 0x05, 0x20, - 0x1e, 0x1c, 0x52, 0xc2, 0xa8, 0x69, 0x88, 0x78, 0xd2, 0x1b, 0xd2, 0xa5, 0x01, 0x61, 0x2a, 0xad, - 0x15, 0x4f, 0xc9, 0xd4, 0xfa, 0xad, 0x00, 0xab, 0x32, 0xe5, 0x49, 0xa9, 0x96, 0x1d, 0xd6, 0x5e, - 0xef, 0x70, 0x3e, 0xeb, 0xf0, 0x2d, 0x4e, 0xb1, 0xf1, 0x94, 0x44, 0xd4, 0xd4, 0xc5, 0xeb, 0xf5, - 0x4c, 0x36, 0xf7, 0x25, 0xa9, 0x1c, 0x48, 0x75, 0x51, 0x17, 0x2e, 0x71, 0x93, 0x11, 0xa1, 0x81, - 0x17, 0x33, 0x37, 0xf0, 0x87, 0x27, 0xae, 0x3f, 0x09, 0x4e, 0x44, 0xd0, 0x3a, 0xbe, 0x38, 0xb3, - 0x4f, 0x71, 0xca, 0x1d, 0x09, 0x0a, 0x5d, 0x07, 0xb0, 0x1d, 0x27, 0x22, 0x8e, 0xcd, 0x88, 0x8c, - 0x75, 0xad, 0xbb, 0x92, 0xbc, 0xb6, 0xe5, 0x38, 0x11, 0x5e, 0xe2, 0xd1, 0x67, 0xb0, 0x1e, 0xda, - 0x11, 0x73, 0x6d, 0x8f, 0xbf, 0x22, 0x2a, 0x3f, 0x9c, 0xb8, 0xd4, 0x1e, 0x79, 0x64, 0x62, 0x16, - 0x5b, 0xda, 0x66, 0x19, 0x5f, 0x51, 0x0a, 0x49, 0x67, 0xdc, 0x51, 0x34, 0xfa, 0xfa, 0x9c, 0xbb, - 0x94, 0x45, 0x36, 0x23, 0xce, 0xdc, 0x2c, 0x89, 0xb2, 0x6c, 0x24, 0x0f, 0x7f, 0x91, 0xb5, 0x31, - 0x50, 0x6a, 0xff, 0x30, 0x9e, 0x10, 0x68, 0x03, 0xaa, 0xf4, 0x89, 0x1b, 0x0e, 0xc7, 0xd3, 0xd8, - 0x7f, 0x42, 0xcd, 0xb2, 0x70, 0x05, 0x38, 0xb4, 0x23, 0x10, 0x74, 0x0d, 0x8c, 0xa9, 0xeb, 0x33, - 0x6a, 0x56, 0x5a, 0x9a, 0x48, 0xa8, 0x9c, 0xc0, 0x76, 0x32, 0x81, 0xed, 0x2d, 0x7f, 0x8e, 0xa5, - 0x0a, 0x42, 0x50, 0xa0, 0x8c, 0x84, 0x26, 0x88, 0xb4, 0x89, 0x33, 0xaa, 0x83, 0x11, 0xd9, 0xbe, - 0x43, 0xcc, 0xaa, 0x00, 0xa5, 0x80, 0x6e, 0x42, 0xf5, 0x69, 0x4c, 0xa2, 0xf9, 0x50, 0xda, 0x5e, - 0x11, 0xb6, 0x51, 0x12, 0xc5, 0x43, 0x4e, 0xed, 0x72, 0x06, 0xc3, 0xd3, 0xf4, 0x8c, 0x6e, 0x00, - 0xd0, 0xa9, 0x1d, 0x4d, 0x86, 0xae, 0x7f, 0x1c, 0x98, 0xab, 0xe2, 0xce, 0xa2, 0x21, 0x39, 0x23, - 0x26, 0xab, 0x42, 0x93, 0xa3, 0xf5, 0xb3, 0x06, 0xb0, 0x30, 0x26, 0x82, 0x65, 0x24, 0x1c, 0xce, - 0x5c, 0xcf, 0x73, 0xa9, 0x6a, 0x2c, 0xe0, 0xd0, 0xbe, 0x40, 0x50, 0x0b, 0x0a, 0xc7, 0xb1, 0x3f, - 0x16, 0x7d, 0x55, 0x5d, 0x94, 0xf3, 0x6e, 0xec, 0x8f, 0xb1, 0x60, 0xd0, 0x75, 0x28, 0x3b, 0x51, - 0x10, 0x87, 0xae, 0xef, 0x88, 0xee, 0xa8, 0x76, 0x6b, 0x89, 0xd6, 0x3d, 0x85, 0xe3, 0x54, 0x03, - 0xfd, 0x2f, 0x09, 0xde, 0x10, 0xaa, 0xe9, 0x6c, 0x63, 0x0e, 0xaa, 0x5c, 0x58, 0x27, 0x50, 0x49, - 0x9d, 0x17, 0x2e, 0xaa, 0x18, 0x27, 0xe4, 0x34, 0x75, 0x51, 0xf2, 0x13, 0x72, 0x8a, 0xfe, 0x0b, - 0x2b, 0x2c, 0x60, 0xb6, 0x37, 0x14, 0x18, 0x55, 0x23, 0x50, 0x15, 0x98, 0x30, 0x43, 0xd1, 0x1a, - 0xe4, 0x47, 0x73, 0x31, 0xcc, 0x65, 0x9c, 0x1f, 0xcd, 0xf9, 0xd2, 0x52, 0x2b, 0xa6, 0xd0, 0xd2, - 0xf9, 0xd2, 0x92, 0x92, 0xd5, 0x80, 0x02, 0x8f, 0x8c, 0x97, 0xcd, 0xb7, 0xd5, 0xa0, 0x55, 0xb0, - 0x38, 0x5b, 0x5d, 0x28, 0x27, 0xf1, 0x28, 0x7b, 0xda, 0x39, 0xf6, 0xf4, 0x8c, 0xbd, 0x0d, 0x30, - 0x44, 0x60, 0x5c, 0x21, 0x93, 0x62, 0x25, 0x59, 0x3f, 0x68, 0xb0, 0x96, 0xcc, 0xb9, 0x5a, 0x7f, - 0x9b, 0x50, 0x4c, 0xf7, 0x31, 0x4f, 0xd1, 0x5a, 0x5a, 0x4f, 0x81, 0xee, 0xe6, 0xb0, 0xe2, 0x51, - 0x03, 0x4a, 0x27, 0x76, 0xe4, 0xf3, 0xc4, 0x8b, 0xdd, 0xbb, 0x9b, 0xc3, 0x09, 0x80, 0xae, 0x27, - 0x4d, 0xaa, 0xbf, 0xbe, 0x49, 0x77, 0x73, 0xaa, 0x4d, 0xb7, 0xcb, 0x50, 0x8c, 0x08, 0x8d, 0x3d, - 0x66, 0xfd, 0x92, 0x87, 0x0b, 0x62, 0x33, 0xf4, 0xed, 0xd9, 0x62, 0xf9, 0xbc, 0x71, 0x58, 0xb5, - 0xf7, 0x18, 0xd6, 0xfc, 0x7b, 0x0e, 0x6b, 0x1d, 0x0c, 0xca, 0xec, 0x88, 0xa9, 0x45, 0x2d, 0x05, - 0x54, 0x03, 0x9d, 0xf8, 0x13, 0xb5, 0xab, 0xf8, 0x71, 0x31, 0xb3, 0xc6, 0xdb, 0x67, 0x76, 0x79, - 0x67, 0x16, 0xff, 0xfd, 0xce, 0xb4, 0x22, 0x40, 0xcb, 0x99, 0x53, 0xe5, 0xac, 0x83, 0xc1, 0xdb, - 0x47, 0xfe, 0x98, 0x55, 0xb0, 0x14, 0x50, 0x03, 0xca, 0xaa, 0x52, 0xbc, 0x5f, 0x39, 0x91, 0xca, - 0x0b, 0x5f, 0xf5, 0xb7, 0xfa, 0x6a, 0xfd, 0x9a, 0x57, 0x8f, 0x3e, 0xb2, 0xbd, 0x78, 0x51, 0xaf, - 0x3a, 0x18, 0xa2, 0x03, 0x55, 0x03, 0x4b, 0xe1, 0xcd, 0x55, 0xcc, 0xbf, 0x47, 0x15, 0xf5, 0x0f, - 0x55, 0xc5, 0xc2, 0x39, 0x55, 0x34, 0xce, 0xa9, 0x62, 0xf1, 0xdd, 0xaa, 0x58, 0x7a, 0x87, 0x2a, - 0xc6, 0x70, 0x31, 0x93, 0x50, 0x55, 0xc6, 0xcb, 0x50, 0xfc, 0x56, 0x20, 0xaa, 0x8e, 0x4a, 0xfa, - 0x50, 0x85, 0xbc, 0xf6, 0x0d, 0x54, 0xd2, 0x0f, 0x08, 0x54, 0x85, 0xd2, 0x61, 0xff, 0xf3, 0xfe, - 0x83, 0xa3, 0x7e, 0x2d, 0x87, 0x2a, 0x60, 0x3c, 0x3c, 0xec, 0xe1, 0xaf, 0x6a, 0x1a, 0x2a, 0x43, - 0x01, 0x1f, 0xde, 0xef, 0xd5, 0xf2, 0x5c, 0x63, 0xb0, 0x77, 0xa7, 0xb7, 0xb3, 0x85, 0x6b, 0x3a, - 0xd7, 0x18, 0x1c, 0x3c, 0xc0, 0xbd, 0x5a, 0x81, 0xe3, 0xb8, 0xb7, 0xd3, 0xdb, 0x7b, 0xd4, 0xab, - 0x19, 0x1c, 0xbf, 0xd3, 0xdb, 0x3e, 0xbc, 0x57, 0x2b, 0x5e, 0xdb, 0x86, 0x02, 0xff, 0x05, 0x46, - 0x25, 0xd0, 0xf1, 0xd6, 0x91, 0xb4, 0xba, 0xf3, 0xe0, 0xb0, 0x7f, 0x50, 0xd3, 0x38, 0x36, 0x38, - 0xdc, 0xaf, 0xe5, 0xf9, 0x61, 0x7f, 0xaf, 0x5f, 0xd3, 0xc5, 0x61, 0xeb, 0x4b, 0x69, 0x4e, 0x68, - 0xf5, 0x70, 0xcd, 0xe8, 0x7e, 0x97, 0x07, 0x43, 0xf8, 0x88, 0x3e, 0x86, 0x82, 0x58, 0xcd, 0x17, - 0x93, 0x8c, 0x2e, 0x7d, 0xcf, 0x35, 0xea, 0x59, 0x50, 0xe5, 0xef, 0x53, 0x28, 0xca, 0xfd, 0x85, - 0x2e, 0x65, 0xf7, 0x59, 0x72, 0xed, 0xf2, 0x59, 0x58, 0x5e, 0xbc, 0xa1, 0xa1, 0x1d, 0x80, 0xc5, - 0x5c, 0xa1, 0xf5, 0x4c, 0x15, 0x97, 0xb7, 0x54, 0xa3, 0x71, 0x1e, 0xa5, 0xde, 0xbf, 0x0b, 0xd5, - 0xa5, 0xb2, 0xa2, 0xac, 0x6a, 0x66, 0x78, 0x1a, 0x57, 0xcf, 0xe5, 0xa4, 0x9d, 0x6e, 0x1f, 0xd6, - 0xc4, 0x17, 0x34, 0x9f, 0x0a, 0x99, 0x8c, 0xdb, 0x50, 0xc5, 0x64, 0x16, 0x30, 0x22, 0x70, 0x94, - 0x86, 0xbf, 0xfc, 0xa1, 0xdd, 0xb8, 0x74, 0x06, 0x55, 0x1f, 0xe4, 0xb9, 0xed, 0xff, 0x3f, 0xfb, - 0xab, 0x99, 0x7b, 0xf6, 0xb2, 0xa9, 0x3d, 0x7f, 0xd9, 0xd4, 0xfe, 0x7c, 0xd9, 0xd4, 0x7e, 0x7c, - 0xd5, 0xcc, 0x3d, 0x7f, 0xd5, 0xcc, 0xfd, 0xfe, 0xaa, 0x99, 0x7b, 0x5c, 0x52, 0xff, 0x13, 0x8c, - 0x8a, 0xa2, 0x67, 0x6e, 0xfe, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x02, 0x42, 0x0e, 0xd0, 0x7d, 0x0c, - 0x00, 0x00, + 0x17, 0xf6, 0x7a, 0xbd, 0xfe, 0x38, 0x4e, 0xf2, 0x2e, 0x83, 0x81, 0x8d, 0x91, 0x12, 0xbf, 0x7e, + 0xf5, 0x4a, 0x11, 0xa2, 0x36, 0x35, 0x15, 0x52, 0x2b, 0x6e, 0x92, 0x60, 0x48, 0x54, 0x12, 0xca, + 0x38, 0x21, 0x2d, 0x55, 0x65, 0xad, 0xed, 0xc9, 0x7a, 0xc5, 0x7e, 0xb1, 0x33, 0xdb, 0xc4, 0xb7, + 0xed, 0x4d, 0x2f, 0xab, 0xfe, 0x84, 0xfe, 0x8a, 0xfe, 0x04, 0xee, 0xca, 0x5d, 0xab, 0x5e, 0xa0, + 0x16, 0xfe, 0x48, 0x35, 0x1f, 0xbb, 0xf6, 0xa6, 0x01, 0x8a, 0xe0, 0x26, 0x9a, 0xf3, 0x3c, 0x67, + 0xce, 0x9c, 0x39, 0xe7, 0xcc, 0xe3, 0x0d, 0x5c, 0xa1, 0x2c, 0x8c, 0x49, 0x57, 0xfc, 0x8d, 0x46, + 0xdd, 0x38, 0x1a, 0x77, 0xa2, 0x38, 0x64, 0x21, 0x2a, 0xb3, 0xa9, 0x1d, 0x84, 0xb4, 0xb9, 0x9a, + 0x77, 0x60, 0xb3, 0x88, 0x50, 0xe9, 0xd2, 0x6c, 0x38, 0xa1, 0x13, 0x8a, 0x65, 0x97, 0xaf, 0x14, + 0xda, 0xca, 0x6f, 0x88, 0xe2, 0xd0, 0x3f, 0xb3, 0x4f, 0x85, 0xf4, 0xec, 0x11, 0xf1, 0xce, 0x52, + 0x4e, 0x18, 0x3a, 0x1e, 0xe9, 0x0a, 0x6b, 0x94, 0x1c, 0x77, 0xed, 0x60, 0x26, 0xa9, 0xf6, 0x7f, + 0x60, 0xf9, 0x28, 0x76, 0x19, 0xc1, 0x84, 0x46, 0x61, 0x40, 0x49, 0xfb, 0x7b, 0x0d, 0x96, 0x14, + 0xf2, 0x34, 0x21, 0x94, 0xa1, 0x4d, 0x00, 0xe6, 0xfa, 0x84, 0x92, 0xd8, 0x25, 0xd4, 0xd2, 0x5a, + 0xfa, 0x46, 0xbd, 0x77, 0x95, 0xef, 0xf6, 0x09, 0x9b, 0x92, 0x84, 0x0e, 0xc7, 0x61, 0x34, 0xeb, + 0x1c, 0xb8, 0x3e, 0x19, 0x08, 0x97, 0xad, 0xd2, 0xb3, 0x17, 0xeb, 0x05, 0xbc, 0xb0, 0x09, 0x5d, + 0x86, 0x32, 0x23, 0x81, 0x1d, 0x30, 0xab, 0xd8, 0xd2, 0x36, 0x6a, 0x58, 0x59, 0xc8, 0x82, 0x4a, + 0x4c, 0x22, 0xcf, 0x1d, 0xdb, 0x96, 0xde, 0xd2, 0x36, 0x74, 0x9c, 0x9a, 0xed, 0x65, 0xa8, 0xef, + 0x06, 0xc7, 0xa1, 0xca, 0xa1, 0xfd, 0x53, 0x11, 0x96, 0xa4, 0x2d, 0xb3, 0x44, 0x63, 0x28, 0x8b, + 0x8b, 0xa6, 0x09, 0x2d, 0x77, 0x64, 0x61, 0x3b, 0xf7, 0x39, 0xba, 0x75, 0x9b, 0xa7, 0xf0, 0xc7, + 0x8b, 0xf5, 0x4f, 0x1c, 0x97, 0x4d, 0x93, 0x51, 0x67, 0x1c, 0xfa, 0x5d, 0xe9, 0xf0, 0x91, 0x1b, + 0xaa, 0x55, 0x37, 0x7a, 0xe2, 0x74, 0x73, 0x35, 0xeb, 0x3c, 0x16, 0xbb, 0xb1, 0x0a, 0x8d, 0x56, + 0xa1, 0xea, 0xbb, 0xc1, 0x90, 0x5f, 0x44, 0x24, 0xae, 0xe3, 0x8a, 0xef, 0x06, 0xfc, 0xa6, 0x82, + 0xb2, 0x4f, 0x25, 0xa5, 0x52, 0xf7, 0xed, 0x53, 0x41, 0x75, 0xa1, 0x26, 0xa2, 0x1e, 0xcc, 0x22, + 0x62, 0x95, 0x5a, 0xda, 0xc6, 0x4a, 0xef, 0x42, 0x9a, 0xdd, 0x20, 0x25, 0xf0, 0xdc, 0x07, 0xdd, + 0x02, 0x10, 0x07, 0x0e, 0x29, 0x61, 0xd4, 0x32, 0xc4, 0x7d, 0xb2, 0x1d, 0x32, 0xa5, 0x01, 0x61, + 0xaa, 0xac, 0x35, 0x4f, 0xd9, 0xb4, 0xfd, 0x83, 0x01, 0xcb, 0xb2, 0xe4, 0x69, 0xab, 0x16, 0x13, + 0xd6, 0x5e, 0x9f, 0x70, 0x31, 0x9f, 0xf0, 0x2d, 0x4e, 0xb1, 0xf1, 0x94, 0xc4, 0xd4, 0xd2, 0xc5, + 0xe9, 0x8d, 0x5c, 0x35, 0xf7, 0x24, 0xa9, 0x12, 0xc8, 0x7c, 0x51, 0x0f, 0x2e, 0xf1, 0x90, 0x31, + 0xa1, 0xa1, 0x97, 0x30, 0x37, 0x0c, 0x86, 0x27, 0x6e, 0x30, 0x09, 0x4f, 0xc4, 0xa5, 0x75, 0x7c, + 0xd1, 0xb7, 0x4f, 0x71, 0xc6, 0x1d, 0x09, 0x0a, 0x5d, 0x07, 0xb0, 0x1d, 0x27, 0x26, 0x8e, 0xcd, + 0x88, 0xbc, 0xeb, 0x4a, 0x6f, 0x29, 0x3d, 0x6d, 0xd3, 0x71, 0x62, 0xbc, 0xc0, 0xa3, 0xcf, 0x60, + 0x35, 0xb2, 0x63, 0xe6, 0xda, 0x1e, 0x3f, 0x45, 0x74, 0x7e, 0x38, 0x71, 0xa9, 0x3d, 0xf2, 0xc8, + 0xc4, 0x2a, 0xb7, 0xb4, 0x8d, 0x2a, 0xbe, 0xa2, 0x1c, 0xd2, 0xc9, 0xb8, 0xa3, 0x68, 0xf4, 0xf5, + 0x39, 0x7b, 0x29, 0x8b, 0x6d, 0x46, 0x9c, 0x99, 0x55, 0x11, 0x6d, 0x59, 0x4f, 0x0f, 0xfe, 0x22, + 0x1f, 0x63, 0xa0, 0xdc, 0xfe, 0x11, 0x3c, 0x25, 0xd0, 0x3a, 0xd4, 0xe9, 0x13, 0x37, 0x1a, 0x8e, + 0xa7, 0x49, 0xf0, 0x84, 0x5a, 0x55, 0x91, 0x0a, 0x70, 0x68, 0x5b, 0x20, 0xe8, 0x1a, 0x18, 0x53, + 0x37, 0x60, 0xd4, 0xaa, 0xb5, 0x34, 0x51, 0x50, 0xf9, 0x02, 0x3b, 0xe9, 0x0b, 0xec, 0x6c, 0x06, + 0x33, 0x2c, 0x5d, 0x10, 0x82, 0x12, 0x65, 0x24, 0xb2, 0x40, 0x94, 0x4d, 0xac, 0x51, 0x03, 0x8c, + 0xd8, 0x0e, 0x1c, 0x62, 0xd5, 0x05, 0x28, 0x0d, 0x74, 0x13, 0xea, 0x4f, 0x13, 0x12, 0xcf, 0x86, + 0x32, 0xf6, 0x92, 0x88, 0x8d, 0xd2, 0x5b, 0x3c, 0xe4, 0xd4, 0x0e, 0x67, 0x30, 0x3c, 0xcd, 0xd6, + 0xe8, 0x06, 0x00, 0x9d, 0xda, 0xf1, 0x64, 0xe8, 0x06, 0xc7, 0xa1, 0xb5, 0x2c, 0xf6, 0xcc, 0x07, + 0x92, 0x33, 0xe2, 0x65, 0xd5, 0x68, 0xba, 0x44, 0x37, 0xa0, 0xe1, 0xdb, 0xa7, 0xae, 0x9f, 0xf8, + 0xbc, 0x62, 0x6e, 0xe0, 0x0c, 0xa9, 0x17, 0x32, 0x6a, 0xad, 0xb4, 0xb4, 0x8d, 0x12, 0x46, 0x8a, + 0x1b, 0x08, 0x6a, 0xc0, 0x99, 0xf6, 0xcf, 0x1a, 0xc0, 0xfc, 0x78, 0x51, 0x1e, 0x46, 0xa2, 0xa1, + 0xef, 0x7a, 0x9e, 0x4b, 0xd5, 0x28, 0x02, 0x87, 0xf6, 0x04, 0x82, 0x5a, 0x50, 0x3a, 0x4e, 0x82, + 0xb1, 0x98, 0xc4, 0xfa, 0x7c, 0x00, 0xee, 0x26, 0xc1, 0x18, 0x0b, 0x06, 0x5d, 0x87, 0xaa, 0x13, + 0x87, 0x49, 0xe4, 0x06, 0x8e, 0x98, 0xa7, 0x7a, 0xcf, 0x4c, 0xbd, 0xee, 0x29, 0x1c, 0x67, 0x1e, + 0xe8, 0x7f, 0x69, 0xb9, 0x0c, 0xe1, 0x9a, 0xa9, 0x01, 0xe6, 0xa0, 0xaa, 0x5e, 0xfb, 0x04, 0x6a, + 0xd9, 0x75, 0x45, 0x8a, 0xaa, 0x2a, 0x13, 0x72, 0x9a, 0xa5, 0x28, 0xf9, 0x09, 0x39, 0x45, 0xff, + 0x85, 0x25, 0x16, 0x32, 0xdb, 0x1b, 0x0a, 0x8c, 0xaa, 0x47, 0x53, 0x17, 0x98, 0x08, 0x43, 0xd1, + 0x0a, 0x14, 0x47, 0x33, 0xf1, 0xfc, 0xab, 0xb8, 0x38, 0x9a, 0x71, 0x99, 0x53, 0xa2, 0x54, 0x6a, + 0xe9, 0x5c, 0xe6, 0xa4, 0xd5, 0x6e, 0x42, 0x89, 0xdf, 0x8c, 0x37, 0x3a, 0xb0, 0xd5, 0xd3, 0xac, + 0x61, 0xb1, 0x6e, 0xf7, 0xa0, 0x9a, 0xde, 0x47, 0xc5, 0xd3, 0xce, 0x89, 0xa7, 0xe7, 0xe2, 0xad, + 0x83, 0x21, 0x2e, 0xc6, 0x1d, 0x72, 0x25, 0x56, 0x56, 0xfb, 0x37, 0x0d, 0x56, 0x52, 0x65, 0x50, + 0x82, 0xb9, 0x01, 0xe5, 0x4c, 0xc1, 0x79, 0x89, 0x56, 0xb2, 0x09, 0x10, 0xe8, 0x4e, 0x01, 0x2b, + 0x1e, 0x35, 0xa1, 0x72, 0x62, 0xc7, 0x01, 0x2f, 0xbc, 0x50, 0xeb, 0x9d, 0x02, 0x4e, 0x01, 0x74, + 0x3d, 0x1d, 0x6b, 0xfd, 0xf5, 0x63, 0xbd, 0x53, 0x48, 0x07, 0xfb, 0x1e, 0x5c, 0x18, 0x87, 0x7e, + 0x14, 0x13, 0x4a, 0xc9, 0x64, 0xa8, 0x8e, 0x97, 0xcd, 0xb4, 0xd2, 0xe3, 0xb7, 0x33, 0x87, 0x2c, + 0x11, 0x73, 0x7c, 0x06, 0xdb, 0xaa, 0x42, 0x39, 0x26, 0x34, 0xf1, 0x58, 0xfb, 0x97, 0x22, 0x5c, + 0x10, 0xa2, 0xb4, 0x6f, 0xfb, 0x73, 0xdd, 0x7b, 0xa3, 0x4e, 0x68, 0xef, 0xa1, 0x13, 0xc5, 0xf7, + 0xd4, 0x89, 0x06, 0x18, 0x94, 0xd9, 0x31, 0x53, 0xbf, 0x11, 0xd2, 0x40, 0x26, 0xe8, 0x24, 0x98, + 0x28, 0x99, 0xe4, 0xcb, 0xb9, 0x5c, 0x18, 0x6f, 0x97, 0x8b, 0x45, 0xb9, 0x2e, 0xff, 0x7b, 0xb9, + 0x6e, 0xc7, 0x80, 0x16, 0x2b, 0xa7, 0xe6, 0xa2, 0x01, 0x06, 0x9f, 0x43, 0xf9, 0x3b, 0x5a, 0xc3, + 0xd2, 0x40, 0x4d, 0xa8, 0xaa, 0x96, 0xf3, 0xc1, 0xe7, 0x44, 0x66, 0xcf, 0x73, 0xd5, 0xdf, 0x9a, + 0x6b, 0xfb, 0xd7, 0xa2, 0x3a, 0xf4, 0x91, 0xed, 0x25, 0xf3, 0x7e, 0x35, 0xc0, 0x10, 0xa3, 0xac, + 0x5e, 0x82, 0x34, 0xde, 0xdc, 0xc5, 0xe2, 0x7b, 0x74, 0x51, 0xff, 0x50, 0x5d, 0x2c, 0x9d, 0xd3, + 0x45, 0xe3, 0x9c, 0x2e, 0x96, 0xdf, 0xad, 0x8b, 0x95, 0x77, 0xe8, 0x62, 0x02, 0x17, 0x73, 0x05, + 0x55, 0x6d, 0xbc, 0x0c, 0xe5, 0x6f, 0x05, 0xa2, 0xfa, 0xa8, 0xac, 0x0f, 0xd5, 0xc8, 0x6b, 0xdf, + 0x40, 0x2d, 0xfb, 0x76, 0x41, 0x75, 0xa8, 0x1c, 0xee, 0x7f, 0xbe, 0xff, 0xe0, 0x68, 0xdf, 0x2c, + 0xa0, 0x1a, 0x18, 0x0f, 0x0f, 0xfb, 0xf8, 0x2b, 0x53, 0x43, 0x55, 0x28, 0xe1, 0xc3, 0xfb, 0x7d, + 0xb3, 0xc8, 0x3d, 0x06, 0xbb, 0x77, 0xfa, 0xdb, 0x9b, 0xd8, 0xd4, 0xb9, 0xc7, 0xe0, 0xe0, 0x01, + 0xee, 0x9b, 0x25, 0x8e, 0xe3, 0xfe, 0x76, 0x7f, 0xf7, 0x51, 0xdf, 0x34, 0x38, 0x7e, 0xa7, 0xbf, + 0x75, 0x78, 0xcf, 0x2c, 0x5f, 0xdb, 0x82, 0x12, 0xff, 0xf1, 0x47, 0x15, 0xd0, 0xf1, 0xe6, 0x91, + 0x8c, 0xba, 0xfd, 0xe0, 0x70, 0xff, 0xc0, 0xd4, 0x38, 0x36, 0x38, 0xdc, 0x33, 0x8b, 0x7c, 0xb1, + 0xb7, 0xbb, 0x6f, 0xea, 0x62, 0xb1, 0xf9, 0xa5, 0x0c, 0x27, 0xbc, 0xfa, 0xd8, 0x34, 0x7a, 0xdf, + 0x15, 0xc1, 0x10, 0x39, 0xa2, 0x8f, 0xa1, 0x24, 0x34, 0xfe, 0x62, 0x5a, 0xd1, 0x85, 0x4f, 0xc9, + 0x66, 0x23, 0x0f, 0xaa, 0xfa, 0x7d, 0x0a, 0x65, 0xa9, 0x35, 0xe8, 0x52, 0x5e, 0x18, 0xd3, 0x6d, + 0x97, 0xcf, 0xc2, 0x72, 0xe3, 0x0d, 0x0d, 0x6d, 0x03, 0xcc, 0xdf, 0x15, 0x5a, 0xcd, 0x75, 0x71, + 0x51, 0xa5, 0x9a, 0xcd, 0xf3, 0x28, 0x75, 0xfe, 0x5d, 0xa8, 0x2f, 0xb4, 0x15, 0xe5, 0x5d, 0x73, + 0x8f, 0xa7, 0x79, 0xf5, 0x5c, 0x4e, 0xc6, 0xe9, 0xed, 0xc3, 0x8a, 0xf8, 0x78, 0xe7, 0xaf, 0x42, + 0x16, 0xe3, 0x36, 0xd4, 0x31, 0xf1, 0x43, 0x46, 0x04, 0x8e, 0xb2, 0xeb, 0x2f, 0x7e, 0xe3, 0x37, + 0x2f, 0x9d, 0x41, 0xd5, 0xff, 0x02, 0x85, 0xad, 0xff, 0x3f, 0xfb, 0x6b, 0xad, 0xf0, 0xec, 0xe5, + 0x9a, 0xf6, 0xfc, 0xe5, 0x9a, 0xf6, 0xe7, 0xcb, 0x35, 0xed, 0xc7, 0x57, 0x6b, 0x85, 0xe7, 0xaf, + 0xd6, 0x0a, 0xbf, 0xbf, 0x5a, 0x2b, 0x3c, 0xae, 0xa8, 0x7f, 0x47, 0x46, 0x65, 0x31, 0x33, 0x37, + 0xff, 0x0e, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x80, 0xf5, 0xc0, 0xf8, 0x0c, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1412,6 +1433,11 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.MaximumStringSlots != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.MaximumStringSlots)) + i-- + dAtA[i] = 0x70 + } if m.ShardInfo != nil { { size, err := m.ShardInfo.MarshalToSizedBuffer(dAtA[:i]) @@ -1837,6 +1863,27 @@ func (m *SeriesResponse_Hints) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *SeriesResponse_CompressedSeries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesResponse_CompressedSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.CompressedSeries != nil { + { + size, err := m.CompressedSeries.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} func (m *LabelNamesRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2234,6 +2281,9 @@ func (m *SeriesRequest) Size() (n int) { l = m.ShardInfo.Size() n += 1 + l + sovRpc(uint64(l)) } + if m.MaximumStringSlots != 0 { + n += 1 + sovRpc(uint64(m.MaximumStringSlots)) + } return n } @@ -2374,6 +2424,18 @@ func (m *SeriesResponse_Hints) Size() (n int) { } return n } +func (m *SeriesResponse_CompressedSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.CompressedSeries != nil { + l = m.CompressedSeries.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} func (m *LabelNamesRequest) Size() (n int) { if m == nil { return 0 @@ -3300,6 +3362,25 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 14: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaximumStringSlots", wireType) + } + m.MaximumStringSlots = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaximumStringSlots |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -4022,6 +4103,41 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { } m.Result = &SeriesResponse_Hints{v} iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field CompressedSeries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &CompressedSeries{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &SeriesResponse_CompressedSeries{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 72afaba8ed..74bbc260b2 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -122,6 +122,12 @@ message SeriesRequest { // shard_info is used by the querier to request a specific // shard of blocks instead of entire blocks. ShardInfo shard_info = 13; + + // maximum_string_slots is the number of maximum strings + // that the receiver can send. It is safe to ignore this by + // the receiver if it does not intend on sending a symbol table + // via hints later on. + uint64 maximum_string_slots = 14; } @@ -196,6 +202,10 @@ message SeriesResponse { /// multiple SeriesResponse frames contain hints for a single Series() request and how should they /// be handled in such case (ie. merged vs keep the first/last one). google.protobuf.Any hints = 3; + + /// CompressedSeries is like series but instead of raw strings for label names/values, + /// it contains references to strings that are later sent via hints. + CompressedSeries compressed_series = 4; } } diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 271a785bf5..33f3a63d20 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -11,8 +11,8 @@ import ( _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" - _ "github.com/thanos-io/thanos/pkg/store/labelpb" github_com_thanos_io_thanos_pkg_store_labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" + labelpb "github.com/thanos-io/thanos/pkg/store/labelpb" ) // Reference imports to suppress errors if they are not otherwise used. @@ -110,7 +110,7 @@ func (x LabelMatcher_Type) String() string { } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{3, 0} + return fileDescriptor_121fba57de02d8e0, []int{4, 0} } type Chunk struct { @@ -152,6 +152,44 @@ func (m *Chunk) XXX_DiscardUnknown() { var xxx_messageInfo_Chunk proto.InternalMessageInfo +type CompressedSeries struct { + Labels []labelpb.CompressedLabel `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Chunks []AggrChunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` +} + +func (m *CompressedSeries) Reset() { *m = CompressedSeries{} } +func (m *CompressedSeries) String() string { return proto.CompactTextString(m) } +func (*CompressedSeries) ProtoMessage() {} +func (*CompressedSeries) Descriptor() ([]byte, []int) { + return fileDescriptor_121fba57de02d8e0, []int{1} +} +func (m *CompressedSeries) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *CompressedSeries) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_CompressedSeries.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *CompressedSeries) XXX_Merge(src proto.Message) { + xxx_messageInfo_CompressedSeries.Merge(m, src) +} +func (m *CompressedSeries) XXX_Size() int { + return m.Size() +} +func (m *CompressedSeries) XXX_DiscardUnknown() { + xxx_messageInfo_CompressedSeries.DiscardUnknown(m) +} + +var xxx_messageInfo_CompressedSeries proto.InternalMessageInfo + type Series struct { Labels []github_com_thanos_io_thanos_pkg_store_labelpb.ZLabel `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" json:"labels"` Chunks []AggrChunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` @@ -161,7 +199,7 @@ func (m *Series) Reset() { *m = Series{} } func (m *Series) String() string { return proto.CompactTextString(m) } func (*Series) ProtoMessage() {} func (*Series) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{1} + return fileDescriptor_121fba57de02d8e0, []int{2} } func (m *Series) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -205,7 +243,7 @@ func (m *AggrChunk) Reset() { *m = AggrChunk{} } func (m *AggrChunk) String() string { return proto.CompactTextString(m) } func (*AggrChunk) ProtoMessage() {} func (*AggrChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{2} + return fileDescriptor_121fba57de02d8e0, []int{3} } func (m *AggrChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -245,7 +283,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{3} + return fileDescriptor_121fba57de02d8e0, []int{4} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -279,6 +317,7 @@ func init() { proto.RegisterEnum("thanos.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value) proto.RegisterEnum("thanos.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) proto.RegisterType((*Chunk)(nil), "thanos.Chunk") + proto.RegisterType((*CompressedSeries)(nil), "thanos.CompressedSeries") proto.RegisterType((*Series)(nil), "thanos.Series") proto.RegisterType((*AggrChunk)(nil), "thanos.AggrChunk") proto.RegisterType((*LabelMatcher)(nil), "thanos.LabelMatcher") @@ -287,41 +326,43 @@ func init() { func init() { proto.RegisterFile("store/storepb/types.proto", fileDescriptor_121fba57de02d8e0) } var fileDescriptor_121fba57de02d8e0 = []byte{ - // 536 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x4f, 0x6f, 0xd3, 0x4e, - 0x10, 0xf5, 0xda, 0x8e, 0x93, 0xcc, 0xaf, 0x3f, 0x64, 0x96, 0x0a, 0xdc, 0x1e, 0x9c, 0xc8, 0x08, - 0x11, 0x55, 0xaa, 0x2d, 0x15, 0x8e, 0x5c, 0x12, 0x94, 0x1b, 0xb4, 0x74, 0x1b, 0x09, 0xd4, 0x0b, - 0xda, 0xb8, 0x2b, 0xdb, 0x6a, 0xfc, 0x47, 0xf6, 0xba, 0x24, 0xdf, 0x02, 0xc4, 0x9d, 0xcf, 0x93, - 0x63, 0x8f, 0x88, 0x43, 0x04, 0xc9, 0x17, 0x41, 0x1e, 0x3b, 0x40, 0xa4, 0x5c, 0xac, 0xf1, 0x7b, - 0x6f, 0x66, 0x76, 0xde, 0xce, 0xc2, 0x51, 0x21, 0xd3, 0x5c, 0x78, 0xf8, 0xcd, 0xa6, 0x9e, 0x5c, - 0x64, 0xa2, 0x70, 0xb3, 0x3c, 0x95, 0x29, 0x35, 0x64, 0xc8, 0x93, 0xb4, 0x38, 0x3e, 0x0c, 0xd2, - 0x20, 0x45, 0xc8, 0xab, 0xa2, 0x9a, 0x3d, 0x6e, 0x12, 0x67, 0x7c, 0x2a, 0x66, 0xbb, 0x89, 0xce, - 0x1d, 0xb4, 0x5e, 0x87, 0x65, 0x72, 0x4b, 0x4f, 0x40, 0xaf, 0x70, 0x8b, 0xf4, 0xc9, 0xe0, 0xc1, - 0xd9, 0x63, 0xb7, 0x2e, 0xe8, 0x22, 0xe9, 0x8e, 0x13, 0x3f, 0xbd, 0x89, 0x92, 0x80, 0xa1, 0x86, - 0x52, 0xd0, 0x6f, 0xb8, 0xe4, 0x96, 0xda, 0x27, 0x83, 0x03, 0x86, 0x31, 0xb5, 0x40, 0x0f, 0x79, - 0x11, 0x5a, 0x5a, 0x9f, 0x0c, 0xf4, 0x91, 0xbe, 0x5c, 0xf5, 0x08, 0x43, 0xc4, 0x79, 0x04, 0x9d, - 0x6d, 0x3e, 0x6d, 0x83, 0xf6, 0xe1, 0x82, 0x99, 0x8a, 0xf3, 0x8d, 0x80, 0x71, 0x25, 0xf2, 0x48, - 0x14, 0xd4, 0x07, 0x03, 0x4f, 0x56, 0x58, 0xa4, 0xaf, 0x0d, 0xfe, 0x3b, 0xfb, 0x7f, 0xdb, 0xfb, - 0x4d, 0x85, 0x8e, 0x5e, 0x2d, 0x57, 0x3d, 0xe5, 0xc7, 0xaa, 0xf7, 0x32, 0x88, 0x64, 0x58, 0x4e, - 0x5d, 0x3f, 0x8d, 0xbd, 0x5a, 0x70, 0x1a, 0xa5, 0x4d, 0xe4, 0x65, 0xb7, 0x81, 0xb7, 0x33, 0xa4, - 0x7b, 0x8d, 0xd9, 0xac, 0x29, 0x4d, 0x3d, 0x30, 0xfc, 0x6a, 0x94, 0xc2, 0x52, 0xb1, 0xc9, 0xc3, - 0x6d, 0x93, 0x61, 0x10, 0xe4, 0x38, 0x24, 0x9e, 0x59, 0x61, 0x8d, 0xcc, 0xf9, 0xaa, 0x42, 0xf7, - 0x0f, 0x47, 0x8f, 0xa0, 0x13, 0x47, 0xc9, 0x47, 0x19, 0xc5, 0xb5, 0x43, 0x1a, 0x6b, 0xc7, 0x51, - 0x32, 0x89, 0x62, 0x81, 0x14, 0x9f, 0xd7, 0x94, 0xda, 0x50, 0x7c, 0x8e, 0x54, 0x0f, 0xb4, 0x9c, - 0x7f, 0x42, 0x4b, 0xfe, 0x19, 0x0b, 0x2b, 0xb2, 0x8a, 0xa1, 0x4f, 0xa1, 0xe5, 0xa7, 0x65, 0x22, - 0x2d, 0x7d, 0x9f, 0xa4, 0xe6, 0xaa, 0x2a, 0x45, 0x19, 0x5b, 0xad, 0xbd, 0x55, 0x8a, 0x32, 0xae, - 0x04, 0x71, 0x94, 0x58, 0xc6, 0x5e, 0x41, 0x1c, 0x25, 0x28, 0xe0, 0x73, 0xab, 0xbd, 0x5f, 0xc0, - 0xe7, 0xf4, 0x39, 0xb4, 0xb1, 0x97, 0xc8, 0xad, 0xce, 0x3e, 0xd1, 0x96, 0x75, 0xbe, 0x10, 0x38, - 0x40, 0x63, 0xdf, 0x72, 0xe9, 0x87, 0x22, 0xa7, 0xa7, 0x3b, 0x6b, 0x73, 0xb4, 0x73, 0x75, 0x8d, - 0xc6, 0x9d, 0x2c, 0x32, 0xf1, 0x77, 0x73, 0x12, 0xde, 0x18, 0xd5, 0x65, 0x18, 0xd3, 0x43, 0x68, - 0xdd, 0xf1, 0x59, 0x29, 0xd0, 0xa7, 0x2e, 0xab, 0x7f, 0x9c, 0x01, 0xe8, 0x55, 0x1e, 0x35, 0x40, - 0x1d, 0x5f, 0x9a, 0x4a, 0xb5, 0x39, 0xe7, 0xe3, 0x4b, 0x93, 0x54, 0x00, 0x1b, 0x9b, 0x2a, 0x02, - 0x6c, 0x6c, 0x6a, 0x27, 0x2e, 0x3c, 0x79, 0xc7, 0x73, 0x19, 0xf1, 0x19, 0x13, 0x45, 0x96, 0x26, - 0x85, 0xb8, 0x92, 0x39, 0x97, 0x22, 0x58, 0xd0, 0x0e, 0xe8, 0xef, 0x87, 0xec, 0xdc, 0x54, 0x68, - 0x17, 0x5a, 0xc3, 0xd1, 0x05, 0x9b, 0x98, 0x64, 0xf4, 0x6c, 0xf9, 0xcb, 0x56, 0x96, 0x6b, 0x9b, - 0xdc, 0xaf, 0x6d, 0xf2, 0x73, 0x6d, 0x93, 0xcf, 0x1b, 0x5b, 0xb9, 0xdf, 0xd8, 0xca, 0xf7, 0x8d, - 0xad, 0x5c, 0xb7, 0x9b, 0xe7, 0x35, 0x35, 0xf0, 0x81, 0xbc, 0xf8, 0x1d, 0x00, 0x00, 0xff, 0xff, - 0x13, 0xe7, 0xb3, 0x25, 0x76, 0x03, 0x00, 0x00, + // 561 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0x86, 0xbd, 0xb6, 0xe3, 0x24, 0x43, 0x41, 0x66, 0xa9, 0xa8, 0xdb, 0x83, 0x1b, 0x19, 0x21, + 0xa2, 0x4a, 0xb5, 0xa5, 0x02, 0x37, 0x2e, 0x6d, 0x95, 0x1b, 0xb4, 0x74, 0x5b, 0x09, 0xd4, 0x0b, + 0xda, 0xb8, 0x2b, 0xdb, 0x6a, 0xec, 0xb5, 0xbc, 0xeb, 0x92, 0xf0, 0x14, 0x20, 0xee, 0x3c, 0x4f, + 0x8e, 0x3d, 0x22, 0x0e, 0x15, 0x24, 0x2f, 0x82, 0xbc, 0x76, 0x02, 0x41, 0x39, 0x71, 0x89, 0x26, + 0xf3, 0x7f, 0xff, 0xcc, 0x78, 0x76, 0x17, 0xb6, 0x85, 0xe4, 0x05, 0x0b, 0xd4, 0x6f, 0x3e, 0x0c, + 0xe4, 0x24, 0x67, 0xc2, 0xcf, 0x0b, 0x2e, 0x39, 0xb6, 0x64, 0x4c, 0x33, 0x2e, 0x76, 0x36, 0x23, + 0x1e, 0x71, 0x95, 0x0a, 0xaa, 0xa8, 0x56, 0x77, 0x1a, 0xe3, 0x88, 0x0e, 0xd9, 0x68, 0xd5, 0xe8, + 0xdd, 0x40, 0xeb, 0x38, 0x2e, 0xb3, 0x6b, 0xbc, 0x07, 0x66, 0x95, 0x77, 0x50, 0x0f, 0xf5, 0x1f, + 0x1c, 0x3c, 0xf6, 0xeb, 0x82, 0xbe, 0x12, 0xfd, 0x41, 0x16, 0xf2, 0xab, 0x24, 0x8b, 0x88, 0x62, + 0x30, 0x06, 0xf3, 0x8a, 0x4a, 0xea, 0xe8, 0x3d, 0xd4, 0xdf, 0x20, 0x2a, 0xc6, 0x0e, 0x98, 0x31, + 0x15, 0xb1, 0x63, 0xf4, 0x50, 0xdf, 0x3c, 0x32, 0xa7, 0x77, 0xbb, 0x88, 0xa8, 0x8c, 0xf7, 0x08, + 0x3a, 0x0b, 0x3f, 0x6e, 0x83, 0xf1, 0xfe, 0x94, 0xd8, 0x9a, 0xf7, 0x09, 0xec, 0x63, 0x9e, 0xe6, + 0x05, 0x13, 0x82, 0x5d, 0x9d, 0xb3, 0x22, 0x61, 0x02, 0xbf, 0x04, 0x4b, 0x8d, 0x28, 0x1c, 0xd4, + 0x33, 0xfa, 0xf7, 0x0e, 0xb6, 0x96, 0x43, 0x2c, 0xc9, 0xd7, 0x95, 0xae, 0xaa, 0x6b, 0xa4, 0x81, + 0x71, 0x00, 0x56, 0x58, 0x4d, 0x29, 0x1c, 0x5d, 0xd9, 0x1e, 0x2e, 0x6c, 0x87, 0x51, 0x54, 0xa8, + 0xf9, 0x17, 0x86, 0x1a, 0xf3, 0xbe, 0x21, 0xb0, 0x9a, 0x96, 0xe1, 0x3f, 0x2d, 0xef, 0x2f, 0xbc, + 0x75, 0xa3, 0x57, 0x95, 0xef, 0xc7, 0xdd, 0xee, 0x8b, 0x28, 0x91, 0x71, 0x39, 0xf4, 0x43, 0x9e, + 0x06, 0x35, 0xb0, 0x9f, 0xf0, 0x26, 0x0a, 0xf2, 0xeb, 0x28, 0x58, 0x59, 0xb0, 0x7f, 0xa9, 0xdc, + 0xff, 0x3f, 0xe0, 0x57, 0x1d, 0xba, 0x4b, 0x0d, 0x6f, 0x43, 0x27, 0x4d, 0xb2, 0x0f, 0x32, 0x49, + 0xeb, 0xd3, 0x31, 0x48, 0x3b, 0x4d, 0xb2, 0x8b, 0x24, 0x65, 0x4a, 0xa2, 0xe3, 0x5a, 0xd2, 0x1b, + 0x89, 0x8e, 0x95, 0xb4, 0x0b, 0x46, 0x41, 0x3f, 0xaa, 0xe3, 0xf8, 0xeb, 0xb3, 0x54, 0x45, 0x52, + 0x29, 0xf8, 0x09, 0xb4, 0x42, 0x5e, 0x66, 0xd2, 0x31, 0xd7, 0x21, 0xb5, 0x56, 0x55, 0x11, 0x65, + 0xea, 0xb4, 0xd6, 0x56, 0x11, 0x65, 0x5a, 0x01, 0x69, 0x92, 0x39, 0xd6, 0x5a, 0x20, 0x4d, 0x32, + 0x05, 0xd0, 0xb1, 0xd3, 0x5e, 0x0f, 0xd0, 0x31, 0x7e, 0x06, 0x6d, 0xd5, 0x8b, 0x15, 0x4e, 0x67, + 0x1d, 0xb4, 0x50, 0xbd, 0x2f, 0x08, 0x36, 0xd4, 0x62, 0xdf, 0x50, 0x19, 0xc6, 0xac, 0xc0, 0xfb, + 0x2b, 0x57, 0x76, 0x7b, 0xe5, 0xe8, 0x1a, 0xc6, 0xbf, 0x98, 0xe4, 0xec, 0xcf, 0xad, 0xcd, 0x68, + 0xb3, 0xa8, 0x2e, 0x51, 0x31, 0xde, 0x84, 0xd6, 0x0d, 0x1d, 0x95, 0x4c, 0xed, 0xa9, 0x4b, 0xea, + 0x3f, 0x5e, 0x1f, 0xcc, 0xca, 0x87, 0x2d, 0xd0, 0x07, 0x67, 0xb6, 0x56, 0xdd, 0xda, 0x93, 0xc1, + 0x99, 0x8d, 0xaa, 0x04, 0x19, 0xd8, 0xba, 0x4a, 0x90, 0x81, 0x6d, 0xec, 0xf9, 0xb0, 0xf5, 0x96, + 0x16, 0x32, 0xa1, 0x23, 0xc2, 0x44, 0xce, 0x33, 0xc1, 0xce, 0x65, 0x41, 0x25, 0x8b, 0x26, 0xb8, + 0x03, 0xe6, 0xbb, 0x43, 0x72, 0x62, 0x6b, 0xb8, 0x0b, 0xad, 0xc3, 0xa3, 0x53, 0x72, 0x61, 0xa3, + 0xa3, 0xa7, 0xd3, 0x5f, 0xae, 0x36, 0x9d, 0xb9, 0xe8, 0x76, 0xe6, 0xa2, 0x9f, 0x33, 0x17, 0x7d, + 0x9e, 0xbb, 0xda, 0xed, 0xdc, 0xd5, 0xbe, 0xcf, 0x5d, 0xed, 0xb2, 0xdd, 0x3c, 0xed, 0xa1, 0xa5, + 0x1e, 0xe7, 0xf3, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x0b, 0xf7, 0x3a, 0x8f, 0xf2, 0x03, 0x00, + 0x00, } func (m *Chunk) Marshal() (dAtA []byte, err error) { @@ -364,6 +405,57 @@ func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *CompressedSeries) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *CompressedSeries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *CompressedSeries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Chunks) > 0 { + for iNdEx := len(m.Chunks) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Chunks[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if len(m.Labels) > 0 { + for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *Series) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -592,6 +684,27 @@ func (m *Chunk) Size() (n int) { return n } +func (m *CompressedSeries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + return n +} + func (m *Series) Size() (n int) { if m == nil { return 0 @@ -800,6 +913,124 @@ func (m *Chunk) Unmarshal(dAtA []byte) error { } return nil } +func (m *CompressedSeries) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: CompressedSeries: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: CompressedSeries: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, labelpb.CompressedLabel{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, AggrChunk{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Series) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/store/storepb/types.proto b/pkg/store/storepb/types.proto index 3739e23b8f..177ed13538 100644 --- a/pkg/store/storepb/types.proto +++ b/pkg/store/storepb/types.proto @@ -29,6 +29,11 @@ message Chunk { uint64 hash = 3 [(gogoproto.nullable) = true]; } +message CompressedSeries { + repeated CompressedLabel labels = 1 [(gogoproto.nullable) = false]; + repeated AggrChunk chunks = 2 [(gogoproto.nullable) = false]; +} + message Series { repeated Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"]; repeated AggrChunk chunks = 2 [(gogoproto.nullable) = false]; diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index d876872e2e..aba73aa31f 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -171,7 +171,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer continue } - storeSeries := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(completeLabelset)} + storeSeries := storepb.Series{Labels: (labelpb.ZLabelsFromPromLabels(completeLabelset))} if r.SkipChunks { if err := srv.Send(storepb.NewSeriesResponse(&storeSeries)); err != nil { return status.Error(codes.Aborted, err.Error()) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 04b425061a..b3288831a8 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1698,5 +1698,62 @@ func TestConnectedQueriesWithLazyProxy(t *testing.T) { instantQuery(t, context.Background(), querier2.Endpoint("http"), func() string { return "sum(metric_that_does_not_exist)" }, time.Now, promclient.QueryOptions{}, 0) +} + +// TestCompressionCompatibility tests whether symbol (string) table +// compression is compatible with older versions. +func TestCompressionCompatibility(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("compr-compat") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "", e2ethanos.LocalPrometheusTarget) + prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "") + querier1 := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).WithDisablePartialResponses(true).Init() + querier2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar.InternalEndpoint("grpc")).WithDisablePartialResponses(true).WithImage("quay.io/thanos/thanos:v0.26.0").Init() + querierConnecting := e2ethanos.NewQuerierBuilder(e, "3", querier2.InternalEndpoint("grpc")).WithDisablePartialResponses(true).Init() + testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar, querier1, querier2, querierConnecting)) + + testutil.Ok(t, querier2.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, querier1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, querierConnecting.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + + for _, querier := range []*e2emon.InstrumentedRunnable{querier1, querier2, querierConnecting} { + result := instantQuery(t, context.Background(), querier.Endpoint("http"), func() string { + return "sum(up)" + }, time.Now, promclient.QueryOptions{}, 1) + testutil.Equals(t, model.SampleValue(1.0), result[0].Value) + } + +} + +// TestCompressionReferenceAdjustments tests whether symbol (string) table +// compression adjusts references properly in the case of a Querier +// connected to a Querier. +func TestCompressionReferenceAdjustments(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("compr-adjust") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "", e2ethanos.LocalPrometheusTarget) + prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "") + + querier1 := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).WithDisablePartialResponses(true).Init() + querier2 := e2ethanos.NewQuerierBuilder(e, "2", sidecar.InternalEndpoint("grpc")).WithDisablePartialResponses(true).Init() + querierGlobal := e2ethanos.NewQuerierBuilder(e, "3", querier1.InternalEndpoint("grpc"), querier2.InternalEndpoint("grpc")).WithDisablePartialResponses(true).Init() + testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar, querier1, querier2, querierGlobal)) + + testutil.Ok(t, querier2.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, querier1.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + testutil.Ok(t, querierGlobal.WaitSumMetricsWithOptions(e2emon.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2emon.WaitMissingMetrics())) + + result := instantQuery(t, context.Background(), querierGlobal.Endpoint("http"), func() string { + return "up" + }, time.Now, promclient.QueryOptions{}, 1) + testutil.Equals(t, `up{instance="localhost:9090", job="myself", prometheus="p1", replica="0"}`, result[0].Metric.String()) } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 2f10e73012..8c8355595a 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -515,6 +515,11 @@ func TestStoreGatewayBytesLimit(t *testing.T) { m := e2ethanos.NewMinio(e, "thanos-minio", bucket) testutil.Ok(t, e2e.StartAndWaitReady(m)) + const cacheCfg = `type: IN-MEMORY +config: + max_size: 2B + max_item_size: 1B` + store1 := e2ethanos.NewStoreGW( e, "1", @@ -522,7 +527,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) { Type: client.S3, Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), m.InternalDir()), }, - "", + string(cacheCfg), []string{"--store.grpc.downloaded-bytes-limit=1B"}, ) @@ -533,7 +538,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) { Type: client.S3, Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), m.InternalDir()), }, - "", + string(cacheCfg), []string{"--store.grpc.downloaded-bytes-limit=100B"}, ) store3 := e2ethanos.NewStoreGW( @@ -543,7 +548,7 @@ func TestStoreGatewayBytesLimit(t *testing.T) { Type: client.S3, Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("https"), m.InternalDir()), }, - "", + string(cacheCfg), []string{"--store.grpc.downloaded-bytes-limit=196627B"}, )