-
Notifications
You must be signed in to change notification settings - Fork 797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Query Sample Statistics #4708
Query Sample Statistics #4708
Changes from 4 commits
32cee1c
73ad6fb
7f6db7a
3f6592a
665b16b
c1899fd
2b2a978
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ import ( | |
"strconv" | ||
"strings" | ||
"time" | ||
"unsafe" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
"github.com/gogo/status" | ||
|
@@ -88,6 +89,10 @@ type Request interface { | |
proto.Message | ||
// LogToSpan writes information about this request to an OpenTracing span | ||
LogToSpan(opentracing.Span) | ||
// GetStats returns the stats of the request. | ||
GetStats() string | ||
// WithStats clones the current `PrometheusRequest` with a new stats. | ||
WithStats(stats string) Request | ||
} | ||
|
||
// Response represents a query range response. | ||
|
@@ -114,6 +119,13 @@ func (q *PrometheusRequest) WithQuery(query string) Request { | |
return &new | ||
} | ||
|
||
// WithStats clones the current `PrometheusRequest` with a new stats. | ||
func (q *PrometheusRequest) WithStats(stats string) Request { | ||
new := *q | ||
new.Stats = stats | ||
return &new | ||
} | ||
|
||
// LogToSpan logs the current `PrometheusRequest` parameters to the specified span. | ||
func (q *PrometheusRequest) LogToSpan(sp opentracing.Span) { | ||
sp.LogFields( | ||
|
@@ -174,6 +186,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) { | |
Data: PrometheusData{ | ||
ResultType: model.ValMatrix.String(), | ||
Result: matrixMerge(promResponses), | ||
Stats: statsMerge(promResponses), | ||
}, | ||
} | ||
|
||
|
@@ -220,6 +233,7 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward | |
} | ||
|
||
result.Query = r.FormValue("query") | ||
result.Stats = r.FormValue("stats") | ||
result.Path = r.URL.Path | ||
|
||
// Include the specified headers from http request in prometheusRequest. | ||
|
@@ -252,6 +266,7 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r Request) (*http.Requ | |
"end": []string{encodeTime(promReq.End)}, | ||
"step": []string{encodeDurationMs(promReq.Step)}, | ||
"query": []string{promReq.Query}, | ||
"stats": []string{promReq.Stats}, | ||
} | ||
u := &url.URL{ | ||
Path: promReq.Path, | ||
|
@@ -380,6 +395,46 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) { | |
return json.Marshal(stream) | ||
} | ||
|
||
// statsMerge merge the stats from 2 responses | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can merge more than 2 responses right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// this function is similar to matrixMerge | ||
func statsMerge(resps []*PrometheusResponse) *PrometheusResponseStats { | ||
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{} | ||
hasStats := false | ||
for _, resp := range resps { | ||
if resp.Data.Stats == nil { | ||
continue | ||
} | ||
|
||
hasStats = true | ||
if resp.Data.Stats.Samples == nil { | ||
continue | ||
} | ||
|
||
for _, s := range resp.Data.Stats.Samples.TotalQueryableSamplesPerStep { | ||
output[s.GetTimestampMs()] = s | ||
} | ||
} | ||
|
||
keys := make([]int64, 0, len(output)) | ||
for key := range output { | ||
keys = append(keys, key) | ||
} | ||
|
||
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) | ||
|
||
result := &PrometheusResponseStats{Samples: &PrometheusResponseSamplesStats{}} | ||
for _, key := range keys { | ||
result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, output[key]) | ||
result.Samples.TotalQueryableSamples += output[key].Value | ||
} | ||
|
||
if !hasStats { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can move this check right after the above |
||
return nil | ||
} | ||
|
||
return result | ||
} | ||
|
||
func matrixMerge(resps []*PrometheusResponse) []SampleStream { | ||
output := map[string]*SampleStream{} | ||
for _, resp := range resps { | ||
|
@@ -473,3 +528,41 @@ func decorateWithParamName(err error, field string) error { | |
} | ||
return fmt.Errorf(errTmpl, field, err) | ||
} | ||
|
||
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { | ||
if !iter.ReadArray() { | ||
iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected [") | ||
return | ||
} | ||
|
||
t := model.Time(iter.ReadFloat64() * float64(time.Second/time.Millisecond)) | ||
|
||
if !iter.ReadArray() { | ||
iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected ,") | ||
return | ||
} | ||
v := iter.ReadInt64() | ||
|
||
if iter.ReadArray() { | ||
iter.ReportError("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", "expected ]") | ||
} | ||
|
||
*(*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) = PrometheusResponseQueryableSamplesStatsPerStep{ | ||
TimestampMs: int64(t), | ||
Value: v, | ||
} | ||
} | ||
|
||
func PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode(ptr unsafe.Pointer, stream *jsoniter.Stream) { | ||
stats := (*PrometheusResponseQueryableSamplesStatsPerStep)(ptr) | ||
stream.WriteArrayStart() | ||
stream.WriteFloat64(float64(stats.TimestampMs) / float64(time.Second/time.Millisecond)) | ||
stream.WriteMore() | ||
stream.WriteInt64(stats.Value) | ||
stream.WriteArrayEnd() | ||
} | ||
|
||
func init() { | ||
jsoniter.RegisterTypeEncoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false }) | ||
jsoniter.RegisterTypeDecoderFunc("queryrange.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description of the flag is a bit weird, the stats is not returned in PromQL, but PromQL response. Or maybe just "query response".