Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-44698: Create AWS clients on every reconcile instead of at initialization #5179

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/go-logr/logr"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/route53"
Expand Down Expand Up @@ -40,7 +43,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

hyperv1 "github.com/openshift/hypershift/api/hypershift/v1beta1"
awsutil "github.com/openshift/hypershift/cmd/infra/aws/util"
"github.com/openshift/hypershift/control-plane-operator/controllers/hostedcontrolplane/manifests"
supportawsutil "github.com/openshift/hypershift/support/awsutil"
"github.com/openshift/hypershift/support/config"
Expand Down Expand Up @@ -187,12 +189,113 @@ const (
// the existence of AWS Endpoints for it in the guest cluster infrastructure.
type AWSEndpointServiceReconciler struct {
client.Client
ec2Client ec2iface.EC2API
route53Client route53iface.Route53API
upsert.CreateOrUpdateProvider
AssumeEndpointRoleARN *string
AssumeRoute53RoleARN *string
LocalZoneID *string
awsClientBuilder clientBuilder
}

type clientBuilder struct {
mu sync.Mutex
initialized bool
assumeEndpointRoleARN string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assumeSharedVPCEndpointRoleARN
assumeSharedVPCRoute53RoleARN
?

assumeRoute53RoleARN string
localZoneID string
}

func (b *clientBuilder) awsSession() (*session.Session, error) {
s, err := session.NewSession()
if err != nil {
return nil, fmt.Errorf("failed to create AWS session: %w", err)
}
s.Handlers.Build.PushBackNamed(request.NamedHandler{
Name: "openshift.io/hypershift",
Fn: request.MakeAddToUserAgentHandler("openshift.io hypershift", "control-plane-operator"),
})
return s, nil
}

func (b *clientBuilder) getClients() (ec2iface.EC2API, route53iface.Route53API, error) {
b.mu.Lock()
defer b.mu.Unlock()

if !b.initialized {
return nil, nil, errors.New("clients not initialized")
}

// AWS_SHARED_CREDENTIALS_FILE and AWS_REGION envvar should be set in operator deployment
awsEndpointSession, err := b.awsSession()
if err != nil {
return nil, nil, err
}
if b.assumeEndpointRoleARN != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

// When sharedVPC we need assume these additional roles

?

awsEndpointSession.Config.WithCredentials(stscreds.NewCredentials(awsEndpointSession, b.assumeEndpointRoleARN))
}
awsRoute53Session, err := b.awsSession()
if err != nil {
return nil, nil, err
}
if b.assumeRoute53RoleARN != "" {
awsRoute53Session.Config.WithCredentials(stscreds.NewCredentials(awsRoute53Session, b.assumeRoute53RoleARN))
}
awsConfig := aws.NewConfig()
ec2Client := ec2.New(awsEndpointSession, awsConfig)
route53Config := aws.NewConfig()
// Hardcode region for route53 config
route53Config.Region = aws.String("us-east-1")
route53Client := route53.New(awsRoute53Session, route53Config)

return ec2Client, route53Client, nil
}

func (b *clientBuilder) localHostedZoneID() string {
b.mu.Lock()
defer b.mu.Unlock()

return b.localZoneID
}

func (b *clientBuilder) initializeWithHCP(log logr.Logger, hcp *hyperv1.HostedControlPlane) {
b.mu.Lock()
defer b.mu.Unlock()

if !b.initialized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make any difference if this func is just

b.warnOnDifferentValues(log, hcp)
b.setFromHCP(hcp)
b.initialized = true

without the conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd warn unnecessarily when initially setting the values

b.setFromHCP(hcp)
b.initialized = true
} else {
b.warnOnDifferentValues(log, hcp)
b.setFromHCP(hcp)
}
}

