Skip to content

Commit

Permalink
Merge pull request #106 from postfinance/cjmn-patch-0314
Browse files Browse the repository at this point in the history
httptrace and better error logging

closes #45
  • Loading branch information
clementnuss authored Jan 22, 2024
2 parents 1d45495 + 03505e9 commit b650ea3
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 68 deletions.
142 changes: 91 additions & 51 deletions internal/servicecheck/httptrace.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,123 @@
package servicecheck

import (
"crypto/tls"
"log"
"net/http"
"net/http/httptrace"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func withRequestTracing(registry *prometheus.Registry, transport http.RoundTripper) http.RoundTripper {
counter := prometheus.NewCounterVec(
// unique type for context.Context to avoid collisions.
type kubenurseTypeKey struct{}

// // http.RoundTripper
type RoundTripperFunc func(req *http.Request) (*http.Response, error)

func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return rt(r)
}

// This collects traces and logs errors. As promhttp.InstrumentRoundTripperTrace doesn't process
// errors, this is custom made and inspired by prometheus/client_golang's promhttp
func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durationHistogram []float64) http.RoundTripper {
httpclientReqTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "httpclient_requests_total",
Help: "A counter for requests from the kubenurse http client.",
},
// []string{"code", "method", "type"}, // TODO
[]string{"code", "method"},
)

latencyVec := prometheus.NewHistogramVec(
httpclientReqDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: []float64{.0005, .005, .01, .025, .05, .1, .25, .5, 1},
Name: "httpclient_request_duration_seconds",
Help: "A latency histogram of request latencies from the kubenurse http client.",
Buckets: durationHistogram,
},
[]string{"event"},
// []string{"type"}, // TODO
[]string{},
)

// histVec has no labels, making it a zero-dimensional ObserverVec.
histVec := prometheus.NewHistogramVec(
httpclientTraceReqDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "httpclient_request_duration_seconds",
Help: "A latency histogram of request latencies from the kubenurse http client.",
Buckets: prometheus.DefBuckets,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: durationHistogram,
},
[]string{},
[]string{"event"},
// []string{"event", "type"}, // TODO
)

// Register all of the metrics in the standard registry.
registry.MustRegister(counter, latencyVec, histVec)
registry.MustRegister(httpclientReqTotal, httpclientReqDuration, httpclientTraceReqDuration)

// Define functions for the available httptrace.ClientTrace hook
// functions that we want to instrument.
trace := &promhttp.InstrumentTrace{
DNSStart: func(t float64) {
latencyVec.WithLabelValues("dns_start").Observe(t)
},
DNSDone: func(t float64) {
latencyVec.WithLabelValues("dns_done").Observe(t)
},
ConnectStart: func(t float64) {
latencyVec.WithLabelValues("connect_start").Observe(t)
},
ConnectDone: func(t float64) {
latencyVec.WithLabelValues("connect_done").Observe(t)
},
TLSHandshakeStart: func(t float64) {
latencyVec.WithLabelValues("tls_handshake_start").Observe(t)
},
TLSHandshakeDone: func(t float64) {
latencyVec.WithLabelValues("tls_handshake_done").Observe(t)
},
WroteRequest: func(t float64) {
latencyVec.WithLabelValues("wrote_request").Observe(t)
},
GotFirstResponseByte: func(t float64) {
latencyVec.WithLabelValues("got_first_resp_byte").Observe(t)
},
collectMetric := func(traceEventType string, start time.Time, r *http.Request, err error) {
td := time.Since(start).Seconds()
kubenurseTypeLabel := r.Context().Value(kubenurseTypeKey{}).(string)

// If we got an error inside a trace, log it and do not collect metrics
if err != nil {
log.Printf("httptrace: failed %s for %s with %v", traceEventType, kubenurseTypeLabel, err)
return
}

httpclientTraceReqDuration.WithLabelValues(traceEventType).Observe(td) // TODO: add back kubenurseTypeKey
}

// Wrap the default RoundTripper with middleware.
roundTripper := promhttp.InstrumentRoundTripperCounter(counter,
promhttp.InstrumentRoundTripperTrace(trace,
promhttp.InstrumentRoundTripperDuration(histVec,
transport,
),
),
)
// Return a http.RoundTripper for tracing requests
return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// Capture request time
start := time.Now()

// Add tracing hooks
trace := &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
collectMetric("got_conn", start, r, nil)
},
DNSStart: func(info httptrace.DNSStartInfo) {
collectMetric("dns_start", start, r, nil)
},
DNSDone: func(info httptrace.DNSDoneInfo) {
collectMetric("dns_done", start, r, info.Err)
},
ConnectStart: func(_, _ string) {
collectMetric("connect_start", start, r, nil)
},
ConnectDone: func(_, _ string, err error) {
collectMetric("connect_done", start, r, err)
},
TLSHandshakeStart: func() {
collectMetric("tls_handshake_start", start, r, nil)
},
TLSHandshakeDone: func(_ tls.ConnectionState, err error) {
collectMetric("tls_handshake_done", start, r, nil)
},
WroteRequest: func(info httptrace.WroteRequestInfo) {
collectMetric("wrote_request", start, r, info.Err)
},
GotFirstResponseByte: func() {
collectMetric("got_first_resp_byte", start, r, nil)
},
}

