diff --git a/internal/servicecheck/httptrace.go b/internal/servicecheck/httptrace.go index 388a9cdd..20d9786b 100644 --- a/internal/servicecheck/httptrace.go +++ b/internal/servicecheck/httptrace.go @@ -1,83 +1,123 @@ package servicecheck import ( + "crypto/tls" + "log" "net/http" + "net/http/httptrace" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) -func withRequestTracing(registry *prometheus.Registry, transport http.RoundTripper) http.RoundTripper { - counter := prometheus.NewCounterVec( +// unique type for context.Context to avoid collisions. +type kubenurseTypeKey struct{} + +// // http.RoundTripper +type RoundTripperFunc func(req *http.Request) (*http.Response, error) + +func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return rt(r) +} + +// This collects traces and logs errors. As promhttp.InstrumentRoundTripperTrace doesn't process +// errors, this is custom made and inspired by prometheus/client_golang's promhttp +func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durationHistogram []float64) http.RoundTripper { + httpclientReqTotal := prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: metricsNamespace, Name: "httpclient_requests_total", Help: "A counter for requests from the kubenurse http client.", }, + // []string{"code", "method", "type"}, // TODO []string{"code", "method"}, ) - latencyVec := prometheus.NewHistogramVec( + httpclientReqDuration := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metricsNamespace, - Name: "httpclient_trace_request_duration_seconds", - Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.", - Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1}, + Name: "httpclient_request_duration_seconds", + Help: "A latency histogram of request latencies from the kubenurse http client.", + Buckets: durationHistogram, }, - []string{"event"}, + // []string{"type"}, // TODO + []string{}, ) - // histVec has no labels, making it a zero-dimensional ObserverVec. - histVec := prometheus.NewHistogramVec( + httpclientTraceReqDuration := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metricsNamespace, - Name: "httpclient_request_duration_seconds", - Help: "A latency histogram of request latencies from the kubenurse http client.", - Buckets: prometheus.DefBuckets, + Name: "httpclient_trace_request_duration_seconds", + Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.", + Buckets: durationHistogram, }, - []string{}, + []string{"event"}, + // []string{"event", "type"}, // TODO ) - // Register all of the metrics in the standard registry. - registry.MustRegister(counter, latencyVec, histVec) + registry.MustRegister(httpclientReqTotal, httpclientReqDuration, httpclientTraceReqDuration) - // Define functions for the available httptrace.ClientTrace hook - // functions that we want to instrument. - trace := &promhttp.InstrumentTrace{ - DNSStart: func(t float64) { - latencyVec.WithLabelValues("dns_start").Observe(t) - }, - DNSDone: func(t float64) { - latencyVec.WithLabelValues("dns_done").Observe(t) - }, - ConnectStart: func(t float64) { - latencyVec.WithLabelValues("connect_start").Observe(t) - }, - ConnectDone: func(t float64) { - latencyVec.WithLabelValues("connect_done").Observe(t) - }, - TLSHandshakeStart: func(t float64) { - latencyVec.WithLabelValues("tls_handshake_start").Observe(t) - }, - TLSHandshakeDone: func(t float64) { - latencyVec.WithLabelValues("tls_handshake_done").Observe(t) - }, - WroteRequest: func(t float64) { - latencyVec.WithLabelValues("wrote_request").Observe(t) - }, - GotFirstResponseByte: func(t float64) { - latencyVec.WithLabelValues("got_first_resp_byte").Observe(t) - }, + collectMetric := func(traceEventType string, start time.Time, r *http.Request, err error) { + td := time.Since(start).Seconds() + kubenurseTypeLabel := r.Context().Value(kubenurseTypeKey{}).(string) + + // If we got an error inside a trace, log it and do not collect metrics + if err != nil { + log.Printf("httptrace: failed %s for %s with %v", traceEventType, kubenurseTypeLabel, err) + return + } + + httpclientTraceReqDuration.WithLabelValues(traceEventType).Observe(td) // TODO: add back kubenurseTypeKey } - // Wrap the default RoundTripper with middleware. - roundTripper := promhttp.InstrumentRoundTripperCounter(counter, - promhttp.InstrumentRoundTripperTrace(trace, - promhttp.InstrumentRoundTripperDuration(histVec, - transport, - ), - ), - ) + // Return a http.RoundTripper for tracing requests + return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + // Capture request time + start := time.Now() + + // Add tracing hooks + trace := &httptrace.ClientTrace{ + GotConn: func(info httptrace.GotConnInfo) { + collectMetric("got_conn", start, r, nil) + }, + DNSStart: func(info httptrace.DNSStartInfo) { + collectMetric("dns_start", start, r, nil) + }, + DNSDone: func(info httptrace.DNSDoneInfo) { + collectMetric("dns_done", start, r, info.Err) + }, + ConnectStart: func(_, _ string) { + collectMetric("connect_start", start, r, nil) + }, + ConnectDone: func(_, _ string, err error) { + collectMetric("connect_done", start, r, err) + }, + TLSHandshakeStart: func() { + collectMetric("tls_handshake_start", start, r, nil) + }, + TLSHandshakeDone: func(_ tls.ConnectionState, err error) { + collectMetric("tls_handshake_done", start, r, nil) + }, + WroteRequest: func(info httptrace.WroteRequestInfo) { + collectMetric("wrote_request", start, r, info.Err) + }, + GotFirstResponseByte: func() { + collectMetric("got_first_resp_byte", start, r, nil) + }, + } + + // Do request with tracing enabled + r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace)) + + // // TODO: uncomment when issue #55 is solved (N^2 request will increase cardinality of path_ metrics too much otherwise) + // typeFromCtxFn := promhttp.WithLabelFromCtx("type", func(ctx context.Context) string { + // return ctx.Value(kubenurseTypeKey{}).(string) + // }) - return roundTripper + rt := next // variable pinning :) essential, to prevent always re-instrumenting the original variable + rt = promhttp.InstrumentRoundTripperCounter(httpclientReqTotal, rt) + rt = promhttp.InstrumentRoundTripperDuration(httpclientReqDuration, rt) + return rt.RoundTrip(r) + }) } diff --git a/internal/servicecheck/servicecheck.go b/internal/servicecheck/servicecheck.go index 8517fad4..f5698e72 100644 --- a/internal/servicecheck/servicecheck.go +++ b/internal/servicecheck/servicecheck.go @@ -75,7 +75,7 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome httpClient := &http.Client{ Timeout: 5 * time.Second, - Transport: withRequestTracing(promRegistry, transport), + Transport: withHttptrace(promRegistry, transport, durationHistogramBuckets), } return &Checker{ @@ -163,43 +163,43 @@ func (c *Checker) StopScheduled() { } // APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link -func (c *Checker) APIServerDirect() (string, error) { +func (c *Checker) APIServerDirect(ctx context.Context) (string, error) { if c.SkipCheckAPIServerDirect { return skippedStr, nil } apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort) - return c.doRequest(apiurl) + return c.doRequest(ctx, apiurl) } // APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL -func (c *Checker) APIServerDNS() (string, error) { +func (c *Checker) APIServerDNS(ctx context.Context) (string, error) { if c.SkipCheckAPIServerDNS { return skippedStr, nil } apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort) - return c.doRequest(apiurl) + return c.doRequest(ctx, apiurl) } // MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress -func (c *Checker) MeIngress() (string, error) { +func (c *Checker) MeIngress(ctx context.Context) (string, error) { if c.SkipCheckMeIngress { return skippedStr, nil } - return c.doRequest(c.KubenurseIngressURL + "/alwayshappy") //nolint:goconst // readability + return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability } // MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service -func (c *Checker) MeService() (string, error) { +func (c *Checker) MeService(ctx context.Context) (string, error) { if c.SkipCheckMeService { return skippedStr, nil } - return c.doRequest(c.KubenurseServiceURL + "/alwayshappy") + return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy") } // checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes @@ -210,12 +210,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) { if neighbour.Phase == v1.PodRunning && // only query running pods (excludes pending ones) !neighbour.Terminating && // exclude terminating pods (c.allowUnschedulable || neighbour.NodeSchedulable == kubediscovery.NodeSchedulable) { - check := func() (string, error) { + check := func(ctx context.Context) (string, error) { if c.UseTLS { - return c.doRequest("https://" + neighbour.PodIP + ":8443/alwayshappy") + return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy") } - return c.doRequest("http://" + neighbour.PodIP + ":8080/alwayshappy") + return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy") } _, _ = c.measure(check, "path_"+neighbour.NodeName) @@ -227,8 +227,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) { func (c *Checker) measure(check Check, label string) (string, error) { start := time.Now() + // Add our label (check type) to the context so our http tracer can annotate + // metrics and errors based with the label + ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, label) + // Execute check - res, err := check() + res, err := check(ctx) // Process metrics c.durationHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds()) diff --git a/internal/servicecheck/transport.go b/internal/servicecheck/transport.go index 1e176881..def8c2c9 100644 --- a/internal/servicecheck/transport.go +++ b/internal/servicecheck/transport.go @@ -1,6 +1,7 @@ package servicecheck import ( + "context" "crypto/tls" "crypto/x509" "errors" @@ -17,14 +18,14 @@ const ( ) // doRequest does an http request only to get the http status code -func (c *Checker) doRequest(url string) (string, error) { +func (c *Checker) doRequest(ctx context.Context, url string) (string, error) { // Read Bearer Token file from ServiceAccount token, err := os.ReadFile(K8sTokenFile) if err != nil { return errStr, fmt.Errorf("load kubernetes serviceaccount token from %s: %w", K8sTokenFile, err) } - req, _ := http.NewRequest("GET", url, http.NoBody) + req, _ := http.NewRequestWithContext(ctx, "GET", url, http.NoBody) // Only add the Bearer for API Server Requests if strings.HasSuffix(url, "/version") { diff --git a/internal/servicecheck/types.go b/internal/servicecheck/types.go index d08b1e14..47c9e758 100644 --- a/internal/servicecheck/types.go +++ b/internal/servicecheck/types.go @@ -1,6 +1,7 @@ package servicecheck import ( + "context" "net/http" "time" @@ -63,8 +64,8 @@ type Result struct { Neighbourhood []kubediscovery.Neighbour `json:"neighbourhood"` } -// Check is the signature used by all checks that the checker can execute -type Check func() (string, error) +// Check is the signature used by all checks that the checker can execute. +type Check func(ctx context.Context) (string, error) // CachedResult represents a cached check result that is valid until the expiration. type CachedResult struct {