Skip to content

Commit

Permalink
Graduation of EventBridge feature in CAPA
Browse files Browse the repository at this point in the history
  • Loading branch information
Ankitasw committed May 23, 2023
1 parent 8a2e6b7 commit cc23260
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 60 deletions.
152 changes: 95 additions & 57 deletions exp/instancestate/awsinstancestate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package instancestate
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -46,6 +47,12 @@ import (
// Ec2InstanceStateLabelKey defines an ec2 instance state label.
const Ec2InstanceStateLabelKey = "ec2-instance-state"

// ASGInstanceStateLabelKey defines an ASG EC2 instance state label.
const ASGInstanceStateLabelKey = "asg-instance-state"

// EC2InstanceHealthStateLabelKey defines an EC2 instance health state label.
const EC2InstanceHealthStateLabelKey = "ec2-instance-health-state"

// AwsInstanceStateReconciler reconciles a AwsInstanceState object.
type AwsInstanceStateReconciler struct {
client.Client
Expand Down Expand Up @@ -134,82 +141,99 @@ func (r *AwsInstanceStateReconciler) watchQueuesForInstanceEvents() {
for range time.Tick(1 * time.Second) {
// go through each cluster and check for messages on its queue
r.queueURLs.Range(func(key, val interface{}) bool {
go func() {
qp := val.(queueParams)
sqsSvs, err := r.getSQSService(qp.region)
if err != nil {
r.Log.Error(err, "unable to create SQS client")
return
}
resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String(qp.URL)})
if err != nil {
r.Log.Error(err, "failed to receive messages")
return
}
for _, msg := range resp.Messages {
m := message{}
err := json.Unmarshal([]byte(*msg.Body), &m)

if err != nil {
r.Log.Error(err, "unable to marshall")
return
}
// TODO: handle errors during process message. We currently deletes the message regardless.
r.processMessage(ctx, m)

_, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(qp.URL),
ReceiptHandle: msg.ReceiptHandle,
})

if err != nil {
r.Log.Error(err, "error deleting message", "queueURL", qp.URL, "messageReceiptHandle", msg.ReceiptHandle)
}
}
}()

qp := val.(queueParams)
go r.processQueue(ctx, qp)
return true
})
}
}

// processMessage triggers a reconcile on an AWSMachine if its EC2 instance state changed.
func (r *AwsInstanceStateReconciler) processMessage(ctx context.Context, msg message) {
if msg.Source != "aws.ec2" || msg.DetailType != instancestate.Ec2StateChangeNotification || msg.MessageDetail == nil {
func (r *AwsInstanceStateReconciler) processQueue(ctx context.Context, qp queueParams) {
sqsSvs, err := r.getSQSService(qp.region)
if err != nil {
r.Log.Error(err, "unable to create SQS client")
return
}
resp, err := receiveMessageFromQueue(sqsSvs, qp)
if err != nil {
r.Log.Error(err, "unable to receive message from queue")
return
}

queueEvents := make([]instancestate.EventDetails, 10)
for _, msg := range resp.Messages {
event := instancestate.EventBridgeEvent{}
err := json.Unmarshal([]byte(*msg.Body), &event)

if err != nil {
r.Log.Error(err, "unable to marshall")
return
}

err = r.processMessage(ctx, event)
if err != nil {
r.Log.Error(err, "unable to process the message from queue")
return
}
message := msg
queueEvents = append(queueEvents, instancestate.EventDetails{
Message: message,
EventBridgeEvent: event,
})
}
instancestate.QueueEventMapping.Store(qp.URL, queueEvents)
}

func receiveMessageFromQueue(sqsSvs sqsiface.SQSAPI, qp queueParams) (*sqs.ReceiveMessageOutput, error) {
resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: aws.String(qp.URL),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(20),
WaitTimeSeconds: aws.Int64(10),
})
if err != nil {
return nil, err
}
return resp, nil
}

