Skip to content

Commit

Permalink
chore: proper Context handling and simplifications
Browse files Browse the repository at this point in the history
Signed-off-by: Clément Nussbaumer <[email protected]>
  • Loading branch information
clementnuss committed Jul 22, 2024
1 parent f64d442 commit cafb7ae
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 73 deletions.
3 changes: 1 addition & 2 deletions internal/kubenurse/handler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kubenurse

import (
"context"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -14,7 +13,7 @@ func TestServerHandler(t *testing.T) {
r := require.New(t)

fakeClient := fake.NewFakeClient()
kubenurse, err := New(context.Background(), fakeClient)
kubenurse, err := New(fakeClient)

r.NoError(err)
r.NotNil(kubenurse)
Expand Down
25 changes: 18 additions & 7 deletions internal/kubenurse/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Server struct {
// * KUBENURSE_CHECK_ME_SERVICE
// * KUBENURSE_CHECK_NEIGHBOURHOOD
// * KUBENURSE_CHECK_INTERVAL
func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity
func New(c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity
mux := http.NewServeMux()

checkInterval := defaultCheckInterval
Expand Down Expand Up @@ -134,7 +134,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
}

// setup checker
chk, err := servicecheck.New(ctx, c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets)
chk, err := servicecheck.New(c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
}

// Run starts the periodic checker and the http/https server(s) and blocks until Shutdown was called.
func (s *Server) Run() error {
func (s *Server) Run(ctx context.Context) error {
var (
wg sync.WaitGroup
errc = make(chan error, 2) // max two errors can happen
Expand All @@ -211,7 +211,17 @@ func (s *Server) Run() error {
go func() {
defer wg.Done()

s.checker.RunScheduled(s.checkInterval)
ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
s.checker.Run(ctx)
case <-ctx.Done():
return
}
}
}()

wg.Add(1)
Expand Down Expand Up @@ -259,7 +269,7 @@ func (s *Server) Run() error {
}

// Shutdown disables the readiness probe and then gracefully halts the kubenurse http/https server(s).
func (s *Server) Shutdown(ctx context.Context) error {
func (s *Server) Shutdown() error {
s.ready.Store(false)

// wait before actually shutting down the http/s server, as the updated
Expand All @@ -268,8 +278,9 @@ func (s *Server) Shutdown(ctx context.Context) error {
// me_ingress or path errors in other pods
time.Sleep(s.checker.ShutdownDuration)

// stop the scheduled checker
s.checker.StopScheduled()
// background ctx since, the "root" context is already canceled
ctx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()

if err := s.http.Shutdown(ctx); err != nil {
return fmt.Errorf("stop http server: %w", err)
Expand Down
9 changes: 6 additions & 3 deletions internal/kubenurse/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubenurse
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand All @@ -12,24 +13,26 @@ func TestCombined(t *testing.T) {
r := require.New(t)

fakeClient := fake.NewFakeClient()
kubenurse, err := New(context.Background(), fakeClient)
kubenurse, err := New(fakeClient)
r.NoError(err)
r.NotNil(kubenurse)

t.Run("start/stop", func(t *testing.T) {
r := require.New(t)
errc := make(chan error, 1)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
go func() {
// blocks until shutdown is called
err := kubenurse.Run()
err := kubenurse.Run(ctx)

errc <- err
close(errc)
cancel()
}()

// Shutdown, Run() should stop after function completes
err := kubenurse.Shutdown(context.Background())
err := kubenurse.Shutdown()
r.NoError(err)

err = <-errc // blocks until kubenurse.Run() finishes and eventually returns an error
Expand Down
42 changes: 10 additions & 32 deletions internal/servicecheck/servicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (

// New configures the checker with a httpClient and a cache timeout for check
// results. Other parameters of the Checker struct need to be configured separately.
func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,
func New(cl client.Client, promRegistry *prometheus.Registry,
allowUnschedulable bool, cacheTTL time.Duration, durationHistogramBuckets []float64) (*Checker, error) {
// setup http transport
tlsConfig, err := generateTLSConfig(os.Getenv("KUBENURSE_EXTRA_CA"))
Expand Down Expand Up @@ -63,13 +63,12 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry,
client: cl,
httpClient: httpClient,
cacheTTL: cacheTTL,
stop: make(chan struct{}),
}, nil
}

// Run runs all servicechecks and returns the result togeter with a boolean which indicates success. The cache
// is respected.
func (c *Checker) Run() {
func (c *Checker) Run(ctx context.Context) {
// Run Checks
result := sync.Map{}

Expand All @@ -91,17 +90,17 @@ func (c *Checker) Run() {

wg.Add(4)

go c.measure(&wg, &result, c.APIServerDirect, APIServerDirect)
go c.measure(&wg, &result, c.APIServerDNS, APIServerDNS)
go c.measure(&wg, &result, c.MeIngress, meIngress)
go c.measure(&wg, &result, c.MeService, meService)
go c.measure(ctx, &wg, &result, c.APIServerDirect, APIServerDirect)
go c.measure(ctx, &wg, &result, c.APIServerDNS, APIServerDNS)
go c.measure(ctx, &wg, &result, c.MeIngress, meIngress)
go c.measure(ctx, &wg, &result, c.MeService, meService)

if c.SkipCheckNeighbourhood {
result.Store(NeighbourhoodState, skippedStr)
return
}

neighbours, err := c.getNeighbours(context.Background(), c.KubenurseNamespace, c.NeighbourFilter)
neighbours, err := c.getNeighbours(ctx, c.KubenurseNamespace, c.NeighbourFilter)
if err != nil {
result.Store(NeighbourhoodState, err.Error())
return
Expand All @@ -121,33 +120,12 @@ func (c *Checker) Run() {
return c.doRequest(ctx, podIPtoURL(neighbour.PodIP, c.UseTLS), true)
}

go c.measure(&wg, &result, check, "path_"+neighbour.NodeName)
go c.measure(ctx, &wg, &result, check, "path_"+neighbour.NodeName)
}

wg.Wait()
}

// RunScheduled runs the checks in the specified interval which can be used to keep the metrics up-to-date. This
// function does not return until StopScheduled is called.
func (c *Checker) RunScheduled(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()

for {
select {
case <-ticker.C:
c.Run()
case <-c.stop:
return
}
}
}

// StopScheduled is used to stop the scheduled run of checks.
func (c *Checker) StopScheduled() {
close(c.stop)
}

// APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link
func (c *Checker) APIServerDirect(ctx context.Context) string {
if c.SkipCheckAPIServerDirect {
Expand Down Expand Up @@ -189,12 +167,12 @@ func (c *Checker) MeService(ctx context.Context) string {
}

// measure implements metric collections for the check
func (c *Checker) measure(wg *sync.WaitGroup, res *sync.Map, check Check, requestType string) {
func (c *Checker) measure(ctx context.Context, wg *sync.WaitGroup, res *sync.Map, check Check, requestType string) {
// Add our label (check type) to the context so our http tracer can annotate
// metrics and errors based with the label
defer wg.Done()

ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, requestType)
ctx = context.WithValue(ctx, kubenurseTypeKey{}, requestType)
res.Store(requestType, check(ctx))
}

Expand Down
19 changes: 2 additions & 17 deletions internal/servicecheck/servicecheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,14 @@ func TestCombined(t *testing.T) {
// fake client, with a dummy neighbour pod
fakeClient := fake.NewFakeClient(&fakeNeighbourPod)

checker, err := New(context.Background(), fakeClient, prometheus.NewRegistry(), false, 3*time.Second, prometheus.DefBuckets)
checker, err := New(fakeClient, prometheus.NewRegistry(), false, 3*time.Second, prometheus.DefBuckets)
r.NoError(err)
r.NotNil(checker)

t.Run("run", func(t *testing.T) {
r := require.New(t)
checker.Run()
checker.Run(context.Background())

r.Equal(okStr, checker.LastCheckResult[NeighbourhoodState])
})

t.Run("scheduled", func(t *testing.T) {
stopped := make(chan struct{})

go func() {
// blocks until StopScheduled()
checker.RunScheduled(time.Second * 5)

close(stopped)
}()

checker.StopScheduled()

<-stopped
})
}
5 changes: 1 addition & 4 deletions internal/servicecheck/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Checker struct {
SkipCheckMeIngress bool
SkipCheckMeService bool

// shutdownDuration defines the time during which kubenurse will wait before stopping
// shutdownDuration defines the time during which kubenurse will accept https requests during shutdown
ShutdownDuration time.Duration

// Kubernetes API
Expand Down Expand Up @@ -55,9 +55,6 @@ type Checker struct {

// cacheTTL defines the TTL of how long a cached result is valid
cacheTTL time.Duration

// stop is used to cancel RunScheduled
stop chan struct{}
}

// Check is the signature used by all checks that the checker can execute.
Expand Down
11 changes: 3 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/postfinance/kubenurse/internal/kubenurse"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -62,7 +61,7 @@ func main() {
return
}

server, err := kubenurse.New(ctx, c)
server, err := kubenurse.New(c)
if err != nil {
slog.Error("error in kubenurse.New call", "err", err)
return
Expand All @@ -73,17 +72,13 @@ func main() {

slog.Info("shutting down, received signal to stop")

// background ctx since, the "root" context is already canceled
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()

if err := server.Shutdown(shutdownCtx); err != nil {
if err := server.Shutdown(); err != nil {
slog.Error("error during graceful shutdown", "err", err)
}
}()

// blocks, until the server is stopped by calling Shutdown()
if err := server.Run(); err != nil {
if err := server.Run(ctx); err != nil {
slog.Error("error while running kubenurse", "err", err)
}
}

0 comments on commit cafb7ae

Please sign in to comment.