Skip to content

Commit

Permalink
Added infinite loop to awsutils.Initmetadata to fetch new CIDRs every…
Browse files Browse the repository at this point in the history
… 30s, and update rule for running pods
  • Loading branch information
nithu0115 committed May 6, 2020
1 parent 5ca609d commit 4b5cf58
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 49 deletions.
123 changes: 100 additions & 23 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -159,14 +160,14 @@ type APIs interface {
// EC2InstanceMetadataCache caches instance metadata
type EC2InstanceMetadataCache struct {
// metadata info
securityGroups []*string
securityGroups StringSet
subnetID string
cidrBlock string
localIPv4 string
instanceID string
instanceType string
vpcIPv4CIDR string
vpcIPv4CIDRs []*string
vpcIPv4CIDRs StringSet
primaryENI string
primaryENImac string
availabilityZone string
Expand Down Expand Up @@ -220,6 +221,32 @@ func prometheusRegister() {
}
}

//StringSet is a set of strings
type StringSet struct {
sync.RWMutex
data map[string]bool
}

func (ss *StringSet) add(value string) {
ss.Lock()
defer ss.Unlock()
if len(ss.data) == 0 {
ss.data = make(map[string]bool)
}
ss.data[value] = true
}

func (ss *StringSet) contains(value string) bool {
_, found := ss.data[value]
return found //true if it existed already
}

func (ss *StringSet) reset() {
ss.Lock()
defer ss.Unlock()
ss.data = make(map[string]bool)
}

// New creates an EC2InstanceMetadataCache
func New() (*EC2InstanceMetadataCache, error) {
// Initializes prometheus metrics
Expand Down Expand Up @@ -311,20 +338,6 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
return errors.Wrap(err, "get instance metadata: failed to find primary ENI")
}

// retrieve security groups
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service, %v", err)
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}
sgIDs := strings.Fields(metadataSGIDs)

for _, sgID := range sgIDs {
log.Debugf("Found security-group id: %s", sgID)
cache.securityGroups = append(cache.securityGroups, aws.String(sgID))
}

// retrieve sub-id
cache.subnetID, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSubnetID)
if err != nil {
Expand All @@ -343,21 +356,71 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error {
}
log.Debugf("Found vpc-ipv4-cidr-block: %s ", cache.vpcIPv4CIDR)

// retrieve vpc-ipv4-cidr-blocks
// spawing go routine and retrieving vpc-ipv4-cidr-block
waitSGCh := make(chan string)
waitVPCCh := make(chan string)
errorCh := make(chan error)
go wait.Forever(func() {
cache.initSGIDs(mac, waitSGCh, errorCh)
cache.initVPCIPv4CIDRs(mac, waitVPCCh, errorCh)
}, 30*time.Second)

//blocks until there's data available on waitSGCh and waitVPCCh or errorCh
for {
select {
case <-waitSGCh:
break
case errs := <-errorCh:
return errs
}
select {
case <- waitVPCCh:
break
case errs := <-errorCh:
return errs
}
break
}
return nil
}

// initSGIDs retrieves security groups
func (cache *EC2InstanceMetadataCache) initSGIDs(mac string, waitSGCh chan string, errorCh chan error) {
metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs)
if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve security-group-ids data from instance metadata service")
errorCh <- errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
return
}

sgIDs := strings.Fields(metadataSGIDs)
cache.securityGroups.reset()
for _, sgID := range sgIDs {
log.Debugf("Found security-group id: %s", sgID)
cache.securityGroups.add(sgID)
}
waitSGCh <- "Security Groups retrieved"
}

// initVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks
func (cache *EC2InstanceMetadataCache) initVPCIPv4CIDRs(mac string, waitVPCCh chan string, errorCh chan error) {
metadataVPCIPv4CIDRs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidrs)

if err != nil {
awsAPIErrInc("GetMetadata", err)
log.Errorf("Failed to retrieve vpc-ipv4-cidr-blocks from instance metadata service")
return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-block data")
errorCh <- errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-blocks data")
return
}

vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs)

cache.vpcIPv4CIDRs.reset()
for _, vpcCIDR := range vpcIPv4CIDRs {
log.Debugf("Found VPC CIDR: %s", vpcCIDR)
cache.vpcIPv4CIDRs = append(cache.vpcIPv4CIDRs, aws.String(vpcCIDR))
cache.vpcIPv4CIDRs.add(vpcCIDR)
}
return nil
waitVPCCh <- "VPC CIDR blocks retrieved"
}

func (cache *EC2InstanceMetadataCache) setPrimaryENI() error {
Expand Down Expand Up @@ -657,9 +720,15 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) {
// return ENI id, error
func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, subnet string) (string, error) {
eniDescription := eniDescriptionPrefix + cache.instanceID
var securityGroups []*string
for securityGroup := range cache.securityGroups.data {
//In Golang, the loop iterator variable is a single variable that takes different values in each loop iteration.
tmpVar := securityGroup
securityGroups = append(securityGroups, aws.String(tmpVar))
}
input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: cache.securityGroups,
Groups: securityGroups,
SubnetId: aws.String(cache.subnetID),
}

Expand Down Expand Up @@ -1225,7 +1294,15 @@ func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDR() string {

// GetVPCIPv4CIDRs returns VPC CIDRs
func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []*string {
return cache.vpcIPv4CIDRs
cache.vpcIPv4CIDRs.RLock()
defer cache.vpcIPv4CIDRs.RUnlock()
var vpcIPv4CIDRslice []*string
for vpcIPv4CIDR := range cache.vpcIPv4CIDRs.data{
//In Golang, the loop iterator variable is a single variable that takes different values in each loop iteration.
tmpVar := vpcIPv4CIDR
vpcIPv4CIDRslice = append(vpcIPv4CIDRslice, aws.String(tmpVar))
}
return vpcIPv4CIDRslice
}

// GetLocalIPv4 returns the primary IP address on the primary interface
Expand Down
19 changes: 15 additions & 4 deletions pkg/awsutils/awsutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"errors"
"os"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -70,6 +71,14 @@ func TestInitWithEC2metadata(t *testing.T) {
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()

metadataVPCIPv4CIDRs := "192.168.0.0/16 100.66.0.0/1"
vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs)
var cidr []*string
for _, vpcCIDR := range vpcIPv4CIDRs {
log.Debugf("Found VPC CIDR: %s", vpcCIDR)
cidr = append(cidr, aws.String(vpcCIDR))
}

mockMetadata.EXPECT().GetMetadata(metadataAZ).Return(az, nil)
mockMetadata.EXPECT().GetMetadata(metadataLocalIP).Return(localIP, nil)
mockMetadata.EXPECT().GetMetadata(metadataInstanceID).Return(instanceID, nil)
Expand All @@ -82,7 +91,7 @@ func TestInitWithEC2metadata(t *testing.T) {
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidr).Return(vpcCIDR, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidrs).Return(vpcCIDR, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidrs).Return(metadataVPCIPv4CIDRs, nil)

ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata}
err := ins.initWithEC2Metadata()
Expand All @@ -91,9 +100,10 @@ func TestInitWithEC2metadata(t *testing.T) {
assert.Equal(t, localIP, ins.localIPv4)
assert.Equal(t, ins.instanceID, instanceID)
assert.Equal(t, ins.primaryENImac, primaryMAC)
assert.Equal(t, len(ins.securityGroups), 2)
assert.Equal(t, len(ins.securityGroups.data), 2)
assert.Equal(t, subnetID, ins.subnetID)
assert.Equal(t, vpcCIDR, ins.vpcIPv4CIDR)
assert.Equal(t, len(ins.vpcIPv4CIDRs.data), 2)
}

func TestInitWithEC2metadataVPCcidrErr(t *testing.T) {
Expand All @@ -109,7 +119,6 @@ func TestInitWithEC2metadataVPCcidrErr(t *testing.T) {
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataDeviceNum).Return(eni1Device, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataOwnerID).Return("1234", nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataInterface).Return(primaryMAC, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidr).Return(vpcCIDR, errors.New("Error on VPCcidr"))

Expand All @@ -131,7 +140,6 @@ func TestInitWithEC2metadataSubnetErr(t *testing.T) {
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataDeviceNum).Return(eni1Device, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataOwnerID).Return("1234", nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataInterface).Return(primaryMAC, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, errors.New("Error on Subnet"))

ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata}
Expand All @@ -152,6 +160,9 @@ func TestInitWithEC2metadataSGErr(t *testing.T) {
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataDeviceNum).Return(eni1Device, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataOwnerID).Return("1234", nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataInterface).Return(primaryMAC, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidr).Return(vpcCIDR, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidrs).Return(vpcCIDR, nil)
mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, errors.New("Error on SG"))

ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata}
Expand Down
67 changes: 51 additions & 16 deletions pkg/ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
"reflect"

"github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils"
"github.com/aws/amazon-vpc-cni-k8s/pkg/cri"
"github.com/aws/amazon-vpc-cni-k8s/pkg/eniconfig"
"github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore"
"github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi"
"github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)

// The package ipamd is a long running daemon which manages a warm pool of available IP addresses.
Expand Down Expand Up @@ -375,7 +376,7 @@ func (c *IPAMContext) nodeInit() error {
}

if retry > maxRetryCheckENI {
log.Warn("Unable to discover attached IPs for ENI from metadata service")
log.Warnf("Reached max retry: Unable to discover attached IPs for ENI from metadata service (attempts %d/%d): %v", retry, maxRetryCheckENI, err)
ipamdErrInc("waitENIAttachedMaxRetryExceeded")
break
}
Expand All @@ -390,11 +391,33 @@ func (c *IPAMContext) nodeInit() error {
time.Sleep(eniAttachTime)
}
}
localPods, err := c.getLocalPodsWithRetry()

err = c.configureIPRulesForPods(pbVPCcidrs)
if err != nil {
log.Warnf("During ipamd init, failed to get Pod information from Kubernetes API Server %v", err)
ipamdErrInc("nodeInitK8SGetLocalPodIPsFailed")
// This can happens when L-IPAMD starts before kubelet.
return err
}
// For a new node, attach IPs
increasedPool, err := c.tryAssignIPs()
if err == nil && increasedPool {
c.updateLastNodeIPPoolAction()
} else {
return err
}

//Spawning checkAndUpdateRules go-routine
go wait.Forever(func() {
pbVPCcidrs = c.checkVPCCIDRsAndRules(pbVPCcidrs)
}, 30*time.Second)

return nil
}

func (c *IPAMContext) configureIPRulesForPods(pbVPCcidrs []string) error {
localPods, err := c.getLocalPodsWithRetry()
if err != nil {
log.Warnf("Failed to get Pod information from Kubernetes API Server %v", err)
// This can happens when L-IPAMD starts before kube-proxy.
return errors.Wrap(err, "failed to get running pods!")
}
log.Debugf("getLocalPodsWithRetry() found %d local pods", len(localPods))
Expand Down Expand Up @@ -429,12 +452,24 @@ func (c *IPAMContext) nodeInit() error {
log.Errorf("UpdateRuleListBySrc in nodeInit() failed for IP %s: %v", ip.IP, err)
}
}
// For a new node, attach IPs
increasedPool, err := c.tryAssignIPs()
if err == nil && increasedPool {
c.updateLastNodeIPPoolAction()

return nil
}

func (c *IPAMContext) checkVPCCIDRsAndRules(oldVPCCidrs []string) []string {
var pbVPCCIDRs []string
newVPCCIDRs := c.awsClient.GetVPCIPv4CIDRs()
for _, cidr := range newVPCCIDRs {
pbVPCCIDRs = append(pbVPCCIDRs, *cidr)
}

if len(oldVPCCidrs) != len(pbVPCCIDRs) || !reflect.DeepEqual(oldVPCCidrs, pbVPCCIDRs) {
err := c.configureIPRulesForPods(pbVPCCIDRs)
if err != nil {
log.Errorf("%v", err)
}
}
return err
return pbVPCCIDRs
}

func (c *IPAMContext) updateIPStats(unmanaged int) {
Expand Down
10 changes: 4 additions & 6 deletions pkg/ipamd/ipamd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func TestNodeInit(t *testing.T) {
ctrl, mockAWS, mockK8S, mockCRI, mockNetwork, _ := setup(t)
defer ctrl.Finish()



mockContext := &IPAMContext{
awsClient: mockAWS,
k8sClient: mockK8S,
Expand All @@ -95,7 +93,7 @@ func TestNodeInit(t *testing.T) {

_, parsedVPCCIDR, _ := net.ParseCIDR(vpcCIDR)
primaryIP := net.ParseIP(ipaddr01)
mockAWS.EXPECT().GetVPCIPv4CIDRs().Return(cidrs)
mockAWS.EXPECT().GetVPCIPv4CIDRs().AnyTimes().Return(cidrs)
mockAWS.EXPECT().GetPrimaryENImac().Return("")
mockNetwork.EXPECT().SetupHostNetwork(parsedVPCCIDR, cidrs, "", &primaryIP).Return(nil)

Expand Down Expand Up @@ -577,10 +575,10 @@ func TestIPAMContext_filterUnmanagedENIs(t *testing.T) {
mockAWSUtils.EXPECT().GetPrimaryENI().Times(2).Return(eni1.ENIID)

tests := []struct {
name string
name string
tagMap map[string]awsutils.TagMap
enis []awsutils.ENIMetadata
want []awsutils.ENIMetadata
enis []awsutils.ENIMetadata
want []awsutils.ENIMetadata
}{
{"No tags at all", nil, allENIs, allENIs},
{"Primary ENI unmanaged", eni1TagMap, allENIs, allENIs},
Expand Down

0 comments on commit 4b5cf58

Please sign in to comment.