From a1b2d77fe348aee989ea7accf672f28afdaf6fe5 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Wed, 29 Mar 2023 15:57:49 -0500 Subject: [PATCH] optionally delete sqs message when node not found (#801) * optionally delete sqs message when node not found * remove unused variables --- cmd/node-termination-handler.go | 11 +- .../aws-node-termination-handler/README.md | 1 + .../templates/deployment.yaml | 2 + .../aws-node-termination-handler/values.yaml | 3 + pkg/config/config.go | 6 + ...nce-recommendation-sqs-node-not-found-test | 166 ++++++++++++++++++ test/k8s-local-cluster-test/run-test | 12 +- 7 files changed, 194 insertions(+), 7 deletions(-) create mode 100755 test/e2e/rebalance-recommendation-sqs-node-not-found-test diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 07104706..7e741974 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -333,6 +333,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) { defer wg.Done() + nodeFound := true nodeName := drainEvent.NodeName if nthConfig.UseProviderId { @@ -348,6 +349,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto nodeLabels, err := node.GetNodeLabels(nodeName) if err != nil { log.Err(err).Msgf("Unable to fetch node labels for node '%s' ", nodeName) + nodeFound = false } drainEvent.NodeLabels = nodeLabels if drainEvent.PreDrainTask != nil { @@ -376,15 +378,14 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto if err != nil { interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID) - <-interruptionEventStore.Workers } else { interruptionEventStore.MarkAllAsProcessed(nodeName) - if drainEvent.PostDrainTask != nil { - runPostDrainTask(node, nodeName, drainEvent, metrics, recorder) - } - <-interruptionEventStore.Workers } + if (err == nil || (!nodeFound && nthConfig.DeleteSqsMsgIfNodeNotFound)) && drainEvent.PostDrainTask != nil { + runPostDrainTask(node, nodeName, drainEvent, metrics, recorder) + } + <-interruptionEventStore.Workers } func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) { diff --git a/config/helm/aws-node-termination-handler/README.md b/config/helm/aws-node-termination-handler/README.md index 2fe7d391..11491272 100644 --- a/config/helm/aws-node-termination-handler/README.md +++ b/config/helm/aws-node-termination-handler/README.md @@ -156,6 +156,7 @@ The configuration in this table applies to AWS Node Termination Handler in IMDS | `enableScheduledEventDraining` | If `true`, drain nodes before the maintenance window starts for an EC2 instance scheduled event. Only used in IMDS mode. | `true` | | `enableRebalanceMonitoring` | If `true`, cordon nodes when the rebalance recommendation notice is received. If you'd like to drain the node in addition to cordoning, then also set `enableRebalanceDraining`. Only used in IMDS mode. | `false` | | `enableRebalanceDraining` | If `true`, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode. | `false` | +| `deleteSqsMsgIfNodeNotFound` | If `true`, delete the SQS Message from the SQS Queue if the targeted node is not found. Only used in Queue Processor mode. | `false` | ### Testing Configuration diff --git a/config/helm/aws-node-termination-handler/templates/deployment.yaml b/config/helm/aws-node-termination-handler/templates/deployment.yaml index 3024e42f..26a696a4 100644 --- a/config/helm/aws-node-termination-handler/templates/deployment.yaml +++ b/config/helm/aws-node-termination-handler/templates/deployment.yaml @@ -164,6 +164,8 @@ spec: {{- end }} - name: QUEUE_URL value: {{ .Values.queueURL | quote }} + - name: DELETE_SQS_MSG_IF_NODE_NOT_FOUND + value: {{ .Values.deleteSqsMsgIfNodeNotFound | quote }} - name: WORKERS value: {{ .Values.workers | quote }} {{- with .Values.extraEnv }} diff --git a/config/helm/aws-node-termination-handler/values.yaml b/config/helm/aws-node-termination-handler/values.yaml index ea7b7f77..3a4d5db4 100644 --- a/config/helm/aws-node-termination-handler/values.yaml +++ b/config/helm/aws-node-termination-handler/values.yaml @@ -277,6 +277,9 @@ enableRebalanceMonitoring: false # enableRebalanceDraining If true, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode. enableRebalanceDraining: false +# deleteSqsMsgIfNodeNotFound If true, delete the SQS Message from the SQS Queue if the targeted node(s) are not found. Only used in Queue Processor mode. +deleteSqsMsgIfNodeNotFound: false + # --------------------------------------------------------------------------------------------------------------------- # Testing # --------------------------------------------------------------------------------------------------------------------- diff --git a/pkg/config/config.go b/pkg/config/config.go index e7c99aea..4eabaf00 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -108,6 +108,7 @@ const ( awsEndpointConfigKey = "AWS_ENDPOINT" queueURLConfigKey = "QUEUE_URL" completeLifecycleActionDelaySecondsKey = "COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS" + deleteSqsMsgIfNodeNotFoundKey = "DELETE_SQS_MSG_IF_NODE_NOT_FOUND" ) // Config arguments set via CLI, environment variables, or defaults @@ -159,6 +160,7 @@ type Config struct { Workers int UseProviderId bool CompleteLifecycleActionDelaySeconds int + DeleteSqsMsgIfNodeNotFound bool } // ParseCliArgs parses cli arguments and uses environment variables as fallback values @@ -220,6 +222,7 @@ func ParseCliArgs() (config Config, err error) { flag.IntVar(&config.Workers, "workers", getIntEnv(workersConfigKey, workersDefault), "The amount of parallel event processors.") flag.BoolVar(&config.UseProviderId, "use-provider-id", getBoolEnv(useProviderIdConfigKey, useProviderIdDefault), "If true, fetch node name through Kubernetes node spec ProviderID instead of AWS event PrivateDnsHostname.") flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.") + flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.") flag.Parse() if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) { @@ -299,6 +302,7 @@ func (c Config) PrintJsonConfigArgs() { Bool("enable_scheduled_event_draining", c.EnableScheduledEventDraining). Bool("enable_spot_interruption_draining", c.EnableSpotInterruptionDraining). Bool("enable_sqs_termination_draining", c.EnableSQSTerminationDraining). + Bool("delete_sqs_msg_if_node_not_found", c.DeleteSqsMsgIfNodeNotFound). Bool("enable_rebalance_monitoring", c.EnableRebalanceMonitoring). Bool("enable_rebalance_draining", c.EnableRebalanceDraining). Int("metadata_tries", c.MetadataTries). @@ -346,6 +350,7 @@ func (c Config) PrintHumanConfigArgs() { "\tenable-scheduled-event-draining: %t,\n"+ "\tenable-spot-interruption-draining: %t,\n"+ "\tenable-sqs-termination-draining: %t,\n"+ + "\tdelete-sqs-msg-if-node-not-found: %t,\n"+ "\tenable-rebalance-monitoring: %t,\n"+ "\tenable-rebalance-draining: %t,\n"+ "\tmetadata-tries: %d,\n"+ @@ -384,6 +389,7 @@ func (c Config) PrintHumanConfigArgs() { c.EnableScheduledEventDraining, c.EnableSpotInterruptionDraining, c.EnableSQSTerminationDraining, + c.DeleteSqsMsgIfNodeNotFound, c.EnableRebalanceMonitoring, c.EnableRebalanceDraining, c.MetadataTries, diff --git a/test/e2e/rebalance-recommendation-sqs-node-not-found-test b/test/e2e/rebalance-recommendation-sqs-node-not-found-test new file mode 100755 index 00000000..df869c89 --- /dev/null +++ b/test/e2e/rebalance-recommendation-sqs-node-not-found-test @@ -0,0 +1,166 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $WEBHOOK_DOCKER_REPO +# $WEBHOOK_DOCKER_TAG +# $AEMM_URL +# $AEMM_VERSION + +function fail_and_exit { + echo "❌ Rebalance Recommendation SQS Test failed $CLUSTER_NAME ❌" + exit "${1:-1}" +} + +echo "Starting Rebalance Recommendation SQS Test for Node Termination Handler" +START_TIME=$(date -u +"%Y-%m-%dT%TZ") + +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" + +common_helm_args=() + +localstack_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-localstack" + "$SCRIPTPATH/../../config/helm/localstack/" + --set nodeSelector."${NTH_CONTROL_LABEL}" + --set defaultRegion="${AWS_REGION}" + --wait +) + +set -x +helm "${localstack_helm_args[@]}" +set +x + +sleep 10 + +localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ + -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ + | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') +echo "🥑 Using localstack pod ${localstack_pod}" + +run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "awslocal ec2 run-instances --private-ip-address 192.168.0.4 --region ${AWS_REGION}") +instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') +echo "🥑 Created awslocal EC2 instance (${instance_id}) not backed by a node" + +CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" +queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl) + +echo "🥑 Created SQS Queue ${queue_url}" + +anth_helm_args=( + upgrade + --install + --namespace kube-system + "$CLUSTER_NAME-acth" + "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" + --set nodeSelector."${NTH_CONTROL_LABEL}" + --set tolerations[0].operator=Exists + --set awsAccessKeyID=foo + --set awsSecretAccessKey=bar + --set awsRegion="${AWS_REGION}" + --set awsEndpoint="http://localstack.default" + --set checkTagBeforeDraining=false + --set enableSqsTerminationDraining=true + --set deleteSqsMsgIfNodeNotFound=true + --set taintNode="true" + --set "queueURL=${queue_url}" + --wait +) +[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] && + anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + anth_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${anth_helm_args[@]}" +set +x + +emtp_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-emtp" + "$SCRIPTPATH/../../config/helm/webhook-test-proxy/" + --set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO" + --set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG" + --wait +) +[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] && + emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + emtp_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${emtp_helm_args[@]}" +set +x + +CHECK_CYCLES=15 +CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in $(seq 1 $CHECK_CYCLES); do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + echo "Setup Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds" + sleep $CHECK_SLEEP +done + +if [[ $DEPLOYED -eq 0 ]]; then + echo "❌ regular-pod-test pod deployment failed" + fail_and_exit 2 +fi + +REBALANCE_EVENT=$(cat <$TMP_DIR/env +export KUBECONFIG=$TMP_DIR/kubeconfig +echo "Updated KUBECONFIG=$KUBECONFIG" + +export PATH=$TMP_DIR:\$PATH +echo "Updated PATH=$PATH" + +EOF + echo "======================================================================================================" echo "To poke around your test manually:" -echo "export KUBECONFIG=$TMP_DIR/kubeconfig" -echo "export PATH=$TMP_DIR:\$PATH" +echo ". $TMP_DIR/env" echo "kubectl get pods -A" echo "======================================================================================================"