Skip to content

Commit

Permalink
feat: add incoming neighbouring checks gauge
Browse files Browse the repository at this point in the history
permits to control that every kubenurse is checked by the
number of nodes defined with NEIGHBOUR_FILTERING env var.

Signed-off-by: Clément Nussbaumer <[email protected]>
  • Loading branch information
clementnuss committed Apr 4, 2024
1 parent 9943b92 commit b27de93
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 14 deletions.
9 changes: 9 additions & 0 deletions internal/kubenurse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,12 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) {
_ = enc.Encode(out)
}
}

func (s *Server) alwaysHappyHandler() func(w http.ResponseWriter, r *http.Request) {
return func(_ http.ResponseWriter, r *http.Request) {
origin := r.Header.Get(servicecheck.NeighbourOriginHeader)
if origin != "" {
s.neighboursTTLCache.Insert(origin)
}
}
}
28 changes: 27 additions & 1 deletion internal/kubenurse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Server struct {
// Mutex to protect ready flag
mu *sync.Mutex
ready bool

// Neighbourhood incoming checks
neighbouringIncomingChecks prometheus.Gauge
neighboursTTLCache TTLCache[string]
}

// New creates a new kubenurse server. The server can be configured with the following environment variables:
Expand Down Expand Up @@ -94,12 +98,23 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
ready: true,
}

server.neighboursTTLCache.Init(60 * time.Second)

promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)

server.neighbouringIncomingChecks = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: servicecheck.MetricsNamespace,
Name: "neighbourhood_incoming_checks",
Help: "Number of unique source nodes checks in the last minute for the neighbourhood checks",
},
)
promRegistry.MustRegister(server.neighbouringIncomingChecks)

var histogramBuckets []float64

if bucketsString := os.Getenv("KUBENURSE_HISTOGRAM_BUCKETS"); bucketsString != "" {
Expand Down Expand Up @@ -166,7 +181,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
// setup http routes
mux.HandleFunc("/ready", server.readyHandler())
mux.HandleFunc("/alive", server.aliveHandler())
mux.HandleFunc("/alwayshappy", func(http.ResponseWriter, *http.Request) {})
mux.HandleFunc("/alwayshappy", server.alwaysHappyHandler())
mux.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}))
mux.Handle("/", http.RedirectHandler("/alive", http.StatusMovedPermanently))

Expand All @@ -180,6 +195,17 @@ func (s *Server) Run() error {
errc = make(chan error, 2) // max two errors can happen
)

go func() { // update the incoming neighbouring check gauge every second
t := time.NewTicker(1 * time.Second)
defer t.Stop()

for range t.C {
s.neighbouringIncomingChecks.Set(
float64(s.neighboursTTLCache.ActiveEntries()),
)
}
}()

wg.Add(1)

go func() {
Expand Down
6 changes: 3 additions & 3 deletions internal/servicecheck/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (rt RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durationHistogram []float64) http.RoundTripper {
httpclientReqTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "httpclient_requests_total",
Help: "A counter for requests from the kubenurse http client.",
},
Expand All @@ -36,7 +36,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati

httpclientReqDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "httpclient_request_duration_seconds",
Help: "A latency histogram of request latencies from the kubenurse http client.",
Buckets: durationHistogram,
Expand All @@ -46,7 +46,7 @@ func withHttptrace(registry *prometheus.Registry, next http.RoundTripper, durati

httpclientTraceReqDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "httpclient_trace_request_duration_seconds",
Help: "Latency histogram for requests from the kubenurse http client. Time in seconds since the start of the http request.",
Buckets: durationHistogram,
Expand Down
8 changes: 6 additions & 2 deletions internal/servicecheck/neighbours.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var (
currentNode string
)

const (
NeighbourOriginHeader = "KUBENURSE-NEIGHBOUR-ORIGIN"
)

// Neighbour represents a kubenurse which should be reachable
type Neighbour struct {
PodName string
Expand Down Expand Up @@ -93,10 +97,10 @@ func (c *Checker) checkNeighbours(nh []*Neighbour) {
for _, neighbour := range nh {
check := func(ctx context.Context) (string, error) {
if c.UseTLS {
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy")
return c.doRequest(ctx, "https://"+neighbour.PodIP+":8443/alwayshappy", true)
}

return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy")
return c.doRequest(ctx, "http://"+neighbour.PodIP+":8080/alwayshappy", true)
}

_, _ = c.measure(check, "path_"+neighbour.NodeName)
Expand Down
14 changes: 7 additions & 7 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
okStr = "ok"
errStr = "error"
skippedStr = "skipped"
metricsNamespace = "kubenurse"
MetricsNamespace = "kubenurse"
)

// New configures the checker with a httpClient and a cache timeout for check
Expand All @@ -28,7 +28,7 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,
allowUnschedulable bool, cacheTTL time.Duration, durationHistogramBuckets []float64) (*Checker, error) {
errorCounter := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "errors_total",
Help: "Kubenurse error counter partitioned by error type",
},
Expand All @@ -37,7 +37,7 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,

durationHistogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Namespace: MetricsNamespace,
Name: "request_duration",
Help: "Kubenurse request duration partitioned by target path",
Buckets: durationHistogramBuckets,
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *Checker) APIServerDirect(ctx context.Context) (string, error) {

apiurl := fmt.Sprintf("https://%s:%s/version", c.KubernetesServiceHost, c.KubernetesServicePort)

return c.doRequest(ctx, apiurl)
return c.doRequest(ctx, apiurl, false)
}

// APIServerDNS checks the /version endpoint of the Kubernetes API Server through the Cluster DNS URL
Expand All @@ -174,7 +174,7 @@ func (c *Checker) APIServerDNS(ctx context.Context) (string, error) {

apiurl := fmt.Sprintf("https://kubernetes.default.svc.cluster.local:%s/version", c.KubernetesServicePort)

return c.doRequest(ctx, apiurl)
return c.doRequest(ctx, apiurl, false)
}

// MeIngress checks if the kubenurse is reachable at the /alwayshappy endpoint behind the ingress
Expand All @@ -183,7 +183,7 @@ func (c *Checker) MeIngress(ctx context.Context) (string, error) {
return skippedStr, nil
}

return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy") //nolint:goconst // readability
return c.doRequest(ctx, c.KubenurseIngressURL+"/alwayshappy", false) //nolint:goconst // readability
}

// MeService checks if the kubenurse is reachable at the /alwayshappy endpoint through the kubernetes service
Expand All @@ -192,7 +192,7 @@ func (c *Checker) MeService(ctx context.Context) (string, error) {
return skippedStr, nil
}

return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy")
return c.doRequest(ctx, c.KubenurseServiceURL+"/alwayshappy", false)
}

// measure implements metric collections for the check
Expand Down
7 changes: 6 additions & 1 deletion internal/servicecheck/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

// doRequest does an http request only to get the http status code
func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
func (c *Checker) doRequest(ctx context.Context, url string, addOriginHeader bool) (string, error) {
// Read Bearer Token file from ServiceAccount
token, err := os.ReadFile(K8sTokenFile)
if err != nil {
Expand All @@ -32,6 +32,11 @@ func (c *Checker) doRequest(ctx context.Context, url string) (string, error) {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}

if addOriginHeader {
hostname, _ := os.Hostname()
req.Header.Add(NeighbourOriginHeader, hostname)
}

resp, err := c.httpClient.Do(req)
if err != nil {
return err.Error(), err
Expand Down

0 comments on commit b27de93

Please sign in to comment.