Skip to content

Commit

Permalink
Some refresh cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
mogren committed Jun 29, 2020
1 parent 8c266e9 commit c369480
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 107 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ verify-network
portmap
grpc-health-probe
cni-metrics-helper
coverage.txt
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ unit-test:
go test -v -coverprofile=coverage.txt -covermode=atomic $(ALLPKGS)

# Run unit tests with race detection (can only be run natively)
unit-test-race: export AWS_VPC_K8S_CNI_LOG_FILE=stdout
unit-test-race: CGO_ENABLED=1
unit-test-race: GOARCH=
unit-test-race:
Expand Down
2 changes: 1 addition & 1 deletion cmd/routed-eni-cni-plugin/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *testMocks) mockWithFailureAt(t *testing.T, failAt string) *createVethPa

//container side
if failAt == "link-byname" {
m.netlink.EXPECT().LinkByName(gomock.Any()).Return(mockContVeth, errors.New("error on LinkByName container")).After(call)
m.netlink.EXPECT().LinkByName(gomock.Any()).Return(mockContVeth, errors.New("error on LinkByName container")).After(call)
return mockContext
}
call = m.netlink.EXPECT().LinkByName(gomock.Any()).Return(mockContVeth, nil).After(call)
Expand Down
70 changes: 24 additions & 46 deletions pkg/awsutils/awsutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ type APIs interface {
// GetVPCIPv4CIDR returns VPC's 1st CIDR
GetVPCIPv4CIDR() string

// GetVPCIPv4CIDRs returns VPC's CIDRs
GetVPCIPv4CIDRs() []*string
// GetVPCIPv4CIDRs returns VPC's CIDRs from instance metadata
GetVPCIPv4CIDRs() []string

// GetLocalIPv4 returns the primary IP address on the primary ENI interface
GetLocalIPv4() string
Expand Down Expand Up @@ -227,17 +227,14 @@ func prometheusRegister() {
//StringSet is a set of strings
type StringSet struct {
sync.RWMutex
data sets.String
data sets.String
}

func (ss *StringSet) AWSStrings() []*string {
func (ss *StringSet) SortedList() []string {
ss.RLock()
defer ss.RUnlock()
var dataSlice []*string
for key, _ := range ss.data {
dataSlice = append(dataSlice, aws.String(key))
}
return dataSlice
// sets.String.List() returns a sorted list
return ss.data.List()
}

func (ss *StringSet) Set(items []string) {
Expand All @@ -246,13 +243,7 @@ func (ss *StringSet) Set(items []string) {
ss.data = sets.NewString(items...)
}

func (ss *StringSet) IsEmpty() bool {
ss.RLock()
defer ss.RUnlock()
return ss.data != nil && ss.data.Len() == 0
}

func (ss *StringSet) Difference (other *StringSet) *StringSet {
func (ss *StringSet) Difference(other *StringSet) *StringSet {
ss.RLock()
other.RLock()
defer ss.RUnlock()
Expand Down Expand Up @@ -280,9 +271,7 @@ func New() (*EC2InstanceMetadataCache, error) {
cache.region = region
log.Debugf("Discovered region: %s", cache.region)

sess, err := session.NewSession(
&aws.Config{Region: aws.String(cache.region),
MaxRetries: aws.Int(15)})
sess, err := session.NewSession(&aws.Config{Region: aws.String(cache.region), MaxRetries: aws.Int(15)})
if err != nil {
log.Errorf("Failed to initialize AWS SDK session %v", err)
return nil, errors.Wrap(err, "instance metadata: failed to initialize AWS SDK session")
Expand Down Expand Up @@ -385,7 +374,7 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata(ctx context.Context)
return err
}

// refresh security groups and VPC CIDR blocks in the background
// Refresh security groups and VPC CIDR blocks in the background
// Ignoring errors since we will retry in 30s
go wait.Forever(func() { _ = cache.refreshSGIDs(mac) }, 30*time.Second)
go wait.Forever(func() { _ = cache.refreshVPCIPv4CIDRs(mac) }, 30*time.Second)
Expand All @@ -396,7 +385,6 @@ func (cache *EC2InstanceMetadataCache) initWithEC2Metadata(ctx context.Context)
return nil
default:
}

return nil
}

Expand All @@ -409,25 +397,20 @@ func (cache *EC2InstanceMetadataCache) refreshSGIDs(mac string) error {
return errors.Wrap(err, "get instance metadata: failed to retrieve security-group-ids")
}

sgIDs := strings.Fields(metadataSGIDs)
sgIDs := strings.Fields(metadataSGIDs)

newSGs := StringSet{}
newSGs.Set(sgIDs)
addedSGs := newSGs.Difference(&cache.securityGroups)
addedSGs := newSGs.Difference(&cache.securityGroups)
deletedSGs := cache.securityGroups.Difference(&newSGs)

if !addedSGs.IsEmpty() {
for _, sg := range addedSGs.AWSStrings() {
log.Infof("Found %s, added to ipamd cache", *sg)
}
for _, sg := range addedSGs.SortedList() {
log.Infof("Found %s, added to ipamd cache", sg)
}
if !deletedSGs.IsEmpty() {
for _, sg := range deletedSGs.AWSStrings() {
log.Infof("Removed %s from ipamd cache", *sg)
}
for _, sg := range deletedSGs.SortedList() {
log.Infof("Removed %s from ipamd cache", sg)
}
cache.securityGroups.Set(sgIDs)

return nil
}

Expand All @@ -444,21 +427,16 @@ func (cache *EC2InstanceMetadataCache) refreshVPCIPv4CIDRs(mac string) error {

newVpcIPv4CIDRs := StringSet{}
newVpcIPv4CIDRs.Set(vpcIPv4CIDRs)
addedVpcIPv4CIDRs := newVpcIPv4CIDRs.Difference(&cache.securityGroups)
deletedVpcIPv4CIDRs := cache.securityGroups.Difference(&newVpcIPv4CIDRs)
addedVpcIPv4CIDRs := newVpcIPv4CIDRs.Difference(&cache.vpcIPv4CIDRs)
deletedVpcIPv4CIDRs := cache.vpcIPv4CIDRs.Difference(&newVpcIPv4CIDRs)

if !addedVpcIPv4CIDRs.IsEmpty() {
for _, vpcIPv4CIDR := range addedVpcIPv4CIDRs.AWSStrings() {
log.Infof("Found %s, added to ipamd cache", *vpcIPv4CIDR)
}
for _, vpcIPv4CIDR := range addedVpcIPv4CIDRs.SortedList() {
log.Infof("Found %s, added to ipamd cache", vpcIPv4CIDR)
}
if !deletedVpcIPv4CIDRs.IsEmpty() {
for _, vpcIPv4CIDR := range deletedVpcIPv4CIDRs.AWSStrings() {
log.Infof("Removed %s from ipamd cache", *vpcIPv4CIDR)
}
for _, vpcIPv4CIDR := range deletedVpcIPv4CIDRs.SortedList() {
log.Infof("Removed %s from ipamd cache", vpcIPv4CIDR)
}
cache.vpcIPv4CIDRs.Set(vpcIPv4CIDRs)

return nil
}

Expand Down Expand Up @@ -761,7 +739,7 @@ func (cache *EC2InstanceMetadataCache) createENI(useCustomCfg bool, sg []*string
eniDescription := eniDescriptionPrefix + cache.instanceID
input := &ec2.CreateNetworkInterfaceInput{
Description: aws.String(eniDescription),
Groups: cache.securityGroups.AWSStrings(),
Groups: aws.StringSlice(cache.securityGroups.SortedList()),
SubnetId: aws.String(cache.subnetID),
}

Expand Down Expand Up @@ -1367,8 +1345,8 @@ func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDR() string {
}

// GetVPCIPv4CIDRs returns VPC CIDRs
func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []*string {
return cache.vpcIPv4CIDRs.AWSStrings()
func (cache *EC2InstanceMetadataCache) GetVPCIPv4CIDRs() []string {
return cache.vpcIPv4CIDRs.SortedList()
}

// GetLocalIPv4 returns the primary IP address on the primary interface
Expand Down
24 changes: 12 additions & 12 deletions pkg/awsutils/awsutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func setup(t *testing.T) (*gomock.Controller,
}

func TestInitWithEC2metadata(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -96,14 +96,14 @@ func TestInitWithEC2metadata(t *testing.T) {
assert.Equal(t, localIP, ins.localIPv4)
assert.Equal(t, ins.instanceID, instanceID)
assert.Equal(t, ins.primaryENImac, primaryMAC)
assert.Equal(t, len(ins.securityGroups.data), 2)
assert.Equal(t, len(ins.securityGroups.SortedList()), 2)
assert.Equal(t, subnetID, ins.subnetID)
assert.Equal(t, vpcCIDR, ins.vpcIPv4CIDR)
assert.Equal(t, len(ins.vpcIPv4CIDRs.data), 2)
assert.Equal(t, len(ins.vpcIPv4CIDRs.SortedList()), 2)
}

func TestInitWithEC2metadataVPCcidrErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -126,7 +126,7 @@ func TestInitWithEC2metadataVPCcidrErr(t *testing.T) {
}

func TestInitWithEC2metadataSubnetErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -148,7 +148,7 @@ func TestInitWithEC2metadataSubnetErr(t *testing.T) {
}

func TestInitWithEC2metadataSGErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -172,7 +172,7 @@ func TestInitWithEC2metadataSGErr(t *testing.T) {
}

func TestInitWithEC2metadataENIErrs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -190,7 +190,7 @@ func TestInitWithEC2metadataENIErrs(t *testing.T) {
}

func TestInitWithEC2metadataMACErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -207,7 +207,7 @@ func TestInitWithEC2metadataMACErr(t *testing.T) {
}

func TestInitWithEC2metadataLocalIPErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -221,7 +221,7 @@ func TestInitWithEC2metadataLocalIPErr(t *testing.T) {
}

func TestInitWithEC2metadataInstanceErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand All @@ -236,7 +236,7 @@ func TestInitWithEC2metadataInstanceErr(t *testing.T) {
}

func TestInitWithEC2metadataAZErr(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, _ := setup(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestDescribeAllENIs(t *testing.T) {
}

func TestTagEni(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
ctrl, mockMetadata, mockEC2 := setup(t)
defer ctrl.Finish()
Expand Down
4 changes: 2 additions & 2 deletions pkg/awsutils/mocks/awsutils_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 7 additions & 18 deletions pkg/ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ const (
eniAttachTime = 10 * time.Second
nodeIPPoolReconcileInterval = 60 * time.Second
decreaseIPPoolInterval = 30 * time.Second
maxK8SRetries = 5
retryK8SInterval = 3 * time.Second

// ipReconcileCooldown is the amount of time that an IP address must wait until it can be added to the data store
// during reconciliation after being discovered on the EC2 instance metadata.
Expand Down Expand Up @@ -340,18 +338,13 @@ func (c *IPAMContext) nodeInit() error {
return err
}

var pbVPCcidrs []string
vpcCIDRs := c.awsClient.GetVPCIPv4CIDRs()

for _, cidr := range vpcCIDRs {
pbVPCcidrs = append(pbVPCcidrs, *cidr)
}

_, vpcCIDR, err := net.ParseCIDR(c.awsClient.GetVPCIPv4CIDR())
if err != nil {
return errors.Wrap(err, "ipamd init: failed to retrieve VPC CIDR")
}

vpcCIDRs := c.awsClient.GetVPCIPv4CIDRs()
primaryIP := net.ParseIP(c.awsClient.GetLocalIPv4())
err = c.networkClient.SetupHostNetwork(vpcCIDR, vpcCIDRs, c.awsClient.GetPrimaryENImac(), &primaryIP)
if err != nil {
Expand Down Expand Up @@ -398,7 +391,7 @@ func (c *IPAMContext) nodeInit() error {
return err
}

if err = c.configureIPRulesForPods(pbVPCcidrs); err != nil {
if err = c.configureIPRulesForPods(vpcCIDRs); err != nil {
return err
}

Expand All @@ -410,8 +403,8 @@ func (c *IPAMContext) nodeInit() error {
return err
}

//Spawning updateCIDRsRulesOnChange go-routine
go wait.Forever(func() { pbVPCcidrs = c.updateCIDRsRulesOnChange(pbVPCcidrs)}, 30*time.Second)
// Spawning updateCIDRsRulesOnChange go-routine
go wait.Forever(func() { vpcCIDRs = c.updateCIDRsRulesOnChange(vpcCIDRs) }, 30*time.Second)
return nil
}

Expand All @@ -437,16 +430,12 @@ func (c *IPAMContext) configureIPRulesForPods(pbVPCcidrs []string) error {
}

func (c *IPAMContext) updateCIDRsRulesOnChange(oldVPCCidrs []string) []string {
var pbVPCCIDRs []string
newVPCCIDRs := c.awsClient.GetVPCIPv4CIDRs()
for _, cidr := range newVPCCIDRs {
pbVPCCIDRs = append(pbVPCCIDRs, *cidr)
}

if len(oldVPCCidrs) != len(pbVPCCIDRs) || !reflect.DeepEqual(oldVPCCidrs, pbVPCCIDRs) {
_ = c.configureIPRulesForPods(pbVPCCIDRs)
if len(oldVPCCidrs) != len(newVPCCIDRs) || !reflect.DeepEqual(oldVPCCidrs, newVPCCIDRs) {
_ = c.configureIPRulesForPods(newVPCCIDRs)
}
return pbVPCCIDRs
return newVPCCIDRs
}

func (c *IPAMContext) updateIPStats(unmanaged int) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipamd/ipamd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestNodeInit(t *testing.T) {

eni1, eni2 := getDummyENIMetadata()

var cidrs []*string
var cidrs []string
m.awsutils.EXPECT().GetENILimit().Return(4, nil)
m.awsutils.EXPECT().GetENIipLimit().Return(14, nil)
m.awsutils.EXPECT().GetIPv4sFromEC2(eni1.ENIID).AnyTimes().Return(eni1.IPv4Addresses, nil)
Expand Down
7 changes: 3 additions & 4 deletions pkg/ipamd/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rp
}
addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(ipamKey)

var pbVPCcidrs []string
for _, cidr := range s.ipamContext.awsClient.GetVPCIPv4CIDRs() {
log.Debugf("VPC CIDR %s", *cidr)
pbVPCcidrs = append(pbVPCcidrs, *cidr)
pbVPCcidrs := s.ipamContext.awsClient.GetVPCIPv4CIDRs()
for _, cidr := range pbVPCcidrs {
log.Debugf("VPC CIDR %s", cidr)
}

useExternalSNAT := s.ipamContext.networkClient.UseExternalSNAT()
Expand Down
Loading

0 comments on commit c369480

Please sign in to comment.