diff --git a/Dockerfile b/Dockerfile index a7dd7b6..813ea13 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ FROM quay.io/prometheus/busybox:latest LABEL maintainer="EMQX" COPY --from=builder /workspace/emqx-exporter /bin/emqx-exporter +COPY config/example/config.yaml /etc/emqx-exporter/config.yaml EXPOSE 8085 USER nobody diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..2b3d559 --- /dev/null +++ b/config/config.go @@ -0,0 +1,91 @@ +package config + +import ( + "fmt" + "os" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + yaml "gopkg.in/yaml.v3" +) + +type Config struct { + Probes []Probe `yaml:"probes"` +} + +type Probe struct { + Target string `yaml:"target"` + Scheme string `yaml:"scheme,omitempty"` + ClientID string `yaml:"client_id,omitempty"` + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` + Topic string `yaml:"topic,omitempty"` + QoS byte `yaml:"qos,omitempty"` +} + +type SafeConfig struct { + sync.RWMutex + C *Config + configReloadSuccess prometheus.Gauge + configReloadSeconds prometheus.Gauge +} + +func NewSafeConfig(reg prometheus.Registerer) *SafeConfig { + configReloadSuccess := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "emqx_exporter", + Name: "config_last_reload_successful", + Help: "EMQX exporter config loaded successfully.", + }) + + configReloadSeconds := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "emqx_exporter", + Name: "config_last_reload_success_timestamp_seconds", + Help: "Timestamp of the last successful configuration reload.", + }) + return &SafeConfig{C: &Config{}, configReloadSuccess: configReloadSuccess, configReloadSeconds: configReloadSeconds} +} + +func (sc *SafeConfig) ReloadConfig(confFile string) (err error) { + var c = &Config{} + defer func() { + if err != nil { + sc.configReloadSuccess.Set(0) + } else { + sc.configReloadSuccess.Set(1) + sc.configReloadSeconds.SetToCurrentTime() + } + }() + + yamlReader, err := os.Open(confFile) + if err != nil { + return fmt.Errorf("error reading config file: %s", err) + } + defer yamlReader.Close() + decoder := yaml.NewDecoder(yamlReader) + decoder.KnownFields(true) + + if err = decoder.Decode(c); err != nil { + return fmt.Errorf("error parsing config file: %s", err) + } + + for index, probe := range c.Probes { + if probe.Scheme == "" { + probe.Scheme = "tcp" + } + if probe.ClientID == "" { + probe.ClientID = "emqx_exporter_probe" + } + if probe.Topic == "" { + probe.Topic = "emqx_exporter_probe" + } + c.Probes[index] = probe + } + + sc.Lock() + sc.C = c + sc.Unlock() + + return nil +} diff --git a/config/example/config.yaml b/config/example/config.yaml new file mode 100644 index 0000000..d63cf32 --- /dev/null +++ b/config/example/config.yaml @@ -0,0 +1,8 @@ +probes: + - target: broker.emqx.io:1883 + scheme: + client_id: + username: + password: + topic: + qos: diff --git a/examples/docker-compose/prometheus-emqx5.yaml b/examples/docker-compose/prometheus-emqx5.yaml index d9b1581..b2fd9b7 100644 --- a/examples/docker-compose/prometheus-emqx5.yaml +++ b/examples/docker-compose/prometheus-emqx5.yaml @@ -30,7 +30,10 @@ scrape_configs: # fix value, don't modify from: exporter - job_name: 'mqtt-probe' - metrics_path: /probe + metrics_path: '/probe' + params: + target: + - "broker.emqx.io:1883" scrape_interval: 5s static_configs: - targets: [exporter-demo:8085] diff --git a/go.mod b/go.mod index 31d54a2..6784a2a 100644 --- a/go.mod +++ b/go.mod @@ -43,4 +43,5 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0156b0a..9eda24b 100644 --- a/go.sum +++ b/go.sum @@ -106,3 +106,4 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 67edd64..3b5b9a6 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,9 @@ package main import ( "emqx-exporter/client" + "emqx-exporter/config" "emqx-exporter/prober" + "fmt" stdlog "log" "net/http" @@ -29,6 +31,7 @@ import ( promcollectors "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/common/promlog" "github.com/prometheus/common/promlog/flag" + "gopkg.in/yaml.v2" "emqx-exporter/collector" @@ -112,6 +115,16 @@ func newHandler(includeExporterMetrics bool, maxRequests int, logger log.Logger) } } +var ( + sc = config.NewSafeConfig(prometheus.DefaultRegisterer) + + configFile = kingpin.Flag("config.file", "EMQX exporter configuration file.").Default("/etc/emqx-exporter/config.yaml").String() +) + +func init() { + prometheus.MustRegister(version.NewCollector("emqx_exporter")) +} + func main() { var ( disableExporterMetrics = kingpin.Flag( @@ -134,10 +147,17 @@ func main() { kingpin.CommandLine.UsageWriter(os.Stdout) kingpin.HelpFlag.Short('h') kingpin.Parse() - logger := promlog.New(promlogConfig) + logger := promlog.New(promlogConfig) level.Info(logger).Log("msg", "Starting emqx-exporter", "version", version.Info()) level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext()) + + if err := sc.ReloadConfig(*configFile); err != nil { + level.Error(logger).Log("msg", "Error loading config", "err", err) + os.Exit(1) + } + level.Info(logger).Log("msg", "Loaded config file") + if user, err := user.Current(); err == nil && user.Uid == "0" { level.Warn(logger).Log("msg", "EMQX Exporter is running as root user. This exporter is designed to run as unprivileged user, root is not required.") } @@ -147,7 +167,23 @@ func main() { http.Handle("/metrics", newHandler(!*disableExporterMetrics, *maxRequests, logger)) http.HandleFunc("/probe", func(w http.ResponseWriter, r *http.Request) { - prober.Handler(w, r, logger) + sc.Lock() + probes := sc.C.Probes + sc.Unlock() + prober.Handler(w, r, probes, logger, nil) + }) + + http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) { + sc.RLock() + c, err := yaml.Marshal(sc.C) + sc.RUnlock() + if err != nil { + level.Warn(logger).Log("msg", "Error marshalling configuration", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "text/plain") + w.Write(c) }) landingConfig := web.LandingConfig{ @@ -163,6 +199,10 @@ func main() { Address: "/probe", Text: "Probe", }, + { + Address: "/config", + Text: "Config", + }, }, } landingPage, err := web.NewLandingPage(landingConfig) diff --git a/prober/handler.go b/prober/handler.go index 20e2613..fb11dc7 100644 --- a/prober/handler.go +++ b/prober/handler.go @@ -1,15 +1,37 @@ package prober import ( + "emqx-exporter/config" + "fmt" + "net/http" + "net/url" "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) -func Handler(w http.ResponseWriter, r *http.Request, logger log.Logger) { +func Handler(w http.ResponseWriter, r *http.Request, probes []config.Probe, logger log.Logger, params url.Values) { + var probe config.Probe + if params == nil { + params = r.URL.Query() + } + target := params.Get("target") + for i := 0; i < len(probes); i++ { + if probes[i].Target == target { + probe = probes[i] + break + } + } + if probe.Target == "" { + http.Error(w, fmt.Sprintf("Unknown probe target %q", target), http.StatusBadRequest) + level.Debug(logger).Log("msg", "Unknown probe target", "target", target) + return + } + probeSuccessGauge := prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "emqx", Subsystem: "mqtt", @@ -28,7 +50,7 @@ func Handler(w http.ResponseWriter, r *http.Request, logger log.Logger) { registry.MustRegister(probeDurationGauge) start := time.Now() - if ProbeMQTT(logger) { + if ProbeMQTT(probe, logger) { probeSuccessGauge.Set(1) } else { probeSuccessGauge.Set(0) diff --git a/prober/mqtt.go b/prober/mqtt.go index c41e513..9412e24 100644 --- a/prober/mqtt.go +++ b/prober/mqtt.go @@ -1,6 +1,7 @@ package prober import ( + "emqx-exporter/config" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -15,8 +16,8 @@ type MQTTProbe struct { var mqttProbe *MQTTProbe -func initMQTTProbe(logger log.Logger) (*MQTTProbe, error) { - opt := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx-exporter") +func initMQTTProbe(probe config.Probe, logger log.Logger) (*MQTTProbe, error) { + opt := mqtt.NewClientOptions().AddBroker(probe.Scheme + "://" + probe.Target).SetClientID(probe.ClientID).SetUsername(probe.Username).SetPassword(probe.Password) opt.SetOnConnectHandler(func(c mqtt.Client) { level.Info(logger).Log("msg", "Connected to MQTT broker") }) @@ -30,7 +31,7 @@ func initMQTTProbe(logger log.Logger) (*MQTTProbe, error) { } var msgChan = make(chan mqtt.Message) - if token := c.Subscribe("emqx-exporter", 1, func(c mqtt.Client, m mqtt.Message) { + if token := c.Subscribe(probe.Topic, probe.QoS, func(c mqtt.Client, m mqtt.Message) { msgChan <- m }); token.Wait() && token.Error() != nil { level.Error(logger).Log("msg", "Failed to subscribe to MQTT topic", "err", token.Error()) @@ -43,10 +44,10 @@ func initMQTTProbe(logger log.Logger) (*MQTTProbe, error) { }, nil } -func ProbeMQTT(logger log.Logger) bool { +func ProbeMQTT(probe config.Probe, logger log.Logger) bool { if mqttProbe == nil { var err error - if mqttProbe, err = initMQTTProbe(logger); err != nil { + if mqttProbe, err = initMQTTProbe(probe, logger); err != nil { return false } } @@ -55,7 +56,7 @@ func ProbeMQTT(logger log.Logger) bool { return false } - if token := mqttProbe.Client.Publish("emqx-exporter", 1, false, "hello world"); token.Wait() && token.Error() != nil { + if token := mqttProbe.Client.Publish(probe.Topic, probe.QoS, false, "hello world"); token.Wait() && token.Error() != nil { return false }