diff --git a/pkg/awsutils/awsutils.go b/pkg/awsutils/awsutils.go index c013b34df7b..d4cf0b8b77f 100644 --- a/pkg/awsutils/awsutils.go +++ b/pkg/awsutils/awsutils.go @@ -21,6 +21,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/pkg/errors" @@ -159,14 +160,14 @@ type APIs interface { // EC2InstanceMetadataCache caches instance metadata type EC2InstanceMetadataCache struct { // metadata info - securityGroups []*string + securityGroups StringSet subnetID string cidrBlock string localIPv4 string instanceID string instanceType string vpcIPv4CIDR string - vpcIPv4CIDRs []*string + vpcIPv4CIDRs StringSet primaryENI string primaryENImac string availabilityZone string @@ -220,6 +221,32 @@ func prometheusRegister() { } } +//StringSet is a set of strings +type StringSet struct { + sync.RWMutex + data map[string]bool +} + +func (ss *StringSet) add(value string) { + ss.Lock() + defer ss.Unlock() + if len(ss.data) == 0 { + ss.data = make(map[string]bool) + } + ss.data[value] = true +} + +func (ss *StringSet) contains(value string) bool { + _, found := ss.data[value] + return found //true if it existed already +} + +func (ss *StringSet) reset() { + ss.Lock() + defer ss.Unlock() + ss.data = make(map[string]bool) +} + // New creates an EC2InstanceMetadataCache func New() (*EC2InstanceMetadataCache, error) { // Initializes prometheus metrics @@ -311,20 +338,6 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error { return errors.Wrap(err, "get instance metadata: failed to find primary ENI") } - // retrieve security groups - metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs) - if err != nil { - awsAPIErrInc("GetMetadata", err) - log.Errorf("Failed to retrieve security-group-ids data from instance metadata service, %v", err) - return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids") - } - sgIDs := strings.Fields(metadataSGIDs) - - for _, sgID := range sgIDs { - log.Debugf("Found security-group id: %s", sgID) - cache.securityGroups = append(cache.securityGroups, aws.String(sgID)) - } - // retrieve sub-id cache.subnetID, err = cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSubnetID) if err != nil { @@ -343,21 +356,71 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata() error { } log.Debugf("Found vpc-ipv4-cidr-block: %s ", cache.vpcIPv4CIDR) - // retrieve vpc-ipv4-cidr-blocks + // spawing go routine and retrieving vpc-ipv4-cidr-block + waitSGCh := make(chan string) + waitVPCCh := make(chan string) + errorCh := make(chan error) + go wait.Forever(func() { + cache.initSGIDs(mac, waitSGCh, errorCh) + cache.initVPCIPv4CIDRs(mac, waitVPCCh, errorCh) + }, 30*time.Second) + + //blocks until there's data available on waitSGCh and waitVPCCh or errorCh + for { + select { + case <-waitSGCh: + break + case errs := <-errorCh: + return errs + } + select { + case <- waitVPCCh: + break + case errs := <-errorCh: + return errs + } + break + } + return nil +} + +// initSGIDs retrieves security groups +func (cache *EC2InstanceMetadataCache) initSGIDs(mac string, waitSGCh chan string, errorCh chan error) { + metadataSGIDs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataSGs) + if err != nil { + awsAPIErrInc("GetMetadata", err) + log.Errorf("Failed to retrieve security-group-ids data from instance metadata service") + errorCh <- errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids") + return + } + + sgIDs := strings.Fields(metadataSGIDs) + cache.securityGroups.reset() + for _, sgID := range sgIDs { + log.Debugf("Found security-group id: %s", sgID) + cache.securityGroups.add(sgID) + } + waitSGCh <- "Security Groups retrieved" +} + +// initVPCIPv4CIDRs retrieves VPC IPv4 CIDR blocks +func (cache *EC2InstanceMetadataCache) initVPCIPv4CIDRs(mac string, waitVPCCh chan string, errorCh chan error) { metadataVPCIPv4CIDRs, err := cache.ec2Metadata.GetMetadata(metadataMACPath + mac + metadataVPCcidrs) + if err != nil { awsAPIErrInc("GetMetadata", err) log.Errorf("Failed to retrieve vpc-ipv4-cidr-blocks from instance metadata service") - return errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-block data") + errorCh <- errors.Wrap(err, "get instance metadata: failed to retrieve vpc-ipv4-cidr-blocks data") + return } vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs) - + cache.vpcIPv4CIDRs.reset() for _, vpcCIDR := range vpcIPv4CIDRs { log.Debugf("Found VPC CIDR: %s", vpcCIDR) - cache.vpcIPv4CIDRs = append(cache.vpcIPv4CIDRs, aws.String(vpcCIDR)) + cache.vpcIPv4CIDRs.add(vpcCIDR) } - return nil + waitVPCCh <- "VPC CIDR blocks retrieved" } func (cache *EC2InstanceMetadataCache) setPrimaryENI() error { @@ -657,9 +720,15 @@ func (cache *EC2InstanceMetadataCache) attachENI(eniID string) (string, error) { // return ENI id, error func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string, subnet string) (string, error) { eniDescription := eniDescriptionPrefix + cache.instanceID + var securityGroups []*string + for securityGroup := range cache.securityGroups.data { + //In Golang, the loop iterator variable is a single variable that takes different values in each loop iteration. + tmpVar := securityGroup + securityGroups = append(securityGroups, aws.String(tmpVar)) + } input := &ec2.CreateNetworkInterfaceInput{ Description: aws.String(eniDescription), - Groups: cache.securityGroups, + Groups: securityGroups, SubnetId: aws.String(cache.subnetID), } @@ -1225,7 +1294,15 @@ func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDR() string { // GetVPCIPv4CIDRs returns VPC CIDRs func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []*string { - return cache.vpcIPv4CIDRs + cache.vpcIPv4CIDRs.RLock() + defer cache.vpcIPv4CIDRs.RUnlock() + var vpcIPv4CIDRslice []*string + for vpcIPv4CIDR := range cache.vpcIPv4CIDRs.data{ + //In Golang, the loop iterator variable is a single variable that takes different values in each loop iteration. + tmpVar := vpcIPv4CIDR + vpcIPv4CIDRslice = append(vpcIPv4CIDRslice, aws.String(tmpVar)) + } + return vpcIPv4CIDRslice } // GetLocalIPv4 returns the primary IP address on the primary interface diff --git a/pkg/awsutils/awsutils_test.go b/pkg/awsutils/awsutils_test.go index 31cfc840601..430e772d7c5 100644 --- a/pkg/awsutils/awsutils_test.go +++ b/pkg/awsutils/awsutils_test.go @@ -17,6 +17,7 @@ import ( "errors" "os" "sort" + "strings" "testing" "time" @@ -70,6 +71,14 @@ func TestInitWithEC2metadata(t *testing.T) { ctrl, mockMetadata, _ := setup(t) defer ctrl.Finish() + metadataVPCIPv4CIDRs := "192.168.0.0/16 100.66.0.0/1" + vpcIPv4CIDRs := strings.Fields(metadataVPCIPv4CIDRs) + var cidr []*string + for _, vpcCIDR := range vpcIPv4CIDRs { + log.Debugf("Found VPC CIDR: %s", vpcCIDR) + cidr = append(cidr, aws.String(vpcCIDR)) + } + mockMetadata.EXPECT().GetMetadata(metadataAZ).Return(az, nil) mockMetadata.EXPECT().GetMetadata(metadataLocalIP).Return(localIP, nil) mockMetadata.EXPECT().GetMetadata(metadataInstanceID).Return(instanceID, nil) @@ -82,7 +91,7 @@ func TestInitWithEC2metadata(t *testing.T) { mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidr).Return(vpcCIDR, nil) - mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidrs).Return(vpcCIDR, nil) + mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidrs).Return(metadataVPCIPv4CIDRs, nil) ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata} err := ins.initWithEC2Metadata() @@ -91,9 +100,10 @@ func TestInitWithEC2metadata(t *testing.T) { assert.Equal(t, localIP, ins.localIPv4) assert.Equal(t, ins.instanceID, instanceID) assert.Equal(t, ins.primaryENImac, primaryMAC) - assert.Equal(t, len(ins.securityGroups), 2) + assert.Equal(t, len(ins.securityGroups.data), 2) assert.Equal(t, subnetID, ins.subnetID) assert.Equal(t, vpcCIDR, ins.vpcIPv4CIDR) + assert.Equal(t, len(ins.vpcIPv4CIDRs.data), 2) } func TestInitWithEC2metadataVPCcidrErr(t *testing.T) { @@ -109,7 +119,6 @@ func TestInitWithEC2metadataVPCcidrErr(t *testing.T) { mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataDeviceNum).Return(eni1Device, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataOwnerID).Return("1234", nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataInterface).Return(primaryMAC, nil) - mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidr).Return(vpcCIDR, errors.New("Error on VPCcidr")) @@ -131,7 +140,6 @@ func TestInitWithEC2metadataSubnetErr(t *testing.T) { mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataDeviceNum).Return(eni1Device, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataOwnerID).Return("1234", nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataInterface).Return(primaryMAC, nil) - mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, errors.New("Error on Subnet")) ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata} @@ -152,6 +160,9 @@ func TestInitWithEC2metadataSGErr(t *testing.T) { mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataDeviceNum).Return(eni1Device, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataOwnerID).Return("1234", nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataInterface).Return(primaryMAC, nil) + mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSubnetID).Return(subnetID, nil) + mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidr).Return(vpcCIDR, nil) + mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataVPCcidrs).Return(vpcCIDR, nil) mockMetadata.EXPECT().GetMetadata(metadataMACPath+primaryMAC+metadataSGs).Return(sgs, errors.New("Error on SG")) ins := &EC2InstanceMetadataCache{ec2Metadata: mockMetadata} diff --git a/pkg/ipamd/ipamd.go b/pkg/ipamd/ipamd.go index 9493675f6ec..45018a21863 100644 --- a/pkg/ipamd/ipamd.go +++ b/pkg/ipamd/ipamd.go @@ -22,13 +22,7 @@ import ( "sync" "sync/atomic" "time" - - "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/ec2" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "k8s.io/apimachinery/pkg/util/sets" + "reflect" "github.com/aws/amazon-vpc-cni-k8s/pkg/awsutils" "github.com/aws/amazon-vpc-cni-k8s/pkg/cri" @@ -36,6 +30,13 @@ import ( "github.com/aws/amazon-vpc-cni-k8s/pkg/ipamd/datastore" "github.com/aws/amazon-vpc-cni-k8s/pkg/k8sapi" "github.com/aws/amazon-vpc-cni-k8s/pkg/networkutils" + "github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" ) // The package ipamd is a long running daemon which manages a warm pool of available IP addresses. @@ -375,7 +376,7 @@ func (c *IPAMContext) nodeInit() error { } if retry > maxRetryCheckENI { - log.Warn("Unable to discover attached IPs for ENI from metadata service") + log.Warnf("Reached max retry: Unable to discover attached IPs for ENI from metadata service (attempts %d/%d): %v", retry, maxRetryCheckENI, err) ipamdErrInc("waitENIAttachedMaxRetryExceeded") break } @@ -390,11 +391,33 @@ func (c *IPAMContext) nodeInit() error { time.Sleep(eniAttachTime) } } - localPods, err := c.getLocalPodsWithRetry() + + err = c.configureIPRulesForPods(pbVPCcidrs) if err != nil { - log.Warnf("During ipamd init, failed to get Pod information from Kubernetes API Server %v", err) ipamdErrInc("nodeInitK8SGetLocalPodIPsFailed") - // This can happens when L-IPAMD starts before kubelet. + return err + } + // For a new node, attach IPs + increasedPool, err := c.tryAssignIPs() + if err == nil && increasedPool { + c.updateLastNodeIPPoolAction() + } else { + return err + } + + //Spawning checkAndUpdateRules go-routine + go wait.Forever(func() { + pbVPCcidrs = c.checkVPCCIDRsAndRules(pbVPCcidrs) + }, 30*time.Second) + + return nil +} + +func (c *IPAMContext) configureIPRulesForPods(pbVPCcidrs []string) error { + localPods, err := c.getLocalPodsWithRetry() + if err != nil { + log.Warnf("Failed to get Pod information from Kubernetes API Server %v", err) + // This can happens when L-IPAMD starts before kube-proxy. return errors.Wrap(err, "failed to get running pods!") } log.Debugf("getLocalPodsWithRetry() found %d local pods", len(localPods)) @@ -429,12 +452,24 @@ func (c *IPAMContext) nodeInit() error { log.Errorf("UpdateRuleListBySrc in nodeInit() failed for IP %s: %v", ip.IP, err) } } - // For a new node, attach IPs - increasedPool, err := c.tryAssignIPs() - if err == nil && increasedPool { - c.updateLastNodeIPPoolAction() + + return nil +} + +func (c *IPAMContext) checkVPCCIDRsAndRules(oldVPCCidrs []string) []string { + var pbVPCCIDRs []string + newVPCCIDRs := c.awsClient.GetVPCIPv4CIDRs() + for _, cidr := range newVPCCIDRs { + pbVPCCIDRs = append(pbVPCCIDRs, *cidr) + } + + if len(oldVPCCidrs) != len(pbVPCCIDRs) || !reflect.DeepEqual(oldVPCCidrs, pbVPCCIDRs) { + err := c.configureIPRulesForPods(pbVPCCIDRs) + if err != nil { + log.Errorf("%v", err) + } } - return err + return pbVPCCIDRs } func (c *IPAMContext) updateIPStats(unmanaged int) { diff --git a/pkg/ipamd/ipamd_test.go b/pkg/ipamd/ipamd_test.go index 597131a7f3e..0396773814a 100644 --- a/pkg/ipamd/ipamd_test.go +++ b/pkg/ipamd/ipamd_test.go @@ -71,8 +71,6 @@ func TestNodeInit(t *testing.T) { ctrl, mockAWS, mockK8S, mockCRI, mockNetwork, _ := setup(t) defer ctrl.Finish() - - mockContext := &IPAMContext{ awsClient: mockAWS, k8sClient: mockK8S, @@ -95,7 +93,7 @@ func TestNodeInit(t *testing.T) { _, parsedVPCCIDR, _ := net.ParseCIDR(vpcCIDR) primaryIP := net.ParseIP(ipaddr01) - mockAWS.EXPECT().GetVPCIPv4CIDRs().Return(cidrs) + mockAWS.EXPECT().GetVPCIPv4CIDRs().AnyTimes().Return(cidrs) mockAWS.EXPECT().GetPrimaryENImac().Return("") mockNetwork.EXPECT().SetupHostNetwork(parsedVPCCIDR, cidrs, "", &primaryIP).Return(nil) @@ -577,10 +575,10 @@ func TestIPAMContext_filterUnmanagedENIs(t *testing.T) { mockAWSUtils.EXPECT().GetPrimaryENI().Times(2).Return(eni1.ENIID) tests := []struct { - name string + name string tagMap map[string]awsutils.TagMap - enis []awsutils.ENIMetadata - want []awsutils.ENIMetadata + enis []awsutils.ENIMetadata + want []awsutils.ENIMetadata }{ {"No tags at all", nil, allENIs, allENIs}, {"Primary ENI unmanaged", eni1TagMap, allENIs, allENIs},