// processMessage triggers reconcile on an AWSMachine if its EC2 instance state changed.
func (r *AwsInstanceStateReconciler) processMessage(ctx context.Context, event instancestate.EventBridgeEvent) error {
instanceID := event.Detail.InstanceID
if event.Source == "aws.health" {
instanceID = event.Detail.AffectedEntities[0].EntityValue
}
// Fetch the awsMachine instance by InstanceID
awsMachines := &infrav1.AWSMachineList{}
err := r.List(ctx, awsMachines, client.MatchingFields{controllers.InstanceIDIndex: msg.MessageDetail.InstanceID})
err := r.List(ctx, awsMachines, client.MatchingFields{controllers.InstanceIDIndex: instanceID})

if err != nil {
r.Log.Error(err, "unable to list machines by instance ID", "instanceID", msg.MessageDetail.InstanceID)
r.Log.Error(err, "unable to list machines by instance ID", "instanceID", instanceID)
}

if len(awsMachines.Items) > 0 {
machine := awsMachines.Items[0]
if !machine.ObjectMeta.DeletionTimestamp.IsZero() {
return
return nil
}
patchHelper, err := patch.NewHelper(&machine, r.Client)
if err != nil {
r.Log.Error(err, "unable to create patch helper")
}
// Trigger an update on the machine
labels := machine.GetLabels()
if labels == nil {
labels = make(map[string]string)

labels, err := r.getMachineLabelOnEventType(machine, event)
if err != nil {
return err
}

labels[Ec2InstanceStateLabelKey] = string(msg.MessageDetail.State)
// Trigger an update on the machine
machine.SetLabels(labels)

err = patchHelper.Patch(ctx, &machine)
if err != nil {
r.Log.Error(err, "unable to patch AWS machine")
}
}
return nil
}

// getQueueURL retrieves the SQS queue URL for a given cluster.
Expand All @@ -228,6 +252,31 @@ func (r *AwsInstanceStateReconciler) getQueueURL(cluster *infrav1.AWSCluster) (s
return *resp.QueueUrl, nil
}

func (r *AwsInstanceStateReconciler) getMachineLabelOnEventType(machine infrav1.AWSMachine, event instancestate.EventBridgeEvent) (map[string]string, error) {
labels := machine.GetLabels()
if labels == nil {
labels = make(map[string]string)
}

switch event.Source {
case "aws.autoscaling":
labels[ASGInstanceStateLabelKey] = string(event.Detail.State)
case "aws.ec2":
labels[Ec2InstanceStateLabelKey] = string(event.Detail.State)
case "aws.health":
if event.Detail.Service != "EC2" {
return nil, fmt.Errorf("events from Amazon EventBridge for service (%s) are not supported", event.Detail.Service)
}
if event.Detail.EventTypeCategory != "scheduledChange" {
return nil, fmt.Errorf("events from Amazon EventBridge with EventTypeCategory (%s) are not supported", event.Detail.EventTypeCategory)
}
labels[EC2InstanceHealthStateLabelKey] = event.Detail.EventTypeCategory
default:
r.Log.V(4).Info("event type from Amazon EventBridge is not supported", "event", event.Source)
}
return labels, nil
}

func queueNotFoundError(err error) bool {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
Expand All @@ -241,14 +290,3 @@ type queueParams struct {
region string
URL string
}

type message struct {
Source string `json:"source"`
DetailType string `json:"detail-type,omitempty"`
MessageDetail *messageDetail `json:"detail,omitempty"`
}

type messageDetail struct {
InstanceID string `json:"instance-id,omitempty"`
State infrav1.InstanceState `json:"state,omitempty"`
}
21 changes: 18 additions & 3 deletions exp/instancestate/awsinstancestate_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ func TestAWSInstanceStateController(t *testing.T) {
Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-2-url")}, nil)
sqsSvs.EXPECT().GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String("aws-cluster-3-queue")}).AnyTimes().
Return(&sqs.GetQueueUrlOutput{QueueUrl: aws.String("aws-cluster-3-url")}, nil)
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-1-url")}).AnyTimes().
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: aws.String("aws-cluster-1-url"),
VisibilityTimeout: aws.Int64(20),
WaitTimeSeconds: aws.Int64(10),
}).AnyTimes().
DoAndReturn(func(arg *sqs.ReceiveMessageInput) (*sqs.ReceiveMessageOutput, error) {
m := &infrav1.AWSMachine{}
lookupKey := types.NamespacedName{
Expand All @@ -85,9 +90,19 @@ func TestAWSInstanceStateController(t *testing.T) {
return &sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil
})

sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-2-url")}).AnyTimes().
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: aws.String("aws-cluster-2-url"),
VisibilityTimeout: aws.Int64(20),
WaitTimeSeconds: aws.Int64(10),
}).AnyTimes().
Return(&sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil)
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String("aws-cluster-3-url")}).AnyTimes().
sqsSvs.EXPECT().ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: aws.String("aws-cluster-3-url"),
VisibilityTimeout: aws.Int64(20),
WaitTimeSeconds: aws.Int64(10),
}).AnyTimes().
Return(&sqs.ReceiveMessageOutput{Messages: []*sqs.Message{}}, nil)
sqsSvs.EXPECT().DeleteMessage(&sqs.DeleteMessageInput{QueueUrl: aws.String("aws-cluster-1-url"), ReceiptHandle: aws.String("message-receipt-handle")}).AnyTimes().
Return(nil, nil)
Expand Down
92 changes: 92 additions & 0 deletions pkg/cloud/services/instancestate/eventbridge_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package instancestate