// Do request with tracing enabled
r = r.WithContext(httptrace.WithClientTrace(r.Context(), trace))

// // TODO: uncomment when issue #55 is solved (N^2 request will increase cardinality of path_ metrics too much otherwise)
// typeFromCtxFn := promhttp.WithLabelFromCtx("type", func(ctx context.Context) string {
// return ctx.Value(kubenurseTypeKey{}).(string)
// })

return roundTripper
rt := next // variable pinning :) essential, to prevent always re-instrumenting the original variable
rt = promhttp.InstrumentRoundTripperCounter(httpclientReqTotal, rt)
rt = promhttp.InstrumentRoundTripperDuration(httpclientReqDuration, rt)
return rt.RoundTrip(r)
})
}
30 changes: 17 additions & 13 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(_ context.Context, discovery *kubediscovery.Client, promRegistry *prome

httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: withRequestTracing(promRegistry, transport),
Transport: withHttptrace(promRegistry, transport, durationHistogramBuckets),
}

return &Checker{
Expand Down Expand Up @@ -163,43 +163,43 @@ func (c *Checker) StopScheduled() {
}

// APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link
func (c *Checker) APIServerDirect() (string, error) {
func (c *Checker) APIServerDirect(ctx context.Context) (string, error) {
if c.SkipCheckAPIServerDirect {
return skippedStr, nil
}

apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort)

return c.doRequest(apiurl)
return c.doRequest(ctx, apiurl)
}

// APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL
func (c *Checker) APIServerDNS() (string, error) {
func (c *Checker) APIServerDNS(ctx context.Context) (string, error) {
if c.SkipCheckAPIServerDNS {
return skippedStr, nil
}

apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort)

return c.doRequest(apiurl)
return c.doRequest(ctx, apiurl)
}

// MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress
func (c *Checker) MeIngress() (string, error) {
func (c *Checker) MeIngress(ctx context.Context) (string, error) {
if c.SkipCheckMeIngress {
return skippedStr, nil
}

return c.doRequest(c.KubenurseIngressURL + "/alwayshappy") //nolint:goconst // readability
return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability
}

// MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service
func (c *Checker) MeService() (string, error) {
func (c *Checker) MeService(ctx context.Context) (string, error) {
if c.SkipCheckMeService {
return skippedStr, nil
}

return c.doRequest(c.KubenurseServiceURL + "/alwayshappy")
return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy")
}

// checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes
Expand All @@ -210,12 +210,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) {
if neighbour.Phase == v1.PodRunning && // only query running pods (excludes pending ones)
!neighbour.Terminating && // exclude terminating pods
(c.allowUnschedulable || neighbour.NodeSchedulable == kubediscovery.NodeSchedulable) {
check := func() (string, error) {
check := func(ctx context.Context) (string, error) {
if c.UseTLS {
return c.doRequest("https://" + neighbour.PodIP + ":8443/alwayshappy")
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy")
}

return c.doRequest("http://" + neighbour.PodIP + ":8080/alwayshappy")
return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy")
}

_, _ = c.measure(check, "path_"+neighbour.NodeName)
Expand All @@ -227,8 +227,12 @@ func (c *Checker) checkNeighbours(nh []kubediscovery.Neighbour) {
func (c *Checker) measure(check Check, label string) (string, error) {
start := time.Now()

// Add our label (check type) to the context so our http tracer can annotate
// metrics and errors based with the label
ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, label)

// Execute check
res, err := check()
res, err := check(ctx)

// Process metrics
c.durationHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
Expand Down
5 changes: 3 additions & 2 deletions internal/servicecheck/transport.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package servicecheck

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -17,14 +18,14 @@ const (
)

// doRequest does an http request only to get the http status code
func (c *Checker) doRequest(url string) (string, error) {
func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
// Read Bearer Token file from ServiceAccount
token, err := os.ReadFile(K8sTokenFile)
if err != nil {
return errStr, fmt.Errorf("load kubernetes serviceaccount token from %s: %w", K8sTokenFile, err)
}

req, _ := http.NewRequest("GET", url, http.NoBody)
req, _ := http.NewRequestWithContext(ctx, "GET", url, http.NoBody)

// Only add the Bearer for API Server Requests
if strings.HasSuffix(url, "/version") {
Expand Down
5 changes: 3 additions & 2 deletions internal/servicecheck/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package servicecheck

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -63,8 +64,8 @@ type Result struct {
Neighbourhood []kubediscovery.Neighbour `json:"neighbourhood"`
}

// Check is the signature used by all checks that the checker can execute
type Check func() (string, error)
// Check is the signature used by all checks that the checker can execute.
type Check func(ctx context.Context) (string, error)

// CachedResult represents a cached check result that is valid until the expiration.
type CachedResult struct {
Expand Down

0 comments on commit b650ea3

Please sign in to comment.