Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of Scrape process, fixing multiple module issues #1111

Merged
merged 8 commits into from
Feb 19, 2024
190 changes: 90 additions & 100 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/snmp_exporter/config"
"github.com/prometheus/snmp_exporter/scraper"
)

var (
Expand Down Expand Up @@ -77,74 +78,17 @@ func listToOid(l []int) string {
}

type ScrapeResults struct {
pdus []gosnmp.SnmpPDU
packets uint64
retries uint64
pdus []gosnmp.SnmpPDU
}

func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module *config.Module, logger log.Logger, metrics Metrics) (ScrapeResults, error) {
func ScrapeTarget(snmp scraper.SNMPScraper, target string, auth *config.Auth, module *config.Module, logger log.Logger, metrics Metrics) (ScrapeResults, error) {
results := ScrapeResults{}
// Set the options.
snmp := gosnmp.GoSNMP{}
snmp.Context = ctx
snmp.MaxRepetitions = module.WalkParams.MaxRepetitions
snmp.Retries = *module.WalkParams.Retries
snmp.Timeout = module.WalkParams.Timeout
snmp.UseUnconnectedUDPSocket = module.WalkParams.UseUnconnectedUDPSocket
snmp.LocalAddr = *srcAddress

// Allow a set of OIDs that aren't in a strictly increasing order
if module.WalkParams.AllowNonIncreasingOIDs {
snmp.AppOpts = make(map[string]interface{})
snmp.AppOpts["c"] = true
}

var sent time.Time
snmp.OnSent = func(x *gosnmp.GoSNMP) {
sent = time.Now()
metrics.SNMPPackets.Inc()
results.packets++
}
snmp.OnRecv = func(x *gosnmp.GoSNMP) {
metrics.SNMPDuration.Observe(time.Since(sent).Seconds())
}
snmp.OnRetry = func(x *gosnmp.GoSNMP) {
metrics.SNMPRetries.Inc()
results.retries++
}

// Configure target.
if err := configureTarget(&snmp, target); err != nil {
return results, err
}

// Configure auth.
auth.ConfigureSNMP(&snmp)

// Do the actual walk.
getInitialStart := time.Now()
err := snmp.Connect()
if err != nil {
if err == context.Canceled {
return results, fmt.Errorf("scrape cancelled after %s (possible timeout) connecting to target %s",
time.Since(getInitialStart), snmp.Target)
}
return results, fmt.Errorf("error connecting to target %s: %s", target, err)
}
defer snmp.Conn.Close()

// Evaluate rules.
newGet := module.Get
newWalk := module.Walk
for _, filter := range module.Filters {
var pdus []gosnmp.SnmpPDU
allowedList := []string{}

if snmp.Version == gosnmp.Version1 {
pdus, err = snmp.WalkAll(filter.Oid)
} else {
pdus, err = snmp.BulkWalkAll(filter.Oid)
}
pdus, err := snmp.WalkAll(filter.Oid)
// Do not try to filter anything if we had errors.
if err != nil {
level.Info(logger).Log("msg", "Error getting OID, won't do any filter on this oid", "oid", filter.Oid)
Expand All @@ -165,10 +109,11 @@ func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module
newGet = newCfg
}

version := auth.Version
getOids := newGet
maxOids := int(module.WalkParams.MaxRepetitions)
// Max Repetition can be 0, maxOids cannot. SNMPv1 can only report one OID error per call.
if maxOids == 0 || snmp.Version == gosnmp.Version1 {
if maxOids == 0 || version == 1 {
maxOids = 1
}
for len(getOids) > 0 {
Expand All @@ -177,27 +122,20 @@ func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module
oids = maxOids
}

level.Debug(logger).Log("msg", "Getting OIDs", "oids", oids)
getStart := time.Now()
packet, err := snmp.Get(getOids[:oids])
if err != nil {
if err == context.Canceled {
return results, fmt.Errorf("scrape cancelled after %s (possible timeout) getting target %s",
time.Since(getInitialStart), snmp.Target)
}
return results, fmt.Errorf("error getting target %s: %s", snmp.Target, err)
return results, err
}
level.Debug(logger).Log("msg", "Get of OIDs completed", "oids", oids, "duration_seconds", time.Since(getStart))
// SNMPv1 will return packet error for unsupported OIDs.
if packet.Error == gosnmp.NoSuchName && snmp.Version == gosnmp.Version1 {
if packet.Error == gosnmp.NoSuchName && version == 1 {
level.Debug(logger).Log("msg", "OID not supported by target", "oids", getOids[0])
getOids = getOids[oids:]
continue
}
// Response received with errors.
// TODO: "stringify" gosnmp errors instead of showing error code.
if packet.Error != gosnmp.NoError {
return results, fmt.Errorf("error reported by target %s: Error Status %d", snmp.Target, packet.Error)
return results, fmt.Errorf("error reported by target %s: Error Status %d", target, packet.Error)
}
for _, v := range packet.Variables {
if v.Type == gosnmp.NoSuchObject || v.Type == gosnmp.NoSuchInstance {
Expand All @@ -210,23 +148,10 @@ func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module
}

for _, subtree := range newWalk {
var pdus []gosnmp.SnmpPDU
level.Debug(logger).Log("msg", "Walking subtree", "oid", subtree)
walkStart := time.Now()
if snmp.Version == gosnmp.Version1 {
pdus, err = snmp.WalkAll(subtree)
} else {
pdus, err = snmp.BulkWalkAll(subtree)
}
pdus, err := snmp.WalkAll(subtree)
if err != nil {
if err == context.Canceled {
return results, fmt.Errorf("scrape canceled after %s (possible timeout) walking target %s",
time.Since(getInitialStart), snmp.Target)
}
return results, fmt.Errorf("error walking target %s: %s", snmp.Target, err)
return results, err
}
level.Debug(logger).Log("msg", "Walk of subtree completed", "oid", subtree, "duration_seconds", time.Since(walkStart))

results.pdus = append(results.pdus, pdus...)
}
return results, nil
Expand Down Expand Up @@ -384,11 +309,44 @@ func (c Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy", "dummy", nil, nil)
}

func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
logger := log.With(c.logger, "module", module.name)
func (c Collector) collect(ch chan<- prometheus.Metric, logger log.Logger, client scraper.SNMPScraper, module *NamedModule) {
var (
packets uint64
retries uint64
)
client.SetOptions(
// Set the metrics options.
func(g *gosnmp.GoSNMP) {
var sent time.Time
g.OnSent = func(x *gosnmp.GoSNMP) {
sent = time.Now()
c.metrics.SNMPPackets.Inc()
packets++
}
g.OnRecv = func(x *gosnmp.GoSNMP) {
c.metrics.SNMPDuration.Observe(time.Since(sent).Seconds())
}
g.OnRetry = func(x *gosnmp.GoSNMP) {
c.metrics.SNMPRetries.Inc()
retries++
}
},
// Set the Walk options.
func(g *gosnmp.GoSNMP) {
g.Retries = *module.WalkParams.Retries
g.Timeout = module.WalkParams.Timeout
g.MaxRepetitions = module.WalkParams.MaxRepetitions
g.UseUnconnectedUDPSocket = module.WalkParams.UseUnconnectedUDPSocket
if module.WalkParams.AllowNonIncreasingOIDs {
g.AppOpts = map[string]interface{}{
"c": true,
}
}
},
)
start := time.Now()
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module.Module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": module.name}
results, err := ScrapeTarget(client, c.target, c.auth, module.Module, logger, c.metrics)
if err != nil {
level.Info(logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, moduleLabel), err)
Expand All @@ -401,31 +359,31 @@ func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.packets))
float64(packets))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.retries))
float64(retries))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(len(results.pdus)))

oidToPdu := make(map[string]gosnmp.SnmpPDU, len(results.pdus))
for _, pdu := range results.pdus {
oidToPdu[pdu.Name[1:]] = pdu
}

metricTree := buildMetricTree(module.Metrics)
// Look for metrics that match each pdu.
PduLoop:
for oid, pdu := range oidToPdu {
head := metricTree
oidList := oidToList(oid)
for i, o := range oidList {
var ok bool
head, ok = head.children[o]
if !ok {
continue PduLoop
break
}
if head.metric != nil {
// Found a match.
Expand All @@ -450,25 +408,57 @@ func (c Collector) Collect(ch chan<- prometheus.Metric) {
if workerCount < 1 {
workerCount = 1
}
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
workerChan := make(chan *NamedModule)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
go func(i int) {
defer wg.Done()
logger := log.With(c.logger, "worker", i)
client, err := scraper.NewGoSNMP(logger, c.target, *srcAddress)
if err != nil {
level.Info(logger).Log("msg", err)
cancel()
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error during initialisation of the Worker", nil, nil), err)
return
}
// Set the options.
client.SetOptions(func(g *gosnmp.GoSNMP) {
g.Context = ctx
c.auth.ConfigureSNMP(g)
})
if err = client.Connect(); err != nil {
level.Info(logger).Log("msg", "Error connecting to target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error connecting to target", nil, nil), err)
cancel()
return
}
defer client.Close()
for m := range workerChan {
logger := log.With(c.logger, "module", m.name)
level.Debug(logger).Log("msg", "Starting scrape")
_logger := log.With(logger, "module", m.name)
level.Debug(_logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, m)
c.collect(ch, _logger, client, m)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
level.Debug(_logger).Log("msg", "Finished scrape", "duration_seconds", duration)
c.metrics.SNMPCollectionDuration.WithLabelValues(m.name).Observe(duration)
}
}()
}(i)
}

done := false
for _, module := range c.modules {
workerChan <- module
if done {
break
}
select {
case <-ctx.Done():
done = true
level.Debug(c.logger).Log("msg", "Context canceled", "err", ctx.Err(), "module", module.name)
case workerChan <- module:
level.Debug(c.logger).Log("msg", "Sent module to worker", "module", module.name)
}
}
close(workerChan)
wg.Wait()
Expand Down
Loading
Loading