Skip to content

Commit

Permalink
Query batch feature (#1052)
Browse files Browse the repository at this point in the history
* batch query prom for single panel

* make code better:

1.extract server/api.go

2.make webapi reading prom with reusing server's API,not a new prom client

* clear code

* clear code

* format code
clear code

* move reader.go,reuse webapi/prom/prom.go clusterTypes clients cache

* clear code,extract common method
  • Loading branch information
SunnyBoy-WYH authored Jul 17, 2022
1 parent b7ff82d commit 05651ad
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 39 deletions.
20 changes: 11 additions & 9 deletions src/server/reader/api.go → src/pkg/prom/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

// Package v1 provides bindings to the Prometheus HTTP API v1:
// http://prometheus.io/docs/querying/api/
package reader
package prom

import (
"context"
Expand Down Expand Up @@ -558,10 +558,11 @@ func (qr *queryResult) UnmarshalJSON(b []byte) error {
// NewAPI returns a new API for the client.
//
// It is safe to use the returned API from multiple goroutines.
func NewAPI(c api.Client) API {
func NewAPI(c api.Client, opt ClientOptions) API {
return &httpAPI{
client: &apiClientImpl{
client: c,
opt: opt,
},
}
}
Expand Down Expand Up @@ -891,6 +892,7 @@ type apiClient interface {

type apiClientImpl struct {
client api.Client
opt ClientOptions
}

type apiResponse struct {
Expand Down Expand Up @@ -921,16 +923,16 @@ func (h *apiClientImpl) URL(ep string, args map[string]string) *url.URL {
}

func (h *apiClientImpl) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, Warnings, error) {
if Reader.Opts.BasicAuthUser != "" && Reader.Opts.BasicAuthPass != "" {
req.SetBasicAuth(Reader.Opts.BasicAuthUser, Reader.Opts.BasicAuthPass)
if h.opt.BasicAuthUser != "" && h.opt.BasicAuthPass != "" {
req.SetBasicAuth(h.opt.BasicAuthUser, h.opt.BasicAuthPass)
}

headerCount := len(Reader.Opts.Headers)
headerCount := len(h.opt.Headers)
if headerCount > 0 && headerCount%2 == 0 {
for i := 0; i < len(Reader.Opts.Headers); i += 2 {
req.Header.Add(Reader.Opts.Headers[i], Reader.Opts.Headers[i+1])
if Reader.Opts.Headers[i] == "Host" {
req.Host = Reader.Opts.Headers[i+1]
for i := 0; i < len(h.opt.Headers); i += 2 {
req.Header.Add(h.opt.Headers[i], h.opt.Headers[i+1])
if h.opt.Headers[i] == "Host" {
req.Host = h.opt.Headers[i+1]
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/pkg/prom/client_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package prom

type ClientOptions struct {
BasicAuthUser string

BasicAuthPass string

Headers []string
}
3 changes: 2 additions & 1 deletion src/server/engine/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/toolkits/pkg/str"

"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (r RuleEval) Work() {
var value model.Value
var err error
if r.rule.Algorithm == "" {
var warnings reader.Warnings
var warnings prom.Warnings
value, warnings, err = reader.Reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
Expand Down
11 changes: 8 additions & 3 deletions src/server/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"net/http"
"time"

"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/prometheus/client_golang/api"
)

type ReaderType struct {
Opts config.ReaderOptions
Client API
Client prom.API
}

var Reader ReaderType
Expand Down Expand Up @@ -41,8 +42,12 @@ func Init(opts config.ReaderOptions) error {
}

Reader = ReaderType{
Opts: opts,
Client: NewAPI(cli),
Opts: opts,
Client: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opts.BasicAuthUser,
BasicAuthPass: opts.BasicAuthPass,
Headers: opts.Headers,
}),
}

return nil
Expand Down
65 changes: 39 additions & 26 deletions src/webapi/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ import (
"sync"
"time"

"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
)

type ClusterType struct {
Opts config.ClusterOptions
Transport *http.Transport
Opts config.ClusterOptions
Transport *http.Transport
PromClient prom.API
}

type ClustersType struct {
Expand Down Expand Up @@ -61,18 +64,7 @@ func initClustersFromConfig() error {
opts := config.C.Clusters

for i := 0; i < len(opts); i++ {
cluster := &ClusterType{
Opts: opts[i],
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
},
}
cluster := newClusterTypeByOption(opts[i])
Clusters.Put(opts[i].Name, cluster)
}

Expand Down Expand Up @@ -173,21 +165,42 @@ func loadClustersFromAPI() {
MaxIdleConnsPerHost: 32,
}

cluster := &ClusterType{
Opts: opt,
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
},
}
cluster := newClusterTypeByOption(opt)

Clusters.Put(item.Name, cluster)
continue
}
}
}

func newClusterTypeByOption(opt config.ClusterOptions) *ClusterType {
transport := &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
}

cli, err := api.NewClient(api.Config{
Address: opt.Prom,
RoundTripper: transport,
})

if err != nil {
logger.Errorf("new client fail: %v", err)
}

cluster := &ClusterType{
Opts: opt,
Transport: transport,
PromClient: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opt.BasicAuthUser,
BasicAuthPass: opt.BasicAuthPass,
Headers: opt.Headers,
}),
}
return cluster
}
5 changes: 5 additions & 0 deletions src/webapi/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,15 @@ func configRoute(r *gin.Engine, version string) {

pages := r.Group(pagesPrefix)
{

if config.C.AnonymousAccess.PromQuerier {
pages.Any("/prometheus/*url", prometheusProxy)

pages.POST("/query-range-batch", promBatchQueryRange)
} else {
pages.Any("/prometheus/*url", auth(), prometheusProxy)

pages.POST("/query-range-batch", auth(), promBatchQueryRange)
}

pages.GET("/version", func(c *gin.Context) {
Expand Down
65 changes: 65 additions & 0 deletions src/webapi/router/router_prometheus.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,59 @@
package router

import (
"context"
"errors"

"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"

. "github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/didi/nightingale/v5/src/webapi/prom"
"github.com/prometheus/common/model"
)

type queryFormItem struct {
Start int64 `json:"start" binding:"required"`
End int64 `json:"end" binding:"required"`
Step int64 `json:"step" binding:"required"`
Query string `json:"query" binding:"required"`
}

type batchQueryForm struct {
Queries []queryFormItem `json:"queries" binding:"required"`
}

type batchQueryRes struct {
Data []model.Value `json:"data"`
}

func promBatchQueryRange(c *gin.Context) {

xcluster := c.GetHeader("X-Cluster")

if xcluster == "" {
c.String(500, "X-Cluster is blank")
return
}

var f batchQueryForm
err := c.BindJSON(&f)
if err != nil {
c.String(500, "%s", err.Error())
return
}
res, err := batchQueryRange(xcluster, f.Queries)

ginx.NewRender(c).Data(res, err)
}

func prometheusProxy(c *gin.Context) {
xcluster := c.GetHeader("X-Cluster")
if xcluster == "" {
Expand Down Expand Up @@ -93,3 +134,27 @@ func clustersGets(c *gin.Context) {
}
ginx.NewRender(c).Data(names, nil)
}

func batchQueryRange(clusterName string, data []queryFormItem) (batchQueryRes, error) {

var res batchQueryRes

clusterType, exist := prom.Clusters.Get(clusterName)
if !exist {
return batchQueryRes{}, errors.New("cluster client not exist")
}
for _, item := range data {

r := Range{
Start: time.Unix(item.Start, 0),
End: time.Unix(item.End, 0),
Step: time.Duration(item.Step) * time.Second,
}
resp, _, err := clusterType.PromClient.QueryRange(context.Background(), item.Query, r)
if err != nil {
return res, err
}
res.Data = append(res.Data, resp)
}
return res, nil
}

0 comments on commit 05651ad

Please sign in to comment.