diff --git a/exp/instancestate/awsinstancestate_controller.go b/exp/instancestate/awsinstancestate_controller.go index c04f8687c4..b40155e8cc 100644 --- a/exp/instancestate/awsinstancestate_controller.go +++ b/exp/instancestate/awsinstancestate_controller.go @@ -19,6 +19,7 @@ package instancestate import ( "context" "encoding/json" + "fmt" "sync" "time" @@ -46,6 +47,12 @@ import ( // Ec2InstanceStateLabelKey defines an ec2 instance state label. const Ec2InstanceStateLabelKey = "ec2-instance-state" +// ASGInstanceStateLabelKey defines an ASG EC2 instance state label. +const ASGInstanceStateLabelKey = "asg-instance-state" + +// EC2InstanceHealthStateLabelKey defines an EC2 instance health state label. +const EC2InstanceHealthStateLabelKey = "ec2-instance-health-state" + // AwsInstanceStateReconciler reconciles a AwsInstanceState object. type AwsInstanceStateReconciler struct { client.Client @@ -134,82 +141,99 @@ func (r *AwsInstanceStateReconciler) watchQueuesForInstanceEvents() { for range time.Tick(1 * time.Second) { // go through each cluster and check for messages on its queue r.queueURLs.Range(func(key, val interface{}) bool { - go func() { - qp := val.(queueParams) - sqsSvs, err := r.getSQSService(qp.region) - if err != nil { - r.Log.Error(err, "unable to create SQS client") - return - } - resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String(qp.URL)}) - if err != nil { - r.Log.Error(err, "failed to receive messages") - return - } - for _, msg := range resp.Messages { - m := message{} - err := json.Unmarshal([]byte(*msg.Body), &m) - - if err != nil { - r.Log.Error(err, "unable to marshall") - return - } - // TODO: handle errors during process message. We currently deletes the message regardless. - r.processMessage(ctx, m) - - _, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{ - QueueUrl: aws.String(qp.URL), - ReceiptHandle: msg.ReceiptHandle, - }) - - if err != nil { - r.Log.Error(err, "error deleting message", "queueURL", qp.URL, "messageReceiptHandle", msg.ReceiptHandle) - } - } - }() - + qp := val.(queueParams) + go r.processQueue(ctx, qp) return true }) } } -// processMessage triggers a reconcile on an AWSMachine if its EC2 instance state changed. -func (r *AwsInstanceStateReconciler) processMessage(ctx context.Context, msg message) { - if msg.Source != "aws.ec2" || msg.DetailType != instancestate.Ec2StateChangeNotification || msg.MessageDetail == nil { +func (r *AwsInstanceStateReconciler) processQueue(ctx context.Context, qp queueParams) { + sqsSvs, err := r.getSQSService(qp.region) + if err != nil { + r.Log.Error(err, "unable to create SQS client") return } + resp, err := receiveMessageFromQueue(sqsSvs, qp) + if err != nil { + r.Log.Error(err, "unable to receive message from queue") + return + } + + queueEvents := make([]instancestate.EventDetails, 10) + for _, msg := range resp.Messages { + event := instancestate.EventBridgeEvent{} + err := json.Unmarshal([]byte(*msg.Body), &event) + + if err != nil { + r.Log.Error(err, "unable to marshall") + return + } + + err = r.processMessage(ctx, event) + if err != nil { + r.Log.Error(err, "unable to process the message from queue") + return + } + message := msg + queueEvents = append(queueEvents, instancestate.EventDetails{ + Message: message, + EventBridgeEvent: event, + }) + } + instancestate.QueueEventMapping.Store(qp.URL, queueEvents) +} +func receiveMessageFromQueue(sqsSvs sqsiface.SQSAPI, qp queueParams) (*sqs.ReceiveMessageOutput, error) { + resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: aws.String(qp.URL), + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(20), + WaitTimeSeconds: aws.Int64(10), + }) + if err != nil { + return nil, err + } + return resp, nil +} + +// processMessage triggers reconcile on an AWSMachine if its EC2 instance state changed. +func (r *AwsInstanceStateReconciler) processMessage(ctx context.Context, event instancestate.EventBridgeEvent) error { + instanceID := event.Detail.InstanceID + if event.Source == "aws.health" { + instanceID = event.Detail.AffectedEntities[0].EntityValue + } // Fetch the awsMachine instance by InstanceID awsMachines := &infrav1.AWSMachineList{} - err := r.List(ctx, awsMachines, client.MatchingFields{controllers.InstanceIDIndex: msg.MessageDetail.InstanceID}) + err := r.List(ctx, awsMachines, client.MatchingFields{controllers.InstanceIDIndex: instanceID}) if err != nil { - r.Log.Error(err, "unable to list machines by instance ID", "instanceID", msg.MessageDetail.InstanceID) + r.Log.Error(err, "unable to list machines by instance ID", "instanceID", instanceID) } if len(awsMachines.Items) > 0 { machine := awsMachines.Items[0] if !machine.ObjectMeta.DeletionTimestamp.IsZero() { - return + return nil } patchHelper, err := patch.NewHelper(&machine, r.Client) if err != nil { r.Log.Error(err, "unable to create patch helper") } - // Trigger an update on the machine - labels := machine.GetLabels() - if labels == nil { - labels = make(map[string]string) + + labels, err := r.getMachineLabelOnEventType(machine, event) + if err != nil { + return err } - labels[Ec2InstanceStateLabelKey] = string(msg.MessageDetail.State) + // Trigger an update on the machine machine.SetLabels(labels) - err = patchHelper.Patch(ctx, &machine) if err != nil { r.Log.Error(err, "unable to patch AWS machine") } } + return nil } // getQueueURL retrieves the SQS queue URL for a given cluster. @@ -228,6 +252,31 @@ func (r *AwsInstanceStateReconciler) getQueueURL(cluster *infrav1.AWSCluster) (s return *resp.QueueUrl, nil } +func (r *AwsInstanceStateReconciler) getMachineLabelOnEventType(machine infrav1.AWSMachine, event instancestate.EventBridgeEvent) (map[string]string, error) { + labels := machine.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + + switch event.Source { + case "aws.autoscaling": + labels[ASGInstanceStateLabelKey] = string(event.Detail.State) + case "aws.ec2": + labels[Ec2InstanceStateLabelKey] = string(event.Detail.State) + case "aws.health": + if event.Detail.Service != "EC2" { + return nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", event.Detail.Service) + } + if event.Detail.EventTypeCategory != "scheduledChange" { + return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", event.Detail.EventTypeCategory) + } + labels[EC2InstanceHealthStateLabelKey] = event.Detail.EventTypeCategory + default: + r.Log.V(4).Info("event type from Amazon EventBridge is not supported", "event", event.Source) + } + return labels, nil +} + func queueNotFoundError(err error) bool { if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == sqs.ErrCodeQueueDoesNotExist { @@ -241,14 +290,3 @@ type queueParams struct { region string URL string } - -type message struct { - Source string `json:"source"` - DetailType string `json:"detail-type,omitempty"` - MessageDetail *messageDetail `json:"detail,omitempty"` -} - -type messageDetail struct { - InstanceID string `json:"instance-id,omitempty"` - State infrav1.InstanceState `json:"state,omitempty"` -} diff --git a/exp/instancestate/awsinstancestate_controller_test.go b/exp/instancestate/awsinstancestate_controller_test.go index ebd9702c0d..9cce2b994f 100644 --- a/exp/instancestate/awsinstancestate_controller_test.go +++ b/exp/instancestate/awsinstancestate_controller_test.go @@ -64,7 +64,12 @@ func TestAWSInstanceStateController(t *testing.T) { Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-2-url")}, nil) sqsSvs.EXPECT().GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-3-queue")}).AnyTimes(). Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-3-url")}, nil) - sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-1-url")}).AnyTimes(). + sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + QueueUrl: aws.String("aws-cluster-1-url"), + VisibilityTimeout: aws.Int64(20), + WaitTimeSeconds: aws.Int64(10), + }).AnyTimes(). DoAndReturn(func(arg *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) { m := &infrav1.AWSMachine{} lookupKey := types.NamespacedName{ @@ -85,9 +90,19 @@ func TestAWSInstanceStateController(t *testing.T) { return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil }) - sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-2-url")}).AnyTimes(). + sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + QueueUrl: aws.String("aws-cluster-2-url"), + VisibilityTimeout: aws.Int64(20), + WaitTimeSeconds: aws.Int64(10), + }).AnyTimes(). Return(&sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil) - sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-3-url")}).AnyTimes(). + sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{ + MaxNumberOfMessages: aws.Int64(10), + QueueUrl: aws.String("aws-cluster-3-url"), + VisibilityTimeout: aws.Int64(20), + WaitTimeSeconds: aws.Int64(10), + }).AnyTimes(). Return(&sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil) sqsSvs.EXPECT().DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: aws.String("aws-cluster-1-url"), ReceiptHandle: aws.String("message-receipt-handle")}).AnyTimes(). Return(nil, nil) diff --git a/pkg/cloud/services/instancestate/eventbridge_events.go b/pkg/cloud/services/instancestate/eventbridge_events.go new file mode 100644 index 0000000000..faa2da685d --- /dev/null +++ b/pkg/cloud/services/instancestate/eventbridge_events.go @@ -0,0 +1,92 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancestate + +import ( + "reflect" + "sync" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/pkg/errors" + + infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2" +) + +// QueueEventMapping stores the mapping of queueURL with all the events and its related details +// necessary to act upon based on the event type. +var QueueEventMapping sync.Map + +// EventDetails represents all the necessary details present in the SQS message. +type EventDetails struct { + EventBridgeEvent EventBridgeEvent + Message *sqs.Message +} + +// EventBridgeEvent is a structure to hold generic event details from Amazon EventBridge. +type EventBridgeEvent struct { + Version string `json:"version"` + ID string `json:"id"` + DetailType string `json:"detail-type"` + Source string `json:"source"` + Account string `json:"account"` + Time string `json:"time"` + Region string `json:"region"` + Resources []string `json:"resources"` + Detail *messageDetail `json:"detail"` +} + +// messageDetail holds information on the affected instance/entity. +type messageDetail struct { + InstanceID string `json:"instance-id,omitempty"` + State infrav1.InstanceState `json:"state,omitempty"` + EventTypeCategory string `json:"eventTypeCategory,omitempty"` + Service string `json:"service,omitempty"` + AffectedEntities []affectedEntity `json:"affectedEntities,omitempty"` +} + +// affectedEntity holds information about an entity that is affected by a Health event. +type affectedEntity struct { + EntityValue string `json:"entityValue"` +} + +// GetEventsFromSQS is a utility used across CAPA controllers in tandem with a watcher +// to get the events in SQS queue to be processed further by the controller. +func GetEventsFromSQS(queueURL string) []EventDetails { + var eventBridgeValues reflect.Value + if events, ok := QueueEventMapping.Load(queueURL); ok { + eventBridgeValues = reflect.ValueOf(events) + } + + return eventBridgeValues.Interface().([]EventDetails) +} + +// DeleteProcessedEvent deletes the event from queue if it's already processed by the awsinstancestate controller. +func (s Service) DeleteProcessedEvent(msg sqs.Message, queueURL string) error { + if _, ok := QueueEventMapping.Load(queueURL); ok { + QueueEventMapping.Delete(queueURL) + _, err := s.SQSClient.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: msg.ReceiptHandle, + }) + + if err != nil { + return errors.Wrapf(err, "error deleting message, queueURL %v, messageReceiptHandle %v", queueURL, msg.ReceiptHandle) + } + } + return nil +}