Skip to content

Commit

Permalink
optionally delete sqs message when node not found (#801)
Browse files Browse the repository at this point in the history
* optionally delete sqs message when node not found

* remove unused variables
  • Loading branch information
cjerad authored Mar 29, 2023
1 parent 40f0251 commit a1b2d77
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 7 deletions.
11 changes: 6 additions & 5 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int

func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) {
defer wg.Done()
nodeFound := true
nodeName := drainEvent.NodeName

if nthConfig.UseProviderId {
Expand All @@ -348,6 +349,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
nodeLabels, err := node.GetNodeLabels(nodeName)
if err != nil {
log.Err(err).Msgf("Unable to fetch node labels for node '%s' ", nodeName)
nodeFound = false
}
drainEvent.NodeLabels = nodeLabels
if drainEvent.PreDrainTask != nil {
Expand Down Expand Up @@ -376,15 +378,14 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto

if err != nil {
interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
<-interruptionEventStore.Workers
} else {
interruptionEventStore.MarkAllAsProcessed(nodeName)
if drainEvent.PostDrainTask != nil {
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
}
<-interruptionEventStore.Workers
}

if (err == nil || (!nodeFound && nthConfig.DeleteSqsMsgIfNodeNotFound)) && drainEvent.PostDrainTask != nil {
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
}
<-interruptionEventStore.Workers
}

func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
Expand Down
1 change: 1 addition & 0 deletions config/helm/aws-node-termination-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ The configuration in this table applies to AWS Node Termination Handler in IMDS
| `enableScheduledEventDraining` | If `true`, drain nodes before the maintenance window starts for an EC2 instance scheduled event. Only used in IMDS mode. | `true` |
| `enableRebalanceMonitoring` | If `true`, cordon nodes when the rebalance recommendation notice is received. If you'd like to drain the node in addition to cordoning, then also set `enableRebalanceDraining`. Only used in IMDS mode. | `false` |
| `enableRebalanceDraining` | If `true`, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode. | `false` |
| `deleteSqsMsgIfNodeNotFound` | If `true`, delete the SQS Message from the SQS Queue if the targeted node is not found. Only used in Queue Processor mode. | `false` |

### Testing Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ spec:
{{- end }}
- name: QUEUE_URL
value: {{ .Values.queueURL | quote }}
- name: DELETE_SQS_MSG_IF_NODE_NOT_FOUND
value: {{ .Values.deleteSqsMsgIfNodeNotFound | quote }}
- name: WORKERS
value: {{ .Values.workers | quote }}
{{- with .Values.extraEnv }}
Expand Down
3 changes: 3 additions & 0 deletions config/helm/aws-node-termination-handler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ enableRebalanceMonitoring: false
# enableRebalanceDraining If true, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode.
enableRebalanceDraining: false

# deleteSqsMsgIfNodeNotFound If true, delete the SQS Message from the SQS Queue if the targeted node(s) are not found. Only used in Queue Processor mode.
deleteSqsMsgIfNodeNotFound: false

# ---------------------------------------------------------------------------------------------------------------------
# Testing
# ---------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const (
awsEndpointConfigKey = "AWS_ENDPOINT"
queueURLConfigKey = "QUEUE_URL"
completeLifecycleActionDelaySecondsKey = "COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS"
deleteSqsMsgIfNodeNotFoundKey = "DELETE_SQS_MSG_IF_NODE_NOT_FOUND"
)

// Config arguments set via CLI, environment variables, or defaults
Expand Down Expand Up @@ -159,6 +160,7 @@ type Config struct {
Workers int
UseProviderId bool
CompleteLifecycleActionDelaySeconds int
DeleteSqsMsgIfNodeNotFound bool
}

// ParseCliArgs parses cli arguments and uses environment variables as fallback values
Expand Down Expand Up @@ -220,6 +222,7 @@ func ParseCliArgs() (config Config, err error) {
flag.IntVar(&config.Workers, "workers", getIntEnv(workersConfigKey, workersDefault), "The amount of parallel event processors.")
flag.BoolVar(&config.UseProviderId, "use-provider-id", getBoolEnv(useProviderIdConfigKey, useProviderIdDefault), "If true, fetch node name through Kubernetes node spec ProviderID instead of AWS event PrivateDnsHostname.")
flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.")
flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.")
flag.Parse()

if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) {
Expand Down Expand Up @@ -299,6 +302,7 @@ func (c Config) PrintJsonConfigArgs() {
Bool("enable_scheduled_event_draining", c.EnableScheduledEventDraining).
Bool("enable_spot_interruption_draining", c.EnableSpotInterruptionDraining).
Bool("enable_sqs_termination_draining", c.EnableSQSTerminationDraining).
Bool("delete_sqs_msg_if_node_not_found", c.DeleteSqsMsgIfNodeNotFound).
Bool("enable_rebalance_monitoring", c.EnableRebalanceMonitoring).
Bool("enable_rebalance_draining", c.EnableRebalanceDraining).
Int("metadata_tries", c.MetadataTries).
Expand Down Expand Up @@ -346,6 +350,7 @@ func (c Config) PrintHumanConfigArgs() {
"\tenable-scheduled-event-draining: %t,\n"+
"\tenable-spot-interruption-draining: %t,\n"+
"\tenable-sqs-termination-draining: %t,\n"+
"\tdelete-sqs-msg-if-node-not-found: %t,\n"+
"\tenable-rebalance-monitoring: %t,\n"+
"\tenable-rebalance-draining: %t,\n"+
"\tmetadata-tries: %d,\n"+
Expand Down Expand Up @@ -384,6 +389,7 @@ func (c Config) PrintHumanConfigArgs() {
c.EnableScheduledEventDraining,
c.EnableSpotInterruptionDraining,
c.EnableSQSTerminationDraining,
c.DeleteSqsMsgIfNodeNotFound,
c.EnableRebalanceMonitoring,
c.EnableRebalanceDraining,
c.MetadataTries,
Expand Down
166 changes: 166 additions & 0 deletions test/e2e/rebalance-recommendation-sqs-node-not-found-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#!/bin/bash
set -euo pipefail

# Available env vars:
# $TMP_DIR
# $CLUSTER_NAME
# $KUBECONFIG
# $NODE_TERMINATION_HANDLER_DOCKER_REPO
# $NODE_TERMINATION_HANDLER_DOCKER_TAG
# $WEBHOOK_DOCKER_REPO
# $WEBHOOK_DOCKER_TAG
# $AEMM_URL
# $AEMM_VERSION

function fail_and_exit {
echo "❌ Rebalance Recommendation SQS Test failed $CLUSTER_NAME"
exit "${1:-1}"
}

echo "Starting Rebalance Recommendation SQS Test for Node Termination Handler"
START_TIME=$(date -u +"%Y-%m-%dT%TZ")

SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )"

common_helm_args=()

localstack_helm_args=(
upgrade
--install
--namespace default
"$CLUSTER_NAME-localstack"
"$SCRIPTPATH/../../config/helm/localstack/"
--set nodeSelector."${NTH_CONTROL_LABEL}"
--set defaultRegion="${AWS_REGION}"
--wait
)

set -x
helm "${localstack_helm_args[@]}"
set +x

sleep 10

localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \
-o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \
| awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }')
echo "🥑 Using localstack pod ${localstack_pod}"

run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "awslocal ec2 run-instances --private-ip-address 192.168.0.4 --region ${AWS_REGION}")
instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId')
echo "🥑 Created awslocal EC2 instance (${instance_id}) not backed by a node"

CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}"
queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl)

