Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optionally delete sqs message when node not found #801

Merged
merged 2 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int

func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) {
defer wg.Done()
nodeFound := true
nodeName := drainEvent.NodeName

if nthConfig.UseProviderId {
Expand All @@ -348,6 +349,7 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto
nodeLabels, err := node.GetNodeLabels(nodeName)
if err != nil {
log.Err(err).Msgf("Unable to fetch node labels for node '%s' ", nodeName)
nodeFound = false
}
drainEvent.NodeLabels = nodeLabels
if drainEvent.PreDrainTask != nil {
Expand Down Expand Up @@ -376,15 +378,14 @@ func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Sto

if err != nil {
interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
<-interruptionEventStore.Workers
} else {
interruptionEventStore.MarkAllAsProcessed(nodeName)
if drainEvent.PostDrainTask != nil {
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
}
<-interruptionEventStore.Workers
}

if (err == nil || (!nodeFound && nthConfig.DeleteSqsMsgIfNodeNotFound)) && drainEvent.PostDrainTask != nil {
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
}
<-interruptionEventStore.Workers
}

func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
Expand Down
1 change: 1 addition & 0 deletions config/helm/aws-node-termination-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ The configuration in this table applies to AWS Node Termination Handler in IMDS
| `enableScheduledEventDraining` | If `true`, drain nodes before the maintenance window starts for an EC2 instance scheduled event. Only used in IMDS mode. | `true` |
| `enableRebalanceMonitoring` | If `true`, cordon nodes when the rebalance recommendation notice is received. If you'd like to drain the node in addition to cordoning, then also set `enableRebalanceDraining`. Only used in IMDS mode. | `false` |
| `enableRebalanceDraining` | If `true`, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode. | `false` |
| `deleteSqsMsgIfNodeNotFound` | If `true`, delete the SQS Message from the SQS Queue if the targeted node is not found. Only used in Queue Processor mode. | `false` |

### Testing Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ spec:
{{- end }}
- name: QUEUE_URL
value: {{ .Values.queueURL | quote }}
- name: DELETE_SQS_MSG_IF_NODE_NOT_FOUND
value: {{ .Values.deleteSqsMsgIfNodeNotFound | quote }}
- name: WORKERS
value: {{ .Values.workers | quote }}
{{- with .Values.extraEnv }}
Expand Down
3 changes: 3 additions & 0 deletions config/helm/aws-node-termination-handler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ enableRebalanceMonitoring: false
# enableRebalanceDraining If true, drain nodes when the rebalance recommendation notice is received. Only used in IMDS mode.
enableRebalanceDraining: false

# deleteSqsMsgIfNodeNotFound If true, delete the SQS Message from the SQS Queue if the targeted node(s) are not found. Only used in Queue Processor mode.
deleteSqsMsgIfNodeNotFound: false

# ---------------------------------------------------------------------------------------------------------------------
# Testing
# ---------------------------------------------------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const (
awsEndpointConfigKey = "AWS_ENDPOINT"
queueURLConfigKey = "QUEUE_URL"
completeLifecycleActionDelaySecondsKey = "COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS"
deleteSqsMsgIfNodeNotFoundKey = "DELETE_SQS_MSG_IF_NODE_NOT_FOUND"
)

// Config arguments set via CLI, environment variables, or defaults
Expand Down Expand Up @@ -159,6 +160,7 @@ type Config struct {
Workers int
UseProviderId bool
CompleteLifecycleActionDelaySeconds int
DeleteSqsMsgIfNodeNotFound bool
}

// ParseCliArgs parses cli arguments and uses environment variables as fallback values
Expand Down Expand Up @@ -220,6 +222,7 @@ func ParseCliArgs() (config Config, err error) {
flag.IntVar(&config.Workers, "workers", getIntEnv(workersConfigKey, workersDefault), "The amount of parallel event processors.")
flag.BoolVar(&config.UseProviderId, "use-provider-id", getBoolEnv(useProviderIdConfigKey, useProviderIdDefault), "If true, fetch node name through Kubernetes node spec ProviderID instead of AWS event PrivateDnsHostname.")
flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.")
flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.")
flag.Parse()

if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) {
Expand Down Expand Up @@ -299,6 +302,7 @@ func (c Config) PrintJsonConfigArgs() {
Bool("enable_scheduled_event_draining", c.EnableScheduledEventDraining).
Bool("enable_spot_interruption_draining", c.EnableSpotInterruptionDraining).
Bool("enable_sqs_termination_draining", c.EnableSQSTerminationDraining).
Bool("delete_sqs_msg_if_node_not_found", c.DeleteSqsMsgIfNodeNotFound).
Bool("enable_rebalance_monitoring", c.EnableRebalanceMonitoring).
Bool("enable_rebalance_draining", c.EnableRebalanceDraining).
Int("metadata_tries", c.MetadataTries).
Expand Down Expand Up @@ -346,6 +350,7 @@ func (c Config) PrintHumanConfigArgs() {
"\tenable-scheduled-event-draining: %t,\n"+
"\tenable-spot-interruption-draining: %t,\n"+
"\tenable-sqs-termination-draining: %t,\n"+
"\tdelete-sqs-msg-if-node-not-found: %t,\n"+
"\tenable-rebalance-monitoring: %t,\n"+
"\tenable-rebalance-draining: %t,\n"+
"\tmetadata-tries: %d,\n"+
Expand Down Expand Up @@ -384,6 +389,7 @@ func (c Config) PrintHumanConfigArgs() {
c.EnableScheduledEventDraining,
c.EnableSpotInterruptionDraining,
c.EnableSQSTerminationDraining,
c.DeleteSqsMsgIfNodeNotFound,
c.EnableRebalanceMonitoring,
c.EnableRebalanceDraining,
c.MetadataTries,
Expand Down
166 changes: 166 additions & 0 deletions test/e2e/rebalance-recommendation-sqs-node-not-found-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#!/bin/bash
set -euo pipefail

# Available env vars:
# $TMP_DIR
# $CLUSTER_NAME
# $KUBECONFIG
# $NODE_TERMINATION_HANDLER_DOCKER_REPO
# $NODE_TERMINATION_HANDLER_DOCKER_TAG
# $WEBHOOK_DOCKER_REPO
# $WEBHOOK_DOCKER_TAG
# $AEMM_URL
# $AEMM_VERSION

function fail_and_exit {
echo "❌ Rebalance Recommendation SQS Test failed $CLUSTER_NAME ❌"
exit "${1:-1}"
}

echo "Starting Rebalance Recommendation SQS Test for Node Termination Handler"
START_TIME=$(date -u +"%Y-%m-%dT%TZ")

SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )"

common_helm_args=()

localstack_helm_args=(
upgrade
--install
--namespace default
"$CLUSTER_NAME-localstack"
"$SCRIPTPATH/../../config/helm/localstack/"
--set nodeSelector."${NTH_CONTROL_LABEL}"
--set defaultRegion="${AWS_REGION}"
--wait
)

set -x
helm "${localstack_helm_args[@]}"
set +x

sleep 10

localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \
-o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \
| awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }')
echo "🥑 Using localstack pod ${localstack_pod}"

run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "awslocal ec2 run-instances --private-ip-address 192.168.0.4 --region ${AWS_REGION}")
instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId')
echo "🥑 Created awslocal EC2 instance (${instance_id}) not backed by a node"

CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}"
queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "${CREATE_SQS_CMD}" | jq -r .QueueUrl)

echo "🥑 Created SQS Queue ${queue_url}"

anth_helm_args=(
upgrade
--install
--namespace kube-system
"$CLUSTER_NAME-acth"
"$SCRIPTPATH/../../config/helm/aws-node-termination-handler/"
--set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO"
--set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG"
--set nodeSelector."${NTH_CONTROL_LABEL}"
--set tolerations[0].operator=Exists
--set awsAccessKeyID=foo
--set awsSecretAccessKey=bar
--set awsRegion="${AWS_REGION}"
--set awsEndpoint="http://localstack.default"
--set checkTagBeforeDraining=false
--set enableSqsTerminationDraining=true
--set deleteSqsMsgIfNodeNotFound=true
--set taintNode="true"
--set "queueURL=${queue_url}"
--wait
)
[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] &&
anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY")
[[ ${#common_helm_args[@]} -gt 0 ]] &&
anth_helm_args+=("${common_helm_args[@]}")

set -x
helm "${anth_helm_args[@]}"
set +x

emtp_helm_args=(
upgrade
--install
--namespace default
"$CLUSTER_NAME-emtp"
"$SCRIPTPATH/../../config/helm/webhook-test-proxy/"
--set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO"
--set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG"
--wait
)
[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] &&
emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY")
[[ ${#common_helm_args[@]} -gt 0 ]] &&
emtp_helm_args+=("${common_helm_args[@]}")

set -x
helm "${emtp_helm_args[@]}"
set +x

CHECK_CYCLES=15
CHECK_SLEEP=15

DEPLOYED=0

for i in $(seq 1 $CHECK_CYCLES); do
if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then
echo "✅ Verified regular-pod-test pod was scheduled and started!"
DEPLOYED=1
break
fi
echo "Setup Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds"
sleep $CHECK_SLEEP
done

if [[ $DEPLOYED -eq 0 ]]; then
echo "❌ regular-pod-test pod deployment failed"
fail_and_exit 2
fi

REBALANCE_EVENT=$(cat <<EOF
{
"version": "0",
"id": "5d5555d5-dd55-5555-5555-5555dd55d55d",
"detail-type": "EC2 Instance Rebalance Recommendation",
"source": "aws.ec2",
"account": "123456789012",
"time": "$(date -u +"%Y-%m-%dT%TZ")",
"region": "us-east-1",
"resources": [
"arn:aws:ec2:us-east-1b:instance/${instance_id}"
],
"detail": {
"instance-id": "${instance_id}"
}
}
EOF
)

REBALANCE_EVENT_ONE_LINE=$(echo "${REBALANCE_EVENT}" | tr -d '\n' |sed 's/\"/\\"/g')
SEND_SQS_CMD="awslocal sqs send-message --queue-url ${queue_url} --message-body \"${REBALANCE_EVENT_ONE_LINE}\" --region ${AWS_REGION}"
kubectl exec -i "${localstack_pod}" -- bash -c "${SEND_SQS_CMD}"
echo "✅ Sent Rebalance Recommendation to SQS queue: ${queue_url}"

GET_ATTRS_SQS_CMD="awslocal sqs get-queue-attributes --queue-url ${queue_url} --attribute-names All --region ${AWS_REGION}"

for i in $(seq 1 $CHECK_CYCLES); do
if [[ $(kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then
kubectl exec -i "${localstack_pod}" -- bash -c "${GET_ATTRS_SQS_CMD}"
echo "✅ Verified the message was deleted from the queue after processing!"
echo "✅ Rebalance Recommendation SQS Test Passed $CLUSTER_NAME! ✅"
exit 0
fi

echo "Assertion Loop $i/$CHECK_CYCLES, sleeping for $CHECK_SLEEP seconds"
sleep $CHECK_SLEEP
done

echo "❌ message was not removed from the queue after processing"
fail_and_exit 3
12 changes: 10 additions & 2 deletions test/k8s-local-cluster-test/run-test
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,18 @@ export KUBECONFIG="$TMP_DIR/kubeconfig"
trap "exit_and_fail" INT TERM ERR
trap "clean_up" EXIT

cat << EOF >$TMP_DIR/env
export KUBECONFIG=$TMP_DIR/kubeconfig
echo "Updated KUBECONFIG=$KUBECONFIG"

export PATH=$TMP_DIR:\$PATH
echo "Updated PATH=$PATH"

EOF

echo "======================================================================================================"
echo "To poke around your test manually:"
echo "export KUBECONFIG=$TMP_DIR/kubeconfig"
echo "export PATH=$TMP_DIR:\$PATH"
echo ". $TMP_DIR/env"
echo "kubectl get pods -A"
echo "======================================================================================================"

Expand Down