Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ebs dev #50

Open
wants to merge 16 commits into
base: ebs2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/utils/mobypkgwrapper"
"github.com/aws/amazon-ecs-agent/agent/version"
acsclient "github.com/aws/amazon-ecs-agent/ecs-agent/acs/client"
"github.com/aws/amazon-ecs-agent/ecs-agent/api/attachmentinfo"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
apira "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
"github.com/aws/amazon-ecs-agent/ecs-agent/api/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/doctor"
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
Expand All @@ -71,6 +68,10 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
aws_credentials "github.com/aws/aws-sdk-go/aws/credentials"

"github.com/aws/amazon-ecs-agent/ecs-agent/api/attachmentinfo"
apira "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
"github.com/aws/amazon-ecs-agent/ecs-agent/api/status"

"github.com/cihub/seelog"
"github.com/pborman/uuid"
)
Expand Down Expand Up @@ -541,6 +542,26 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre
AttachmentProperties: tempAttachmentProperties3,
})

tempAttachmentProperties4 := map[string]string{
apira.ResourceTypeName: apira.ElasticBlockStorage,
apira.RequestedSizeName: "10",
apira.VolumeSizeInGiBName: "7",
apira.DeviceName: "/dev/nvme4n1",
apira.VolumeIdName: "vol-0797268b9cfec8c61",
apira.FileSystemTypeName: "testXFS",
}
go agent.ebsWatcher.HandleResourceAttachment(&apira.ResourceAttachment{
AttachmentInfo: attachmentinfo.AttachmentInfo{
AttachmentARN: "dummy-arn4",
Status: status.AttachmentNone,
ExpiresAt: time.Now().Add(20000 * time.Millisecond),
AttachStatusSent: false,
ClusterARN: "dummy-cluster-arn4",
ContainerInstanceARN: "dummy-container-instance-arn4",
},
AttachmentProperties: tempAttachmentProperties4,
})

// Start the acs session, which should block doStart
return agent.startACSSession(credentialsManager, taskEngine,
deregisterInstanceEventStream, client, state, taskHandler, doctor)
Expand Down
7 changes: 2 additions & 5 deletions agent/app/agent_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

asmfactory "github.com/aws/amazon-ecs-agent/agent/asm/factory"
"github.com/aws/amazon-ecs-agent/agent/config"

