Skip to content

Commit

Permalink
Refactoring of Scrape process, fixing multiple module issues (#1111)
Browse files Browse the repository at this point in the history
* Add SNMPScraper Interface for easier maintenance
* Moved SNMP-related operations to the scraper side and simplified the ScrapeTarget function
* Remove version retrieval from scraper and update to fetch from auth information
* Add error handling for scraper used in worker
* Add unit test for ScrapeTarget function using a mock SNMP scraper
* Add test cases for scraping targets with different configurations

---------

Signed-off-by: Kakuya Ando <[email protected]>
  • Loading branch information
servak authored Feb 19, 2024
1 parent 59194f6 commit 862b410
Show file tree
Hide file tree
Showing 5 changed files with 425 additions and 100 deletions.
190 changes: 90 additions & 100 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/snmp_exporter/config"
"github.com/prometheus/snmp_exporter/scraper"
)

var (
Expand Down Expand Up @@ -77,74 +78,17 @@ func listToOid(l []int) string {
}

type ScrapeResults struct {
pdus []gosnmp.SnmpPDU
packets uint64
retries uint64
pdus []gosnmp.SnmpPDU
}

func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module *config.Module, logger log.Logger, metrics Metrics) (ScrapeResults, error) {
func ScrapeTarget(snmp scraper.SNMPScraper, target string, auth *config.Auth, module *config.Module, logger log.Logger, metrics Metrics) (ScrapeResults, error) {
results := ScrapeResults{}
// Set the options.
snmp := gosnmp.GoSNMP{}
snmp.Context = ctx
snmp.MaxRepetitions = module.WalkParams.MaxRepetitions
snmp.Retries = *module.WalkParams.Retries
snmp.Timeout = module.WalkParams.Timeout
snmp.UseUnconnectedUDPSocket = module.WalkParams.UseUnconnectedUDPSocket
snmp.LocalAddr = *srcAddress

// Allow a set of OIDs that aren't in a strictly increasing order
if module.WalkParams.AllowNonIncreasingOIDs {
snmp.AppOpts = make(map[string]interface{})
snmp.AppOpts["c"] = true
}

var sent time.Time
snmp.OnSent = func(x *gosnmp.GoSNMP) {
sent = time.Now()
metrics.SNMPPackets.Inc()
results.packets++
}
snmp.OnRecv = func(x *gosnmp.GoSNMP) {
metrics.SNMPDuration.Observe(time.Since(sent).Seconds())
}
snmp.OnRetry = func(x *gosnmp.GoSNMP) {
metrics.SNMPRetries.Inc()
results.retries++
}

// Configure target.
if err := configureTarget(&snmp, target); err != nil {
return results, err
}

// Configure auth.
auth.ConfigureSNMP(&snmp)

// Do the actual walk.
getInitialStart := time.Now()
err := snmp.Connect()
if err != nil {
if err == context.Canceled {
return results, fmt.Errorf("scrape cancelled after %s (possible timeout) connecting to target %s",
time.Since(getInitialStart), snmp.Target)
}
return results, fmt.Errorf("error connecting to target %s: %s", target, err)
}
defer snmp.Conn.Close()

// Evaluate rules.
newGet := module.Get
newWalk := module.Walk
for _, filter := range module.Filters {
var pdus []gosnmp.SnmpPDU
allowedList := []string{}

if snmp.Version == gosnmp.Version1 {
pdus, err = snmp.WalkAll(filter.Oid)
} else {
pdus, err = snmp.BulkWalkAll(filter.Oid)
}
pdus, err := snmp.WalkAll(filter.Oid)
// Do not try to filter anything if we had errors.
if err != nil {
level.Info(logger).Log("msg", "Error getting OID, won't do any filter on this oid", "oid", filter.Oid)
Expand All @@ -165,10 +109,11 @@ func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module
newGet = newCfg
}

version := auth.Version
getOids := newGet
maxOids := int(module.WalkParams.MaxRepetitions)
// Max Repetition can be 0, maxOids cannot. SNMPv1 can only report one OID error per call.
if maxOids == 0 || snmp.Version == gosnmp.Version1 {
if maxOids == 0 || version == 1 {
maxOids = 1
}
for len(getOids) > 0 {
Expand All @@ -177,27 +122,20 @@ func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module
oids = maxOids
}

level.Debug(logger).Log("msg", "Getting OIDs", "oids", oids)
getStart := time.Now()
packet, err := snmp.Get(getOids[:oids])
if err != nil {
if err == context.Canceled {
return results, fmt.Errorf("scrape cancelled after %s (possible timeout) getting target %s",
time.Since(getInitialStart), snmp.Target)
}
return results, fmt.Errorf("error getting target %s: %s", snmp.Target, err)
return results, err
}
level.Debug(logger).Log("msg", "Get of OIDs completed", "oids", oids, "duration_seconds", time.Since(getStart))
// SNMPv1 will return packet error for unsupported OIDs.
if packet.Error == gosnmp.NoSuchName && snmp.Version == gosnmp.Version1 {
if packet.Error == gosnmp.NoSuchName && version == 1 {
level.Debug(logger).Log("msg", "OID not supported by target", "oids", getOids[0])
getOids = getOids[oids:]
continue
}
// Response received with errors.
// TODO: "stringify" gosnmp errors instead of showing error code.
if packet.Error != gosnmp.NoError {
return results, fmt.Errorf("error reported by target %s: Error Status %d", snmp.Target, packet.Error)
return results, fmt.Errorf("error reported by target %s: Error Status %d", target, packet.Error)
}
for _, v := range packet.Variables {
if v.Type == gosnmp.NoSuchObject || v.Type == gosnmp.NoSuchInstance {
Expand All @@ -210,23 +148,10 @@ func ScrapeTarget(ctx context.Context, target string, auth *config.Auth, module
}

for _, subtree := range newWalk {
var pdus []gosnmp.SnmpPDU
level.Debug(logger).Log("msg", "Walking subtree", "oid", subtree)
walkStart := time.Now()
if snmp.Version == gosnmp.Version1 {
pdus, err = snmp.WalkAll(subtree)
} else {
pdus, err = snmp.BulkWalkAll(subtree)
}
pdus, err := snmp.WalkAll(subtree)
if err != nil {
if err == context.Canceled {
return results, fmt.Errorf("scrape canceled after %s (possible timeout) walking target %s",
time.Since(getInitialStart), snmp.Target)
}
return results, fmt.Errorf("error walking target %s: %s", snmp.Target, err)
return results, err
}
level.Debug(logger).Log("msg", "Walk of subtree completed", "oid", subtree, "duration_seconds", time.Since(walkStart))

results.pdus = append(results.pdus, pdus...)
}
return results, nil
Expand Down Expand Up @@ -384,11 +309,44 @@ func (c Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy", "dummy", nil, nil)
}

func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
logger := log.With(c.logger, "module", module.name)
func (c Collector) collect(ch chan<- prometheus.Metric, logger log.Logger, client scraper.SNMPScraper, module *NamedModule) {
var (
packets uint64
retries uint64
)
client.SetOptions(
// Set the metrics options.
func(g *gosnmp.GoSNMP) {
var sent time.Time
g.OnSent = func(x *gosnmp.GoSNMP) {
sent = time.Now()
c.metrics.SNMPPackets.Inc()
packets++
}
g.OnRecv = func(x *gosnmp.GoSNMP) {
c.metrics.SNMPDuration.Observe(time.Since(sent).Seconds())
}
g.OnRetry = func(x *gosnmp.GoSNMP) {
c.metrics.SNMPRetries.Inc()
retries++
}
},
// Set the Walk options.
func(g *gosnmp.GoSNMP) {
g.Retries = *module.WalkParams.Retries
g.Timeout = module.WalkParams.Timeout
g.MaxRepetitions = module.WalkParams.MaxRepetitions
g.UseUnconnectedUDPSocket = module.WalkParams.UseUnconnectedUDPSocket
if module.WalkParams.AllowNonIncreasingOIDs {
g.AppOpts = map[string]interface{}{
"c": true,
}
}
},
)
start := time.Now()
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module.Module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": module.name}
results, err := ScrapeTarget(client, c.target, c.auth, module.Module, logger, c.metrics)
if err != nil {
level.Info(logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, moduleLabel), err)
Expand All @@ -401,31 +359,31 @@ func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.packets))
float64(packets))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.retries))
float64(retries))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(len(results.pdus)))

oidToPdu := make(map[string]gosnmp.SnmpPDU, len(results.pdus))
for _, pdu := range results.pdus {
oidToPdu[pdu.Name[1:]] = pdu
}

metricTree := buildMetricTree(module.Metrics)
// Look for metrics that match each pdu.
PduLoop:
for oid, pdu := range oidToPdu {
head := metricTree
oidList := oidToList(oid)
for i, o := range oidList {
var ok bool
head, ok = head.children[o]
if !ok {
continue PduLoop
break
}
if head.metric != nil {
// Found a match.
Expand All @@ -450,25 +408,57 @@ func (c Collector) Collect(ch chan<- prometheus.Metric) {
if workerCount < 1 {
workerCount = 1
}
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
workerChan := make(chan *NamedModule)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
go func(i int) {
defer wg.Done()
logger := log.With(c.logger, "worker", i)
client, err := scraper.NewGoSNMP(logger, c.target, *srcAddress)
if err != nil {
level.Info(logger).Log("msg", err)
cancel()
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error during initialisation of the Worker", nil, nil), err)
return
}
// Set the options.
client.SetOptions(func(g *gosnmp.GoSNMP) {
g.Context = ctx
c.auth.ConfigureSNMP(g)
})
if err = client.Connect(); err != nil {
level.Info(logger).Log("msg", "Error connecting to target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error connecting to target", nil, nil), err)
cancel()
return
}
defer client.Close()
for m := range workerChan {
logger := log.With(c.logger, "module", m.name)
level.Debug(logger).Log("msg", "Starting scrape")
_logger := log.With(logger, "module", m.name)
level.Debug(_logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, m)
c.collect(ch, _logger, client, m)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
level.Debug(_logger).Log("msg", "Finished scrape", "duration_seconds", duration)
c.metrics.SNMPCollectionDuration.WithLabelValues(m.name).Observe(duration)
}
}()
}(i)
}

done := false
for _, module := range c.modules {
workerChan <- module
if done {
break
}
select {
case <-ctx.Done():
done = true
level.Debug(c.logger).Log("msg", "Context canceled", "err", ctx.Err(), "module", module.name)
case workerChan <- module:
level.Debug(c.logger).Log("msg", "Sent module to worker", "module", module.name)
}
}
close(workerChan)
wg.Wait()
Expand Down
Loading

0 comments on commit 862b410

Please sign in to comment.