Skip to content

Commit

Permalink
Disable mesos tasks statistics until we find a better way to deal wit…
Browse files Browse the repository at this point in the history
…h them.

Due to quite real problem of generating vast number of data series through
mesos tasks metrics this feature is disabled until better solution is found.
  • Loading branch information
harnash authored and sparrc committed Sep 28, 2016
1 parent 80391bf commit 32268fb
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 96 deletions.
39 changes: 1 addition & 38 deletions plugins/inputs/mesos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,10 @@ For more information, please check the [Mesos Observability Metrics](http://meso
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
## Should tags in slave task metrics be normalized? This will remove UUIDs from
## task_id tag so we don't generate milions of series in InfluxDB, default is false
# slave_tasks_normalize = true
```

By default this plugin is not configured to gather metrics from mesos. Since a mesos cluster can be deployed in numerous ways it does not provide any default
values. User needs to specify master/slave nodes this plugin will gather metrics from. Additionally, enabling `slave_tasks` will allow
gathering metrics from tasks running on specified slaves (this option is disabled by default).
values. User needs to specify master/slave nodes this plugin will gather metrics from.

### Measurements & Fields:

Expand Down Expand Up @@ -238,27 +232,6 @@ Mesos slave metric groups
- slave/valid_framework_messages
- slave/valid_status_updates

Mesos tasks metric groups

- executor_id
- cpus_limit
- cpus_system_time_secs
- cpus_user_time_secs
- mem_anon_bytes
- mem_cache_bytes
- mem_critical_pressure_counter
- mem_file_bytes
- mem_limit_bytes
- mem_low_pressure_counter
- mem_mapped_file_bytes
- mem_medium_pressure_counter
- mem_rss_bytes
- mem_swap_bytes
- mem_total_bytes
- mem_total_memsw_bytes
- mem_unevictable_bytes
- timestamp

### Tags:

- All master/slave measurements have the following tags:
Expand All @@ -268,10 +241,6 @@ Mesos tasks metric groups
- All master measurements have the extra tags:
- state (leader/follower)

- Tasks measurements have the following tags:
- server
- framework_id

### Example Output:
```
$ telegraf -config ~/mesos.conf -input-filter mesos -test
Expand All @@ -295,9 +264,3 @@ master/mem_used=0,master/messages_authenticate=0,
master/messages_deactivate_framework=0 ...
```

Meoso tasks metrics (if enabled):
```
> mesos_tasks,framework_id=20151016-120318-1243483658-5050-6139-0000,host=localhost,server=mesos-1
cpus_limit=0.2,cpus_system_time_secs=84.04,cpus_user_time_secs=1161,executor_id="some_app.5d9f3cf8-6b19-11e6-8d24-0242f3fd597e",
mem_limit_bytes=348127232,mem_rss_bytes=310820864,timestamp=1472572204.22177 1472572204000000000...
```
22 changes: 10 additions & 12 deletions plugins/inputs/mesos/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Mesos struct {
MasterCols []string `toml:"master_collections"`
Slaves []string
SlaveCols []string `toml:"slave_collections"`
SlaveTasks bool
//SlaveTasks bool
}

var allMetrics = map[Role][]string{
Expand Down Expand Up @@ -66,8 +66,6 @@ var sampleConfig = `
# "tasks",
# "messages",
# ]
## Include mesos tasks statistics, default is false
# slave_tasks = true
`

// SampleConfig returns a sample configuration block
Expand Down Expand Up @@ -121,16 +119,16 @@ func (m *Mesos) Gather(acc telegraf.Accumulator) error {
return
}(v)

if !m.SlaveTasks {
continue
}
// if !m.SlaveTasks {
// continue
// }

wg.Add(1)
go func(c string) {
errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
wg.Done()
return
}(v)
// wg.Add(1)
// go func(c string) {
// errorChannel <- m.gatherSlaveTaskMetrics(c, ":5051", acc)
// wg.Done()
// return
// }(v)
}

wg.Wait()
Expand Down
93 changes: 47 additions & 46 deletions plugins/inputs/mesos/mesos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
var masterMetrics map[string]interface{}
var masterTestServer *httptest.Server
var slaveMetrics map[string]interface{}
var slaveTaskMetrics map[string]interface{}

// var slaveTaskMetrics map[string]interface{}
var slaveTestServer *httptest.Server

func randUUID() string {
Expand Down Expand Up @@ -215,31 +216,31 @@ func generateMetrics() {
slaveMetrics[k] = rand.Float64()
}

slaveTaskMetrics = map[string]interface{}{
"executor_id": fmt.Sprintf("task_name.%s", randUUID()),
"executor_name": "Some task description",
"framework_id": randUUID(),
"source": fmt.Sprintf("task_source.%s", randUUID()),
"statistics": map[string]interface{}{
"cpus_limit": rand.Float64(),
"cpus_system_time_secs": rand.Float64(),
"cpus_user_time_secs": rand.Float64(),
"mem_anon_bytes": float64(rand.Int63()),
"mem_cache_bytes": float64(rand.Int63()),
"mem_critical_pressure_counter": float64(rand.Int63()),
"mem_file_bytes": float64(rand.Int63()),
"mem_limit_bytes": float64(rand.Int63()),
"mem_low_pressure_counter": float64(rand.Int63()),
"mem_mapped_file_bytes": float64(rand.Int63()),
"mem_medium_pressure_counter": float64(rand.Int63()),
"mem_rss_bytes": float64(rand.Int63()),
"mem_swap_bytes": float64(rand.Int63()),
"mem_total_bytes": float64(rand.Int63()),
"mem_total_memsw_bytes": float64(rand.Int63()),
"mem_unevictable_bytes": float64(rand.Int63()),
"timestamp": rand.Float64(),
},
}
// slaveTaskMetrics = map[string]interface{}{
// "executor_id": fmt.Sprintf("task_name.%s", randUUID()),
// "executor_name": "Some task description",
// "framework_id": randUUID(),
// "source": fmt.Sprintf("task_source.%s", randUUID()),
// "statistics": map[string]interface{}{
// "cpus_limit": rand.Float64(),
// "cpus_system_time_secs": rand.Float64(),
// "cpus_user_time_secs": rand.Float64(),
// "mem_anon_bytes": float64(rand.Int63()),
// "mem_cache_bytes": float64(rand.Int63()),
// "mem_critical_pressure_counter": float64(rand.Int63()),
// "mem_file_bytes": float64(rand.Int63()),
// "mem_limit_bytes": float64(rand.Int63()),
// "mem_low_pressure_counter": float64(rand.Int63()),
// "mem_mapped_file_bytes": float64(rand.Int63()),
// "mem_medium_pressure_counter": float64(rand.Int63()),
// "mem_rss_bytes": float64(rand.Int63()),
// "mem_swap_bytes": float64(rand.Int63()),
// "mem_total_bytes": float64(rand.Int63()),
// "mem_total_memsw_bytes": float64(rand.Int63()),
// "mem_unevictable_bytes": float64(rand.Int63()),
// "timestamp": rand.Float64(),
// },
// }
}

func TestMain(m *testing.M) {
Expand All @@ -259,11 +260,11 @@ func TestMain(m *testing.M) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(slaveMetrics)
})
slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
})
// slaveRouter.HandleFunc("/monitor/statistics", func(w http.ResponseWriter, r *http.Request) {
// w.WriteHeader(http.StatusOK)
// w.Header().Set("Content-Type", "application/json")
// json.NewEncoder(w).Encode([]map[string]interface{}{slaveTaskMetrics})
// })
slaveTestServer = httptest.NewServer(slaveRouter)

rc := m.Run()
Expand Down Expand Up @@ -323,10 +324,10 @@ func TestMesosSlave(t *testing.T) {
var acc testutil.Accumulator

m := Mesos{
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
SlaveTasks: true,
Timeout: 10,
Masters: []string{},
Slaves: []string{slaveTestServer.Listener.Addr().String()},
// SlaveTasks: true,
Timeout: 10,
}

err := m.Gather(&acc)
Expand All @@ -337,17 +338,17 @@ func TestMesosSlave(t *testing.T) {

acc.AssertContainsFields(t, "mesos", slaveMetrics)

expectedFields := make(map[string]interface{}, len(slaveTaskMetrics["statistics"].(map[string]interface{}))+1)
for k, v := range slaveTaskMetrics["statistics"].(map[string]interface{}) {
expectedFields[k] = v
}
expectedFields["executor_id"] = slaveTaskMetrics["executor_id"]

acc.AssertContainsTaggedFields(
t,
"mesos_tasks",
expectedFields,
map[string]string{"server": "127.0.0.1", "framework_id": slaveTaskMetrics["framework_id"].(string)})
// expectedFields := make(map[string]interface{}, len(slaveTaskMetrics["statistics"].(map[string]interface{}))+1)
// for k, v := range slaveTaskMetrics["statistics"].(map[string]interface{}) {
// expectedFields[k] = v
// }
// expectedFields["executor_id"] = slaveTaskMetrics["executor_id"]

// acc.AssertContainsTaggedFields(
// t,
// "mesos_tasks",
// expectedFields,
// map[string]string{"server": "127.0.0.1", "framework_id": slaveTaskMetrics["framework_id"].(string)})
}

func TestSlaveFilter(t *testing.T) {
Expand Down

0 comments on commit 32268fb

Please sign in to comment.