Skip to content

Commit

Permalink
refresh subnet/CIDR information every 30 seconds and update ip rules …
Browse files Browse the repository at this point in the history
…to map pods
  • Loading branch information
nithu0115 committed Jun 18, 2020
1 parent 14d5135 commit 01c2d5d
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 57 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ require (
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect
google.golang.org/grpc v1.29.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.7 // indirect
Expand Down
148 changes: 123 additions & 25 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -163,14 +164,14 @@ type APIs interface {
// EC2InstanceMetadataCache caches instance metadata
type EC2InstanceMetadataCache struct {
// metadata info
securityGroups []*string
securityGroups StringSet
subnetID string
cidrBlock string
localIPv4 string
instanceID string
instanceType string
vpcIPv4CIDR string
vpcIPv4CIDRs []*string
vpcIPv4CIDRs StringSet
primaryENI string
primaryENImac string
availabilityZone string
Expand Down Expand Up @@ -224,8 +225,47 @@ func prometheusRegister() {
}
}

//StringSet is a set of strings
type StringSet struct {
sync.RWMutex
data sets.String
}

func (ss *StringSet) AWSStrings() []*string {
ss.RLock()
defer ss.RUnlock()
var dataSlice []*string
for key, _ := range ss.data {
dataSlice = append(dataSlice, aws.String(key))
}
return dataSlice
}

func (ss *StringSet) Set(items []string) {
ss.Lock()
defer ss.Unlock()
ss.data = sets.NewString(items...)
}

func (ss *StringSet) IsEmpty() bool {
if ss.data.Len() == 0 {
return true
}
return false
}

func (ss *StringSet) Difference (other *StringSet) *StringSet {
ss.RLock()
defer ss.RUnlock()
//example: s1 = {a1, a2, a3} s2 = {a1, a2, a4, a5} s1.Difference(s2) = {a3} s2.Difference(s1) = {a4, a5}
return &StringSet{data: ss.data.Difference(other.data)}
}

// New creates an EC2InstanceMetadataCache
func New() (*EC2InstanceMetadataCache, error) {
//ctx is passed to initWithEC2Metadata func to cancel spawned go-routines when tests are run
ctx := context.Background()

// Initializes prometheus metrics
prometheusRegister()

Expand All @@ -250,7 +290,7 @@ func New() (*EC2InstanceMetadataCache, error) {

ec2SVC := ec2wrapper.New(sess)
cache.ec2SVC = ec2SVC
err = cache.initWithEC2Metadata()
err = cache.initWithEC2Metadata(ctx)
if err != nil {
return nil, err
}
Expand All @@ -262,7 +302,7 @@ func New() (*EC2InstanceMetadataCache, error) {
}

// InitWithEC2metadata initializes the EC2InstanceMetadataCache with the data retrieved from EC2 metadata service
func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
func (cache *EC2InstanceMetadataCache) initWithEC2Metadata(ctx context.Context) error {
// retrieve availability-zone
az, err := cache.ec2Metadata.GetMetadata(metadataAZ)
if err != nil {
Expand Down Expand Up @@ -315,20 +355,6 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
return errors.Wrap(err, "get instance metadata: failed to find primary ENI")
}

// retrieve security groups
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service, %v", err)
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}
sgIDs := strings.Fields(metadataSGIDs)

for _, sgID := range sgIDs {
log.Debugf("Found security-group id: %s", sgID)
cache.securityGroups = append(cache.securityGroups, aws.String(sgID))
}

// retrieve sub-id
cache.subnetID, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSubnetID)
if err != nil {
Expand All @@ -347,20 +373,92 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
}
log.Debugf("Found vpc-ipv4-cidr-block: %s ", cache.vpcIPv4CIDR)

// retrieve vpc-ipv4-cidr-blocks
// initSGIDs retrieves security groups
err = cache.refreshSGIDs(mac)
if err != nil {
return err
}

// initVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
err = cache.refreshVPCIPv4CIDRs(mac)
if err != nil {
return err
}

// refresh security groups and VPC CIDR blocks in the background
// Ignoring errors since we will retry in 30s
go wait.Forever(func() { _ = cache.refreshSGIDs(mac) }, 30*time.Second)
go wait.Forever(func() { _ = cache.refreshVPCIPv4CIDRs(mac) }, 30*time.Second)

// We use the ctx here for testing, since we spawn go-routines above which will run forever.
select {
case <-ctx.Done():
return nil
default:
}

return nil
}

// refreshSGIDs retrieves security groups
func (cache *EC2InstanceMetadataCache) refreshSGIDs(mac string) error {
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}

sgIDs := strings.Fields(metadataSGIDs)

newSGs := StringSet{}
newSGs.Set(sgIDs)
addedSGs := newSGs.Difference(&cache.securityGroups)
deletedSGs := cache.securityGroups.Difference(&newSGs)

if !addedSGs.IsEmpty() {
for _, sg := range addedSGs.AWSStrings() {
log.Infof("Found %s, added to ipamd cache", *sg)
}
}
if !deletedSGs.IsEmpty() {
for _, sg := range deletedSGs.AWSStrings() {
log.Infof("Removed %s from ipamd cache", *sg)
}
}
cache.securityGroups.Set(sgIDs)

return nil
}

// refreshVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
func (cache *EC2InstanceMetadataCache) refreshVPCIPv4CIDRs(mac string) error {
metadataVPCIPv4CIDRs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidrs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve vpc-ipv4-cidr-blocks from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-block data")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-blocks data")
}

vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs)

for _, vpcCIDR := range vpcIPv4CIDRs {
log.Debugf("Found VPC CIDR: %s", vpcCIDR)
cache.vpcIPv4CIDRs = append(cache.vpcIPv4CIDRs, aws.String(vpcCIDR))
newVpcIPv4CIDRs := StringSet{}
newVpcIPv4CIDRs.Set(vpcIPv4CIDRs)
addedVpcIPv4CIDRs := newVpcIPv4CIDRs.Difference(&cache.securityGroups)
deletedVpcIPv4CIDRs := cache.securityGroups.Difference(&newVpcIPv4CIDRs)

if !addedVpcIPv4CIDRs.IsEmpty() {
for _, vpcIPv4CIDR := range addedVpcIPv4CIDRs.AWSStrings() {
log.Infof("Found %s, added to ipamd cache", *vpcIPv4CIDR)
}
}
if !deletedVpcIPv4CIDRs.IsEmpty() {
for _, vpcIPv4CIDR := range deletedVpcIPv4CIDRs.AWSStrings() {
log.Infof("Removed %s from ipamd cache", *vpcIPv4CIDR)
}
}
cache.vpcIPv4CIDRs.Set(vpcIPv4CIDRs)

return nil
}

Expand Down Expand Up @@ -663,7 +761,7 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
eniDescription := eniDescriptionPrefix + cache.instanceID
input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: cache.securityGroups,
Groups: cache.securityGroups.AWSStrings(),
SubnetId: aws.String(cache.subnetID),
}

Expand Down Expand Up @@ -1270,7 +1368,7 @@ func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDR() string {

// GetVPCIPv4CIDRs returns VPC CIDRs
func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []*string {
return cache.vpcIPv4CIDRs
return cache.vpcIPv4CIDRs.AWSStrings()
}

// GetLocalIPv4 returns the primary IP address on the primary interface
Expand Down
Loading

0 comments on commit 01c2d5d

Please sign in to comment.