-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Autodiscover #5245
Autodiscover #5245
Conversation
7344322
to
b7674c4
Compare
|
||
// AddProvider registers a new ProviderBuilder | ||
func (r *registry) AddProvider(name string, provider ProviderBuilder) error { | ||
r.lock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you lock here? Are init
functions called concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, or not at the moment 😇, but some defensive code there won't hurt I think, this may be a thing in the future?
libbeat/common/bus/bus.go
Outdated
// Bus provides a common channel to emit and listen for Events | ||
type Bus interface { | ||
// Emit an event to the bus | ||
Emit(Event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Publish
would be a more expressive name than Emit
, as it is a kind of Publish-Subscribe model.
c946667
to
8eb747a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start for this feature.
@@ -0,0 +1,63 @@ | |||
package common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry a bit that this will conflict with our libbeat common package in some places.
|
||
import ( | ||
"github.com/elastic/beats/libbeat/autodiscover/common" | ||
"github.com/elastic/beats/libbeat/processors/add_docker_metadata" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels strange to include here a processor. Perhaps we should extract the TLS config part to a docker package.
"github.com/elastic/beats/libbeat/common/bus" | ||
"github.com/elastic/beats/libbeat/logp" | ||
|
||
// TODO move add_docker_metadata.Watcher to a common place |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:-D
@@ -32,6 +33,12 @@ type Watcher interface { | |||
|
|||
// Containers returns the list of known containers | |||
Containers() map[string]*Container | |||
|
|||
// ListenStart returns a bus listener to receive container started events, with a `container` key holding it | |||
ListenStart() bus.Listener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should put these in the same interface or one more specific to autodiscovery which is composeo f this Watcher interface.
@exekias I stumbled over the config a bit because of all the indentation it has. Here an alternative suggestion:
The basic idea is to make it more obvious out of what the config will be generated and allow multiple I don't think we should provide the conditions in the first implementation but wanted to add it here to take into account where this could go. I'm not happy with I'm pretty sure the above config brings up other issues as I only focused on the config and not potential implications on the implementation itself. |
ucfg.VarExp, | ||
} | ||
for _, config := range configs { | ||
c, err := ucfg.NewFrom(config, opts...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need ucfg here?
Minor problem with this approach is, you are loosing the full setting paths. If some config unpacking fails, the setting it's full path is reported with the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pass provider event data for variable expansion, so config templates can use data from the event. Agree on the minor issue (would affect logs mainly?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's for logs only. As we can't store file positions (by line) yet, not having the correct names can make it more difficult to track-down misconfigurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discovery can be more automatic than providing docker-image: redis
. if we use docker labels, we should be able to pass annotation
like information same like collectbeat to be able to discover workloads automatically. i could run 10 containers in a host. it would be a huge config if i have to say for each via config how it has to be discovered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree @vjsamuel, this is a first PR, intended to bring the basic functionality and building blocks to grow from there. Definitely that's something we want to have
@exekias on general thought. with autodiscover we should avoid using we should also have some plugability in place for folks who want to run something custom on top of auto discovery. this is similar to collectbeat appenders where we have our own internal appenders more custom tailored for our needs. |
subscribe |
@ruflin I updated config format to take advantage of processor conditions, let me know what you think about it:
|
This is a really great feature as this really great for mid to large scale Docker/Kubernetes installations to auto-configure Beats when container come and go. |
Hi @christiangalsterer, I'm glad you like it! this is something we want to have in the 6.X timeline, probably in a few months |
@exekias How would your example look if multiple conditions are used? I'm still stumbling over the template part but lets separate these discussions :-) |
We had a discussion with @exekias and @urso about the config and that is what we came up with. I added some comments to explain why the decision was made this way:
We couldn't decide if it should be called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change LGTM in general. I would appreciate if we would have some automate tests to see if the discovery works as expected with docker but that could become tricky.
I left some minor comments and some questions mainly for my understanding.
@@ -21,6 +21,8 @@ type Crawler struct { | |||
prospectorConfigs []*common.Config | |||
out channel.Factory | |||
wg sync.WaitGroup | |||
ProspectorsFactory cfgfile.RunnerFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for myself: Need to understand why we need to store the factory now in the Crawler struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer: we use this factory from Autodiscovery. In the long term, we should unify all prospector/module initialization in a common place
filebeat/fileset/factory.go
Outdated
@@ -110,6 +110,7 @@ func (p *prospectorsRunner) Stop() { | |||
prospector.Stop() | |||
} | |||
} | |||
func (p *prospectorsRunner) ID() uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was the ID still used somewhere? If not, perhaps remove it in a different PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't used, it's a spurious implementation, probably forgot from a previous removal
@@ -141,3 +144,7 @@ func (p *Prospector) stop() { | |||
p.prospectorer.Stop() | |||
} | |||
} | |||
|
|||
func (p *Prospector) String() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that a required method or just for convenience / logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for logging purposes only, autodiscover will tell about new spawned prospectors/modules in the log output
// RunnerFactory provides runner creation by feeding valid configs | ||
cfgfile.RunnerFactory | ||
|
||
// StartFilter returns the bus filter to retrieve runner start triggering events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far it seems these are all the same for the current implementation. How will these look for other adapters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know yet 😇, I think this code can be used in other parts of beats, not only to detect new modules to launch, so wanted to keep this open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need the filters methods on the interface? Can the filters be passed when registering the adapter, so to keep the interface somewhat 'smaller'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The adapter is not registered but passed to Autodiscover Manager. If you look at is implementation, it's very small generally, I think it makes sense to keep these filters together with the functions that use it. If in the future we use something different from config we need to update the filters too https://github.com/exekias/beats/blob/411d13a878d64dbe62b6008057bbaa1c207f9e4e/metricbeat/beater/autodiscover.go
libbeat/autodiscover/autodiscover.go
Outdated
} | ||
|
||
// Start autodiscovery process | ||
func (a *Autodiscover) Start() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some idea that popped up in my head when reading this: Will it be needed to enabled / disable autodiscovery dynamically? For example on startup you want the beat to autodiscover all the services that are out there and then stop the discovery for example? Or only use autodiscovery for some time enabled through centralised config? We definitively not need that now, just thinking out loud.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good point, something like modules reload could make sense for autodiscover providers
libbeat/common/bus/bus.go
Outdated
Subscribe(filter ...string) Listener | ||
} | ||
|
||
// Listener retrieves Events from a Bus subscription until Close is called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
until Stop is called?
} | ||
|
||
// New initializes a new bus with the given name and returns it | ||
func New(name string) Bus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the names of the buses you have in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, this name is used just for logging
// event list to send on Events call | ||
events []interface{} | ||
|
||
done chan interface{} | ||
} | ||
|
||
func (m *MockClient) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) { | ||
return m.containers, nil | ||
res := m.containers[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added support for several requests to this mock, each request returns the next response and pops it out the list
"unsafe" | ||
) | ||
|
||
// MapStrPointer stores a pointer to atomically get/set a MapStr object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have prefered to have "optimizations" like this in a follow up PR to have the code we put in as simple as possible to start with and then when we do optimizations we also add some benchmark tests for it. Makes it easier to later detect in case these optimizations causes some issues, if it happened with the optimization or the original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, but as I try to get a stable API to introduce new providers as soon as possible this somehow made sense while I waited for reviews. Once this is merged, autodiscover API should have everything we need
libbeat/processors/condition.go
Outdated
@@ -42,6 +42,10 @@ type WhenProcessor struct { | |||
p Processor | |||
} | |||
|
|||
type ValuesMap interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to myself: Figure out why this change is needed and if it could happen in a separate PR.
libbeat/beat/pipeline.go
Outdated
@@ -34,6 +34,9 @@ type ClientConfig struct { | |||
// Fields provides additional 'global' fields to be added to every event | |||
Fields common.MapStr | |||
|
|||
// DynamicMeta provides additional fields to be added to every event, supporting live updates | |||
DynamicMeta *common.MapStrPointer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce confusion, rather use DynamicFields
. The beat event has Meta
and Fields
. The Meta
are somewhat internal (like LS @metadata
). But DynamicMeta
is updating the event Fields
, not Meta
.
Btw. instead of adding the functionality to the publisher pipeline, you can just pass a processor doing the magic.
Instead of passing DynamicMeta, we could add atomic values support to the normalizer of common.MapStr
. This would allow you to pass updatable fields via Fields
(given you don't need them on top-level).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the input, I agree DynamicFields is the right name.
I thought about the processor but that would cause Copy to be called for all cases, I guess that's not too bad?
libbeat/common/mapstr_pointer.go
Outdated
// replacing it. | ||
type MapStrPointer struct { | ||
p unsafe.Pointer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing the type to:
type VolatileMapStr struct {
p *unsafe.Pointer
}
func MakeVolatileMapStr(m MapStr) VolatileMapStr {
return VolatileMapStr{ p: &unsafe.Pointer(&m) }
}
func (m VolatileMapStr) Get() MapStr {
return *((*MapStr)(atomic.LoadPointer(m.p)))
}
func (m VolatileMapStr) Set(new MapStr) {
atomic.StorePointer(m.p, unsafe.Pointer(&new))
}
Then you can use VolatileMapStr
and MapStr
by value, just like MapStr
is already passed by value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but don't really like the Volatile name, somehow it hides the real thing?
@@ -27,14 +27,14 @@ func NewFactory(maxStartDelay time.Duration, p beat.Pipeline) *Factory { | |||
} | |||
} | |||
|
|||
func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) { | |||
func (r *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Factory.Create should have comment or be unexported
metricbeat/mb/module/connector.go
Outdated
} | ||
|
||
type connectorConfig struct { | ||
Processors processors.PluginConfig `config:"processors"` | ||
common.EventMetadata `config:",inline"` // Fields and tags to add to events. | ||
} | ||
|
||
func NewConnector(pipeline beat.Pipeline, c *common.Config) (*Connector, error) { | ||
func NewConnector(pipeline beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (*Connector, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function NewConnector should have comment or be unexported
metricbeat/beater/autodiscover.go
Outdated
return []string{"start", "config"} | ||
} | ||
|
||
func (m *AutodiscoverAdapter) StopFilter() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method AutodiscoverAdapter.StopFilter should have comment or be unexported
metricbeat/beater/autodiscover.go
Outdated
return m.factory.Create(c, meta) | ||
} | ||
|
||
func (m *AutodiscoverAdapter) StartFilter() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method AutodiscoverAdapter.StartFilter should have comment or be unexported
metricbeat/beater/autodiscover.go
Outdated
return nil | ||
} | ||
|
||
func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method AutodiscoverAdapter.Create should have comment or be unexported
libbeat/common/mapstr_pointer.go
Outdated
return (*MapStr)(atomic.LoadPointer(&m.p)) | ||
} | ||
|
||
func (m *MapStrPointer) Set(p *MapStr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method MapStrPointer.Set should have comment or be unexported
libbeat/common/mapstr_pointer.go
Outdated
} | ||
} | ||
|
||
func (m *MapStrPointer) Get() *MapStr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method MapStrPointer.Get should have comment or be unexported
libbeat/common/mapstr_pointer.go
Outdated
p unsafe.Pointer | ||
} | ||
|
||
func NewMapStrPointer(p *MapStr) *MapStrPointer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function NewMapStrPointer should have comment or be unexported
filebeat/beater/autodiscover.go
Outdated
return []string{"start", "config"} | ||
} | ||
|
||
func (m *AutodiscoverAdapter) StopFilter() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method AutodiscoverAdapter.StopFilter should have comment or be unexported
filebeat/beater/autodiscover.go
Outdated
return m.prospectorFactory.Create(c, meta) | ||
} | ||
|
||
func (m *AutodiscoverAdapter) StartFilter() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method AutodiscoverAdapter.StartFilter should have comment or be unexported
libbeat/cfgfile/registry.go
Outdated
|
||
_, ok := r.List[hash] | ||
return ok | ||
} | ||
|
||
func (r *Registry) Get(hash uint64) Runner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Registry.Get should have comment or be unexported
// Event adapts MapStr to processors.ValuesMap interface | ||
type Event common.MapStr | ||
|
||
func (e Event) GetValue(key string) (interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Event.GetValue should have comment or be unexported
autodiscover.ProviderRegistry.AddProvider("docker", AutodiscoverBuilder) | ||
} | ||
|
||
// Autodiscover implements autodiscover provider for docker containers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported type Provider should be of the form "Provider ..." (with optional leading article)
f10f5cb
to
34c9aa0
Compare
metricbeat/metricbeat.yml
Outdated
# Autodiscover allows you to detect changes in the system and spawn new modules | ||
# as they happen. | ||
|
||
metricbeat.autodiscover: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Autodiscovery doesn't need to be in the short config (which we try to keep minimal), but only in the reference one. Especially since Autodiscovery is going to be in the K8s deployment manifersts, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 that works for me, will remove it from short configs
8f63a35
to
21a0563
Compare
This PR is ready for a (final?) review, I'm currently opening side PRs to shrink this one |
@exekias just merged the other two. Could you rebase on top? If it reduces the hassle of rebasing, feel free to squash it into 1 commit already (or as many as you want in the end). |
a147ee6
to
9f3b4d8
Compare
9f3b4d8
to
26ce229
Compare
26ce229
to
2f59c0b
Compare
@exekias Merged, but please keep in mind to add docs before the 6.1 GA. |
@exekias Awesome stuff. Thanks for making this happen. |
Autodiscover docs were added with #5868, so I'm removing the needs_docs label. |
This PR adds support for Autodiscover, and implements a first provider for Docker.
Autodiscover
Autodiscover allows the user to define different providers to watch for system changes and instantiate new modules as these happen.
Providers watch for changes and emit events to a common bus, then the autodiscovery process detect situations when there is something we can monitor and instantiate a modules for it.
Bus messages contain useful information about what providers see, with some common well-known optional fields (provider specific fields may also be present):
Docker provider
Docker provider watches for docker container events, it supports config mapping from container metadata -> config templates, so new modules are created when a container starts. The available data for a container (that you can use in the config template) is:
Testing
This should be enough to monitor any present and future redis instances that run on docker:
Or reading redis logs only:
You can launch it like this:
This PR is in progress, with some missing parts: