Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort scrapes after configurable timeout #340

Merged
merged 4 commits into from
Aug 3, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ after_build:
return
}
$ErrorActionPreference = "Stop"
$BuildVersion = Get-Content VERSION
# The MSI version is not semver compliant, so just take the numerical parts
$Version = $env:APPVEYOR_REPO_TAG_NAME -replace '^v?([0-9\.]+).*$','$1'
$MSIVersion = $env:APPVEYOR_REPO_TAG_NAME -replace '^v?([0-9\.]+).*$','$1'
foreach($Arch in "amd64","386") {
Write-Verbose "Building wmi_exporter $Version msi for $Arch"
.\installer\build.ps1 -PathToExecutable .\output\$Arch\wmi_exporter-$Version-$Arch.exe -Version $Version -Arch "$Arch"
Move-Item installer\Output\wmi_exporter-$Version-$Arch.msi output\$Arch\
Write-Verbose "Building wmi_exporter $MSIVersion msi for $Arch"
.\installer\build.ps1 -PathToExecutable .\output\$Arch\wmi_exporter-$BuildVersion-$Arch.exe -Version $MSIVersion -Arch "$Arch"
Move-Item installer\Output\wmi_exporter-$MSIVersion-$Arch.msi output\$Arch\
}
- promu checksum output\

Expand Down
146 changes: 133 additions & 13 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package main
import (
"fmt"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -23,7 +25,8 @@ import (

// WmiCollector implements the prometheus.Collector interface.
type WmiCollector struct {
collectors map[string]collector.Collector
maxScrapeDuration time.Duration
collectors map[string]collector.Collector
}

const (
Expand All @@ -45,6 +48,18 @@ var (
[]string{"collector"},
nil,
)
scrapeTimeoutDesc = prometheus.NewDesc(
prometheus.BuildFQName(collector.Namespace, "exporter", "collector_timeout"),
"wmi_exporter: Whether the collector timed out.",
[]string{"collector"},
nil,
)
snapshotDuration = prometheus.NewDesc(
prometheus.BuildFQName(collector.Namespace, "exporter", "perflib_snapshot_duration_seconds"),
"Duration of perflib snapshot capture",
nil,
nil,
)

// This can be removed when client_golang exposes this on Windows
// (See https://github.com/prometheus/client_golang/issues/376)
Expand All @@ -65,21 +80,54 @@ func (coll WmiCollector) Describe(ch chan<- *prometheus.Desc) {
}

// Collect sends the collected metrics from each of the collectors to
// prometheus. Collect could be called several times concurrently
// and thus its run is protected by a single mutex.
// prometheus.
func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
t := time.Now()
scrapeContext, err := collector.PrepareScrapeContext()
ch <- prometheus.MustNewConstMetric(
snapshotDuration,
prometheus.GaugeValue,
time.Since(t).Seconds(),
)
if err != nil {
ch <- prometheus.NewInvalidMetric(scrapeSuccessDesc, fmt.Errorf("failed to prepare scrape: %v", err))
return
}

remainingCollectors := make(map[string]bool)
for name := range coll.collectors {
remainingCollectors[name] = true
}

metricsBuffer := make(chan prometheus.Metric)
allDone := make(chan struct{})
stopped := false
go func() {
for {
select {
case m, ok := <-metricsBuffer:
if ok && !stopped {
ch <- m
}
case <-allDone:
return
}
}
}()

wg := sync.WaitGroup{}
wg.Add(len(coll.collectors))
go func() {
wg.Wait()
close(allDone)
close(metricsBuffer)
}()

for name, c := range coll.collectors {
go func(name string, c collector.Collector) {
execute(name, c, scrapeContext, ch)
wg.Done()
defer wg.Done()
execute(name, c, scrapeContext, metricsBuffer)
delete(remainingCollectors, name)
}(name, c)
}

Expand All @@ -88,7 +136,33 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
prometheus.CounterValue,
startTime,
)
wg.Wait()

select {
case <-allDone:
stopped = true
return
case <-time.After(coll.maxScrapeDuration):
stopped = true
remainingCollectorNames := make([]string, 0, len(remainingCollectors))
for rc := range remainingCollectors {
remainingCollectorNames = append(remainingCollectorNames, rc)
}
log.Warn("Collection timed out, still waiting for ", remainingCollectorNames)
for name := range remainingCollectors {
ch <- prometheus.MustNewConstMetric(
scrapeSuccessDesc,
prometheus.GaugeValue,
0.0,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
1.0,
name,
)
}
}
}

func filterAvailableCollectors(collectors string) string {
Expand Down Expand Up @@ -127,6 +201,12 @@ func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, c
success,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
0.0,
name,
)
}

func expandEnabledCollectors(enabled string) []string {
Expand Down Expand Up @@ -163,10 +243,6 @@ func loadCollectors(list string) (map[string]collector.Collector, error) {
return collectors, nil
}

func init() {
prometheus.MustRegister(version.NewCollector("wmi_exporter"))
}

func initWbem() {
// This initialization prevents a memory leak on WMF 5+. See
// https://github.com/martinlindhe/wmi_exporter/issues/77 and linked issues
Expand Down Expand Up @@ -198,6 +274,10 @@ func main() {
"collectors.print",
"If true, print available collectors and exit.",
).Bool()
timeoutMargin = kingpin.Flag(
"scrape.timeout-margin",
"Seconds to subtract from the timeout allowed by the client. Tune to allow for overhead or high loads.",
).Default("0.5").Float64()
)

log.AddFlags(kingpin.CommandLine)
Expand Down Expand Up @@ -242,10 +322,17 @@ func main() {

log.Infof("Enabled collectors: %v", strings.Join(keys(collectors), ", "))

nodeCollector := WmiCollector{collectors: collectors}
prometheus.MustRegister(nodeCollector)
h := &metricsHandler{
timeoutMargin: *timeoutMargin,
collectorFactory: func(timeout time.Duration) *WmiCollector {
return &WmiCollector{
collectors: collectors,
maxScrapeDuration: timeout,
}
},
}

http.Handle(*metricsPath, promhttp.Handler())
http.Handle(*metricsPath, h)
http.HandleFunc("/health", healthCheck)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, *metricsPath, http.StatusMovedPermanently)
Expand Down Expand Up @@ -309,3 +396,36 @@ loop:
changes <- svc.Status{State: svc.StopPending}
return
}

type metricsHandler struct {
timeoutMargin float64
collectorFactory func(timeout time.Duration) *WmiCollector
}

func (mh *metricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
const defaultTimeout = 10.0

var timeoutSeconds float64
if v := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"); v != "" {
var err error
timeoutSeconds, err = strconv.ParseFloat(v, 64)
if err != nil {
log.Warnf("Couldn't parse X-Prometheus-Scrape-Timeout-Seconds: %q. Defaulting timeout to %d", v, defaultTimeout)
martinlindhe marked this conversation as resolved.
Show resolved Hide resolved
}
}
if timeoutSeconds == 0 {
timeoutSeconds = defaultTimeout
}
timeoutSeconds = timeoutSeconds - mh.timeoutMargin

reg := prometheus.NewRegistry()
reg.MustRegister(mh.collectorFactory(time.Duration(timeoutSeconds * float64(time.Second))))
reg.MustRegister(
prometheus.NewProcessCollector(os.Getpid(), ""),
prometheus.NewGoCollector(),
version.NewCollector("wmi_exporter"),
)

h := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
}