func (b *clientBuilder) warnOnDifferentValues(log logr.Logger, hcp *hyperv1.HostedControlPlane) {
newEndpointRoleARN := ""
newRoute53RoleARN := ""
newLocalZoneID := ""
if hcp.Spec.Platform.AWS != nil && hcp.Spec.Platform.AWS.SharedVPC != nil {
newEndpointRoleARN = hcp.Spec.Platform.AWS.SharedVPC.RolesRef.ControlPlaneARN
newRoute53RoleARN = hcp.Spec.Platform.AWS.SharedVPC.RolesRef.IngressARN
newLocalZoneID = hcp.Spec.Platform.AWS.SharedVPC.LocalZoneID
}
if b.assumeEndpointRoleARN != newEndpointRoleARN {
log.Info("WARNING: Setting different value for the endpoint role ARN", "previous", b.assumeEndpointRoleARN, "new", newEndpointRoleARN)
}
if b.assumeRoute53RoleARN != newRoute53RoleARN {
log.Info("WARNING: Setting different value for the route53 role ARN", "previous", b.assumeRoute53RoleARN, "new", newRoute53RoleARN)
}
if b.localZoneID != newLocalZoneID {
log.Info("WARNING: Setting different value for local zone ID", "previous", b.localZoneID, "new", newLocalZoneID)
}
}

func (b *clientBuilder) setFromHCP(hcp *hyperv1.HostedControlPlane) {
if hcp.Spec.Platform.AWS != nil && hcp.Spec.Platform.AWS.SharedVPC != nil {
b.assumeEndpointRoleARN = hcp.Spec.Platform.AWS.SharedVPC.RolesRef.ControlPlaneARN
b.assumeRoute53RoleARN = hcp.Spec.Platform.AWS.SharedVPC.RolesRef.IngressARN
b.localZoneID = hcp.Spec.Platform.AWS.SharedVPC.LocalZoneID
} else {
b.assumeEndpointRoleARN = ""
b.assumeRoute53RoleARN = ""
b.localZoneID = ""
}
}

func (r *AWSEndpointServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand All @@ -209,23 +312,6 @@ func (r *AWSEndpointServiceReconciler) SetupWithManager(mgr ctrl.Manager) error
}
r.Client = mgr.GetClient()

// AWS_SHARED_CREDENTIALS_FILE and AWS_REGION envvar should be set in operator deployment
awsEndpointSession := awsutil.NewSession("control-plane-operator", "", "", "", "")
if r.AssumeEndpointRoleARN != nil {
awsEndpointSession.Config.WithCredentials(stscreds.NewCredentials(awsEndpointSession, *r.AssumeEndpointRoleARN))
}
awsRoute53Session := awsutil.NewSession("control-plane-operator", "", "", "", "")
if r.AssumeRoute53RoleARN != nil {
awsRoute53Session.Config.WithCredentials(stscreds.NewCredentials(awsRoute53Session, *r.AssumeRoute53RoleARN))
}

awsConfig := aws.NewConfig()
r.ec2Client = ec2.New(awsEndpointSession, awsConfig)
route53Config := aws.NewConfig()
// Hardcode region for route53 config
route53Config.Region = aws.String("us-east-1")
r.route53Client = route53.New(awsRoute53Session, route53Config)

return nil
}

Expand Down Expand Up @@ -287,12 +373,18 @@ func (r *AWSEndpointServiceReconciler) Reconcile(ctx context.Context, req ctrl.R
// If we previously removed our finalizer, don't delete again and return early
return ctrl.Result{}, nil
}
completed, err := r.delete(ctx, awsEndpointService, r.ec2Client, r.route53Client)

