Skip to content

Commit

Permalink
eni watcher: retry in first reconciliation loop.
Browse files Browse the repository at this point in the history
  • Loading branch information
fenxiong committed Aug 20, 2020
1 parent d066526 commit c6c4794
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 32 deletions.
20 changes: 16 additions & 4 deletions agent/eni/watcher/watcher_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ func newWatcher(ctx context.Context,

// Init initializes a new ENI Watcher
func (udevWatcher *UdevWatcher) Init() error {
return udevWatcher.reconcileOnce()
// Retry in the first reconciliation, in case the ENI is attached before we connect to ACS.
return udevWatcher.reconcileOnce(true)
}

// Start periodically updates the state of ENIs connected to the system
Expand All @@ -157,7 +158,7 @@ func (udevWatcher *UdevWatcher) performPeriodicReconciliation(updateInterval tim
for {
select {
case <-udevWatcher.updateIntervalTicker.C:
if err := udevWatcher.reconcileOnce(); err != nil {
if err := udevWatcher.reconcileOnce(false); err != nil {
log.Warnf("Udev watcher reconciliation failed: %v", err)
}
case <-udevWatcher.ctx.Done():
Expand All @@ -168,7 +169,7 @@ func (udevWatcher *UdevWatcher) performPeriodicReconciliation(updateInterval tim
}

// reconcileOnce is used to reconcile the state of ENIs attached to the instance
func (udevWatcher *UdevWatcher) reconcileOnce() error {
func (udevWatcher *UdevWatcher) reconcileOnce(withRetry bool) error {
links, err := udevWatcher.netlinkClient.LinkList()
if err != nil {
return errors.Wrapf(err, "udev watcher: unable to retrieve network interfaces")
Expand All @@ -188,6 +189,14 @@ func (udevWatcher *UdevWatcher) reconcileOnce() error {

// Add new interfaces next
for mac := range currentState {
if withRetry {
go func(ctx context.Context, macAddress string, timeout time.Duration) {
if err := udevWatcher.sendENIStateChangeWithRetries(ctx, macAddress, timeout); err != nil {
log.Infof("Udev watcher event-handler: unable to send state change: %v", err)
}
}(udevWatcher.ctx, mac, sendENIStateChangeRetryTimeout)
continue
}
if err := udevWatcher.sendENIStateChange(mac); err != nil {
// skip logging status sent error as it's redundant and doesn't really indicate a problem
if strings.Contains(err.Error(), eniStatusSentMsg) {
Expand Down Expand Up @@ -337,7 +346,10 @@ func (udevWatcher *UdevWatcher) sendENIStateChangeWithRetries(parentCtx context.
sendErr := udevWatcher.sendENIStateChange(macAddress)
if sendErr != nil {
if _, ok := sendErr.(*unmanagedENIError); ok {
log.Debugf("Unable to send state change for unmanaged ENI: %v", sendErr)
// This can happen in two scenarios: (1) the ENI is indeed not managed by ECS (i.e. attached manually
// by customer); (2) this is an ENI attached by ECS but we have not yet received its information from
// ACS.
log.Debugf("Not sending state change because we don't know about the ENI: %v", sendErr)
return sendErr
}
// Not unmanagedENIError. Stop retrying when this happens
Expand Down
102 changes: 74 additions & 28 deletions agent/eni/watcher/watcher_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,37 +153,14 @@ func TestReconcileENIs(t *testing.T) {
defer mockCtrl.Finish()

ctx := context.Background()
parsedMAC, err := net.ParseMAC(randomMAC)
assert.NoError(t, err)

primaryMACAddr, err := net.ParseMAC("00:0a:95:9d:68:61")
assert.NoError(t, err)

mockNetlink := mock_netlinkwrapper.NewMockNetLink(mockCtrl)
taskEngineState := dockerstate.NewTaskEngineState()
eventChannel := make(chan statechange.Event)

taskEngineState.AddENIAttachment(&apieni.ENIAttachment{
MACAddress: randomMAC,
AttachStatusSent: false,
ExpiresAt: time.Unix(time.Now().Unix()+10, 0),
})
taskEngineState.AddENIAttachment(getMockAttachment())

mockNetlink.EXPECT().LinkList().Return([]netlink.Link{
&netlink.Device{
LinkAttrs: netlink.LinkAttrs{
HardwareAddr: parsedMAC,
Name: randomDevice,
},
},
&netlink.Device{
LinkAttrs: netlink.LinkAttrs{
HardwareAddr: primaryMACAddr,
Name: "lo",
EncapType: "loopback",
},
},
}, nil)
mockNetlink.EXPECT().LinkList().Return(getMockNetLinkResponse(t), nil)

var event statechange.Event
done := make(chan struct{})
Expand All @@ -194,7 +171,7 @@ func TestReconcileENIs(t *testing.T) {

// Create Watcher
watcher := newWatcher(ctx, primaryMAC, mockNetlink, nil, taskEngineState, eventChannel)
watcher.reconcileOnce()
require.NoError(t, watcher.reconcileOnce(false))

<-done
assert.NotNil(t, event.(api.TaskStateChange).Attachment)
Expand All @@ -207,6 +184,75 @@ func TestReconcileENIs(t *testing.T) {
}
}

func TestReconcileENIsWithRetry(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
ctx := context.Background()
mockNetlink := mock_netlinkwrapper.NewMockNetLink(mockCtrl)
mockState := mock_dockerstate.NewMockTaskEngineState(mockCtrl)
eventChannel := make(chan statechange.Event)

gomock.InOrder(
mockNetlink.EXPECT().LinkList().Return(getMockNetLinkResponse(t), nil),
// Let the first one fail to check that we retry on failure.
mockState.EXPECT().ENIByMac(gomock.Any()).Return(nil, false),
mockState.EXPECT().ENIByMac(gomock.Any()).Return(getMockAttachment(), true),
)

var event statechange.Event
done := make(chan struct{})
go func() {
event = <-eventChannel
done <- struct{}{}
}()

// Create Watcher
watcher := newWatcher(ctx, primaryMAC, mockNetlink, nil, mockState, eventChannel)
require.NoError(t, watcher.reconcileOnce(true))

<-done
require.NotNil(t, event.(api.TaskStateChange).Attachment)
assert.Equal(t, randomMAC, event.(api.TaskStateChange).Attachment.MACAddress)

select {
case <-eventChannel:
t.Errorf("Expect no more state change event")
default:
}
}

func getMockAttachment() *apieni.ENIAttachment {
return &apieni.ENIAttachment{
MACAddress: randomMAC,
AttachStatusSent: false,
ExpiresAt: time.Unix(time.Now().Unix()+10, 0),
}
}

func getMockNetLinkResponse(t *testing.T) []netlink.Link {
parsedMAC, err := net.ParseMAC(randomMAC)
require.NoError(t, err)

primaryMACAddr, err := net.ParseMAC("00:0a:95:9d:68:61")
require.NoError(t, err)

return []netlink.Link{
&netlink.Device{
LinkAttrs: netlink.LinkAttrs{
HardwareAddr: parsedMAC,
Name: randomDevice,
},
},
&netlink.Device{
LinkAttrs: netlink.LinkAttrs{
HardwareAddr: primaryMACAddr,
Name: "lo",
EncapType: "loopback",
},
},
}
}

// TestReconcileENIsWithNetlinkErr tests reconciliation with netlink error
func TestReconcileENIsWithNetlinkErr(t *testing.T) {
mockCtrl := gomock.NewController(t)
Expand All @@ -222,7 +268,7 @@ func TestReconcileENIsWithNetlinkErr(t *testing.T) {

// Create Watcher
watcher := newWatcher(ctx, primaryMAC, mockNetlink, nil, taskEngineState, eventChannel)
watcher.reconcileOnce()
assert.Error(t, watcher.reconcileOnce(false))

select {
case <-eventChannel:
Expand All @@ -246,7 +292,7 @@ func TestReconcileENIsWithEmptyList(t *testing.T) {

// Create Watcher
watcher := newWatcher(ctx, primaryMAC, mockNetlink, nil, taskEngineState, eventChannel)
watcher.reconcileOnce()
assert.NoError(t, watcher.reconcileOnce(false))
watcher.Stop()

select {
Expand Down

0 comments on commit c6c4794

Please sign in to comment.