Skip to content

Commit

Permalink
Added infinite loop to awsutils.Initmetadata to fetch new CIDRs every…
Browse files Browse the repository at this point in the history
… 30s, and update rule for running pods
  • Loading branch information
nithu0115 committed May 11, 2020
1 parent 5ca609d commit 355549c
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 64 deletions.
118 changes: 94 additions & 24 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -159,14 +160,14 @@ type APIs interface {
// EC2InstanceMetadataCache caches instance metadata
type EC2InstanceMetadataCache struct {
// metadata info
securityGroups []*string
securityGroups StringSet
subnetID string
cidrBlock string
localIPv4 string
instanceID string
instanceType string
vpcIPv4CIDR string
vpcIPv4CIDRs []*string
vpcIPv4CIDRs StringSet
primaryENI string
primaryENImac string
availabilityZone string
Expand Down Expand Up @@ -220,8 +221,35 @@ func prometheusRegister() {
}
}

//StringSet is a set of strings
type StringSet struct {
sync.RWMutex
data map[string]bool
}

func (ss *StringSet) add(value string) {
ss.Lock()
defer ss.Unlock()
if len(ss.data) == 0 {
ss.data = make(map[string]bool)
}
ss.data[value] = true
}

func (ss *StringSet) contains(value string) bool {
_, found := ss.data[value]
return found //true if it existed already
}

func (ss *StringSet) reset() {
ss.Lock()
defer ss.Unlock()
ss.data = make(map[string]bool)
}

// New creates an EC2InstanceMetadataCache
func New() (*EC2InstanceMetadataCache, error) {
ctx, _ := context.WithCancel(context.Background())
// Initializes prometheus metrics
prometheusRegister()

Expand All @@ -246,7 +274,7 @@ func New() (*EC2InstanceMetadataCache, error) {

ec2SVC := ec2wrapper.New(sess)
cache.ec2SVC = ec2SVC
err = cache.initWithEC2Metadata()
err = cache.initWithEC2Metadata(ctx)
if err != nil {
return nil, err
}
Expand All @@ -258,7 +286,7 @@ func New() (*EC2InstanceMetadataCache, error) {
}

// InitWithEC2metadata initializes the EC2InstanceMetadataCache with the data retrieved from EC2 metadata service
func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
func (cache *EC2InstanceMetadataCache) initWithEC2Metadata(ctx context.Context) error {
// retrieve availability-zone
az, err := cache.ec2Metadata.GetMetadata(metadataAZ)
if err != nil {
Expand Down Expand Up @@ -311,20 +339,6 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
return errors.Wrap(err, "get instance metadata: failed to find primary ENI")
}

// retrieve security groups
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service, %v", err)
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}
sgIDs := strings.Fields(metadataSGIDs)

for _, sgID := range sgIDs {
log.Debugf("Found security-group id: %s", sgID)
cache.securityGroups = append(cache.securityGroups, aws.String(sgID))
}

// retrieve sub-id
cache.subnetID, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSubnetID)
if err != nil {
Expand All @@ -343,19 +357,61 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
}
log.Debugf("Found vpc-ipv4-cidr-block: %s ", cache.vpcIPv4CIDR)

// retrieve vpc-ipv4-cidr-blocks
// initSGIDs retrieves security groups
err = cache.initSGIDs(mac)
if err != nil {
return err
}

// initVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
err = cache.initVPCIPv4CIDRs(mac)
if err != nil {
return err
}

// refresh security groups and VPC CIDR blocks in the background
go wait.Forever(func() { cache.initSGIDs(mac) }, 30*time.Second)
go wait.Forever(func() { cache.initVPCIPv4CIDRs(mac) }, 30*time.Second)

select {
case <-ctx.Done():
return nil
}
}

// initSGIDs retrieves security groups
func (cache *EC2InstanceMetadataCache) initSGIDs(mac string) error {
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}

sgIDs := strings.Fields(metadataSGIDs)
cache.securityGroups.reset()
for _, sgID := range sgIDs {
log.Debugf("Found security-group id: %s", sgID)
cache.securityGroups.add(sgID)
}
return nil
}

// initVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
func (cache *EC2InstanceMetadataCache) initVPCIPv4CIDRs(mac string) error {
metadataVPCIPv4CIDRs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidrs)

if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve vpc-ipv4-cidr-blocks from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-block data")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-blocks data")
}

vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs)

cache.vpcIPv4CIDRs.reset()
for _, vpcCIDR := range vpcIPv4CIDRs {
log.Debugf("Found VPC CIDR: %s", vpcCIDR)
cache.vpcIPv4CIDRs = append(cache.vpcIPv4CIDRs, aws.String(vpcCIDR))
cache.vpcIPv4CIDRs.add(vpcCIDR)
}
return nil
}
Expand Down Expand Up @@ -657,9 +713,15 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) {
// return ENI id, error
func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, subnet string) (string, error) {
eniDescription := eniDescriptionPrefix + cache.instanceID
var securityGroups []*string
for securityGroup := range cache.securityGroups.data {
//In go, the loop iterator variable is a single variable that takes different values in each loop iteration.
tmpVar := securityGroup
securityGroups = append(securityGroups, aws.String(tmpVar))
}
input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: cache.securityGroups,
Groups: securityGroups,
SubnetId: aws.String(cache.subnetID),
}

Expand Down Expand Up @@ -1225,7 +1287,15 @@ func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDR() string {

// GetVPCIPv4CIDRs returns VPC CIDRs
func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []*string {
return cache.vpcIPv4CIDRs
cache.vpcIPv4CIDRs.RLock()
defer cache.vpcIPv4CIDRs.RUnlock()
var vpcIPv4CIDRslice []*string
for vpcIPv4CIDR := range cache.vpcIPv4CIDRs.data{
//In go, the loop iterator variable is a single variable that takes different values in each loop iteration.
tmpVar := vpcIPv4CIDR
vpcIPv4CIDRslice = append(vpcIPv4CIDRslice, aws.String(tmpVar))
}
return vpcIPv4CIDRslice
}

// GetLocalIPv4 returns the primary IP address on the primary interface
Expand Down
Loading

0 comments on commit 355549c

Please sign in to comment.