-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Beats state reporting #7075
Conversation
func (vs *KeyValueVisitor) OnBool(b bool) { vs.cb(vs.getName(), b) } | ||
func (vs *KeyValueVisitor) OnNil() { vs.cb(vs.getName(), nil) } | ||
func (vs *KeyValueVisitor) OnInt(i int64) { vs.cb(vs.getName(), i) } | ||
func (vs *KeyValueVisitor) OnFloat(f float64) { vs.cb(vs.getName(), f) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method KeyValueVisitor.OnFloat should have comment or be unexported
func (vs *KeyValueVisitor) OnString(s string) { vs.cb(vs.getName(), s) } | ||
func (vs *KeyValueVisitor) OnBool(b bool) { vs.cb(vs.getName(), b) } | ||
func (vs *KeyValueVisitor) OnNil() { vs.cb(vs.getName(), nil) } | ||
func (vs *KeyValueVisitor) OnInt(i int64) { vs.cb(vs.getName(), i) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method KeyValueVisitor.OnInt should have comment or be unexported
func (vs *KeyValueVisitor) OnFloat(f float64) { vs.cb(vs.getName(), f) } | ||
func (vs *KeyValueVisitor) OnString(s string) { vs.cb(vs.getName(), s) } | ||
func (vs *KeyValueVisitor) OnBool(b bool) { vs.cb(vs.getName(), b) } | ||
func (vs *KeyValueVisitor) OnNil() { vs.cb(vs.getName(), nil) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method KeyValueVisitor.OnNil should have comment or be unexported
func (vs *KeyValueVisitor) OnInt(i int64) { vs.cb(vs.getName(), i) } | ||
func (vs *KeyValueVisitor) OnFloat(f float64) { vs.cb(vs.getName(), f) } | ||
func (vs *KeyValueVisitor) OnString(s string) { vs.cb(vs.getName(), s) } | ||
func (vs *KeyValueVisitor) OnBool(b bool) { vs.cb(vs.getName(), b) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method KeyValueVisitor.OnBool should have comment or be unexported
func (vs *KeyValueVisitor) OnNil() { vs.cb(vs.getName(), nil) } | ||
func (vs *KeyValueVisitor) OnInt(i int64) { vs.cb(vs.getName(), i) } | ||
func (vs *KeyValueVisitor) OnFloat(f float64) { vs.cb(vs.getName(), f) } | ||
func (vs *KeyValueVisitor) OnString(s string) { vs.cb(vs.getName(), s) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method KeyValueVisitor.OnString should have comment or be unexported
@@ -50,3 +51,8 @@ func ReportFloat(V Visitor, name string, value float64) { | |||
V.OnKey(name) | |||
V.OnFloat(value) | |||
} | |||
|
|||
func ReportStringSlice(V Visitor, name string, value []string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function ReportStringSlice should have comment or be unexported
libbeat/monitoring/namespace.go
Outdated
} | ||
} | ||
|
||
func (n *Namespaces) Get(key string) *Namespace { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Namespaces.Get should have comment or be unexported
libbeat/monitoring/namespace.go
Outdated
namespaces map[string]*Namespace | ||
} | ||
|
||
func NewNamespaces() *Namespaces { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function NewNamespaces should have comment or be unexported
libbeat/monitoring/namespace.go
Outdated
@@ -37,3 +35,25 @@ func (n *Namespace) GetRegistry() *Registry { | |||
} | |||
return n.registry | |||
} | |||
|
|||
type Namespaces struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type Namespaces should have comment or be unexported
@pickypg @tsullivan I think we should have a different reporting interval for the beat state. Any recommendations on the interval? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be a great source of info, It may be especially useful to debug autodiscover status at some point 🎉
metricbeat/mb/module/runner.go
Outdated
} | ||
|
||
func reportModules(m monitoring.Mode, V monitoring.Visitor) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty line here
@@ -24,6 +24,7 @@ func Start(cfg *common.Config) { | |||
|
|||
// register handlers | |||
mux.HandleFunc("/", rootHandler()) | |||
mux.HandleFunc("/state", stateHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would make more sense to call this /modules
, do you plan to add anything else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of also adding information about the datasets, the output used. But not sure where it will evolve into. We probably need soon a more general discussion about api endpoint naming and structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, I guess we don't have a hard commit on these names, so we can change them later if needed (?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, API endpoints are not documented so far and we can still change them.
func (vs *KeyValueVisitor) OnNil() { vs.cb(vs.getName(), nil) } | ||
func (vs *KeyValueVisitor) OnInt(i int64) { vs.cb(vs.getName(), i) } | ||
func (vs *KeyValueVisitor) OnFloat(f float64) { vs.cb(vs.getName(), f) } | ||
func (vs *KeyValueVisitor) OnStringSlice(f []string) { vs.cb(vs.getName(), f) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method KeyValueVisitor.OnStringSlice should have comment or be unexported
@@ -217,6 +216,7 @@ func (r *reporter) snapshotLoop() { | |||
r.client.Publish(beat.Event{ | |||
Timestamp: ts, | |||
Fields: fields, | |||
Meta: common.MapStr{"type": "beats_" + namespace, "interval_ms": period * time.Millisecond}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pickypg I'm trying to work around the requirement that the interval
has to be set as a request param and instead have it directly as part of the event. The reasons is that I try to reuse the the elasticsearch client / reporter and we set the params only once on setup and cannot pass them dynamically. I can modify the code to have a client for each reporting type but was curious if there is a work around already today in the xpack endpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not. The parameter is required to be passed as a querystring field.
NOTE: I'm ignoring HoundCI on the very short interface methods on purpose. |
Perhaps once every 5 minutes? |
}) | ||
} | ||
|
||
statsPipeline, err := getPipeline("stats", config.StatsPeriod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso This code works but is not very nice. The reason I have to define 2 different pipelines is because the interval params is different and the monitoring registry conflicts. I would prefer to have only 1 pipeline but be able to set the params on request. Are there other options?
ab6edff
to
1c27307
Compare
|
||
var namespaces = struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsoriano This code change as I also already had it change in this PR. Let me now if my implementation also looks good to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
params[k] = v | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about us copying/allocating contents if required only?
This loop rebuilds params every single time, for every event. If the batch contains multiple events, with only one event having params set, we might loose some params again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is c.params and params via 'meta' always configured? If not, we can reduce some copying here:
eventParams := func(meta common.MapStr) map[string]string {
tmp, err := event.Content.Meta.GetValue("params")
if err != nil {
return nil
}
p, ok := tmp.(map[string]string)
if !ok {
return nil
}
return p
}
cpyKV := func(to map[string]string, from map[string]string) {
for k, v := range from {
to[k] = v
}
}
params = c.params
if userParams := eventParams(event.Content.Meta); len(userParams) > 0 {
if len(c.params) > 0 {
params = make(map[string]string, len(params) + len(userParams))
cpyKV(params, c.params)
cpyKV(params, userParams)
} else {
params = userParams
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As each event contains the interval param in meta and it's required at the moment, I think we have to reprocess it every time.
What do you mean by we loose
params? As it's reprocessed every time and we don't modify c.params, all "base" params should always be there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl'dr: this PR might break x-pack monitoring.
The HTTP parameters are global, while this PR tries to set parameters per event. As intervals of the different reporters overlap, the HTTP parameters will eventually be filled by one or the other reporter at some point in time. Plus params are reset on every event being processed in a batch.
For this to work, we either need to ensure some grouping by common event parameters. Either by:
- having a client/pipeline per 'loop'
- or by doing some more grouping in the client, such that multiple bulk requests will
- do not use bulk API or do bulk API request per event
- better, don't have monitoring enforce global parameters, while in reality we want to provide per event parameters/meta-data
libbeat/_meta/config.reference.yml
Outdated
@@ -982,6 +982,9 @@ logging.files: | |||
# never, once, and freely. Default is never. | |||
#ssl.renegotiation: never | |||
|
|||
#stats.period: 10s | |||
#state.period: 1m |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stats/state
? I wonder how many users will get confused by this (or how many typos we will see). How about metrics.period
and state.period
?
@pickypg It seems because of the interval param that we have to send each type separately and can't send it in one bulk request. I wonder if moving forward we could make the |
1e4d3e7
to
e76ecc2
Compare
@urso This should be ready for an other round of reviews. |
bulk := make([]interface{}, 0, 2*len(events)) | ||
var failed []publisher.Event | ||
var reason error | ||
var params map[string]string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need the global 'params' variable? It's re-initialized in the loop for every bulk request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this resolved also my comment above.
@@ -1218,6 +1218,9 @@ logging.files: | |||
# never, once, and freely. Default is never. | |||
#ssl.renegotiation: never | |||
|
|||
#metrics.period: 10s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso I have second thoughts if we even want to add this to the config file, meaning should we even recommend users to change it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the reference config. I don't really see a problem in configuring the interval. Allows users to reduce number of documents for monitoring if required.
@urso Ready for an other round. I will follow up with the ES team to potentially change the requirement on the xpack _bulk endpoint to simplify this implementation. |
As soon as this PR is merged, the monitoring template in Elasticsearch has to be updated. |
Currently in monitoring or the API endpoint it's not visible which modules are running. This PR adds and endpoint `/state` which shows the following data: ``` { "module": { "count": 3, "names": [ "system" ] } } ``` The names are a unique list of module names that are running and the count is a total of all the modules that are running. In the above case it's 3 as the system module is started by default 3 times with different configs / periods. Currently this reporting only works for Metricbeat. The same state metrics are reported to X-Pack Monitoring. The data is reported every minute by default. Additional changes: * Introduce ArraySlice support in metrics * Improve namespace handling to prevent races * Add monitoring reporter for state endpoint * Add config options for reporting frequency. The old config option was removed. I don't consider this a breaking change as the old option was never document and not intended to be changed.
PR for template can be found here: elastic/elasticsearch#31809 |
With elastic/beats#7075 Beats introduces state reporting for X-Pack Monitoring. The data sent up to Elasticsearch ends up stored in the following format. ``` "beats_state": { "timestamp": "2018-07-05T07:21:03.581Z", "state": { "module": { "count": 1, "names": [ "http" ] } }, "beat": { "uuid": "594039b5-6353-4d78-9bad-778ecc0fe83f", "type": "metricbeat", "version": "7.0.0-alpha1", "name": "ruflin", "host": "ruflin" } } ``` This PR adds the new fields to the template.
…1809) (#31893) With elastic/beats#7075 Beats introduces state reporting for X-Pack Monitoring. The data sent up to Elasticsearch ends up stored in the following format. ``` "beats_state": { "timestamp": "2018-07-05T07:21:03.581Z", "state": { "module": { "count": 1, "names": [ "http" ] } }, "beat": { "uuid": "594039b5-6353-4d78-9bad-778ecc0fe83f", "type": "metricbeat", "version": "7.0.0-alpha1", "name": "ruflin", "host": "ruflin" } } ``` This PR adds the new fields to the template.
With elastic/beats#7075 Beats introduces state reporting for X-Pack Monitoring. The data sent up to Elasticsearch ends up stored in the following format. ``` "beats_state": { "timestamp": "2018-07-05T07:21:03.581Z", "state": { "module": { "count": 1, "names": [ "http" ] } }, "beat": { "uuid": "594039b5-6353-4d78-9bad-778ecc0fe83f", "type": "metricbeat", "version": "7.0.0-alpha1", "name": "ruflin", "host": "ruflin" } } ``` This PR adds the new fields to the template.
…1809) With elastic/beats#7075 Beats introduces state reporting for X-Pack Monitoring. The data sent up to Elasticsearch ends up stored in the following format. ``` "beats_state": { "timestamp": "2018-07-05T07:21:03.581Z", "state": { "module": { "count": 1, "names": [ "http" ] } }, "beat": { "uuid": "594039b5-6353-4d78-9bad-778ecc0fe83f", "type": "metricbeat", "version": "7.0.0-alpha1", "name": "ruflin", "host": "ruflin" } } ``` This PR adds the new fields to the template.
Currently in monitoring or the API endpoint it's not visible which modules are running. This PR adds and endpoint
/state
which shows the following data:The names are a unique list of module names that are running and the count is a total of all the modules that are running. In the above case it's 3 as the system module is started by default 3 times with different configs / periods. Currently this reporting only works for Metricbeat.
The same state metrics are reported to X-Pack Monitoring. The data is reported every minute by default.
Additional changes: