diff --git a/client/client_4.x.go b/client/client_4.x.go index 6c2f2c0..62f8c20 100644 --- a/client/client_4.x.go +++ b/client/client_4.x.go @@ -15,10 +15,9 @@ import ( var _ client = &cluster4x{} type cluster4x struct { - username string - password string - version string - client *fasthttp.Client + version string + client *fasthttp.Client + uri *fasthttp.URI } func (n *cluster4x) getVersion() string { @@ -33,7 +32,7 @@ func (n *cluster4x) getLicense() (lic *collector.LicenseInfo, err error) { } Code int }{} - data, statusCode, err := callHTTPGet(n.client, "/api/v4/license", n.username, n.password) + data, statusCode, err := callHTTPGet(n.client, n.uri, "/api/v4/license") if statusCode == http.StatusNotFound { // open source version doesn't support license api err = nil @@ -82,7 +81,7 @@ func (n *cluster4x) getClusterStatus() (cluster collector.ClusterStatus, err err } Code int }{} - err = callHTTPGetWithResp(n.client, "/api/v4/nodes", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v4/nodes", &resp) if err != nil { return } @@ -123,7 +122,7 @@ func (n *cluster4x) getBrokerMetrics() (metrics *collector.Broker, err error) { } Code int }{} - data, statusCode, err := callHTTPGet(n.client, "/api/v4/monitor/current_metrics", n.username, n.password) + data, statusCode, err := callHTTPGet(n.client, n.uri, "/api/v4/monitor/current_metrics") if statusCode == http.StatusNotFound { // open source version doesn't support this api err = nil @@ -178,7 +177,7 @@ func (n *cluster4x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err } Code int }{} - err = callHTTPGetWithResp(n.client, "/api/v4/rules?_limit=10000", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v4/rules?_limit=10000", &resp) if err != nil { return } @@ -236,7 +235,7 @@ func (n *cluster4x) getDataBridge() (bridges []collector.DataBridge, err error) } Code int }{} - err = callHTTPGetWithResp(n.client, "/api/v4/resources", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v4/resources", &resp) if err != nil { return } diff --git a/client/client_5.x.go b/client/client_5.x.go index 0eab335..08df9e8 100644 --- a/client/client_5.x.go +++ b/client/client_5.x.go @@ -12,11 +12,10 @@ import ( var _ client = &cluster5x{} type cluster5x struct { - username string - password string - version string - edition edition - client *fasthttp.Client + version string + edition edition + client *fasthttp.Client + uri *fasthttp.URI } func (n *cluster5x) getVersion() string { @@ -32,7 +31,7 @@ func (n *cluster5x) getLicense() (lic *collector.LicenseInfo, err error) { MaxConnections int64 `json:"max_connections"` ExpiryAt string `json:"expiry_at"` }{} - err = callHTTPGetWithResp(n.client, "/api/v5/license", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/license", &resp) if err != nil { return } @@ -63,7 +62,7 @@ func (n *cluster5x) getClusterStatus() (cluster collector.ClusterStatus, err err Load5 any `json:"load5"` Load15 any `json:"load15"` }{{}} - err = callHTTPGetWithResp(n.client, "/api/v5/nodes", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/nodes", &resp) if err != nil { return } @@ -109,7 +108,7 @@ func (n *cluster5x) getBrokerMetrics() (metrics *collector.Broker, err error) { SentMsgRate int64 `json:"sent_msg_rate"` ReceivedMsgRate int64 `json:"received_msg_rate"` }{} - err = callHTTPGetWithResp(n.client, "/api/v5/monitor_current", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/monitor_current", &resp) if err != nil { return } @@ -129,7 +128,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err Enable bool } }{} - err = callHTTPGetWithResp(n.client, "/api/v5/rules?limit=10000", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/rules?limit=10000", &resp) if err != nil { return } @@ -157,7 +156,7 @@ func (n *cluster5x) getRuleEngineMetrics() (metrics []collector.RuleEngine, err } } `json:"node_metrics"` }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), n.username, n.password, &metricsResp) + err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/rules/%s/metrics", rule.ID), &metricsResp) if err != nil { return } @@ -190,7 +189,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error) Type string Status string }{{}} - err = callHTTPGetWithResp(n.client, "/api/v5/bridges", n.username, n.password, &bridgesResp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/bridges", &bridgesResp) if err != nil { return } @@ -214,7 +213,7 @@ func (n *cluster5x) getDataBridge() (bridges []collector.DataBridge, err error) Dropped int64 } }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), n.username, n.password, &metricsResp) + err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/bridges/%s:%s/metrics", data.Type, data.Name), &metricsResp) if err != nil { return } @@ -233,7 +232,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour Backend string Enable bool }{{}} - err = callHTTPGetWithResp(n.client, "/api/v5/authentication", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/authentication", &resp) if err != nil { return } @@ -257,7 +256,7 @@ func (n *cluster5x) getAuthenticationMetrics() (dataSources []collector.DataSour } `json:"node_metrics"` Status string }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), n.username, n.password, &status) + err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/authentication/%s/status", plugin.ID), &status) if err != nil { return } @@ -296,7 +295,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc Enable bool } }{} - err = callHTTPGetWithResp(n.client, "/api/v5/authorization/sources", n.username, n.password, &resp) + err = callHTTPGetWithResp(n.client, n.uri, "/api/v5/authorization/sources", &resp) if err != nil { return } @@ -320,7 +319,7 @@ func (n *cluster5x) getAuthorizationMetrics() (dataSources []collector.DataSourc } `json:"node_metrics"` Status string }{} - err = callHTTPGetWithResp(n.client, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), n.username, n.password, &status) + err = callHTTPGetWithResp(n.client, n.uri, fmt.Sprintf("/api/v5/authorization/sources/%s/status", plugin.Type), &status) if err != nil { return } diff --git a/client/cluster.go b/client/cluster.go index 1944e5a..326adf5 100644 --- a/client/cluster.go +++ b/client/cluster.go @@ -22,12 +22,12 @@ func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster { c := &cluster{} go func() { - httpClient := getHTTPClient(metrics.Target) + httpClient := getHTTPClient(metrics) + uri := getURI(metrics) for { client4 := &cluster4x{ - username: metrics.APIKey, - password: metrics.APISecret, - client: httpClient, + client: httpClient, + uri: uri, } if _, err := client4.getClusterStatus(); err == nil { c.client = client4 @@ -38,9 +38,8 @@ func NewCluster(metrics *config.Metrics, logger log.Logger) collector.Cluster { } client5 := &cluster5x{ - username: metrics.APIKey, - password: metrics.APISecret, - client: httpClient, + client: httpClient, + uri: uri, } if _, err := client5.getClusterStatus(); err == nil { c.client = client5 diff --git a/client/utils.go b/client/utils.go index 4f178a4..d6ad500 100644 --- a/client/utils.go +++ b/client/utils.go @@ -1,7 +1,7 @@ package client import ( - "encoding/base64" + "emqx-exporter/config" "errors" "fmt" "net/http" @@ -29,7 +29,16 @@ func cutNodeName(nodeName string) string { return slice[1] } -func getHTTPClient(host string) *fasthttp.Client { +func getURI(metrics *config.Metrics) *fasthttp.URI { + uri := &fasthttp.URI{} + uri.SetUsername(metrics.APIKey) + uri.SetPassword(metrics.APISecret) + uri.SetScheme(metrics.Scheme) + uri.SetHost(metrics.Target) + return uri +} + +func getHTTPClient(metrics *config.Metrics) *fasthttp.Client { return &fasthttp.Client{ Name: "EMQX-Exporter", //User-Agent MaxConnsPerHost: 5, @@ -37,44 +46,37 @@ func getHTTPClient(host string) *fasthttp.Client { ReadTimeout: 5 * time.Second, WriteTimeout: 5 * time.Second, MaxConnWaitTimeout: 5 * time.Second, - ConfigureClient: func(hc *fasthttp.HostClient) error { - hc.Addr = host - return nil - }, + TLSConfig: metrics.TLSClientConfig.ToTLSConfig(), } } -func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data []byte, statusCode int, err error) { +func callHTTPGet(client *fasthttp.Client, uri *fasthttp.URI, requestURI string) (data []byte, statusCode int, err error) { req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) - req.SetRequestURI(uri) + req.SetURI(uri) + req.URI().SetPath(requestURI) req.Header.SetMethod(http.MethodGet) - req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password))) - // for fasthttp, must set host, otherwise will panic - // but it doesn't matter what value is set - // the host will be replaced by the real host in fasthttp.Client.ConfigureClient - req.Header.SetHost("emqx-exporter") resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) err = client.Do(req, resp) if err != nil { - err = fmt.Errorf("request %s failed. %w", uri, err) + err = fmt.Errorf("request %s failed. %w", req.URI().String(), err) return } statusCode = resp.StatusCode() if resp.StatusCode() != http.StatusOK { - err = fmt.Errorf("%s: %s", uri, http.StatusText(resp.StatusCode())) + err = fmt.Errorf("%s: %s", req.URI().String(), http.StatusText(resp.StatusCode())) return } data = resp.Body() if len(data) == 0 { - err = fmt.Errorf("get nothing from api %s", uri) + err = fmt.Errorf("get nothing from api %s", req.URI().String()) return } if !jsoniter.Valid(data) { @@ -87,12 +89,12 @@ func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data if code.ValueType() == jsoniter.NumberValue { // for emqx 4.4, it will return integer type code if occurred error if code.ToInt() != 0 { - errMsg = fmt.Sprintf("%s: %d", uri, code.ToInt()) + errMsg = fmt.Sprintf("%s: %d", req.URI().String(), code.ToInt()) } } else if code.ValueType() == jsoniter.StringValue { // for emqx 5, it will return string type code if occurred error if code.ToString() != "" { - errMsg = fmt.Sprintf("%s: %s", uri, code.ToString()) + errMsg = fmt.Sprintf("%s: %s", req.URI().String(), code.ToString()) } } @@ -106,15 +108,15 @@ func callHTTPGet(client *fasthttp.Client, uri, username, password string) (data return } -func callHTTPGetWithResp(client *fasthttp.Client, uri, username, password string, respData interface{}) (err error) { - data, _, err := callHTTPGet(client, uri, username, password) +func callHTTPGetWithResp(client *fasthttp.Client, uri *fasthttp.URI, requestURI string, respData interface{}) (err error) { + data, _, err := callHTTPGet(client, uri, requestURI) if err != nil { return } err = jsoniter.Unmarshal(data, respData) if err != nil { - err = fmt.Errorf("unmarshal api resp failed: %s, %s", uri, err.Error()) + err = fmt.Errorf("unmarshal api resp failed: %s, %s", requestURI, err.Error()) return } return diff --git a/config/config.go b/config/config.go index 005e2dd..18f4e2b 100644 --- a/config/config.go +++ b/config/config.go @@ -19,9 +19,11 @@ type Config struct { } type Metrics struct { - Target string `yaml:"target"` - APIKey string `yaml:"api_key"` - APISecret string `yaml:"api_secret"` + APIKey string `yaml:"api_key"` + APISecret string `yaml:"api_secret"` + Target string `yaml:"target"` + Scheme string `yaml:"scheme,omitempty"` + TLSClientConfig *TLSClientConfig `yaml:"tls_config,omitempty"` } type Probe struct { @@ -103,16 +105,32 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) { } if c.Metrics != nil { - if c.Metrics.Target == "" { - return fmt.Errorf("metrics.target is required") - } if c.Metrics.APIKey == "" { return fmt.Errorf("metrics.api_key is required") } - if c.Metrics.APISecret == "" { return fmt.Errorf("metrics.api_secret is required") } + if c.Metrics.Target == "" { + return fmt.Errorf("metrics.target is required") + } + if c.Metrics.TLSClientConfig != nil { + if c.Metrics.Scheme == "" { + c.Metrics.Scheme = "https" + } + if c.Metrics.TLSClientConfig.CAData, err = dataFromSliceOrFile(c.Metrics.TLSClientConfig.CAData, c.Metrics.TLSClientConfig.CAFile); err != nil { + return fmt.Errorf("metrics.ssl_config.ca_data: %s", err) + } + if c.Metrics.TLSClientConfig.CertData, err = dataFromSliceOrFile(c.Metrics.TLSClientConfig.CertData, c.Metrics.TLSClientConfig.CertFile); err != nil { + return fmt.Errorf("metrics.ssl_config.cert_data: %s", err) + } + if c.Metrics.TLSClientConfig.KeyData, err = dataFromSliceOrFile(c.Metrics.TLSClientConfig.KeyData, c.Metrics.TLSClientConfig.KeyFile); err != nil { + return fmt.Errorf("metrics.ssl_config.key_data: %s", err) + } + } + if c.Metrics.Scheme == "" { + c.Metrics.Scheme = "http" + } } for index, probe := range c.Probes { @@ -153,6 +171,9 @@ func (sc *SafeConfig) ReloadConfig(confFile string) (err error) { } func (conf *TLSClientConfig) ToTLSConfig() *tls.Config { + if conf == nil { + return nil + } certpool := x509.NewCertPool() certpool.AppendCertsFromPEM(conf.CAData) clientKeyPair, _ := tls.X509KeyPair(conf.CertData, conf.KeyData) diff --git a/config/example/config.yaml b/config/example/config.yaml index 8105c07..5cd7790 100644 --- a/config/example/config.yaml +++ b/config/example/config.yaml @@ -1,13 +1,10 @@ metrics: - ## EMQX API + api_key: "some_api_key" ## EMQX API key + api_secret: "some_api_secret" ## EMQX API secret target: 127.0.0.1:18083 - ## EMQX API key - api_key: "some_api_key" - ## EMQX API secret - api_secret: "some_api_secret" probes: - target: 127.0.0.1:1883 ## MQTT broker address - scheme: ## tcp, default is tcp + scheme: ## mqtt | tcp | mqtts | ssl | tls | ws | wss, default is tcp client_id: username: password: diff --git a/config/example/just_metrics_https.yaml b/config/example/just_metrics_https.yaml new file mode 100644 index 0000000..d4dfe6b --- /dev/null +++ b/config/example/just_metrics_https.yaml @@ -0,0 +1,10 @@ +metrics: + api_key: "some_api_key" + api_secret: "some_api_secret" + target: 127.0.0.1:18084 + scheme: https + tls_config: + insecure_skip_verify: true + ca_file: config/example/certs/cacert.pem + cert_file: config/example/certs/client-cert.pem + key_file: config/example/certs/client-key.pem