Skip to content

Commit

Permalink
Merge pull request #1 from prometheus/refactor-multiple-collectors
Browse files Browse the repository at this point in the history
Refactor node_exporter to support collectors.
  • Loading branch information
discordianfish committed May 14, 2013
2 parents a6e8bcb + 588ef8b commit b199e6d
Show file tree
Hide file tree
Showing 9 changed files with 782 additions and 224 deletions.
177 changes: 177 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Exporter is a prometheus exporter using multiple collectors to collect and export system metrics.
package exporter

import (
"encoding/json"
"flag"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/exp"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"runtime/pprof"
"sync"
"syscall"
"time"
)

var verbose = flag.Bool("verbose", false, "Verbose output.")

// Interface a collector has to implement.
type Collector interface {
// Get new metrics and expose them via prometheus registry.
Update() (n int, err error)

// Returns the name of the collector
Name() string
}

type config struct {
Attributes map[string]string `json:"attributes"`
ListeningAddress string `json:"listeningAddress"`
ScrapeInterval int `json:"scrapeInterval"`
Collectors []string `json:"collectors"`
}

func (e *exporter) loadConfig() (err error) {
log.Printf("Reading config %s", e.configFile)
bytes, err := ioutil.ReadFile(e.configFile)
if err != nil {
return
}

return json.Unmarshal(bytes, &e.config) // Make sure this is safe
}

type exporter struct {
configFile string
listeningAddress string
scrapeInterval time.Duration
scrapeDurations prometheus.Histogram
metricsUpdated prometheus.Gauge
config config
registry prometheus.Registry
collectors []Collector
MemProfile string
}

// New takes the path to a config file and returns an exporter instance
func New(configFile string) (e exporter, err error) {
registry := prometheus.NewRegistry()
e = exporter{
configFile: configFile,
scrapeDurations: prometheus.NewDefaultHistogram(),
metricsUpdated: prometheus.NewGauge(),
listeningAddress: ":8080",
scrapeInterval: 60 * time.Second,
registry: registry,
}

err = e.loadConfig()
if err != nil {
return e, fmt.Errorf("Couldn't read config: %s", err)
}

cn, err := NewNativeCollector(e.config, e.registry)
if err != nil {
log.Fatalf("Couldn't attach collector: %s", err)
}

cg, err := NewGmondCollector(e.config, e.registry)
if err != nil {
log.Fatalf("Couldn't attach collector: %s", err)
}

cm, err := NewMuninCollector(e.config, e.registry)
if err != nil {
log.Fatalf("Couldn't attach collector: %s", err)
}

e.collectors = []Collector{&cn, &cg, &cm}

if e.config.ListeningAddress != "" {
e.listeningAddress = e.config.ListeningAddress
}
if e.config.ScrapeInterval != 0 {
e.scrapeInterval = time.Duration(e.config.ScrapeInterval) * time.Second
}

registry.Register("node_exporter_scrape_duration_seconds", "node_exporter: Duration of a scrape job.", prometheus.NilLabels, e.scrapeDurations)
registry.Register("node_exporter_metrics_updated", "node_exporter: Number of metrics updated.", prometheus.NilLabels, e.metricsUpdated)

return e, nil
}

func (e *exporter) serveStatus() {
exp.Handle(prometheus.ExpositionResource, e.registry.Handler())
http.ListenAndServe(e.listeningAddress, exp.DefaultCoarseMux)
}

func (e *exporter) Execute(c Collector) {
begin := time.Now()
updates, err := c.Update()
duration := time.Since(begin)

label := map[string]string{
"collector": c.Name(),
}
if err != nil {
log.Printf("ERROR: %s failed after %fs: %s", c.Name(), duration.Seconds(), err)
label["result"] = "error"
} else {
log.Printf("OK: %s success after %fs.", c.Name(), duration.Seconds())
label["result"] = "success"
}
e.scrapeDurations.Add(label, duration.Seconds())
e.metricsUpdated.Set(label, float64(updates))
}