echo "🥑 Created SQS Queue ${queue_url}"

anth_helm_args=(
upgrade
--install
--namespace kube-system
"$CLUSTER_NAME-acth"
"$SCRIPTPATH/../../config/helm/aws-node-termination-handler/"
--set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO"
--set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG"
--set nodeSelector."${NTH_CONTROL_LABEL}"
--set tolerations[0].operator=Exists
--set awsAccessKeyID=foo
--set awsSecretAccessKey=bar
--set awsRegion="${AWS_REGION}"
--set awsEndpoint="http://localstack.default"
--set checkTagBeforeDraining=false
--set enableSqsTerminationDraining=true
--set deleteSqsMsgIfNodeNotFound=true
--set taintNode="true"
--set "queueURL=${queue_url}"
--wait
)
[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] &&
anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY")
[[ ${#common_helm_args[@]} -gt 0 ]] &&
anth_helm_args+=("${common_helm_args[@]}")

set -x
helm "${anth_helm_args[@]}"
set +x

emtp_helm_args=(
upgrade
--install
--namespace default
"$CLUSTER_NAME-emtp"
"$SCRIPTPATH/../../config/helm/webhook-test-proxy/"
--set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO"
--set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG"
--wait
)
[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] &&
emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY")
[[ ${#common_helm_args[@]} -gt 0 ]] &&
emtp_helm_args+=("${common_helm_args[@]}")

set -x
helm "${emtp_helm_args[@]}"
set +x

CHECK_CYCLES=15
CHECK_SLEEP=15

DEPLOYED=0

for i in $(seq 1 $CHECK_CYCLES); do
if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then
echo "✅ Verified regular-pod-test pod was scheduled and started!"
DEPLOYED=1
break
fi
echo "Setup Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds"
sleep $CHECK_SLEEP
done

if [[ $DEPLOYED -eq 0 ]]; then
echo "❌ regular-pod-test pod deployment failed"
fail_and_exit 2
fi

REBALANCE_EVENT=$(cat <<EOF
{
"version": "0",
"id": "5d5555d5-dd55-5555-5555-5555dd55d55d",
"detail-type": "EC2 Instance Rebalance Recommendation",
"source": "aws.ec2",
"account": "123456789012",
"time": "$(date -u +"%Y-%m-%dT%TZ")",
"region": "us-east-1",
"resources": [
"arn:aws:ec2:us-east-1b:instance/${instance_id}"
],
"detail": {
"instance-id": "${instance_id}"
}
}
EOF
)

REBALANCE_EVENT_ONE_LINE=$(echo "${REBALANCE_EVENT}" | tr -d '\n' |sed 's/\"/\\"/g')
SEND_SQS_CMD="awslocal sqs send-message --queue-url ${queue_url} --message-body \"${REBALANCE_EVENT_ONE_LINE}\" --region ${AWS_REGION}"
kubectl exec -i "${localstack_pod}" -- bash -c "${SEND_SQS_CMD}"
echo "✅ Sent Rebalance Recommendation to SQS queue: ${queue_url}"

GET_ATTRS_SQS_CMD="awslocal sqs get-queue-attributes --queue-url ${queue_url} --attribute-names All --region ${AWS_REGION}"

for i in $(seq 1 $CHECK_CYCLES); do
if [[ $(kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then
kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}"
echo "✅ Verified the message was deleted from the queue after processing!"
echo "✅ Rebalance Recommendation SQS Test Passed $CLUSTER_NAME! ✅"
exit 0
fi

echo "Assertion Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds"
sleep $CHECK_SLEEP
done

echo "❌ message was not removed from the queue after processing"
fail_and_exit 3
12 changes: 10 additions & 2 deletions test/k8s-local-cluster-test/run-test
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,18 @@ export KUBECONFIG="$TMP_DIR/kubeconfig"
trap "exit_and_fail" INT TERM ERR
trap "clean_up" EXIT

cat << EOF >$TMP_DIR/env
export KUBECONFIG=$TMP_DIR/kubeconfig
echo "Updated KUBECONFIG=$KUBECONFIG"
export PATH=$TMP_DIR:\$PATH
echo "Updated PATH=$PATH"
EOF

echo "======================================================================================================"
echo "To poke around your test manually:"
echo "export KUBECONFIG=$TMP_DIR/kubeconfig"
echo "export PATH=$TMP_DIR:\$PATH"
echo ". $TMP_DIR/env"
echo "kubectl get pods -A"
echo "======================================================================================================"

Expand Down

0 comments on commit a1b2d77

Please sign in to comment.