Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refresh subnet/CIDR information periodically #903

Merged
merged 2 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ require (
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
google.golang.org/grpc v1.29.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.7 // indirect
Expand Down
148 changes: 123 additions & 25 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -163,14 +164,14 @@ type APIs interface {
// EC2InstanceMetadataCache caches instance metadata
type EC2InstanceMetadataCache struct {
// metadata info
securityGroups []*string
securityGroups StringSet
subnetID string
cidrBlock string
localIPv4 string
instanceID string
instanceType string
vpcIPv4CIDR string
vpcIPv4CIDRs []*string
vpcIPv4CIDRs StringSet
primaryENI string
primaryENImac string
availabilityZone string
Expand Down Expand Up @@ -224,8 +225,47 @@ func prometheusRegister() {
}
}

//StringSet is a set of strings
type StringSet struct {
sync.RWMutex
data sets.String
}

func (ss *StringSet) AWSStrings() []*string {
ss.RLock()
defer ss.RUnlock()
var dataSlice []*string
for key, _ := range ss.data {
dataSlice = append(dataSlice, aws.String(key))
}
return dataSlice
}

func (ss *StringSet) Set(items []string) {
ss.Lock()
defer ss.Unlock()
ss.data = sets.NewString(items...)
}

func (ss *StringSet) IsEmpty() bool {
if ss.data.Len() == 0 {
nithu0115 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
return false
}

func (ss *StringSet) Difference (other *StringSet) *StringSet {
ss.RLock()
defer ss.RUnlock()
//example: s1 = {a1, a2, a3} s2 = {a1, a2, a4, a5} s1.Difference(s2) = {a3} s2.Difference(s1) = {a4, a5}
return &StringSet{data: ss.data.Difference(other.data)}
}

// New creates an EC2InstanceMetadataCache
func New() (*EC2InstanceMetadataCache, error) {
//ctx is passed to initWithEC2Metadata func to cancel spawned go-routines when tests are run
ctx := context.Background()

// Initializes prometheus metrics
prometheusRegister()

Expand All @@ -250,7 +290,7 @@ func New() (*EC2InstanceMetadataCache, error) {

ec2SVC := ec2wrapper.New(sess)
cache.ec2SVC = ec2SVC
err = cache.initWithEC2Metadata()
err = cache.initWithEC2Metadata(ctx)
if err != nil {
return nil, err
}
Expand All @@ -262,7 +302,7 @@ func New() (*EC2InstanceMetadataCache, error) {
}

// InitWithEC2metadata initializes the EC2InstanceMetadataCache with the data retrieved from EC2 metadata service
func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
func (cache *EC2InstanceMetadataCache) initWithEC2Metadata(ctx context.Context) error {
// retrieve availability-zone
az, err := cache.ec2Metadata.GetMetadata(metadataAZ)
if err != nil {
Expand Down Expand Up @@ -315,20 +355,6 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
return errors.Wrap(err, "get instance metadata: failed to find primary ENI")
}

// retrieve security groups
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service, %v", err)
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}
sgIDs := strings.Fields(metadataSGIDs)

for _, sgID := range sgIDs {
log.Debugf("Found security-group id: %s", sgID)
cache.securityGroups = append(cache.securityGroups, aws.String(sgID))
}

// retrieve sub-id
cache.subnetID, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSubnetID)
if err != nil {
Expand All @@ -347,20 +373,92 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
}
log.Debugf("Found vpc-ipv4-cidr-block: %s ", cache.vpcIPv4CIDR)

// retrieve vpc-ipv4-cidr-blocks
// initSGIDs retrieves security groups
nithu0115 marked this conversation as resolved.
Show resolved Hide resolved
err = cache.refreshSGIDs(mac)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not invoked for CustomNetworking right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it is not!

if err != nil {
return err
}

// initVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
nithu0115 marked this conversation as resolved.
Show resolved Hide resolved
err = cache.refreshVPCIPv4CIDRs(mac)
if err != nil {
return err
}

// refresh security groups and VPC CIDR blocks in the background
// Ignoring errors since we will retry in 30s
go wait.Forever(func() { _ = cache.refreshSGIDs(mac) }, 30*time.Second)
go wait.Forever(func() { _ = cache.refreshVPCIPv4CIDRs(mac) }, 30*time.Second)
nithu0115 marked this conversation as resolved.
Show resolved Hide resolved

// We use the ctx here for testing, since we spawn go-routines above which will run forever.
select {
case <-ctx.Done():
return nil
default:
}

return nil
}

// refreshSGIDs retrieves security groups
func (cache *EC2InstanceMetadataCache) refreshSGIDs(mac string) error {
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}

sgIDs := strings.Fields(metadataSGIDs)

newSGs := StringSet{}
newSGs.Set(sgIDs)
addedSGs := newSGs.Difference(&cache.securityGroups)
deletedSGs := cache.securityGroups.Difference(&newSGs)

if !addedSGs.IsEmpty() {
for _, sg := range addedSGs.AWSStrings() {
log.Infof("Found %s, added to ipamd cache", *sg)
}
}
if !deletedSGs.IsEmpty() {
for _, sg := range deletedSGs.AWSStrings() {
log.Infof("Removed %s from ipamd cache", *sg)
}
}
cache.securityGroups.Set(sgIDs)

return nil
}

// refreshVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
func (cache *EC2InstanceMetadataCache) refreshVPCIPv4CIDRs(mac string) error {
metadataVPCIPv4CIDRs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidrs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve vpc-ipv4-cidr-blocks from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-block data")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-blocks data")
}

vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs)

for _, vpcCIDR := range vpcIPv4CIDRs {
log.Debugf("Found VPC CIDR: %s", vpcCIDR)
cache.vpcIPv4CIDRs = append(cache.vpcIPv4CIDRs, aws.String(vpcCIDR))
newVpcIPv4CIDRs := StringSet{}
newVpcIPv4CIDRs.Set(vpcIPv4CIDRs)
addedVpcIPv4CIDRs := newVpcIPv4CIDRs.Difference(&cache.securityGroups)
deletedVpcIPv4CIDRs := cache.securityGroups.Difference(&newVpcIPv4CIDRs)

if !addedVpcIPv4CIDRs.IsEmpty() {
for _, vpcIPv4CIDR := range addedVpcIPv4CIDRs.AWSStrings() {
log.Infof("Found %s, added to ipamd cache", *vpcIPv4CIDR)
}
}
if !deletedVpcIPv4CIDRs.IsEmpty() {
for _, vpcIPv4CIDR := range deletedVpcIPv4CIDRs.AWSStrings() {
log.Infof("Removed %s from ipamd cache", *vpcIPv4CIDR)
}
}
cache.vpcIPv4CIDRs.Set(vpcIPv4CIDRs)

return nil
}

Expand Down Expand Up @@ -663,7 +761,7 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
eniDescription := eniDescriptionPrefix + cache.instanceID
input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: cache.securityGroups,
Groups: cache.securityGroups.AWSStrings(),
SubnetId: aws.String(cache.subnetID),
}

Expand Down Expand Up @@ -1270,7 +1368,7 @@ func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDR() string {

// GetVPCIPv4CIDRs returns VPC CIDRs
func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []*string {
return cache.vpcIPv4CIDRs
return cache.vpcIPv4CIDRs.AWSStrings()
}

// GetLocalIPv4 returns the primary IP address on the primary interface
Expand Down
Loading