func (e *exporter) Loop() {
sigHup := make(chan os.Signal)
sigUsr1 := make(chan os.Signal)
signal.Notify(sigHup, syscall.SIGHUP)
signal.Notify(sigUsr1, syscall.SIGUSR1)

go e.serveStatus()

tick := time.Tick(e.scrapeInterval)
for {
select {
case <-sigHup:
err := e.loadConfig()
if err != nil {
log.Printf("Couldn't reload config: %s", err)
continue
}
log.Printf("Got new config")
tick = time.Tick(e.scrapeInterval)

case <-tick:
log.Printf("Starting new scrape interval")
wg := sync.WaitGroup{}
wg.Add(len(e.collectors))
for _, c := range e.collectors {
go func(c Collector) {
e.Execute(c)
wg.Done()
}(c)
}
wg.Wait()

case <-sigUsr1:
log.Printf("got signal")
if e.MemProfile != "" {
log.Printf("Writing memory profile to %s", e.MemProfile)
f, err := os.Create(e.MemProfile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}
}
}
61 changes: 61 additions & 0 deletions exporter/ganglia/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Types for unmarshalling gmond's XML output.
//
// Not used elements in gmond's XML output are commented.
// In case you want to use them, please change the names so that one
// can understand without needing to know what the acronym stands for.
package ganglia

import "encoding/xml"

type ExtraElement struct {
Name string `xml:"NAME,attr"`
Val string `xml:"VAL,attr"`
}

type ExtraData struct {
ExtraElements []ExtraElement `xml:"EXTRA_ELEMENT"`
}

type Metric struct {
Name string `xml:"NAME,attr"`
Value float64 `xml:"VAL,attr"`
/*
Unit string `xml:"UNITS,attr"`
Slope string `xml:"SLOPE,attr"`
Tn int `xml:"TN,attr"`
Tmax int `xml:"TMAX,attr"`
Dmax int `xml:"DMAX,attr"`
*/
ExtraData ExtraData `xml:"EXTRA_DATA"`
}

type Host struct {
Name string `xml:"NAME,attr"`
/*
Ip string `xml:"IP,attr"`
Tags string `xml:"TAGS,attr"`
Reported int `xml:"REPORTED,attr"`
Tn int `xml:"TN,attr"`
Tmax int `xml:"TMAX,attr"`
Dmax int `xml:"DMAX,attr"`
Location string `xml:"LOCATION,attr"`
GmondStarted int `xml:"GMOND_STARTED",attr"`
*/
Metrics []Metric `xml:"METRIC"`
}

type Cluster struct {
Name string `xml:"NAME,attr"`
/*
Owner string `xml:"OWNER,attr"`
LatLong string `xml:"LATLONG,attr"`
Url string `xml:"URL,attr"`
Localtime int `xml:"LOCALTIME,attr"`
*/
Hosts []Host `xml:"HOST"`
}

type Ganglia struct {
XMLNAME xml.Name `xml:"GANGLIA_XML"`
Clusters []Cluster `xml:"CLUSTER"`
}
103 changes: 103 additions & 0 deletions exporter/gmond_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package exporter

import (
"bufio"
"encoding/xml"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/node_exporter/exporter/ganglia"
"io"
"net"
"time"
"strings"
)

const (
gangliaAddress = "127.0.0.1:8649"
gangliaProto = "tcp"
gangliaTimeout = 30 * time.Second
)

type gmondCollector struct {
name string
Metrics map[string]prometheus.Gauge
config config
registry prometheus.Registry
}

// Takes a config struct and prometheus registry and returns a new Collector scraping ganglia.
func NewGmondCollector(config config, registry prometheus.Registry) (collector gmondCollector, err error) {
collector = gmondCollector{
name: "gmond_collector",
config: config,
Metrics: make(map[string]prometheus.Gauge),
registry: registry,
}

return collector, nil
}

func (c *gmondCollector) Name() string { return c.name }

func (c *gmondCollector) setMetric(name string, labels map[string]string, metric ganglia.Metric) {
if _, ok := c.Metrics[name]; !ok {
var desc string
var title string
for _, element := range metric.ExtraData.ExtraElements {
switch element.Name {
case "DESC":
desc = element.Val
case "TITLE":
title = element.Val
}
if title != "" && desc != "" {
break
}
}
debug(c.Name(), "Register %s: %s", name, desc)
gauge := prometheus.NewGauge()
c.Metrics[name] = gauge
c.registry.Register(name, desc, prometheus.NilLabels, gauge) // one gauge per metric!
}
debug(c.Name(), "Set %s{%s}: %f", name, labels, metric.Value)
c.Metrics[name].Set(labels, metric.Value)
}

func (c *gmondCollector) Update() (updates int, err error) {
conn, err := net.Dial(gangliaProto, gangliaAddress)
debug(c.Name(), "gmondCollector Update")
if err != nil {
return updates, fmt.Errorf("Can't connect to gmond: %s", err)
}
conn.SetDeadline(time.Now().Add(gangliaTimeout))

ganglia := ganglia.Ganglia{}
decoder := xml.NewDecoder(bufio.NewReader(conn))
decoder.CharsetReader = toUtf8

err = decoder.Decode(&ganglia)
if err != nil {
return updates, fmt.Errorf("Couldn't parse xml: %s", err)
}

for _, cluster := range ganglia.Clusters {
for _, host := range cluster.Hosts {

for _, metric := range host.Metrics {
name := strings.ToLower(metric.Name)

var labels = map[string]string{
"hostname": host.Name,
"cluster": cluster.Name,
}
c.setMetric(name, labels, metric)
updates++
}
}
}
return updates, err
}

func toUtf8(charset string, input io.Reader) (io.Reader, error) {
return input, nil //FIXME
}
26 changes: 26 additions & 0 deletions exporter/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package exporter

import (
"fmt"
"log"
"strconv"
"strings"
)

func debug(name string, format string, a ...interface{}) {
if *verbose {
f := fmt.Sprintf("%s: %s", name, format)
log.Printf(f, a...)
}
}

func splitToInts(str string, sep string) (ints []int, err error) {
for _, part := range strings.Split(str, sep) {
i, err := strconv.Atoi(part)
if err != nil {
return nil, fmt.Errorf("Could not split '%s' because %s is no int: %s", str, part, err)
}
ints = append(ints, i)
}
return ints, nil
}
Loading

0 comments on commit b199e6d

Please sign in to comment.