ebs "github.com/aws/amazon-ecs-agent/agent/ebs"
"github.com/aws/amazon-ecs-agent/agent/ecscni"
"github.com/aws/amazon-ecs-agent/agent/engine"
Expand Down Expand Up @@ -154,11 +155,7 @@ func (agent *ecsAgent) startEBSWatcher(state dockerstate.TaskEngineState, stateC
seelog.Debug("Setting up EBS Watcher...")
if agent.ebsWatcher == nil {
seelog.Debug("Creating new EBS watcher...")
ebsWatcher, err := ebs.NewWatcher(agent.ctx, state, stateChangeEvents)
if err != nil {
return errors.Wrapf(err, "unable to create EBS watcher")
}
agent.ebsWatcher = ebsWatcher
agent.ebsWatcher = ebs.NewWatcher(agent.ctx, state, stateChangeEvents)

go agent.ebsWatcher.Start()
}
Expand Down
7 changes: 2 additions & 5 deletions agent/app/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

asmfactory "github.com/aws/amazon-ecs-agent/agent/asm/factory"
"github.com/aws/amazon-ecs-agent/agent/data"

ebs "github.com/aws/amazon-ecs-agent/agent/ebs"
"github.com/aws/amazon-ecs-agent/agent/ecscni"
"github.com/aws/amazon-ecs-agent/agent/engine"
Expand Down Expand Up @@ -354,11 +355,7 @@ func (agent *ecsAgent) startENIWatcher(state dockerstate.TaskEngineState, stateC
func (agent *ecsAgent) startEBSWatcher(state dockerstate.TaskEngineState, stateChangeEvents chan<- statechange.Event) error {
seelog.Debug("Setting up EBS Watcher...")
if agent.ebsWatcher == nil {
ebsWatcher, err := ebs.NewWatcher(agent.ctx, state, stateChangeEvents)
if err != nil {
return errors.Wrapf(err, "unable to create EBS watcher")
}
agent.ebsWatcher = ebsWatcher
agent.ebsWatcher = ebs.NewWatcher(agent.ctx, state, stateChangeEvents)

go agent.ebsWatcher.Start()
}
Expand Down
177 changes: 85 additions & 92 deletions agent/ebs/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,87 @@ package ebs
import (
"context"
"fmt"
"sync"
"time"

"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/statechange"
apiebs "github.com/aws/amazon-ecs-agent/ecs-agent/api/resource"
log "github.com/cihub/seelog"
"github.com/pkg/errors"
)

type EBSWatcher struct {
ctx context.Context
cancel context.CancelFunc
agentState dockerstate.TaskEngineState
// TODO: The ebsChangeEvent will be used to send over the state change event for EBS attachments once it's been found and mounted/resize/format.
ebsChangeEvent chan<- statechange.Event
// TODO: The dataClient will be used to save to agent's data client as well as start the ACK timer. This will be added once the data client functionality have been added
// dataClient data.Client
ebsChangeEvent chan<- statechange.Event
discoveryClient apiebs.EBSDiscovery
scanTicker *time.Ticker
lock sync.RWMutex
isScanning bool
// scanTickerController *apiebs.ScanTickerController
scanTicker *time.Ticker
}

// NewWatcher is used to return a new instance of the EBSWatcher struct
func NewWatcher(ctx context.Context,
state dockerstate.TaskEngineState,
stateChangeEvents chan<- statechange.Event) (*EBSWatcher, error) {
stateChangeEvents chan<- statechange.Event) *EBSWatcher {
derivedContext, cancel := context.WithCancel(ctx)
discoveryClient := apiebs.NewDiscoveryClient(derivedContext)
// scanTickerController := apiebs.NewScanTickerController()
return &EBSWatcher{
ctx: derivedContext,
cancel: cancel,
agentState: state,
ebsChangeEvent: stateChangeEvents,
discoveryClient: discoveryClient,
isScanning: false,
}, nil
}
}

// Start is used to kick off the periodic scanning process of the EBS volume attachments for the EBS watcher.
// It will be start and continue to run whenever there's a pending EBS volume attachment that hasn't been found.
// If there aren't any, the scan ticker will not start up/scan for volumes.
func (w *EBSWatcher) Start() {
if w.IsScanning() {
log.Info("EBS watcher is already scanning.")
return
}

err := w.SetScanTicker()
if err != nil {
log.Warn("Unable to start scan ticker for EBS watcher")
return
}
log.Info("Starting EBS watcher.")
w.scanTicker = time.NewTicker(apiebs.ScanPeriod)

log.Info("New resource attachment to handle. Starting EBS watcher.")
for {
select {
case <-w.scanTicker.C:
pendingEBS := w.agentState.GetAllPendingEBSAttachmentWithKey()
if len(pendingEBS) > 0 {
log.Info("Pending attachments to be found...")
foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
w.NotifyFound(foundVolumes)
// w.agentState.RemoveExpiredEBSVolumes()
} else {
log.Info("No volumes to find.")
}
case <-w.ctx.Done():
w.StopScanner()
log.Info("EBS Watcher Stopped")
w.scanTicker.Stop()
log.Info("EBS Watcher Stopped due to agent stop")
return
}
}

// go func() {
// for {
// select {
// case <-w.scanTickerController.ScanTicker.C:
// pendingEBS := w.agentState.GetAllPendingEBSAttachmentWithKey()
// foundVolumes := apiebs.ScanEBSVolumes(pendingEBS, w.discoveryClient)
// w.NotifyFound(foundVolumes)
// case <-w.scanTickerController.Done:
// w.scanTickerController.Running = false
// w.scanTickerController.ScanTicker.Stop()
// return
// case <-w.ctx.Done():
// w.scanTickerController.StopScanTicker()
// log.Info("EBS Watcher Stopped due to agent stop")
// return
// }
// }
// }()
}

// Stop will stop the EBS watcher
Expand All @@ -90,96 +105,102 @@ func (w *EBSWatcher) Stop() {
w.cancel()
}

// HandleResourceAttachment processes the resource attachment message. It will:
// 1. Check whether we already have this attachment in state, if so, return
// 2. Otherwise add the attachment to state, start its ack timer, and save to the agent state
// If it's the first pending volume to be added to the agent state, then the EBS watcher will start scanning.
func (w *EBSWatcher) HandleResourceAttachment(ebs *apiebs.ResourceAttachment) error {
empty := len(w.agentState.GetAllPendingEBSAttachments()) == 0
attachmentType := ebs.AttachmentProperties[apiebs.ResourceTypeName]
// wasEmpty := len(w.agentState.GetAllPendingEBSAttachments()) == 0
attachmentType := ebs.GetAttachmentProperties(apiebs.ResourceTypeName)
if attachmentType != apiebs.ElasticBlockStorage {
log.Info("Resource type not Elastic Block Storage. Skip handling resource attachment.")
log.Warnf("Resource type not Elastic Block Storage. Skip handling resource attachment with type: %v.", attachmentType)
return nil
}
volumeID := ebs.AttachmentProperties[apiebs.VolumeIdName]
_, ok := w.agentState.GetEBSByVolumeId(volumeID)

volumeId := ebs.GetAttachmentProperties(apiebs.VolumeIdName)
ebsAttachment, ok := w.agentState.GetEBSByVolumeId(volumeId)
if ok {
log.Infof("EBS Volume attachment already exists. Skip handling EBS attachment %v.", ebs.String())
return nil
log.Infof("EBS Volume attachment already exists. Skip handling EBS attachment %v.", ebs.EBSToString())
return ebsAttachment.StartTimer(func() {
w.handleEBSAckTimeout(volumeId)
})
}

if err := w.addEBSAttachmentToState(ebs); err != nil {
return errors.Wrapf(err, fmt.Sprintf("attach %s message handler: unable to add ebs attachment to engine state: %s",
attachmentType, ebs.String()))
}

if empty && len(w.agentState.GetAllPendingEBSAttachments()) == 1 && !w.IsScanning() {
go w.Start()
time.Sleep(5 * time.Millisecond)
// return errors.Wrapf(err, fmt.Sprintf("attach %s message handler: unable to add ebs attachment to engine state: %s",
// attachmentType, ebs.EBSToString()))
return fmt.Errorf("%w; attach %s message handler: unable to add ebs attachment to engine state: %s",
err, attachmentType, ebs.EBSToString())
}

// // If it was originally empty and now there's a pending EBS volume to scan for.
// if wasEmpty && len(w.agentState.GetAllPendingEBSAttachments()) == 1 {
// go w.Start()
// }
return nil
}

// NotifyFound will go through the list of found EBS volumes from the scanning process and mark them as found.
// Afterwards, it stops the EBS watcher if there are no more EBS volumes to find on the host.
func (w *EBSWatcher) NotifyFound(foundVolumes []string) {
var wg sync.WaitGroup
for _, volumeId := range foundVolumes {
wg.Add(1)
go func() {
defer wg.Done()
w.notifyFoundEBS(volumeId)
}()
}
wg.Wait()
if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
w.StopScanner()
w.notifyFoundEBS(volumeId)
}
// w.checkPendingEBSVolumes()
// if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
// w.scanTickerController.StopScanTicker()
// }
}

// notifyFoundEBS will mark it as found within the agent state
func (w *EBSWatcher) notifyFoundEBS(volumeId string) {
// TODO: Add the EBS volume to data client
ebs, ok := w.agentState.GetEBSByVolumeId(volumeId)
if !ok {
log.Warnf("Unable to find EBS volume with volume ID: %v.", volumeId)
return
}

if ebs.HasExpired() {
log.Warnf("EBS status expired, no longer tracking EBS volume: %v.", ebs.String())
log.Warnf("EBS status expired, no longer tracking EBS volume: %v.", ebs.EBSToString())
return
}

// TODO: This is a placeholder for now until the attachment ACS handler gets implemented
if ebs.IsSent() {
log.Warnf("State change event has already been emitted for EBS volume: %v.", ebs.String())
log.Warnf("State change event has already been emitted for EBS volume: %v.", ebs.EBSToString())
return
}

if ebs.IsAttached() {
log.Infof("EBS volume: %v, has been found already.", ebs.String())
log.Infof("EBS volume: %v, has been found already.", ebs.EBSToString())
return
}

ebs.StopAckTimer()
ebs.SetAttachedStatus()

log.Infof("Successfully found attached EBS volume: %v", ebs.String())
log.Infof("Successfully found attached EBS volume: %v", ebs.EBSToString())
}

// RemoveAttachment will stop tracking an EBS attachment
// func (w *EBSWatcher) RemoveAttachment(volumeID string) {
// w.mailbox <- func() {
// w.removeEBSAttachment(volumeID)
// }
// }

func (w *EBSWatcher) removeEBSAttachment(volumeID string) {
// TODO: Remove the EBS volume from the data client.
w.agentState.RemoveEBSAttachment(volumeID)
if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
log.Info("No more attachments to scan for. Stopping scan ticker.")
w.StopScanner()
}
// w.checkPendingEBSVolumes()
// if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
// w.scanTickerController.StopScanTicker()
// }
}

// func (w *EBSWatcher) checkPendingEBSVolumes() {
// if len(w.agentState.GetAllPendingEBSAttachments()) == 0 {
// // log.Info("No more attachments to scan for. Stopping scan ticker.")
// w.scanTickerController.StopScanTicker()
// }
// }

// addEBSAttachmentToState adds an EBS attachment to state, and start its ack timer
func (w *EBSWatcher) addEBSAttachmentToState(ebs *apiebs.ResourceAttachment) error {
volumeId := string(ebs.AttachmentProperties[apiebs.VolumeIdName])
volumeId := ebs.AttachmentProperties[apiebs.VolumeIdName]
err := ebs.StartTimer(func() {
w.handleEBSAckTimeout(volumeId)
})
Expand All @@ -199,35 +220,7 @@ func (w *EBSWatcher) handleEBSAckTimeout(volumeId string) {
return
}
if !ebsAttachment.IsSent() {
log.Warnf("Timed out waiting for EBS ack; removing EBS attachment record %v", ebsAttachment.String())
// w.RemoveAttachment(volumeId)
log.Warnf("Timed out waiting for EBS ack; removing EBS attachment record %v.", ebsAttachment.EBSToString())
w.removeEBSAttachment(volumeId)
}
}

func (w *EBSWatcher) IsScanning() bool {
w.lock.RLock()
defer w.lock.RUnlock()
return w.isScanning
}

func (w *EBSWatcher) StopScanner() {
w.lock.Lock()
defer w.lock.Unlock()
if w.scanTicker != nil {
w.scanTicker.Stop()
w.isScanning = false
}
return
}

func (w *EBSWatcher) SetScanTicker() error {
w.lock.Lock()
defer w.lock.Unlock()
if w.scanTicker != nil {
return errors.New("unable to start scan ticker")
}
w.scanTicker = time.NewTicker(apiebs.ScanPeriod)
w.isScanning = true
return nil
}
Loading
Loading