-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cherry-pick #20842 to 7.x: [Elastic Agent] Add docker composable dyna…
…mic provider. (#21117) * [Elastic Agent] Add docker composable dynamic provider. (#20842) * Add docker provider. * Add changelog. * Update docker start message to info. (cherry picked from commit f017e24) * Fix docker provider builder.
- Loading branch information
1 parent
772e790
commit c4121e5
Showing
6 changed files
with
248 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
x-pack/elastic-agent/pkg/composable/providers/docker/config.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package docker | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common/docker" | ||
) | ||
|
||
// Config for docker provider | ||
type Config struct { | ||
Host string `config:"host"` | ||
TLS *docker.TLSConfig `config:"ssl"` | ||
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"` | ||
} | ||
|
||
// InitDefaults initializes the default values for the config. | ||
func (c *Config) InitDefaults() { | ||
c.Host = "unix:///var/run/docker.sock" | ||
c.CleanupTimeout = 60 * time.Second | ||
} |
153 changes: 153 additions & 0 deletions
153
x-pack/elastic-agent/pkg/composable/providers/docker/docker.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package docker | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/common/bus" | ||
"github.com/elastic/beats/v7/libbeat/common/docker" | ||
"github.com/elastic/beats/v7/libbeat/common/safemapstr" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
) | ||
|
||
func init() { | ||
composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder) | ||
} | ||
|
||
type dockerContainerData struct { | ||
container *docker.Container | ||
mapping map[string]interface{} | ||
processors []map[string]interface{} | ||
} | ||
type dynamicProvider struct { | ||
logger *logger.Logger | ||
config *Config | ||
} | ||
|
||
// Run runs the environment context provider. | ||
func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { | ||
watcher, err := docker.NewWatcher(c.logger, c.config.Host, c.config.TLS, false) | ||
if err != nil { | ||
// info only; return nil (do nothing) | ||
c.logger.Infof("Docker provider skipped, unable to connect: %s", err) | ||
return nil | ||
} | ||
startListener := watcher.ListenStart() | ||
stopListener := watcher.ListenStop() | ||
stoppers := map[string]*time.Timer{} | ||
stopTrigger := make(chan *dockerContainerData) | ||
|
||
if err := watcher.Start(); err != nil { | ||
// info only; return nil (do nothing) | ||
c.logger.Infof("Docker provider skipped, unable to connect: %s", err) | ||
return nil | ||
} | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <-comm.Done(): | ||
startListener.Stop() | ||
stopListener.Stop() | ||
|
||
// Stop all timers before closing the channel | ||
for _, stopper := range stoppers { | ||
stopper.Stop() | ||
} | ||
close(stopTrigger) | ||
return | ||
case event := <-startListener.Events(): | ||
data, err := generateData(event) | ||
if err != nil { | ||
c.logger.Errorf("%s", err) | ||
continue | ||
} | ||
if stopper, ok := stoppers[data.container.ID]; ok { | ||
c.logger.Debugf("container %s is restarting, aborting pending stop", data.container.ID) | ||
stopper.Stop() | ||
delete(stoppers, data.container.ID) | ||
return | ||
} | ||
comm.AddOrUpdate(data.container.ID, data.mapping, data.processors) | ||
case event := <-stopListener.Events(): | ||
data, err := generateData(event) | ||
if err != nil { | ||
c.logger.Errorf("%s", err) | ||
continue | ||
} | ||
stopper := time.AfterFunc(c.config.CleanupTimeout, func() { | ||
stopTrigger <- data | ||
}) | ||
stoppers[data.container.ID] = stopper | ||
case data := <-stopTrigger: | ||
if _, ok := stoppers[data.container.ID]; ok { | ||
delete(stoppers, data.container.ID) | ||
} | ||
comm.Remove(data.container.ID) | ||
} | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
// DynamicProviderBuilder builds the dynamic provider. | ||
func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { | ||
var cfg Config | ||
if c == nil { | ||
c = config.New() | ||
} | ||
err := c.Unpack(&cfg) | ||
if err != nil { | ||
return nil, errors.New(err, "failed to unpack configuration") | ||
} | ||
return &dynamicProvider{logger, &cfg}, nil | ||
} | ||
|
||
func generateData(event bus.Event) (*dockerContainerData, error) { | ||
container, ok := event["container"].(*docker.Container) | ||
if !ok { | ||
return nil, fmt.Errorf("unable to get container from watcher event") | ||
} | ||
|
||
labelMap := common.MapStr{} | ||
processorLabelMap := common.MapStr{} | ||
for k, v := range container.Labels { | ||
safemapstr.Put(labelMap, k, v) | ||
processorLabelMap.Put(common.DeDot(k), v) | ||
} | ||
|
||
data := &dockerContainerData{ | ||
container: container, | ||
mapping: map[string]interface{}{ | ||
"container": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": labelMap, | ||
}, | ||
}, | ||
processors: []map[string]interface{}{ | ||
{ | ||
"add_fields": map[string]interface{}{ | ||
"fields": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": processorLabelMap, | ||
}, | ||
"to": "container", | ||
}, | ||
}, | ||
}, | ||
} | ||
return data, nil | ||
} |
64 changes: 64 additions & 0 deletions
64
x-pack/elastic-agent/pkg/composable/providers/docker/docker_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package docker | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/common/bus" | ||
"github.com/elastic/beats/v7/libbeat/common/docker" | ||
) | ||
|
||
func TestGenerateData(t *testing.T) { | ||
container := &docker.Container{ | ||
ID: "abc", | ||
Name: "foobar", | ||
Labels: map[string]string{ | ||
"do.not.include": "true", | ||
"co.elastic.logs/disable": "true", | ||
}, | ||
} | ||
event := bus.Event{ | ||
"container": container, | ||
} | ||
|
||
data, err := generateData(event) | ||
require.NoError(t, err) | ||
mapping := map[string]interface{}{ | ||
"container": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": common.MapStr{ | ||
"do": common.MapStr{"not": common.MapStr{"include": "true"}}, | ||
"co": common.MapStr{"elastic": common.MapStr{"logs/disable": "true"}}, | ||
}, | ||
}, | ||
} | ||
processors := []map[string]interface{}{ | ||
{ | ||
"add_fields": map[string]interface{}{ | ||
"fields": map[string]interface{}{ | ||
"id": container.ID, | ||
"name": container.Name, | ||
"image": container.Image, | ||
"labels": common.MapStr{ | ||
"do_not_include": "true", | ||
"co_elastic_logs/disable": "true", | ||
}, | ||
}, | ||
"to": "container", | ||
}, | ||
}, | ||
} | ||
|
||
assert.Equal(t, container, data.container) | ||
assert.Equal(t, mapping, data.mapping) | ||
assert.Equal(t, processors, data.processors) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters