Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Maksim Paskal <[email protected]>
  • Loading branch information
maksim-paskal committed Mar 21, 2024
1 parent 8329c49 commit f2f710e
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 117 deletions.
5 changes: 0 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ test:
e2e:
go test -v -race ./e2e \
-kubeconfig=$(KUBECONFIG) \
-log.level=INFO \
-log.pretty \
-taint.node \
-taint.effect=NoExecute \
-podGracePeriodSeconds=30 \
-node=${node} \
-telegram.token=${telegramToken} \
-telegram.chatID=${telegramChatID}
Expand Down
1 change: 0 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ ignore:
# ignore because to test need active connection to the kubernetes cluster
- "pkg/web/web.go"
- "pkg/api/api.go"
- "pkg/events/events.go"
- "pkg/client/client.go"
73 changes: 55 additions & 18 deletions e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,88 @@ package main_test

import (
"context"
"encoding/json"
"flag"
"net/http"
"net/http/httptest"
"testing"

"github.com/maksim-paskal/aks-node-termination-handler/pkg/alert"
"github.com/maksim-paskal/aks-node-termination-handler/internal"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/api"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/client"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/config"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/template"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/types"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var ctx = context.TODO()
const testAzureName = "test-e2e-resource"

func TestDrain(t *testing.T) {
func TestDrain(t *testing.T) { //nolint:funlen
t.Parallel()

log.SetLevel(log.DebugLevel)
log.SetReportCaller(true)

eventID := "test-event-id"

handler := http.NewServeMux()
handler.HandleFunc("/document", func(w http.ResponseWriter, _ *http.Request) {
message, _ := json.Marshal(types.ScheduledEventsType{
DocumentIncarnation: 1,
Events: []types.ScheduledEventsEvent{
{
EventId: eventID,
EventType: types.EventTypePreempt,
ResourceType: "resourceType",
Resources: []string{testAzureName},
},
},
})

w.WriteHeader(http.StatusOK)
_, _ = w.Write(message)
})

testServer := httptest.NewServer(handler)

_ = flag.Set("config", "./testdata/config_test.yaml")
_ = flag.Set("endpoint", testServer.URL+"/document")
_ = flag.Set("resource.name", testAzureName)

flag.Parse()

if err := config.Load(); err != nil {
ctx := context.TODO()

if err := internal.Run(ctx); err != nil {
t.Fatal(err)
}

if err := client.Init(); err != nil {
node, err := client.GetKubernetesClient().CoreV1().Nodes().Get(ctx, *config.Get().NodeName, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

if err := alert.Init(); err != nil {
t.Fatal(err)
if !node.Spec.Unschedulable {
t.Fatal("node must be unschedulable")
}

if err := alert.SendTelegram(&template.MessageType{Template: "e2e"}); err != nil {
t.Fatal(err)
if len(node.Spec.Taints) == 0 {
t.Fatal("node must have taints")
}

if err := api.DrainNode(ctx, *config.Get().NodeName, "Preempt", "manual"); err != nil {
t.Fatal(err)
taintFound := false

for _, taint := range node.Spec.Taints {
if taint.Key == api.GetTaintKey(testAzureName) && taint.Value == string(corev1.TaintEffectNoSchedule) {
taintFound = true

break
}
}

if err := api.AddNodeEvent(ctx, &types.EventMessage{
Type: "Info",
Reason: "TestDrain",
Message: "TestDrain",
}); err != nil {
t.Fatal(err)
if !taintFound {
t.Fatal("taint not found")
}
}
6 changes: 4 additions & 2 deletions e2e/testdata/config_test.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
taintNode: true
taintEffect: NoSchedule
taintnode: true
tainteffect: NoSchedule
podgraceperiodseconds: 30
exitafternodedrain: true
101 changes: 94 additions & 7 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@ import (
"github.com/maksim-paskal/aks-node-termination-handler/pkg/client"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/config"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/events"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/template"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/types"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/web"
"github.com/maksim-paskal/aks-node-termination-handler/pkg/webhook"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

func Run(ctx context.Context) error {
err := config.Check()
err := config.Load()
if err != nil {
return errors.Wrap(err, "error in config check")
return errors.Wrap(err, "error in config load")
}

err = config.Load()
err = config.Check()
if err != nil {
return errors.Wrap(err, "error in config load")
return errors.Wrap(err, "error in config check")
}

log.Debugf("using config: %s", config.Get().String())
Expand All @@ -49,14 +52,98 @@ func Run(ctx context.Context) error {
return errors.Wrap(err, "error in init api")
}

go cache.SheduleCleaning(ctx)
go web.Start(ctx)

if err := startReadingEvents(ctx); err != nil {
return errors.Wrap(err, "error in startReadingEvents")
}

return nil
}

func startReadingEvents(ctx context.Context) error { //nolint:funlen
azureResource, err := api.GetAzureResourceName(ctx, *config.Get().NodeName)
if err != nil {
return errors.Wrap(err, "error in getting azure resource name")
}

go cache.SheduleCleaning(ctx)
go events.ReadEvents(ctx, azureResource)
go web.Start(ctx)
eventReader := events.NewReader()
eventReader.AzureResource = azureResource
eventReader.Period = *config.Get().Period
eventReader.Endpoint = *config.Get().Endpoint
eventReader.RequestTimeout = *config.Get().RequestTimeout
eventReader.NodeName = *config.Get().NodeName
eventReader.BeforeReading = func(ctx context.Context) error {
nodeEvent := types.EventMessage{
Type: "Info",
Reason: "ReadEvents",
Message: "Start to listen events from Azure API",
}
if err := api.AddNodeEvent(ctx, &nodeEvent); err != nil {
return errors.Wrap(err, "error in add node event")
}

return nil
}

eventReader.EventReceived = func(ctx context.Context, event types.ScheduledEventsEvent) (bool, error) {
nodeEvent := types.EventMessage{
Type: "Warning",
Reason: string(event.EventType),
Message: "Azure API sended schedule event for this node",
}
if err := api.AddNodeEvent(ctx, &nodeEvent); err != nil {
return false, errors.Wrap(err, "error in add node event")
}

if config.Get().IsExcludedEvent(event.EventType) {
log.Infof("Excluded event %s by user config", event.EventType)

return false, nil
}

// send event in separate goroutine
go func() {
if err := sendEvent(ctx, event); err != nil {
log.WithError(err).Error("error in sendEvent")
}
}()

if err := api.DrainNode(ctx, *config.Get().NodeName, string(event.EventType), event.EventId); err != nil {
return false, errors.Wrap(err, "error in DrainNode")
}

return true, nil
}

// run reading events in separate goroutine
if *config.Get().ExitAfterNodeDrain {
eventReader.ReadEvents(ctx)
} else {
go eventReader.ReadEvents(ctx)
}

return nil
}

func sendEvent(ctx context.Context, event types.ScheduledEventsEvent) error {
message, err := template.NewMessageType(ctx, *config.Get().NodeName, event)
if err != nil {
return errors.Wrap(err, "error in template.NewMessageType")
}

log.Infof("Message: %+v", message)

message.Template = *config.Get().AlertMessage

if err := alert.SendTelegram(message); err != nil {
log.WithError(err).Error("error in alert.SendTelegram")
}

if err := webhook.SendWebHook(ctx, message); err != nil {
log.WithError(err).Error("error in webhook.SendWebHook")
}

return nil
}
13 changes: 10 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
const taintKeyPrefix = "aks-node-termination-handler"

func GetAzureResourceName(ctx context.Context, nodeName string) (string, error) {
// return user defined resource name
if len(*config.Get().ResourceName) > 0 {
return *config.Get().ResourceName, nil
}

node, err := client.GetKubernetesClient().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return "", errors.Wrap(err, "error in Clientset.CoreV1().Nodes().Get")
Expand Down Expand Up @@ -64,7 +69,7 @@ func DrainNode(ctx context.Context, nodeName string, eventType string, eventID s

// taint node before draining if effect is NoSchedule or TaintEffectPreferNoSchedule
if *config.Get().TaintNode && *config.Get().TaintEffect != string(corev1.TaintEffectNoExecute) {
err = addTaint(ctx, node, getTaintKey(eventType), eventID)
err = addTaint(ctx, node, GetTaintKey(eventType), eventID)
if err != nil {
return errors.Wrap(err, "failed to taint node")
}
Expand Down Expand Up @@ -98,7 +103,7 @@ func DrainNode(ctx context.Context, nodeName string, eventType string, eventID s
// taint node after draining if effect is TaintEffectNoExecute
// this NoExecute taint effect will stop all daemonsents on the node that can not handle this effect
if *config.Get().TaintNode && *config.Get().TaintEffect == string(corev1.TaintEffectNoExecute) {
err = addTaint(ctx, node, getTaintKey(eventType), eventID)
err = addTaint(ctx, node, GetTaintKey(eventType), eventID)
if err != nil {
return errors.Wrap(err, "failed to taint node")
}
Expand All @@ -107,11 +112,13 @@ func DrainNode(ctx context.Context, nodeName string, eventType string, eventID s
return nil
}

func getTaintKey(eventType string) string {
func GetTaintKey(eventType string) string {
return fmt.Sprintf("%s/%s", taintKeyPrefix, strings.ToLower(eventType))
}

func addTaint(ctx context.Context, node *corev1.Node, taintKey string, taintValue string) error {
log.Infof("Adding taint %s=%s on node %s", taintKey, taintValue, node.Name)

freshNode := node.DeepCopy()

var err error
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Type struct {
NodeGracePeriodSeconds *int
GracePeriodSeconds *int
DrainOnFreezeEvent *bool
ResourceName *string
ExitAfterNodeDrain *bool
}

var config = Type{
Expand Down Expand Up @@ -97,6 +99,8 @@ var config = Type{
NodeGracePeriodSeconds: flag.Int("nodeGracePeriodSeconds", defaultNodeGracePeriodSeconds, "maximum time in seconds to drain the node"), //nolint:lll
GracePeriodSeconds: flag.Int("gracePeriodSeconds", defaultGracePeriodSecond, "grace period is seconds for application termination"), //nolint:lll
DrainOnFreezeEvent: flag.Bool("drainOnFreezeEvent", false, "drain node on freeze event"),
ResourceName: flag.String("resource.name", "", "Azure resource name to drain"),
ExitAfterNodeDrain: flag.Bool("exitAfterNodeDrain", false, "process will exit after node drain"),
}

func (t *Type) GracePeriod() time.Duration {
Expand Down
Loading

0 comments on commit f2f710e

Please sign in to comment.