ec2Client, route53Client, err := r.awsClientBuilder.getClients()
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to delete resource: %w", err)
}
if !completed {
return ctrl.Result{RequeueAfter: endpointServiceDeletionRequeueDuration}, nil
log.Error(err, "failed to get AWS client, skipping aws endpoint service cleanup")
} else {
completed, err := r.delete(ctx, awsEndpointService, ec2Client, route53Client)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to delete resource: %w", err)
}
if !completed {
return ctrl.Result{RequeueAfter: endpointServiceDeletionRequeueDuration}, nil
}
}
if controllerutil.ContainsFinalizer(awsEndpointService, finalizer) {
controllerutil.RemoveFinalizer(awsEndpointService, finalizer)
Expand Down Expand Up @@ -340,9 +432,15 @@ func (r *AWSEndpointServiceReconciler) Reconcile(ctx context.Context, req ctrl.R
return ctrl.Result{RequeueAfter: duration}, nil
}

r.awsClientBuilder.initializeWithHCP(log, hcp)
ec2Client, route53Client, err := r.awsClientBuilder.getClients()
if err != nil {
return ctrl.Result{}, err
}

// Reconcile the AWSEndpointService
oldStatus := awsEndpointService.Status.DeepCopy()
if err := r.reconcileAWSEndpointService(ctx, awsEndpointService, hcp, r.ec2Client, r.route53Client); err != nil {
if err := r.reconcileAWSEndpointService(ctx, awsEndpointService, hcp, ec2Client, route53Client); err != nil {
meta.SetStatusCondition(&awsEndpointService.Status.Conditions, metav1.Condition{
Type: string(hyperv1.AWSEndpointAvailable),
Status: metav1.ConditionFalse,
Expand Down Expand Up @@ -421,7 +519,7 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C
return nil
}

if err := r.reconcileAWSEndpointSecurityGroup(ctx, awsEndpointService, hcp); err != nil {
if err := r.reconcileAWSEndpointSecurityGroup(ctx, ec2Client, awsEndpointService, hcp); err != nil {
return err
}

Expand All @@ -433,6 +531,7 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C
VpcEndpointIds: []*string{aws.String(endpointID)},
})
if err != nil {
log.Error(err, "failed to describe vpc endpoint", "endpointID", endpointID)
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "InvalidVpcEndpointId.NotFound" {
// clear the EndpointID so a new Endpoint is created on the requeue
Expand All @@ -450,6 +549,7 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C
if _, err := ec2Client.DeleteVpcEndpointsWithContext(ctx, &ec2.DeleteVpcEndpointsInput{
VpcEndpointIds: []*string{output.VpcEndpoints[0].VpcEndpointId},
}); err != nil {
log.Error(err, "failed to delete vpc endpoint", "id", output.VpcEndpoints[0].VpcEndpointId)
return fmt.Errorf("error deleting AWSEndpoint: %w", err)
}

Expand Down Expand Up @@ -485,6 +585,7 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C
AddSecurityGroupIds: addedSG,
})
if err != nil {
log.Error(err, "failed to modify vpc endpoint", "id", endpointID, "addSubnets", addedSubnet, "removeSubnets", removedSubnet, "addSG", addedSG)
msg := err.Error()
if awsErr, ok := err.(awserr.Error); ok {
msg = awsErr.Code()
Expand Down Expand Up @@ -521,6 +622,7 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C
if _, err := ec2Client.DeleteVpcEndpointsWithContext(ctx, &ec2.DeleteVpcEndpointsInput{
VpcEndpointIds: []*string{output.VpcEndpoints[0].VpcEndpointId},
}); err != nil {
log.Error(err, "failed to delete vpc endpoint", "id", output.VpcEndpoints[0].VpcEndpointId)
return fmt.Errorf("error deleting AWSEndpoint: %w", err)
}

Expand Down Expand Up @@ -585,13 +687,13 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C

zoneName := zoneName(hcp.Name)
var zoneID string
if r.LocalZoneID == nil {
if r.awsClientBuilder.localHostedZoneID() == "" {
zoneID, err = lookupZoneID(ctx, route53Client, zoneName)
if err != nil {
return err
}
} else {
zoneID = *r.LocalZoneID
zoneID = r.awsClientBuilder.localHostedZoneID()
}

var fqdns []string
Expand Down Expand Up @@ -654,14 +756,15 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointService(ctx context.C
return nil
}

func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointSecurityGroup(ctx context.Context, awsEndpointService *hyperv1.AWSEndpointService, hcp *hyperv1.HostedControlPlane) error {
func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointSecurityGroup(ctx context.Context, ec2Client ec2iface.EC2API, awsEndpointService *hyperv1.AWSEndpointService, hcp *hyperv1.HostedControlPlane) error {
var sgID string
var sg *ec2.SecurityGroup

log := ctrl.LoggerFrom(ctx)
var err error
sg, err = supportawsutil.GetSecurityGroup(r.ec2Client, vpcEndpointSecurityGroupFilter(hcp.Spec.InfraID, awsEndpointService.Name))
sg, err = supportawsutil.GetSecurityGroup(ec2Client, vpcEndpointSecurityGroupFilter(hcp.Spec.InfraID, awsEndpointService.Name))
if err != nil {
log.Error(err, "failed to get security group for endpoint", "infraID", hcp.Spec.InfraID, "name", awsEndpointService.Name)
return err
}
if sg != nil {
Expand All @@ -673,7 +776,7 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointSecurityGroup(ctx con
}
if sgID == "" {
var err error
if sg, err = r.createSecurityGroup(ctx, awsEndpointService, hcp); err != nil {
if sg, err = r.createSecurityGroup(ctx, ec2Client, awsEndpointService, hcp); err != nil {
return err
}
sgID = aws.StringValue(sg.GroupId)
Expand All @@ -686,10 +789,11 @@ func (r *AWSEndpointServiceReconciler) reconcileAWSEndpointSecurityGroup(ctx con
ingressPermissions := supportawsutil.VPCEndpointSecurityGroupRules(machineCIDRs, vpcEndpointPort(awsEndpointService))
missingPermissions := diffPermissions(sg.IpPermissions, ingressPermissions)
if len(missingPermissions) > 0 {
if _, err = r.ec2Client.AuthorizeSecurityGroupIngressWithContext(ctx, &ec2.AuthorizeSecurityGroupIngressInput{
if _, err = ec2Client.AuthorizeSecurityGroupIngressWithContext(ctx, &ec2.AuthorizeSecurityGroupIngressInput{
GroupId: aws.String(sgID),
IpPermissions: ingressPermissions,
}); err != nil {
log.Error(err, "failed to sect security group ingress rules", "id", sgID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo sect?
Why are we unconditionally logging here then right below we are logging again via

			if supportawsutil.AWSErrorCode(err) != "InvalidPermission.Duplicate" {
				return fmt.Errorf("failed to set security group ingress rules, code: %s", supportawsutil.AWSErrorCode(err))
			}
			log.Info("WARNING: got duplicate permissions error when setting security group ingress permissions", "sgID", sgID)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo sect?
yes

Will also fix the unconditional logging

if supportawsutil.AWSErrorCode(err) != "InvalidPermission.Duplicate" {
return fmt.Errorf("failed to set security group ingress rules, code: %s", supportawsutil.AWSErrorCode(err))
}
Expand All @@ -711,7 +815,7 @@ func vpcEndpointPort(awsEndpointService *hyperv1.AWSEndpointService) int64 {
}
}

func (r *AWSEndpointServiceReconciler) createSecurityGroup(ctx context.Context, awsEndpointService *hyperv1.AWSEndpointService, hcp *hyperv1.HostedControlPlane) (*ec2.SecurityGroup, error) {
func (r *AWSEndpointServiceReconciler) createSecurityGroup(ctx context.Context, ec2Client ec2iface.EC2API, awsEndpointService *hyperv1.AWSEndpointService, hcp *hyperv1.HostedControlPlane) (*ec2.SecurityGroup, error) {
log := ctrl.LoggerFrom(ctx)
tagKeys := sets.NewString()
var tags []*ec2.Tag
Expand All @@ -736,7 +840,7 @@ func (r *AWSEndpointServiceReconciler) createSecurityGroup(ctx context.Context,
Value: aws.String(name),
})
}
createSGResult, err := r.ec2Client.CreateSecurityGroup(&ec2.CreateSecurityGroupInput{
createSGResult, err := ec2Client.CreateSecurityGroup(&ec2.CreateSecurityGroupInput{
GroupName: aws.String(name),
Description: aws.String("VPC endpoint security group"),
VpcId: aws.String(hcp.Spec.Platform.AWS.CloudProviderConfig.VPC),
Expand All @@ -748,6 +852,7 @@ func (r *AWSEndpointServiceReconciler) createSecurityGroup(ctx context.Context,
},
})
if err != nil {
log.Error(err, "failed to create security group for aws endpoint", "name", name, "vpc", hcp.Spec.Platform.AWS.CloudProviderConfig.VPC)
return nil, fmt.Errorf("failed to create security group, code: %s", supportawsutil.AWSErrorCode(err))
}
sgID := aws.StringValue(createSGResult.GroupId)
Expand All @@ -756,14 +861,17 @@ func (r *AWSEndpointServiceReconciler) createSecurityGroup(ctx context.Context,
describeSGInput := &ec2.DescribeSecurityGroupsInput{
GroupIds: []*string{aws.String(sgID)},
}
if err = r.ec2Client.WaitUntilSecurityGroupExistsWithContext(ctx, describeSGInput); err != nil {
if err = ec2Client.WaitUntilSecurityGroupExistsWithContext(ctx, describeSGInput); err != nil {
log.Error(err, "failed to wait for security group to exist", "id", sgID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we logging in addition to the returned error and with different message?

return nil, fmt.Errorf("failed to find created security group (id: %s), code: %s", sgID, supportawsutil.AWSErrorCode(err))
}
sg, err := supportawsutil.GetSecurityGroupById(r.ec2Client, sgID)
sg, err := supportawsutil.GetSecurityGroupById(ec2Client, sgID)
if err != nil {
log.Error(err, "failed to fetch security group by ID", "id", sgID)
return nil, err
}
if sg == nil {
log.Error(errors.New("security group not found"), "id", sgID)
return nil, fmt.Errorf("failed to fetch security group (id: %s)", sgID)
}
log.Info("created security group", "id", sgID)
Expand Down Expand Up @@ -902,7 +1010,7 @@ func (r *AWSEndpointServiceReconciler) delete(ctx context.Context, awsEndpointSe
}

if awsEndpointService.Status.SecurityGroupID != "" {
if err := r.deleteSecurityGroup(ctx, awsEndpointService.Status.SecurityGroupID); err != nil {
if err := r.deleteSecurityGroup(ctx, ec2Client, awsEndpointService.Status.SecurityGroupID); err != nil {
return false, err
}
log.Info("security group deleted", "id", awsEndpointService.Status.SecurityGroupID)
Expand Down Expand Up @@ -938,9 +1046,9 @@ func (r *AWSEndpointServiceReconciler) delete(ctx context.Context, awsEndpointSe
return true, nil
}

func (r *AWSEndpointServiceReconciler) deleteSecurityGroup(ctx context.Context, sgID string) error {
func (r *AWSEndpointServiceReconciler) deleteSecurityGroup(ctx context.Context, ec2Client ec2iface.EC2API, sgID string) error {
log := ctrl.LoggerFrom(ctx)
describeSGResult, err := r.ec2Client.DescribeSecurityGroupsWithContext(ctx, &ec2.DescribeSecurityGroupsInput{GroupIds: []*string{aws.String(sgID)}})
describeSGResult, err := ec2Client.DescribeSecurityGroupsWithContext(ctx, &ec2.DescribeSecurityGroupsInput{GroupIds: []*string{aws.String(sgID)}})
if err != nil {
if supportawsutil.AWSErrorCode(err) == "InvalidGroup.NotFound" {
return nil
Expand All @@ -953,7 +1061,7 @@ func (r *AWSEndpointServiceReconciler) deleteSecurityGroup(ctx context.Context,
sg := describeSGResult.SecurityGroups[0]

if len(sg.IpPermissions) > 0 {
if _, err = r.ec2Client.RevokeSecurityGroupIngressWithContext(ctx, &ec2.RevokeSecurityGroupIngressInput{
if _, err = ec2Client.RevokeSecurityGroupIngressWithContext(ctx, &ec2.RevokeSecurityGroupIngressInput{
GroupId: sg.GroupId,
IpPermissions: sg.IpPermissions,
}); err != nil {
Expand All @@ -964,7 +1072,7 @@ func (r *AWSEndpointServiceReconciler) deleteSecurityGroup(ctx context.Context,
}

if len(sg.IpPermissionsEgress) > 0 {
if _, err = r.ec2Client.RevokeSecurityGroupEgressWithContext(ctx, &ec2.RevokeSecurityGroupEgressInput{
if _, err = ec2Client.RevokeSecurityGroupEgressWithContext(ctx, &ec2.RevokeSecurityGroupEgressInput{
GroupId: sg.GroupId,
IpPermissions: sg.IpPermissionsEgress,
}); err != nil {
Expand All @@ -973,7 +1081,7 @@ func (r *AWSEndpointServiceReconciler) deleteSecurityGroup(ctx context.Context,
}
}

if _, err = r.ec2Client.DeleteSecurityGroupWithContext(ctx, &ec2.DeleteSecurityGroupInput{
if _, err = ec2Client.DeleteSecurityGroupWithContext(ctx, &ec2.DeleteSecurityGroupInput{
GroupId: sg.GroupId,
}); err != nil {
log.Error(err, "failed to delete security group", "SecurityGroupID", aws.StringValue(sg.GroupId), "code", supportawsutil.AWSErrorCode(err))
Expand Down
Loading