Skip to content

Commit

Permalink
Report metrics from custom-plugin-monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuewei Zhang committed Jul 25, 2019
1 parent fbebcf3 commit 94af7de
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 26 deletions.
1 change: 1 addition & 0 deletions config/custom-plugin-monitor.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"enable_message_change_based_condition_update": false
},
"source": "ntp-custom-plugin-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "NTPProblem",
Expand Down
1 change: 1 addition & 0 deletions config/kernel-monitor-counter.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"concurrency": 1
},
"source": "kernel-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "FrequentUnregisterNetDevice",
Expand Down
1 change: 1 addition & 0 deletions config/network-problem-monitor.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"concurrency": 3
},
"source": "network-custom-plugin-monitor",
"metricsReporting": true,
"conditions": [],
"rules": [
{
Expand Down
1 change: 1 addition & 0 deletions config/systemd-monitor-counter.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"concurrency": 1
},
"source": "systemd-monitor",
"metricsReporting": true,
"conditions": [
{
"type": "FrequentKubeletRestart",
Expand Down
64 changes: 56 additions & 8 deletions pkg/custompluginmonitor/custom_plugin_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/node-problem-detector/pkg/custompluginmonitor/plugin"
cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types"
"k8s.io/node-problem-detector/pkg/problemdaemon"
"k8s.io/node-problem-detector/pkg/problemmetrics"
"k8s.io/node-problem-detector/pkg/types"
"k8s.io/node-problem-detector/pkg/util"
"k8s.io/node-problem-detector/pkg/util/tomb"
Expand Down Expand Up @@ -80,9 +81,31 @@ func NewCustomPluginMonitorOrDie(configPath string) types.Monitor {
c.plugin = plugin.NewPlugin(c.config)
// A 1000 size channel should be big enough.
c.statusChan = make(chan *types.Status, 1000)

if *c.config.EnableMetricsReporting {
initializeProblemMetricsOrDie(c.config.Rules)
}
return c
}

// initializeProblemMetricsOrDie creates problem metrics for all problems and set the value to 0,
// panic if error occurs.
func initializeProblemMetricsOrDie(rules []*cpmtypes.CustomRule) {
for _, rule := range rules {
if rule.Type == types.Perm {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, false)
if err != nil {
glog.Fatalf("Failed to initialize problem gauge metrics for problem %q, reason %q: %v",
rule.Condition, rule.Reason, err)
}
}
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 0)
if err != nil {
glog.Fatalf("Failed to initialize problem counter metrics for %q: %v", rule.Reason, err)
}
}
}

func (c *customPluginMonitor) Start() (<-chan *types.Status, error) {
glog.Info("Start custom plugin monitor")
go c.plugin.Run()
Expand Down Expand Up @@ -120,11 +143,12 @@ func (c *customPluginMonitor) monitorLoop() {
// generateStatus generates status from the plugin check result.
func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Status {
timestamp := time.Now()
var events []types.Event
var activeProblemEvents []types.Event
var inactiveProblemEvents []types.Event
if result.Rule.Type == types.Temp {
// For temporary error only generate event when exit status is above warning
if result.ExitStatus >= cpmtypes.NonOK {
events = append(events, types.Event{
activeProblemEvents = append(activeProblemEvents, types.Event{
Severity: types.Warn,
Timestamp: timestamp,
Reason: result.Rule.Reason,
Expand All @@ -151,7 +175,7 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
}
}

events = append(events, util.GenerateConditionChangeEvent(
inactiveProblemEvents = append(inactiveProblemEvents, util.GenerateConditionChangeEvent(
condition.Type,
status,
defaultConditionReason,
Expand All @@ -165,7 +189,7 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
// change 2: Condition status change from False/Unknown to True
condition.Transition = timestamp
condition.Message = result.Message
events = append(events, util.GenerateConditionChangeEvent(
activeProblemEvents = append(activeProblemEvents, util.GenerateConditionChangeEvent(
condition.Type,
status,
result.Rule.Reason,
Expand All @@ -178,7 +202,7 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
// change 3: Condition status change from False to Unknown or vice versa
condition.Transition = timestamp
condition.Message = result.Message
events = append(events, util.GenerateConditionChangeEvent(
inactiveProblemEvents = append(inactiveProblemEvents, util.GenerateConditionChangeEvent(
condition.Type,
status,
result.Rule.Reason,
Expand All @@ -196,22 +220,46 @@ func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Stat
condition.Transition = timestamp
condition.Reason = result.Rule.Reason
condition.Message = result.Message
events = append(events, util.GenerateConditionChangeEvent(
updateEvent := util.GenerateConditionChangeEvent(
condition.Type,
status,
condition.Reason,
timestamp,
))
)
if condition.Status == types.True {
activeProblemEvents = append(activeProblemEvents, updateEvent)
} else {
inactiveProblemEvents = append(inactiveProblemEvents, updateEvent)
}
}

break
}
}
}
if *c.config.EnableMetricsReporting {
// Increment problem counter only for active problems which just got detected.
for _, event := range activeProblemEvents {
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(
event.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v",
event.Reason, err)
}
}
for _, condition := range c.conditions {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
condition.Type, condition.Reason, condition.Status == types.True)
if err != nil {
glog.Errorf("Failed to update problem gauge metrics for problem %q, reason %q: %v",
condition.Type, condition.Reason, err)
}
}
}
return &types.Status{
Source: c.config.Source,
// TODO(random-liu): Aggregate events and conditions and then do periodically report.
Events: events,
Events: append(activeProblemEvents, inactiveProblemEvents...),
Conditions: c.conditions,
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/custompluginmonitor/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
defaultMaxOutputLength = 80
defaultConcurrency = 3
defaultMessageChangeBasedConditionUpdate = false
defaultEnableMetricsReporting = true

customPluginName = "custom"
)
Expand Down Expand Up @@ -66,6 +67,8 @@ type CustomPluginConfig struct {
DefaultConditions []types.Condition `json:"conditions"`
// Rules are the rules custom plugin monitor will follow to parse and invoke plugins.
Rules []*CustomRule `json:"rules"`
// EnableMetricsReporting describes whether to report problems as metrics or not.
EnableMetricsReporting *bool `json:"metricsReporting,omitempty"`
}

// ApplyConfiguration applies default configurations.
Expand Down Expand Up @@ -112,6 +115,10 @@ func (cpc *CustomPluginConfig) ApplyConfiguration() error {
}
}

if cpc.EnableMetricsReporting == nil {
cpc.EnableMetricsReporting = &defaultEnableMetricsReporting
}

return nil
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/custompluginmonitor/types/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
maxOutputLength := 79
concurrency := 2
messageChangeBasedConditionUpdate := true
disableMetricsReporting := false

ruleTimeout := 1 * time.Second
ruleTimeoutString := ruleTimeout.String()
Expand Down Expand Up @@ -60,6 +61,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
Rules: []*CustomRule{
{
Path: "../plugin/test-data/ok.sh",
Expand Down Expand Up @@ -88,6 +90,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom default timeout": {
Expand All @@ -106,6 +109,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom max output length": {
Expand All @@ -124,6 +128,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom concurrency": {
Expand All @@ -142,6 +147,7 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &concurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"custom message change based condition update": {
Expand All @@ -160,6 +166,24 @@ func TestCustomPluginConfigApplyConfiguration(t *testing.T) {
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &messageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &defaultEnableMetricsReporting,
},
},
"disable metrics reporting": {
Orig: CustomPluginConfig{
EnableMetricsReporting: &disableMetricsReporting,
},
Wanted: CustomPluginConfig{
PluginGlobalConfig: pluginGlobalConfig{
InvokeIntervalString: &defaultInvokeIntervalString,
InvokeInterval: &defaultInvokeInterval,
TimeoutString: &defaultGlobalTimeoutString,
Timeout: &defaultGlobalTimeout,
MaxOutputLength: &defaultMaxOutputLength,
Concurrency: &defaultConcurrency,
EnableMessageChangeBasedConditionUpdate: &defaultMessageChangeBasedConditionUpdate,
},
EnableMetricsReporting: &disableMetricsReporting,
},
},
}
Expand Down
37 changes: 19 additions & 18 deletions pkg/systemlogmonitor/log_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Ru
timestamp := logs[0].Timestamp
message := generateMessage(logs)
var events []types.Event
var changedConditions []*types.Condition
if rule.Type == types.Temp {
// For temporary error only generate event
events = append(events, types.Event{
Expand All @@ -165,12 +166,6 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Ru
Reason: rule.Reason,
Message: message,
})
if *l.config.EnableMetricsReporting {
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v", rule.Reason, err)
}
}
} else {
// For permanent error changes the condition
for i := range l.conditions {
Expand All @@ -188,26 +183,32 @@ func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Ru
rule.Reason,
timestamp,
))

if *l.config.EnableMetricsReporting {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, true)
if err != nil {
glog.Errorf("Failed to update problem gauge metrics for problem %q, reason %q: %v",
rule.Condition, rule.Reason, err)
}
err = problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v", rule.Reason, err)
}
}
}
condition.Status = types.True
condition.Reason = rule.Reason
changedConditions = append(changedConditions, condition)
break
}
}
}

if *l.config.EnableMetricsReporting {
for _, event := range events {
err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(event.Reason, 1)
if err != nil {
glog.Errorf("Failed to update problem counter metrics for %q: %v", event.Reason, err)
}
}
for _, condition := range changedConditions {
err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
condition.Type, condition.Reason, condition.Status == types.True)
if err != nil {
glog.Errorf("Failed to update problem gauge metrics for problem %q, reason %q: %v",
condition.Type, condition.Reason, err)
}
}
}

return &types.Status{
Source: l.config.Source,
// TODO(random-liu): Aggregate events and conditions and then do periodically report.
Expand Down

0 comments on commit 94af7de

Please sign in to comment.