Skip to content

Commit

Permalink
Channel based docker stats engine implementation (DockerStatsEngine -…
Browse files Browse the repository at this point in the history
…> TCSClient) (#3683)

* Channel based docker stats engine implementation (DockerStatsEngine -> TCSClient)
  • Loading branch information
Realmonia committed May 25, 2023
1 parent a4ae28c commit bc8945d
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 248 deletions.
13 changes: 9 additions & 4 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ const (
asgLifecyclePollWait = time.Minute
asgLifecyclePollMax = 120 // given each poll cycle waits for about a minute, this gives 2-3 hours before timing out

// the size of the buffer for the metric/task health/instance health telemetry channels.
// This values needs to be tuned after all channel related implementation is done.
// By default, TCS (or TACS) will reject metrics that are older than 5 minutes. Since our metrics collection interval
// is currently set to 20 seconds, setting a buffer size of 15 allows us to store exactly 5 minutes of metrics in
// these buffers in the case where we temporarily lose connect to TCS. This value does not change with task number,
// as the number of messages in the channel is equal to the number of times we call `getInstanceMetrics`, which collects
// metrics from all tasks and containers and put them into one TelemetryMessage object.
telemetryChannelDefaultBufferSize = 15
)

Expand Down Expand Up @@ -849,9 +852,8 @@ func (agent *ecsAgent) startAsyncRoutines(

telemetryMessages := make(chan ecstcs.TelemetryMessage, telemetryChannelDefaultBufferSize)
healthMessages := make(chan ecstcs.HealthMessage, telemetryChannelDefaultBufferSize)
instanceStatusMessages := make(chan ecstcs.InstanceStatusMessage, telemetryChannelDefaultBufferSize)

statsEngine := stats.NewDockerStatsEngine(agent.cfg, agent.dockerClient, containerChangeEventStream, telemetryMessages, healthMessages, instanceStatusMessages)
statsEngine := stats.NewDockerStatsEngine(agent.cfg, agent.dockerClient, containerChangeEventStream, telemetryMessages, healthMessages)

// Start serving the endpoint to fetch IAM Role credentials and other task metadata
if agent.cfg.TaskMetadataAZDisabled {
Expand All @@ -873,6 +875,8 @@ func (agent *ecsAgent) startAsyncRoutines(
ECSClient: client,
TaskEngine: taskEngine,
StatsEngine: statsEngine,
MetricsChannel: telemetryMessages,
HealthChannel: healthMessages,
Doctor: doctor,
}

Expand All @@ -881,6 +885,7 @@ func (agent *ecsAgent) startAsyncRoutines(
seelog.Warnf("Error initializing metrics engine: %v", err)
return
}
go statsEngine.StartMetricsPublish()

// Start metrics session in a go routine
go tcshandler.StartMetricsSession(&telemetrySessionParams)
Expand Down
91 changes: 85 additions & 6 deletions agent/stats/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ const (
queueResetThreshold = 2 * dockerclient.StatsInactivityTimeout
hostNetworkMode = "host"
noneNetworkMode = "none"
// defaultPublishServiceConnectTicker is every 3rd time service connect metrics will be sent to the backend
// Task metrics are published at 20s interval, thus task's service metrics will be published 60s.
defaultPublishServiceConnectTicker = 3
// publishMetricsTimeout is the duration that we wait for metrics/health info to be
// pushed to the TCS channels. In theory, this timeout should never be hit since
// the TCS handler should be continually reading from the channels and pushing to
// TCS, but when we lose connection to TCS, these channels back up. In case this
// happens, we need to have a timeout to prevent statsEngine channels from blocking.
publishMetricsTimeout = 1 * time.Second
)

var (
Expand Down Expand Up @@ -100,9 +109,8 @@ type DockerStatsEngine struct {
publishServiceConnectTickerInterval int32
publishMetricsTicker *time.Ticker
// channels to send metrics to TACS Client
metricsChannel chan<- ecstcs.TelemetryMessage
healthChannel chan<- ecstcs.HealthMessage
instanceStatusChannel chan<- ecstcs.InstanceStatusMessage
metricsChannel chan<- ecstcs.TelemetryMessage
healthChannel chan<- ecstcs.HealthMessage
}

// ResolveTask resolves the api task object, given container id.
Expand Down Expand Up @@ -146,8 +154,7 @@ func (resolver *DockerContainerMetadataResolver) ResolveContainer(dockerID strin
// NewDockerStatsEngine creates a new instance of the DockerStatsEngine object.
// MustInit() must be called to initialize the fields of the new event listener.
func NewDockerStatsEngine(cfg *config.Config, client dockerapi.DockerClient, containerChangeEventStream *eventstream.EventStream,
metricsChannel chan<- ecstcs.TelemetryMessage, healthChannel chan<- ecstcs.HealthMessage,
instanceStatusChannel chan<- ecstcs.InstanceStatusMessage) *DockerStatsEngine {
metricsChannel chan<- ecstcs.TelemetryMessage, healthChannel chan<- ecstcs.HealthMessage) *DockerStatsEngine {
return &DockerStatsEngine{
client: client,
resolver: nil,
Expand All @@ -161,7 +168,6 @@ func NewDockerStatsEngine(cfg *config.Config, client dockerapi.DockerClient, con
publishServiceConnectTickerInterval: 0,
metricsChannel: metricsChannel,
healthChannel: healthChannel,
instanceStatusChannel: instanceStatusChannel,
}
}

Expand Down Expand Up @@ -430,6 +436,79 @@ func (engine *DockerStatsEngine) addToStatsContainerMapUnsafe(
return true
}

// StartMetricsPublish starts to collect and publish task and health metrics
func (engine *DockerStatsEngine) StartMetricsPublish() {
if engine.publishMetricsTicker == nil {
seelog.Debug("Skipping reporting metrics through channel. Publish ticker is uninitialized")
return
}

// Publish metrics immediately after we start the loop and wait for ticks. This makes sure TACS side has correct
// TaskCount metrics in CX account (especially for short living tasks)
engine.publishMetrics(false)
engine.publishHealth()

for {
var includeServiceConnectStats bool
metricCounter := engine.GetPublishServiceConnectTickerInterval()
metricCounter++
if metricCounter == defaultPublishServiceConnectTicker {
includeServiceConnectStats = true
metricCounter = 0
}
engine.SetPublishServiceConnectTickerInterval(metricCounter)
select {
case <-engine.publishMetricsTicker.C:
seelog.Debugf("publishMetricsTicker triggered. Sending telemetry messages to tcsClient through channel")
go engine.publishMetrics(includeServiceConnectStats)
go engine.publishHealth()
case <-engine.ctx.Done():
return
}
}
}

func (engine *DockerStatsEngine) publishMetrics(includeServiceConnectStats bool) {
publishMetricsCtx, cancel := context.WithTimeout(engine.ctx, publishMetricsTimeout)
defer cancel()
metricsMetadata, taskMetrics, metricsErr := engine.GetInstanceMetrics(includeServiceConnectStats)
if metricsErr == nil {
metricsMessage := ecstcs.TelemetryMessage{
Metadata: metricsMetadata,
TaskMetrics: taskMetrics,
IncludeServiceConnectStats: includeServiceConnectStats,
}
select {
case engine.metricsChannel <- metricsMessage:
seelog.Debugf("sent telemetry message: %v", metricsMessage)
case <-publishMetricsCtx.Done():
seelog.Errorf("timeout sending telemetry message, discarding metrics")
}
} else {
seelog.Warnf("Error collecting task metrics: %v", metricsErr)
}
}

func (engine *DockerStatsEngine) publishHealth() {
publishHealthCtx, cancel := context.WithTimeout(engine.ctx, publishMetricsTimeout)
defer cancel()
healthMetadata, taskHealthMetrics, healthErr := engine.GetTaskHealthMetrics()
if healthErr == nil {
healthMessage := ecstcs.HealthMessage{
Metadata: healthMetadata,
HealthMetrics: taskHealthMetrics,
}
select {
case engine.healthChannel <- healthMessage:
seelog.Debugf("sent health message: %v", healthMessage)
case <-publishHealthCtx.Done():
seelog.Errorf("timeout sending health message, discarding metrics")
}
} else {
seelog.Warnf("Error collecting health metrics: %v", healthErr)
}
}

// GetInstanceMetrics gets all task metrics and instance metadata from stats engine.
func (engine *DockerStatsEngine) GetInstanceMetrics(includeServiceConnectStats bool) (*ecstcs.MetricsMetadata, []*ecstcs.TaskMetric, error) {
idle := engine.isIdle()
Expand Down
21 changes: 10 additions & 11 deletions agent/stats/engine_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func createRunningTask(networkMode string) *apitask.Task {

func TestStatsEngineWithExistingContainersWithoutHealth(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainersWithoutHealth"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainersWithoutHealth"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestStatsEngineWithExistingContainersWithoutHealth(t *testing.T) {

func TestStatsEngineWithNewContainersWithoutHealth(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"), nil, nil)
defer engine.removeAll()

// Assign ContainerStop timeout to addressable variable
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestStatsEngineWithNewContainersWithoutHealth(t *testing.T) {

func TestStatsEngineWithExistingContainers(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainers"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithExistingContainers"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

Expand Down Expand Up @@ -270,7 +270,7 @@ func TestStatsEngineWithExistingContainers(t *testing.T) {

func TestStatsEngineWithNewContainers(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNewContainers"), nil, nil)
defer engine.removeAll()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestStatsEngineWithNewContainersWithPolling(t *testing.T) {
// Create a new docker client with new config
dockerClientForNewContainersWithPolling, _ := dockerapi.NewDockerGoClient(sdkClientFactory, &cfg, ctx)
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClientForNewContainersWithPolling, eventStream("TestStatsEngineWithNewContainers"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClientForNewContainersWithPolling, eventStream("TestStatsEngineWithNewContainers"), nil, nil)
defer engine.removeAll()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestStatsEngineWithDockerTaskEngine(t *testing.T) {
testTask)

// Create a new docker stats engine
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream, nil, nil, nil)
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream, nil, nil)
err = statsEngine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance)
require.NoError(t, err, "initializing stats engine failed")
defer statsEngine.removeAll()
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestStatsEngineWithDockerTaskEngineMissingRemoveEvent(t *testing.T) {
testTask)

// Create a new docker stats engine
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream, nil, nil, nil)
statsEngine := NewDockerStatsEngine(&cfg, dockerClient, containerChangeEventStream, nil, nil)
err = statsEngine.MustInit(ctx, taskEngine, defaultCluster, defaultContainerInstance)
require.NoError(t, err, "initializing stats engine failed")
defer statsEngine.removeAll()
Expand Down Expand Up @@ -571,9 +571,8 @@ func TestStatsEngineWithDockerTaskEngineMissingRemoveEvent(t *testing.T) {

time.Sleep(checkPointSleep)

// Simulate tcs client invoking GetInstanceMetrics.
_, _, err = statsEngine.GetInstanceMetrics(false)
assert.Error(t, err, "expect error 'no task metrics tp report' when getting instance metrics")
assert.Error(t, err, "expect error 'no task metrics to report' when getting instance metrics")

// Should not contain any metrics after cleanup.
validateIdleContainerMetrics(t, statsEngine)
Expand All @@ -586,7 +585,7 @@ func TestStatsEngineWithNetworkStatsDefaultMode(t *testing.T) {

func testNetworkModeStatsInteg(t *testing.T, networkMode string, statsEmpty bool) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNetworkStats"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithNetworkStats"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

Expand Down Expand Up @@ -667,7 +666,7 @@ func testNetworkModeStatsInteg(t *testing.T, networkMode string, statsEmpty bool

func TestStorageStats(t *testing.T) {
// Create a new docker stats engine
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithStorageStats"), nil, nil, nil)
engine := NewDockerStatsEngine(&cfg, dockerClient, eventStream("TestStatsEngineWithStorageStats"), nil, nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

Expand Down
Loading

0 comments on commit bc8945d

Please sign in to comment.