Skip to content

Commit

Permalink
Add missing EBSTaskAttach changes from feature branch
Browse files Browse the repository at this point in the history
  • Loading branch information
fierlion committed Oct 13, 2023
1 parent 352ec50 commit 7a0d116
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 78 deletions.
2 changes: 1 addition & 1 deletion agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ func (agent *ecsAgent) startACSSession(
taskComparer,
sequenceNumberAccessor,
taskStopper,
nil,
agent.ebsWatcher,
updater.NewUpdater(agent.cfg, state, agent.dataClient, taskEngine).AddAgentUpdateHandlers,
)
logger.Info("Beginning Polling for updates")
Expand Down
145 changes: 70 additions & 75 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"time"

Expand All @@ -36,6 +37,7 @@ import (

const (
nodeStageTimeout = 2 * time.Second
hostMountDir = "/mnt/ecs/ebs"
)

type EBSWatcher struct {
Expand Down Expand Up @@ -82,11 +84,7 @@ func (w *EBSWatcher) Start() {
if len(pendingEBS) > 0 {
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.overrideDeviceName(foundVolumes)
if err := w.StageAll(foundVolumes); err != nil {
log.Errorf("stage error: %s", err)
continue
}
// TODO only notify attached for volumes that are successfully staged
w.StageAll(foundVolumes)
w.NotifyAttached(foundVolumes)
}
case <-w.ctx.Done():
Expand Down Expand Up @@ -174,95 +172,87 @@ func (w *EBSWatcher) overrideDeviceName(foundVolumes map[string]string) {
}

// assumes CSI Driver Managed Daemon is running else call will timeout
func (w *EBSWatcher) StageAll(foundVolumes map[string]string) error {
for volID, deviceName := range foundVolumes {
// get volume details from attachment
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID)
if !ok {
continue
}
if ebsAttachment.IsSent() {
log.Debugf("State change event has already been emitted for EBS volume: %v.", ebsAttachment.EBSToString())
continue
}
if ebsAttachment.HasExpired() {
log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebsAttachment.EBSToString())
continue
}
if ebsAttachment.IsAttached() {
log.Debugf("EBS status is already attached, skipping: %v.", ebsAttachment.EBSToString())
continue
func (w *EBSWatcher) StageAll(foundVolumes map[string]string) []error {
errors := make([]error, 0)
for volumeId, deviceName := range foundVolumes {
if err := w.stageVolumeEBS(volumeId, deviceName); err != nil {
log.Error(err)
errors = append(errors, err)
}
hostPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)
filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName)

// CSI NodeStage stub required fields
stubSecrets := make(map[string]string)
stubVolumeContext := make(map[string]string)
stubMountOptions := []string{}
// note that the numbers '123456', '10' and '8' here are dummy data
// we don't use the fsGroup for now
stubFsGroup, _ := strconv.ParseInt("123456", 10, 8)
publishContext := map[string]string{"devicePath": deviceName}
// call CSI NodeStage
timeoutCtx, cancelFunc := context.WithTimeout(w.ctx, nodeStageTimeout)
defer cancelFunc()
err := w.csiClient.NodeStageVolume(timeoutCtx,
volID,
publishContext,
hostPath,
filesystemType,
v1.ReadWriteMany,
stubSecrets,
stubVolumeContext,
stubMountOptions,
&stubFsGroup)
}
return errors
}

if err != nil {
log.Errorf("Failed to initialize EBS volume: error: %s", err)
continue
}
// set attached status
log.Infof("We've set attached status for %v", ebsAttachment.EBSToString())
ebsAttachment.SetAttachedStatus()
func (w *EBSWatcher) stageVolumeEBS(volID, deviceName string) error {
// get volume details from attachment
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volID)
if !ok {
return fmt.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volID)
}
if !ebsAttachment.ShouldAttach() {
return nil
}
attachmentMountPath := ebsAttachment.GetAttachmentProperties(apiebs.SourceVolumeHostPathKey)
hostPath := filepath.Join(hostMountDir, attachmentMountPath)
filesystemType := ebsAttachment.GetAttachmentProperties(apiebs.FileSystemTypeName)
// CSI NodeStage stub required fields
stubSecrets := make(map[string]string)
stubVolumeContext := make(map[string]string)
stubMountOptions := []string{}
// note that the numbers '123456', '10' and '8' here are dummy data
// we don't use the fsGroup for now
stubFsGroup, _ := strconv.ParseInt("123456", 10, 8)
publishContext := map[string]string{"devicePath": deviceName}
// call CSI NodeStage
timeoutCtx, cancelFunc := context.WithTimeout(w.ctx, nodeStageTimeout)
defer cancelFunc()
err := w.csiClient.NodeStageVolume(timeoutCtx,
volID,
publishContext,
hostPath,
filesystemType,
v1.ReadWriteMany,
stubSecrets,
stubVolumeContext,
stubMountOptions,
&stubFsGroup)
if err != nil {
return fmt.Errorf("Failed to initialize EBS volume ID: %v: error: %w", ebsAttachment.EBSToString(), err)
}
ebsAttachment.SetAttachedStatus()
log.Debugf("We've set attached status for %v", ebsAttachment.EBSToString())
return nil
}

// NotifyAttached will go through the list of found EBS volumes from the scanning process and mark them as found.
func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) {
func (w *EBSWatcher) NotifyAttached(foundVolumes map[string]string) []error {
errors := make([]error, 0)
for volID := range foundVolumes {
w.notifyAttachedEBS(volID)
if err := w.notifyAttachedEBS(volID); err != nil {
log.Error(err)
errors = append(errors, err)
}
}
return errors
}

// notifyAttachedEBS will mark it as found within the agent state
func (w *EBSWatcher) notifyAttachedEBS(volumeId string) {
func (w *EBSWatcher) notifyAttachedEBS(volumeId string) error {
// TODO: Add the EBS volume to data client
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId)
return
}

if ebs.HasExpired() {
log.Debugf("EBS status expired, no longer tracking EBS volume: %v.", ebs.EBSToString())
return
return fmt.Errorf("Unable to find EBS volume with volume ID: %v within agent state.", volumeId)
}

if ebs.IsSent() {
log.Debugf("State change event has already been emitted for EBS volume: %v.", ebs.EBSToString())
return
if !ebs.ShouldNotify() {
return nil
}
// We found an EBS volume which has the expiration time set in future and
// needs to be acknowledged as having been 'attached' to the Instance
if err := w.sendEBSStateChange(ebs); err != nil {
log.Warnf("Unable to send state EBS change, %s", err)
return
return fmt.Errorf("Unable to send state EBS change, %s", err)
}
ebs.SetSentStatus()
log.Infof("We've set sent status for %v", ebs.EBSToString())
ebs.StopAckTimer()
log.Infof("We've set sent status for %v", ebs.EBSToString())
return nil
}

// removeEBSAttachment removes a EBS volume with a specific volume ID
Expand Down Expand Up @@ -314,10 +304,15 @@ func (w *EBSWatcher) emitEBSAttachedEvent(ebsvol *apiebs.ResourceAttachment) {
ClusterARN: ebsvol.GetClusterARN(),
ContainerInstanceARN: ebsvol.GetContainerInstanceARN(),
}
eniWrapper := apieni.ENIAttachment{AttachmentInfo: attachmentInfo}
// TODO update separate out ENI and EBS attachment types in attachment
// handler. For now we use fake task ENI with dummy fields
eniWrapper.AttachmentType = apieni.ENIAttachmentTypeTaskENI
eniWrapper.MACAddress = "ebs1"
eniWrapper.StartTimer(func() {})
attachmentChange := ecsapi.AttachmentStateChange{
Attachment: &apieni.ENIAttachment{AttachmentInfo: attachmentInfo},
Attachment: &eniWrapper,
}

log.Debugf("Emitting EBS volume attached event for: %v", ebsvol)
w.taskEngine.StateChangeEvents() <- attachmentChange
}
4 changes: 2 additions & 2 deletions agent/engine/dockerstate/docker_task_engine_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachments() []*apiresource
func (state *DockerTaskEngineState) allPendingEBSAttachmentsUnsafe() []*apiresource.ResourceAttachment {
var pendingEBSAttachments []*apiresource.ResourceAttachment
for _, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments = append(pendingEBSAttachments, v)
}
}
Expand All @@ -319,7 +319,7 @@ func (state *DockerTaskEngineState) GetAllPendingEBSAttachmentsWithKey() map[str
func (state *DockerTaskEngineState) allPendingEBSAttachmentsWithKeyUnsafe() map[string]*apiresource.ResourceAttachment {
pendingEBSAttachments := make(map[string]*apiresource.ResourceAttachment)
for k, v := range state.ebsAttachments {
if !v.IsAttached() && !v.IsSent() {
if !v.IsAttached() || !v.IsSent() {
pendingEBSAttachments[k] = v
}
}
Expand Down
46 changes: 46 additions & 0 deletions agent/engine/dockerstate/dockerstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,52 @@ func TestAddPendingEBSAttachment(t *testing.T) {

}

func TestAddPendingEBSAttachmentExclusion(t *testing.T) {
state := NewTaskEngineState()

testSentAttachmentProperties := map[string]string{
apiresource.VolumeNameKey: "myCoolVolume",
apiresource.SourceVolumeHostPathKey: "/testpath2",
apiresource.VolumeSizeGibKey: "7",
apiresource.DeviceNameKey: "/dev/nvme1n0",
apiresource.VolumeIdKey: "vol-456",
apiresource.FileSystemKey: "testXFS",
}

// not attached but sent should be included (||)
sentAttachment := &apiresource.ResourceAttachment{
AttachmentInfo: attachmentinfo.AttachmentInfo{
TaskARN: "taskarn1",
AttachmentARN: "ebs1",
AttachStatusSent: true,
Status: status.AttachmentNone,
},
AttachmentProperties: testAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

// attached and sent attachment should be excluded (&&)
foundAttachment := &apiresource.ResourceAttachment{
AttachmentInfo: attachmentinfo.AttachmentInfo{
TaskARN: "taskarn2",
AttachmentARN: "ebs2",
AttachStatusSent: true,
Status: status.AttachmentAttached,
},
AttachmentProperties: testSentAttachmentProperties,
AttachmentType: apiresource.EBSTaskAttach,
}

state.AddEBSAttachment(foundAttachment)
state.AddEBSAttachment(sentAttachment)
assert.Len(t, state.(*DockerTaskEngineState).GetAllPendingEBSAttachments(), 1)
assert.Len(t, state.(*DockerTaskEngineState).GetAllPendingEBSAttachmentsWithKey(), 1)
assert.Len(t, state.(*DockerTaskEngineState).GetAllEBSAttachments(), 2)

_, ok := state.(*DockerTaskEngineState).GetAllPendingEBSAttachmentsWithKey()["vol-123"]
assert.True(t, ok)
}

func TestTwophaseAddContainer(t *testing.T) {
state := NewTaskEngineState()
testTask := &apitask.Task{Arn: "test", Containers: []*apicontainer.Container{{
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions ecs-agent/api/resource/resource_attachment.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,19 @@ func (ra *ResourceAttachment) GetContainerInstanceARN() string {

return ra.ContainerInstanceARN
}

// should attach when not attached, and not sent/not expired
func (ra *ResourceAttachment) ShouldAttach() bool {
ra.guard.RLock()
defer ra.guard.RUnlock()

return !(ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt))
}

// should notify when attached, and not sent/not expired
func (ra *ResourceAttachment) ShouldNotify() bool {
ra.guard.RLock()
defer ra.guard.RUnlock()

return (ra.Status == status.AttachmentAttached) && !ra.AttachStatusSent && !(time.Now().After(ra.ExpiresAt))
}

0 comments on commit 7a0d116

Please sign in to comment.