-
Notifications
You must be signed in to change notification settings - Fork 177
/
handlers.go
1075 lines (900 loc) · 37 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Copyright 2018 Comcast Cable Communications Management, LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"crypto/md5"
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/gorilla/mux"
"github.com/prometheus/common/model"
)
const (
// Origin database types
otPrometheus = "prometheus"
// Common HTTP Header Values
hvNoCache = "no-cache"
hvApplicationJSON = "application/json"
// Common HTTP Header Names
hnCacheControl = "Cache-Control"
hnAllowOrigin = "Access-Control-Allow-Origin"
hnContentType = "Content-Type"
hnAuthorization = "Authorization"
// HTTP methods
hmGet = "GET"
// Prometheus response values
rvSuccess = "success"
rvMatrix = "matrix"
rvVector = "vector"
// Common URL parameter names
upQuery = "query"
upStart = "start"
upEnd = "end"
upStep = "step"
upOriginFqdn = "origin_fqdn"
upOriginPort = "origin_port"
upTimeout = "timeout"
upOrigin = "origin"
upTime = "time"
// Cache lookup results
crKeyMiss = "kmiss"
crRangeMiss = "rmiss"
crHit = "hit"
crPartialHit = "phit"
crPurge = "purge"
)
// TricksterHandler contains the services the Handlers need to operate
type TricksterHandler struct {
Logger log.Logger
Config *Config
Metrics *ApplicationMetrics
Cacher Cache
ResponseChannels map[string]chan *ClientRequestContext
ChannelCreateMtx sync.Mutex
}
// HTTP Handlers
// promHealthCheckHandler returns the health of Trickster
// can't suppport multi-origin full proxy for path-based proxying
func (t *TricksterHandler) promHealthCheckHandler(w http.ResponseWriter, r *http.Request) {
level.Debug(t.Logger).Log(lfEvent, "promHealthCheckHandler", "path", r.URL.Path, "method", r.Method)
// Check the labels path for Prometheus Origin Handler to satisfy health check
path := prometheusAPIv1Path + mnLabels
originURL := t.getOrigin(r).OriginURL + strings.Replace(path, "//", "/", 1)
body, resp, _, err := t.getURL(hmGet, originURL, r.URL.Query(), getProxyableClientHeaders(r))
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error fetching data from origin Prometheus", lfDetail, err.Error())
w.WriteHeader(http.StatusBadGateway)
return
}
for k, v := range resp.Header {
w.Header().Set(k, strings.Join(v, ","))
}
w.WriteHeader(resp.StatusCode)
w.Write(body)
}
// promFullProxyHandler handles calls to non-api paths for single-origin configurations and multi-origin via param or hostname
// can't suppport multi-origin full proxy for path-based proxying
func (t *TricksterHandler) promFullProxyHandler(w http.ResponseWriter, r *http.Request) {
level.Debug(t.Logger).Log(lfEvent, "promFullProxyHandler", "path", r.URL.Path, "method", r.Method)
path := r.URL.Path
vars := mux.Vars(r)
// clear out the origin moniker from the front of the API path
if originName, ok := vars["originMoniker"]; ok {
if strings.HasPrefix(path, "/"+originName) {
path = strings.Replace(path, "/"+originName, "", 1)
}
}
originURL := t.getOrigin(r).OriginURL + strings.Replace(path, "//", "/", 1)
body, resp, _, err := t.getURL(hmGet, originURL, r.URL.Query(), getProxyableClientHeaders(r))
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error fetching data from origin Prometheus", lfDetail, err.Error())
w.WriteHeader(http.StatusBadGateway)
return
}
for k, v := range resp.Header {
w.Header().Set(k, strings.Join(v, ","))
}
w.WriteHeader(resp.StatusCode)
w.Write(body)
}
// promAPIProxyHandler handles proxying of non-query/query_range API calls such as the labels path
func (t *TricksterHandler) promAPIProxyHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
vars := mux.Vars(r)
// clear out the origin moniker from the front of the API path
if originName, ok := vars["originMoniker"]; ok {
if strings.HasPrefix(path, "/"+originName) {
path = strings.Replace(path, "/"+originName, "", 1)
}
}
originURL := t.getOrigin(r).OriginURL + strings.Replace(path, "//", "/", 1)
body, resp, err := t.fetchPromQuery(originURL, r.URL.Query(), r)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error fetching data from origin Prometheus", lfDetail, err.Error())
w.WriteHeader(http.StatusBadGateway)
return
}
writeResponse(w, body, resp)
}
// promQueryHandler handles calls to /query (for instantaneous values)
func (t *TricksterHandler) promQueryHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
vars := mux.Vars(r)
// clear out the origin moniker from the front of the API path
if originName, ok := vars["originMoniker"]; ok {
if strings.HasPrefix(path, "/"+originName) {
path = strings.Replace(path, "/"+originName, "", 1)
}
}
originURL := t.getOrigin(r).OriginURL + strings.Replace(path, "//", "/", 1)
params := r.URL.Query()
body, resp, err := t.fetchPromQuery(originURL, params, r)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error fetching data from origin Prometheus", lfDetail, err.Error())
w.WriteHeader(http.StatusBadGateway)
return
}
writeResponse(w, body, resp)
}
// promQueryRangeHandler handles calls to /query_range (requests for timeseries values)
func (t *TricksterHandler) promQueryRangeHandler(w http.ResponseWriter, r *http.Request) {
ctx, err := t.buildRequestContext(w, r)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error building request context", lfDetail, err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
// This WaitGroup ensures that the server does not write the response until we are 100% done Trickstering the range request.
// The responsders that fulfill client requests will mark the waitgroup done when the response is ready for delivery.
ctx.WaitGroup.Add(1)
if ctx.CacheLookupResult == crHit {
t.respondToCacheHit(ctx)
} else {
t.queueRangeProxyRequest(ctx)
}
// Wait until the response is fulfilled before delivering.
ctx.WaitGroup.Wait()
}
// End HTTP Handlers
// Helper functions
// defaultPrometheusMatrixEnvelope returns an empty envelope
func defaultPrometheusMatrixEnvelope() PrometheusMatrixEnvelope {
return PrometheusMatrixEnvelope{
Data: PrometheusMatrixData{
ResultType: rvMatrix,
Result: make([]*model.SampleStream, 0),
},
}
}
// getProxyableClientHeaders returns any pertinent http headers from the client that we should pass through to the Origin when proxying
func getProxyableClientHeaders(r *http.Request) http.Header {
headers := http.Header{}
// pass through Authorization Header
if authorization, ok := r.Header[hnAuthorization]; ok {
headers.Add(hnAuthorization, strings.Join(authorization, " "))
}
return headers
}
// getOrigin determines the origin server to service the request based on the Host header and url params
func (t *TricksterHandler) getOrigin(r *http.Request) PrometheusOriginConfig {
var originName string
var ok bool
vars := mux.Vars(r)
// Check for the Origin Name URL Path
if originName, ok = vars["originMoniker"]; !ok {
// Check for the Origin Name URL Parmameter (origin=)
if on, ok := r.URL.Query()[upOrigin]; ok {
originName = on[1]
} else {
// Otherwise use the Host Header
originName = r.Host
}
}
// If we have matching origin in our Origins Map, return it.
if p, ok := t.Config.Origins[originName]; ok {
return p
}
// Otherwise, return the default origin if it is configured
p, ok := t.Config.Origins["default"]
if !ok {
p = defaultOriginConfig()
}
if t.Config.DefaultOriginURL != "" {
p.OriginURL = t.Config.DefaultOriginURL
}
return p
}
// setResponseHeaders adds any needed headers to the response object.
// this should be called before the body is written
func setResponseHeaders(w http.ResponseWriter) {
// We're read only and a harmless API, so allow all CORS
w.Header().Set(hnAllowOrigin, "*")
// Set the Content-Type so browser's jQuery will auto-parse the response payload
w.Header().Set(hnContentType, hvApplicationJSON)
}
// getURL makes an HTTP request to the provided URL with the provided parameters and returns the response body
func (t *TricksterHandler) getURL(method string, uri string, params url.Values, headers http.Header) ([]byte, *http.Response, time.Duration, error) {
if len(params) > 0 {
uri += "?" + params.Encode()
}
parsedURL, err := url.Parse(uri)
if err != nil {
return nil, nil, 0, fmt.Errorf("error parsing URL %q: %v", uri, err)
}
startTime := time.Now()
client := &http.Client{}
resp, err := client.Do(&http.Request{Method: method, URL: parsedURL})
if err != nil {
return nil, nil, 0, fmt.Errorf("error downloading URL %q: %v", uri, err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
// We don't want to return non-200 status codes as internal Go errors,
// as we want to proxy those status codes all the way back to the user.
level.Warn(t.Logger).Log(lfEvent, "error downloading URL", "url", uri, "status", resp.Status)
return []byte{}, resp, 0, nil
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, 0, fmt.Errorf("error reading body from HTTP response for URL %q: %v", uri, err)
}
duration := time.Since(startTime)
level.Debug(t.Logger).Log(lfEvent, "prometheusOriginHttpRequest", "url", uri, "duration", duration)
return body, resp, duration, nil
}
func (t *TricksterHandler) getVectorFromPrometheus(url string, params url.Values, r *http.Request) (PrometheusVectorEnvelope, *http.Response, error) {
pe := PrometheusVectorEnvelope{}
// Make the HTTP Request
body, resp, err := t.fetchPromQuery(url, params, r)
if err != nil {
return pe, nil, fmt.Errorf("error fetching data from Prometheus: %v", err)
}
// Unmarshal the prometheus data into another PrometheusMatrixEnvelope
err = json.Unmarshal(body, &pe)
if err != nil {
return pe, nil, fmt.Errorf("Prometheus vector unmarshaling error for URL %q: %v", url, err)
}
return pe, resp, nil
}
func (t *TricksterHandler) getMatrixFromPrometheus(url string, params url.Values, r *http.Request) (PrometheusMatrixEnvelope, *http.Response, time.Duration, error) {
pe := PrometheusMatrixEnvelope{}
// Make the HTTP Request - dont use fetchPromQuery here, that is for instantaneous only.
body, resp, duration, err := t.getURL(hmGet, url, params, getProxyableClientHeaders(r))
if err != nil {
return pe, nil, 0, err
}
if resp.StatusCode == 200 {
// Unmarshal the prometheus data into another PrometheusMatrixEnvelope
err := json.Unmarshal(body, &pe)
if err != nil {
return pe, nil, 0, fmt.Errorf("Prometheus matrix unmarshaling error for URL %q: %v", url, err)
}
}
return pe, resp, duration, nil
}
// fetchPromQuery checks for cached instantaneous value for the query and returns it if found,
// otherwise proxies the request to the Prometheus origin and sets the cache with a low TTL
// fetchPromQuery does not do any data marshalling
func (t *TricksterHandler) fetchPromQuery(originURL string, params url.Values, r *http.Request) ([]byte, *http.Response, error) {
var ttl int64 = 15
var end int64 = 0
var err error
cacheKeyBase := originURL
// if we have an authorization header, that should be part of the cache key to ensure only authorized users can access cached datasets
if authorization, ok := r.Header[hnAuthorization]; ok {
cacheKeyBase += strings.Join(authorization, " ")
}
if t, ok := params[upTime]; ok {
end, err = strconv.ParseInt(t[0], 10, 64)
if end <= (time.Now().Unix()-1800) && end%1800 == 0 {
// the Time param is perfectly on the hour and not recent, this is unusual for random dashboard loads.
// It might be some kind of a daily or hourly rollup. Let's cache it longer than 15s
ttl = 1800
}
end = (end / 15) * 15
params.Set(upTime, strconv.Itoa(int(end)))
}
cacheKey := deriveCacheKey(cacheKeyBase, params)
var body []byte
resp := &http.Response{}
var duration time.Duration
cacheResult := crKeyMiss
// check for it in the cache
cachedBody, err := t.Cacher.Retrieve(cacheKey)
if err != nil {
// Cache Miss, we need to get it from prometheus
body, resp, duration, err = t.getURL(hmGet, originURL, params, getProxyableClientHeaders(r))
if err != nil {
return nil, nil, err
}
t.Metrics.ProxyRequestDuration.WithLabelValues(originURL, otPrometheus, mnQuery, crKeyMiss, strconv.Itoa(resp.StatusCode)).Observe(duration.Seconds())
t.Cacher.Store(cacheKey, string(body), ttl)
} else {
// Cache hit, return the data set
body = []byte(cachedBody)
cacheResult = crHit
resp.StatusCode = 200
}
t.Metrics.CacheRequestStatus.WithLabelValues(originURL, otPrometheus, mnQuery, cacheResult, strconv.Itoa(resp.StatusCode)).Inc()
return body, resp, nil
}
// buildRequestContext Creates a ClientRequestContext based on the incoming client request
func (t *TricksterHandler) buildRequestContext(w http.ResponseWriter, r *http.Request) (*ClientRequestContext, error) {
var err error
ctx := &ClientRequestContext{
Request: r,
Writer: w,
Origin: t.getOrigin(r),
Time: time.Now().Unix(),
}
ctx.Origin.OriginURL += strings.Replace(ctx.Origin.APIPath+"/", "//", "/", 1)
// Get the params from the User request so we can inspect them and pass on to prometheus
ctx.RequestParams = r.URL.Query()
// setup the default step value if it is missing
ctx.StepParam = strconv.FormatInt(int64(ctx.Origin.DefaultStep), 10)
ctx.StepMS = int64(ctx.Origin.DefaultStep * 1000)
// Pull the Step Value from the User Reqest urlparams if it exists
if step, ok := ctx.RequestParams[upStep]; ok {
if ctx.StepMS, err = strconv.ParseInt(step[0], 10, 64); err == nil && ctx.StepMS > 0 {
ctx.StepParam = step[0]
ctx.StepMS *= 1000
}
}
cacheKeyBase := ctx.Origin.OriginURL + ctx.StepParam
// if we have an authorization header, that should be part of the cache key to ensure only authorized users can access cached datasets
if authorization, ok := r.Header[hnAuthorization]; ok {
cacheKeyBase += strings.Join(authorization, " ")
}
// Derive a hashed cacheKey for the query where we will get and set the result set
// inclusion of the step ensures that datasets with different resolutions are not written to the same key.
ctx.CacheKey = deriveCacheKey(cacheKeyBase, ctx.RequestParams)
// We will look for a Cache-Control: No-Cache request header and,
// if present, bypass the cache for a fresh full query from prometheus.
// Any user can trigger w/ hard reload (ctrl/cmd+shift+r) to clear out cache-related anomalies
noCache := false
if ctx.Origin.IgnoreNoCacheHeader == false && (strings.ToLower(r.Header.Get(hnCacheControl)) == hvNoCache) {
noCache = true
}
// get the browser-requested start/end times, so we can determine what part of the range is not in the cache
if len(ctx.RequestParams[upStart]) == 0 {
return nil, fmt.Errorf("missing start time parameter")
}
reqStart, err := parseTime(ctx.RequestParams[upStart][0])
if err != nil {
return nil, fmt.Errorf("failed to parse parameter %q with value %q: %v", upStart, ctx.RequestParams[upStart][0], err)
}
if len(ctx.RequestParams[upEnd]) == 0 {
return nil, fmt.Errorf("missing end time parameter")
}
reqEnd, err := parseTime(ctx.RequestParams[upEnd][0])
if err != nil {
return nil, fmt.Errorf("failed to parse parameter %q with value %q: %v", upEnd, ctx.RequestParams[upEnd][0], err)
}
ctx.RequestExtents.Start, ctx.RequestExtents.End = alignStepBoundaries(reqStart.Unix()*1000, reqEnd.Unix()*1000, ctx.StepMS, ctx.Time)
// setup some variables to determine and track the status of the query vs whats in the cache
ctx.Matrix = defaultPrometheusMatrixEnvelope()
ctx.CacheLookupResult = crKeyMiss
// parameters for filling gap on the upper bound
ctx.OriginUpperExtents.Start = ctx.RequestExtents.Start
ctx.OriginUpperExtents.End = ctx.RequestExtents.End
// Get the cached result set if present
cachedBody, err := t.Cacher.Retrieve(ctx.CacheKey)
if err != nil || noCache {
// Cache Miss, Get the whole blob from Prometheus.
// Pass on the browser-requested start/end parameters to our Prom Query
if noCache {
ctx.CacheLookupResult = crPurge
}
} else {
// We had a Redis Key Hit for the hashed query key, but we may not have all points requested by browser
// So we can have a Range Miss, Partial Hit, Full Hit when comparing cached range to what the client requested.
// So let's find out what we are missing (if anything) and fetch what we don't have
// See if cache data is compressed by looking for the first character to be "{":, with which the uncompressed JSON would start
// We do this instead of checking the Compression config bit because if someone turns compression on or off when using filesystem or redis cache,
// we will have no idea if what is already in the cache was compressed or not based on previous settings
cb := []byte(cachedBody)
if cb[0] != 123 {
// Not a JSON object, try decompressing
level.Debug(t.Logger).Log("event", "Decompressing Cached Data", "cacheKey", ctx.CacheKey)
cb, err = snappy.Decode(nil, cb)
if err == nil {
cachedBody = string(cb)
}
}
// Marshall the cache payload into a PrometheusMatrixEnvelope struct
err = json.Unmarshal([]byte(cachedBody), &ctx.Matrix)
if err != nil {
return nil, fmt.Errorf("error unmarshalling cached data for key %q with content %q: %v", ctx.CacheKey, cachedBody, err)
}
// Get the Extents of the data in the cache
ce := ctx.Matrix.getExtents()
extent := "none"
// Figure out our Deltas
if ce.End == 0 || ce.Start == 0 {
// Something went wrong fetching extents
ctx.CacheLookupResult = crRangeMiss
} else if ctx.RequestExtents.Start >= ce.Start && ctx.RequestExtents.End <= ce.End {
// Full cache hit, no need to refresh dataset.
// Everything we are requesting is already in cache
ctx.CacheLookupResult = crHit
ctx.OriginUpperExtents.Start = 0
ctx.OriginUpperExtents.End = 0
} else if ctx.RequestExtents.Start < ce.Start && ctx.RequestExtents.End > ce.End {
// Partial Cache hit on both ends.
ctx.CacheLookupResult = crPartialHit
ctx.OriginUpperExtents.Start = ce.End + ctx.StepMS
ctx.OriginUpperExtents.End = ctx.RequestExtents.End
ctx.OriginLowerExtents.Start = ((ctx.RequestExtents.Start / ctx.StepMS) * ctx.StepMS)
ctx.OriginLowerExtents.End = ce.Start
extent = "both"
} else if ctx.RequestExtents.Start > ce.End {
// Range Miss on the Upper Extent of Cache. We will fill from where our cached data stops to the requested end
ctx.CacheLookupResult = crRangeMiss
ctx.OriginUpperExtents.Start = ce.End + ctx.StepMS
extent = "upper"
} else if ctx.RequestExtents.End > ce.End {
// Partial Cache Hit, Missing the Upper Extent
ctx.CacheLookupResult = crPartialHit
ctx.OriginUpperExtents.Start = ce.End + ctx.StepMS
extent = "upper"
} else if ctx.RequestExtents.End < ce.Start {
// Range Miss on the Lower Extent of Cache. We will fill from the requested start up to where our cached data stops
ctx.CacheLookupResult = crRangeMiss
ctx.OriginLowerExtents.Start = ((ctx.RequestExtents.Start / ctx.StepMS) * ctx.StepMS)
ctx.OriginLowerExtents.End = ce.Start - ctx.StepMS
ctx.OriginUpperExtents.Start = 0
ctx.OriginUpperExtents.End = 0
extent = "lower"
} else if ctx.RequestExtents.Start < ce.Start {
// Partial Cache Hit, Missing Lower Extent
ctx.CacheLookupResult = crPartialHit
ctx.OriginLowerExtents.Start = ((ctx.RequestExtents.Start / ctx.StepMS) * ctx.StepMS)
ctx.OriginLowerExtents.End = ce.Start - ctx.StepMS
ctx.OriginUpperExtents.Start = 0
ctx.OriginUpperExtents.End = 0
extent = "upper"
} else {
panic(fmt.Sprintf("Reaching this final clause should be impossible. Yikes! reqStart=%d, reqEnd=%d, ce.Start=%d, ce.End=%d", ctx.RequestExtents.Start, ctx.RequestExtents.End, ce.Start, ce.End))
}
level.Debug(t.Logger).Log(lfEvent, "deltaRoutineCompleted", "CacheLookupResult", ctx.CacheLookupResult, lfCacheKey, ctx.CacheKey,
"cacheStart", ce.Start, "cacheEnd", ce.End, "reqStart", ctx.RequestExtents.Start, "reqEnd", ctx.RequestExtents.End,
"OriginLowerExtents.Start", ctx.OriginLowerExtents.Start, "OriginLowerExtents.End", ctx.OriginLowerExtents.End,
"OriginUpperExtents.Start", ctx.OriginUpperExtents.Start, "OriginUpperExtents.End", ctx.OriginUpperExtents.End, "extent", extent)
}
return ctx, nil
}
func (t *TricksterHandler) respondToCacheHit(ctx *ClientRequestContext) {
defer ctx.WaitGroup.Done()
t.Metrics.CacheRequestStatus.WithLabelValues(ctx.Origin.OriginURL, otPrometheus, mnQueryRange, ctx.CacheLookupResult, "200").Inc()
// Do the extraction of the range the user requested from the fully cached dataset, if needed.
ctx.Matrix.cropToRange(ctx.RequestExtents.Start, ctx.RequestExtents.End+ctx.StepMS)
r := &http.Response{}
// If Fast Forward is enabled and the request is a real-time request, go get that data
if !ctx.Origin.FastForwardDisable && !(ctx.RequestExtents.End < ctx.Time-ctx.StepMS) {
// Query the latest points if Fast Forward is enabled
queryURL := ctx.Origin.OriginURL + mnQuery
originParams := url.Values{}
// Add the prometheus query params from the user urlparams to the origin request
passthroughParam(upQuery, ctx.RequestParams, originParams, nil)
passthroughParam(upTimeout, ctx.RequestParams, originParams, nil)
passthroughParam(upTime, ctx.RequestParams, originParams, nil)
ffd, resp, err := t.getVectorFromPrometheus(queryURL, originParams, ctx.Request)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error fetching data from origin Prometheus", lfDetail, err.Error())
ctx.Writer.WriteHeader(http.StatusBadGateway)
return
}
r = resp
if resp.StatusCode == 200 && ffd.Status == rvSuccess {
ctx.Matrix = t.mergeVector(ctx.Matrix, ffd)
}
}
// Marshal the Envelope back to a json object for User Response)
body, err := json.Marshal(ctx.Matrix)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "prometheus matrix marshaling error", lfDetail, err.Error())
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
writeResponse(ctx.Writer, body, r)
}
func writeResponse(w http.ResponseWriter, body []byte, resp *http.Response) {
// Now we need to respond to the user request with the dataset
setResponseHeaders(w)
if resp.StatusCode == 0 {
resp.StatusCode = 200
}
w.WriteHeader(resp.StatusCode)
w.Write(body)
}
func (t *TricksterHandler) queueRangeProxyRequest(ctx *ClientRequestContext) {
t.ChannelCreateMtx.Lock()
ch, ok := t.ResponseChannels[ctx.CacheKey]
if !ok {
level.Info(t.Logger).Log(lfEvent, "starting originRangeProxyHandler", lfCacheKey, ctx.CacheKey)
ch = make(chan *ClientRequestContext, 100)
t.ResponseChannels[ctx.CacheKey] = ch
go t.originRangeProxyHandler(ctx.CacheKey, ch)
}
t.ChannelCreateMtx.Unlock()
ch <- ctx
}
func (t *TricksterHandler) originRangeProxyHandler(cacheKey string, originRangeRequests <-chan *ClientRequestContext) {
for r := range originRangeRequests {
// get the cache data for this request again, in case anything about the record has changed
// between the time we queued the request and the time it was consumed from the channel
ctx, err := t.buildRequestContext(r.Writer, r.Request)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "error building request context", lfDetail, err.Error())
r.Writer.WriteHeader(http.StatusBadRequest)
r.WaitGroup.Done()
continue
}
// The cache miss became a cache hit between the time it was queued and processed.
if ctx.CacheLookupResult == crHit {
level.Debug(t.Logger).Log(lfEvent, "delayedCacheHit", lfDetail, "cache was populated with needed data by another proxy request while this one was queued.")
// Lay the newly-retreived data into the original origin range request so it can fully service the client
r.Matrix = ctx.Matrix
// And change the lookup result to a hit.
r.CacheLookupResult = crHit
// Respond with the modified original request object so the right WaitGroup is marked as Done()
t.respondToCacheHit(r)
} else {
// Now we know if we need to make any calls to the Origin, lets set those up
upperDeltaData := PrometheusMatrixEnvelope{}
lowerDeltaData := PrometheusMatrixEnvelope{}
fastForwardData := PrometheusVectorEnvelope{}
var wg sync.WaitGroup
var m sync.Mutex // Protects originErr and resp below.
var originErr error
resp := &http.Response{}
if ctx.OriginLowerExtents.Start > 0 && ctx.OriginLowerExtents.End > 0 {
wg.Add(1)
go func() {
defer wg.Done()
queryURL := ctx.Origin.OriginURL + mnQueryRange
originParams := url.Values{}
// Add the prometheus query params from the user urlparams to the origin request
passthroughParam(upQuery, ctx.RequestParams, originParams, nil)
passthroughParam(upTimeout, ctx.RequestParams, originParams, nil)
originParams.Add(upStep, ctx.StepParam)
originParams.Add(upStart, strconv.FormatInt(ctx.OriginLowerExtents.Start/1000, 10))
originParams.Add(upEnd, strconv.FormatInt(ctx.OriginLowerExtents.End/1000, 10))
ldd, r, duration, err := t.getMatrixFromPrometheus(queryURL, originParams, r.Request)
if err != nil {
m.Lock()
originErr = err
m.Unlock()
return
}
m.Lock()
if resp == (&http.Response{}) {
resp = r
}
m.Unlock()
if r.StatusCode == 200 && ldd.Status == rvSuccess {
lowerDeltaData = ldd
t.Metrics.ProxyRequestDuration.WithLabelValues(ctx.Origin.OriginURL, otPrometheus,
mnQueryRange, ctx.CacheLookupResult, strconv.Itoa(r.StatusCode)).Observe(duration.Seconds())
}
}()
}
if ctx.OriginUpperExtents.Start > 0 && ctx.OriginUpperExtents.End > 0 {
wg.Add(1)
go func() {
defer wg.Done()
queryURL := ctx.Origin.OriginURL + mnQueryRange
originParams := url.Values{}
// Add the prometheus query params from the user urlparams to the origin request
passthroughParam(upQuery, ctx.RequestParams, originParams, nil)
passthroughParam(upTimeout, ctx.RequestParams, originParams, nil)
originParams.Add(upStep, ctx.StepParam)
originParams.Add(upStart, strconv.FormatInt(ctx.OriginUpperExtents.Start/1000, 10))
originParams.Add(upEnd, strconv.FormatInt(ctx.OriginUpperExtents.End/1000, 10))
udd, r, duration, err := t.getMatrixFromPrometheus(queryURL, originParams, r.Request)
if err != nil {
m.Lock()
originErr = err
m.Unlock()
return
}
m.Lock()
if resp == (&http.Response{}) {
resp = r
}
m.Unlock()
if r != nil && r.StatusCode == 200 && udd.Status == rvSuccess {
upperDeltaData = udd
t.Metrics.ProxyRequestDuration.WithLabelValues(ctx.Origin.OriginURL, otPrometheus,
mnQueryRange, ctx.CacheLookupResult, strconv.Itoa(r.StatusCode)).Observe(duration.Seconds())
}
}()
}
if !ctx.Origin.FastForwardDisable && !(ctx.RequestExtents.End < ctx.Time-ctx.StepMS) {
wg.Add(1)
go func() {
defer wg.Done()
// Query the latest points if Fast Forward is enabled
queryURL := ctx.Origin.OriginURL + mnQuery
originParams := url.Values{}
// Add the prometheus query params from the user urlparams to the origin request
passthroughParam(upQuery, ctx.RequestParams, originParams, nil)
passthroughParam(upTimeout, ctx.RequestParams, originParams, nil)
passthroughParam(upTime, ctx.RequestParams, originParams, nil)
ffd, r, err := t.getVectorFromPrometheus(queryURL, originParams, r.Request)
if err != nil {
m.Lock()
originErr = err
m.Unlock()
return
}
m.Lock()
if resp == (&http.Response{}) {
resp = r
}
m.Unlock()
if r != nil && r.StatusCode == 200 && ffd.Status == rvSuccess {
fastForwardData = ffd
}
}()
}
wg.Wait()
if originErr != nil {
level.Error(t.Logger).Log(lfEvent, "error fetching data from origin Prometheus", lfDetail, originErr.Error())
r.Writer.WriteHeader(http.StatusBadGateway)
r.WaitGroup.Done()
continue
}
t.Metrics.CacheRequestStatus.WithLabelValues(ctx.Origin.OriginURL, otPrometheus, mnQueryRange, ctx.CacheLookupResult, strconv.Itoa(resp.StatusCode)).Inc()
uncachedElementCnt := int64(0)
if lowerDeltaData.Status == rvSuccess {
uncachedElementCnt += lowerDeltaData.getValueCount()
ctx.Matrix = t.mergeMatrix(ctx.Matrix, lowerDeltaData)
}
if upperDeltaData.Status == rvSuccess {
uncachedElementCnt += upperDeltaData.getValueCount()
ctx.Matrix = t.mergeMatrix(upperDeltaData, ctx.Matrix)
}
// Prune any old points based on retention policy
ctx.Matrix.cropToRange(ctx.Time-ctx.Origin.MaxValueAgeSecs, 0)
// If it's not a full cache hit, we want to write this back to the cache
if ctx.CacheLookupResult != crHit {
// Marshal the Envelope back to a json object for Cache Storage
cacheBody, err := json.Marshal(ctx.Matrix)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "prometheus matrix marshaling error", lfDetail, err.Error())
r.Writer.WriteHeader(http.StatusInternalServerError)
r.WaitGroup.Done()
continue
}
if t.Config.Caching.Compression {
level.Debug(t.Logger).Log("event", "Compressing Cached Data", "cacheKey", ctx.CacheKey)
cacheBody = snappy.Encode(nil, cacheBody)
}
// Set the Cache Key with the merged dataset
t.Cacher.Store(cacheKey, string(cacheBody), t.Config.Caching.RecordTTLSecs)
level.Debug(t.Logger).Log(lfEvent, "setCacheRecord", lfCacheKey, cacheKey, "ttl", t.Config.Caching.RecordTTLSecs)
}
//Do the extraction of the range the user requested, if needed.
// The only time it may not be needed is if the result was a Key Miss (so the dataset we have is exactly what the user asked for)
// I add one more step on the end of the request to ensure we catch the fast forward data
if ctx.CacheLookupResult != crKeyMiss {
ctx.Matrix.cropToRange(ctx.RequestExtents.Start, ctx.RequestExtents.End+ctx.StepMS)
}
allElementCnt := ctx.Matrix.getValueCount()
cachedElementCnt := allElementCnt - uncachedElementCnt
if uncachedElementCnt > 0 {
t.Metrics.CacheRequestElements.WithLabelValues(ctx.Origin.OriginURL, otPrometheus, "uncached").Add(float64(uncachedElementCnt))
}
if cachedElementCnt > 0 {
t.Metrics.CacheRequestElements.WithLabelValues(ctx.Origin.OriginURL, otPrometheus, "cached").Add(float64(cachedElementCnt))
}
// Stictch in Fast Forward Data
if fastForwardData.Status == rvSuccess {
ctx.Matrix = t.mergeVector(ctx.Matrix, fastForwardData)
}
// Marshal the Envelope back to a json object for User Response)
body, err := json.Marshal(ctx.Matrix)
if err != nil {
level.Error(t.Logger).Log(lfEvent, "prometheus matrix marshaling error", lfDetail, err.Error())
r.Writer.WriteHeader(http.StatusInternalServerError)
r.WaitGroup.Done()
continue
}
writeResponse(r.Writer, body, resp)
r.WaitGroup.Done()
}
}
}
func alignStepBoundaries(start int64, end int64, stepMS int64, now int64) (int64, int64) {
// In case the user had the start/end parameters reversed chronologically, we can fix that up for them
if start > end {
x := end
end = start
start = x
}
// Don't query beyond Time.Now() or charts will have weird data on the far right
if end > now*1000 {
end = now * 1000
}
// Failsafe to 60s if something inexplicably happened to the step param
if stepMS == 0 {
stepMS = 60000
}
// Align start/end to step boundaries
start = (start / stepMS) * stepMS
end = ((end / stepMS) * stepMS)
return start, end
}
func (pe PrometheusMatrixEnvelope) getValueCount() int64 {
i := int64(0)
for j := range pe.Data.Result {
i += int64(len(pe.Data.Result[j].Values))
}
return i
}
// mergeVector merges the passed PrometheusVectorEnvelope object with the calling PrometheusVectorEnvelope object
func (t *TricksterHandler) mergeVector(pe PrometheusMatrixEnvelope, pv PrometheusVectorEnvelope) PrometheusMatrixEnvelope {
if len(pv.Data.Result) == 0 {
level.Debug(t.Logger).Log(lfEvent, "mergeVectorPrematureExit")
return pe
}
for i := range pv.Data.Result {
result2 := pv.Data.Result[i]
for j := range pe.Data.Result {
result1 := pe.Data.Result[j]
if result2.Metric.Equal(result1.Metric) {
if result2.Timestamp > result1.Values[len(result1.Values)-1].Timestamp {
pe.Data.Result[j].Values = append(pe.Data.Result[j].Values, model.SamplePair{
Timestamp: model.Time((int64(result2.Timestamp) / 1000) * 1000),
Value: result2.Value,
})
}
}
}
}
return pe
}
// mergeMatrix merges the passed PrometheusMatrixEnvelope object with the calling PrometheusMatrixEnvelope object
func (t *TricksterHandler) mergeMatrix(pe PrometheusMatrixEnvelope, pe2 PrometheusMatrixEnvelope) PrometheusMatrixEnvelope {
if pe.Status != rvSuccess {
pe = pe2
return pe2
} else if pe2.Status != rvSuccess {
return pe
}
for i := range pe2.Data.Result {
metricSetFound := false
result2 := pe2.Data.Result[i]
for j := range pe.Data.Result {
result1 := pe.Data.Result[j]
if result2.Metric.Equal(result1.Metric) {
metricSetFound = true
pe.Data.Result[j].Values = append(pe2.Data.Result[i].Values, pe.Data.Result[j].Values...)
break
}
}
if !metricSetFound {
level.Debug(t.Logger).Log(lfEvent, "MergeMatrixEnvelopeNewMetric", lfDetail, "Did not find mergable metric set in cache", "metricFingerprint", result2.Metric.Fingerprint())
// Couldn't find metrics with that name in the existing resultset, so this must
// be new for this poll. That's fine, just add it outright instead of merging.
pe.Data.Result = append(pe.Data.Result, result2)
}
}
return pe
}
// cropToRange crops the datasets in a given PrometheusMatrixEnvelope down to the provided start and end times
func (pe PrometheusMatrixEnvelope) cropToRange(start int64, end int64) {
// iterate through each metric series in the result
for i := range pe.Data.Result {
if start > 0 {
// Now we First determine the correct start index for each series in the Matrix
// iterate through each value in the given metric series
for j := range pe.Data.Result[i].Values {
// If the timestamp for this data point is at or after the client requested start time,
// update the slice and break the loop.
ts := int64(pe.Data.Result[i].Values[j].Timestamp)
if ts >= start {
pe.Data.Result[i].Values = pe.Data.Result[i].Values[j:]
break
}
}
}
if end > 0 {
// Then we determine the correct end index for each series in the Matrix
// iterate *backwards* through each value in the given metric series
for j := len(pe.Data.Result[i].Values) - 1; j >= 0; j-- {
// If the timestamp of this metric is at or after the client requested start time,
// update the offset and break.
ts := int64(pe.Data.Result[i].Values[j].Timestamp)
if ts <= end {
pe.Data.Result[i].Values = pe.Data.Result[i].Values[:j+1]
break
}
}
}
}
}