import (
"reflect"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"

infrav1 "sigs.k8s.io/cluster-api-provider-aws/v2/api/v1beta2"
)

// QueueEventMapping stores the mapping of queueURL with all the events and its related details
// necessary to act upon based on the event type.
var QueueEventMapping sync.Map

// EventDetails represents all the necessary details present in the SQS message.
type EventDetails struct {
EventBridgeEvent EventBridgeEvent
Message *sqs.Message
}

// EventBridgeEvent is a structure to hold generic event details from Amazon EventBridge.
type EventBridgeEvent struct {
Version string `json:"version"`
ID string `json:"id"`
DetailType string `json:"detail-type"`
Source string `json:"source"`
Account string `json:"account"`
Time string `json:"time"`
Region string `json:"region"`
Resources []string `json:"resources"`
Detail *messageDetail `json:"detail"`
}

// messageDetail holds information on the affected instance/entity.
type messageDetail struct {
InstanceID string `json:"instance-id,omitempty"`
State infrav1.InstanceState `json:"state,omitempty"`
EventTypeCategory string `json:"eventTypeCategory,omitempty"`
Service string `json:"service,omitempty"`
AffectedEntities []affectedEntity `json:"affectedEntities,omitempty"`
}

// affectedEntity holds information about an entity that is affected by a Health event.
type affectedEntity struct {
EntityValue string `json:"entityValue"`
}

// GetEventsFromSQS is a utility used across CAPA controllers in tandem with a watcher
// to get the events in SQS queue to be processed further by the controller.
func GetEventsFromSQS(queueURL string) []EventDetails {
var eventBridgeValues reflect.Value
if events, ok := QueueEventMapping.Load(queueURL); ok {
eventBridgeValues = reflect.ValueOf(events)
}

return eventBridgeValues.Interface().([]EventDetails)
}

// DeleteProcessedEvent deletes the event from queue if it's already processed by the awsinstancestate controller.
func (s Service) DeleteProcessedEvent(msg sqs.Message, queueURL string) error {
if _, ok := QueueEventMapping.Load(queueURL); ok {
QueueEventMapping.Delete(queueURL)
_, err := s.SQSClient.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
})

if err != nil {
return errors.Wrapf(err, "error deleting message, queueURL %v, messageReceiptHandle %v", queueURL, msg.ReceiptHandle)
}
}
return nil
}

0 comments on commit cc23260

Please sign in to comment.