Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add container meta info on module level #2682

Merged
merged 4 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 44 additions & 30 deletions metricbeat/beater/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,41 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/metricbeat/mb"
)

const (
defaultType = "metricsets"
)

// eventBuilder is used for building MetricSet events. MetricSets generate a
// EventBuilder is used for building MetricSet events. MetricSets generate a
// data in the form of a common.MapStr. This builder transforms that data into
// a complete event and applies any Module-level filtering.
type eventBuilder struct {
moduleName string
metricSetName string
host string
startTime time.Time
fetchDuration time.Duration
event common.MapStr
type EventBuilder struct {
ModuleName string
MetricSetName string
Host string
StartTime time.Time
FetchDuration time.Duration
Event common.MapStr
fetchErr error
filters *processors.Processors
metadata common.EventMetadata
}

// build builds an event from MetricSet data and applies the Module-level
// Build builds an event from MetricSet data and applies the Module-level
// filters.
func (b eventBuilder) build() (common.MapStr, error) {
func (b EventBuilder) Build() (common.MapStr, error) {
// event may be nil when there was an error fetching.
event := b.event
event := b.Event
if event == nil {
event = common.MapStr{} // TODO (akroh): do we want to send an empty event field?
}

// Get and remove meta fields from the event created by the MetricSet.
indexName := getIndex(event, "")
typeName := getType(event, defaultType)
timestamp := getTimestamp(event, common.Time(b.startTime))
timestamp := getTimestamp(event, common.Time(b.StartTime))

// Apply filters.
if b.filters != nil {
Expand All @@ -48,21 +49,34 @@ func (b eventBuilder) build() (common.MapStr, error) {
}
}

event = common.MapStr{
"@timestamp": timestamp,
"type": typeName,
// Checks if additional meta information is provided by the MetricSet under the key ModuleData
// This is based on the convention that each MetricSet can provide module data under the key ModuleData
moduleData, moudleDataExists := event[mb.ModuleData]
if moudleDataExists {
delete(event, mb.ModuleData)
}

event = common.MapStr{
"@timestamp": timestamp,
"type": typeName,
common.EventMetadataKey: b.metadata,
b.moduleName: common.MapStr{
b.metricSetName: event,
b.ModuleName: common.MapStr{
b.MetricSetName: event,
},
"metricset": common.MapStr{
"module": b.moduleName,
"name": b.metricSetName,
"rtt": b.fetchDuration.Nanoseconds() / int64(time.Microsecond),
"module": b.ModuleName,
"name": b.MetricSetName,
"rtt": b.FetchDuration.Nanoseconds() / int64(time.Microsecond),
},
}

// In case meta data exists, it is added on the module level
if moudleDataExists {
if _, ok := moduleData.(common.MapStr); ok {
event[b.ModuleName].(common.MapStr).Update(moduleData.(common.MapStr))
}
}

// Overwrite default index if set.
if indexName != "" {
event["beat"] = common.MapStr{
Expand All @@ -72,10 +86,10 @@ func (b eventBuilder) build() (common.MapStr, error) {

// Adds host name to event. In case credentials are passed through
// hostname, these are contained in this string.
if b.host != "" {
if b.Host != "" {
// TODO (akroh): allow metricset to specify this value so that
// a proper URL can be specified and passwords be redacted.
event["metricset"].(common.MapStr)["host"] = b.host
event["metricset"].(common.MapStr)["host"] = b.Host
}

// Adds error to event in case error happened
Expand Down Expand Up @@ -135,15 +149,15 @@ func createEvent(
start time.Time,
elapsed time.Duration,
) (common.MapStr, error) {
return eventBuilder{
moduleName: msw.Module().Name(),
metricSetName: msw.Name(),
host: msw.Host(),
startTime: start,
fetchDuration: elapsed,
event: event,
return EventBuilder{
ModuleName: msw.Module().Name(),
MetricSetName: msw.Name(),
Host: msw.Host(),
StartTime: start,
FetchDuration: elapsed,
Event: event,
fetchErr: fetchErr,
filters: msw.module.filters,
metadata: msw.module.Config().EventMetadata,
}.build()
}.Build()
}
18 changes: 9 additions & 9 deletions metricbeat/beater/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ var (
tags = []string{tag}
)

var builder = eventBuilder{
moduleName: moduleName,
metricSetName: metricSetName,
var builder = EventBuilder{
ModuleName: moduleName,
MetricSetName: metricSetName,
// host
startTime: startTime,
fetchDuration: elapsed,
StartTime: startTime,
FetchDuration: elapsed,
// event
// fetchErr
// processors
Expand All @@ -39,8 +39,8 @@ var builder = eventBuilder{

func TestEventBuilder(t *testing.T) {
b := builder
b.host = host
event, err := b.build()
b.Host = host
event, err := b.Build()
if err != nil {
t.Fatal(err)
}
Expand All @@ -60,7 +60,7 @@ func TestEventBuilder(t *testing.T) {
func TestEventBuilderError(t *testing.T) {
b := builder
b.fetchErr = errFetch
event, err := b.build()
event, err := b.Build()
if err != nil {
t.Fatal(err)
}
Expand All @@ -70,7 +70,7 @@ func TestEventBuilderError(t *testing.T) {

func TestEventBuilderNoHost(t *testing.T) {
b := builder
event, err := b.build()
event, err := b.Build()
if err != nil {
t.Fatal(err)
}
Expand Down
134 changes: 5 additions & 129 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -802,37 +802,6 @@ Runtime cpu metrics.



[float]
== container Fields

Container meta data.



[float]
=== docker.cpu.container.id

type: keyword

Unique container id.


[float]
=== docker.cpu.container.name

type: keyword

Container name.


[float]
=== docker.cpu.container.socket

type: keyword

The daemon socket option. By default, docker daemon listens on the Unix socket : unix:///var/run/docker.sock.


[float]
== usage Fields

Expand Down Expand Up @@ -871,37 +840,6 @@ Diskio metrics.



[float]
== container Fields

Container meta data.



[float]
=== docker.diskio.container.id

type: keyword

Unique container id.


[float]
=== docker.diskio.container.name

type: keyword

Container name.


[float]
=== docker.diskio.container.socket

type: keyword

The daemon socket option. By default, docker daemon listens on the Unix socket : unix:///var/run/docker.sock.


[float]
=== docker.diskio.reads

Expand Down Expand Up @@ -996,37 +934,6 @@ Memory metrics.



[float]
== container Fields

Container meta data.



[float]
=== docker.memory.container.id

type: keyword

Unique container id.


[float]
=== docker.memory.container.name

type: keyword

Container name.


[float]
=== docker.memory.container.socket

type: keyword

The daemon socket option. By default, docker daemon listens on the Unix socket : unix:///var/run/docker.sock.


[float]
=== docker.memory.fail.count

Expand All @@ -1044,22 +951,22 @@ Memory limit.


[float]
== total Fields
== rss Fields

Total memory stats.
Rss memory stats.



[float]
=== docker.memory.total.rss
=== docker.memory.rss.total

type: long

Memory resident set size.
Total memory resident set size.


[float]
=== docker.memory.total.rss.pct
=== docker.memory.rss.pct

type: scaled_float

Expand Down Expand Up @@ -1104,37 +1011,6 @@ Netowrk metrics.



[float]
== container Fields

Container meta data.



[float]
=== docker.network.container.id

type: keyword

Unique container id.


[float]
=== docker.network.container.name

type: keyword

Container name.


[float]
=== docker.network.container.socket

type: keyword

The daemon socket option. By default, docker daemon listens on the Unix socket : unix:///var/run/docker.sock.


[float]
=== docker.network.interface

Expand Down
Loading