Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query batch feature #1052

Merged
merged 10 commits into from
Jul 17, 2022
20 changes: 11 additions & 9 deletions src/server/reader/api.go → src/pkg/prom/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

// Package v1 provides bindings to the Prometheus HTTP API v1:
// http://prometheus.io/docs/querying/api/
package reader
package prom

import (
"context"
Expand Down Expand Up @@ -558,10 +558,11 @@ func (qr *queryResult) UnmarshalJSON(b []byte) error {
// NewAPI returns a new API for the client.
//
// It is safe to use the returned API from multiple goroutines.
func NewAPI(c api.Client) API {
func NewAPI(c api.Client, opt ClientOptions) API {
return &httpAPI{
client: &apiClientImpl{
client: c,
opt: opt,
},
}
}
Expand Down Expand Up @@ -891,6 +892,7 @@ type apiClient interface {

type apiClientImpl struct {
client api.Client
opt ClientOptions
}

type apiResponse struct {
Expand Down Expand Up @@ -921,16 +923,16 @@ func (h *apiClientImpl) URL(ep string, args map[string]string) *url.URL {
}

func (h *apiClientImpl) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, Warnings, error) {
if Reader.Opts.BasicAuthUser != "" && Reader.Opts.BasicAuthPass != "" {
req.SetBasicAuth(Reader.Opts.BasicAuthUser, Reader.Opts.BasicAuthPass)
if h.opt.BasicAuthUser != "" && h.opt.BasicAuthPass != "" {
req.SetBasicAuth(h.opt.BasicAuthUser, h.opt.BasicAuthPass)
}

headerCount := len(Reader.Opts.Headers)
headerCount := len(h.opt.Headers)
if headerCount > 0 && headerCount%2 == 0 {
for i := 0; i < len(Reader.Opts.Headers); i += 2 {
req.Header.Add(Reader.Opts.Headers[i], Reader.Opts.Headers[i+1])
if Reader.Opts.Headers[i] == "Host" {
req.Host = Reader.Opts.Headers[i+1]
for i := 0; i < len(h.opt.Headers); i += 2 {
req.Header.Add(h.opt.Headers[i], h.opt.Headers[i+1])
if h.opt.Headers[i] == "Host" {
req.Host = h.opt.Headers[i+1]
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/pkg/prom/client_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package prom

type ClientOptions struct {
BasicAuthUser string

BasicAuthPass string

Headers []string
}
3 changes: 2 additions & 1 deletion src/server/engine/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/toolkits/pkg/str"

"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (r RuleEval) Work() {
var value model.Value
var err error
if r.rule.Algorithm == "" {
var warnings reader.Warnings
var warnings prom.Warnings
value, warnings, err = reader.Reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
Expand Down
11 changes: 8 additions & 3 deletions src/server/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"net/http"
"time"

"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/prometheus/client_golang/api"
)

type ReaderType struct {
Opts config.ReaderOptions
Client API
Client prom.API
}

var Reader ReaderType
Expand Down Expand Up @@ -41,8 +42,12 @@ func Init(opts config.ReaderOptions) error {
}

Reader = ReaderType{
Opts: opts,
Client: NewAPI(cli),
Opts: opts,
Client: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opts.BasicAuthUser,
BasicAuthPass: opts.BasicAuthPass,
Headers: opts.Headers,
}),
}

return nil
Expand Down
94 changes: 72 additions & 22 deletions src/webapi/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ import (
"sync"
"time"

"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
)

type ClusterType struct {
Opts config.ClusterOptions
Transport *http.Transport
Opts config.ClusterOptions
Transport *http.Transport
PromClient prom.API
}

type ClustersType struct {
Expand All @@ -39,6 +42,19 @@ func (cs *ClustersType) Get(name string) (*ClusterType, bool) {
return c, has
}

func (cs *ClustersType) GetClusters() []*ClusterType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个方法是不是已经不需要了?

cs.mutex.Lock()
clusterTypes := make([]*ClusterType, 0, len(cs.datas))
for k := range cs.datas {
c, has := cs.datas[k]
if has {
clusterTypes = append(clusterTypes, c)
}
}
cs.mutex.Unlock()
return clusterTypes
}

var Clusters = ClustersType{
datas: make(map[string]*ClusterType),
mutex: new(sync.RWMutex),
Expand All @@ -61,17 +77,34 @@ func initClustersFromConfig() error {
opts := config.C.Clusters

for i := 0; i < len(opts); i++ {
transport := &http.Transport{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initClustersFromConfig 和 initClustersFromAPI 中有大片代码重复,看看是否可以抽取出方法?

// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
}

cli, err := api.NewClient(api.Config{
Address: opts[i].Prom,
RoundTripper: transport,
})

if err != nil {
logger.Errorf("new client fail: %v", err)
continue
}

cluster := &ClusterType{
Opts: opts[i],
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts[i].DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts[i].Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opts[i].MaxIdleConnsPerHost,
},
Opts: opts[i],
Transport: transport,
PromClient: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opts[i].BasicAuthUser,
BasicAuthPass: opts[i].BasicAuthPass,
Headers: opts[i].Headers,
}),
}
Clusters.Put(opts[i].Name, cluster)
}
Expand Down Expand Up @@ -173,17 +206,34 @@ func loadClustersFromAPI() {
MaxIdleConnsPerHost: 32,
}

transport := &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
}

cli, err := api.NewClient(api.Config{
Address: opt.Prom,
RoundTripper: transport,
})

if err != nil {
logger.Errorf("new client fail: %v", err)
continue
}

cluster := &ClusterType{
Opts: opt,
Transport: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opt.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opt.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: opt.MaxIdleConnsPerHost,
},
Opts: opt,
Transport: transport,
PromClient: prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opt.BasicAuthUser,
BasicAuthPass: opt.BasicAuthPass,
Headers: opt.Headers,
}),
}

Clusters.Put(item.Name, cluster)
Expand Down
3 changes: 3 additions & 0 deletions src/webapi/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func configRoute(r *gin.Engine, version string) {

pages := r.Group(pagesPrefix)
{

SunnyBoy-WYH marked this conversation as resolved.
Show resolved Hide resolved
pages.POST("/query-range-batch", promBatchQueryRange)

if config.C.AnonymousAccess.PromQuerier {
pages.Any("/prometheus/*url", prometheusProxy)
} else {
Expand Down
58 changes: 58 additions & 0 deletions src/webapi/router/router_prometheus.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
package router

import (
"context"
"errors"

"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"

"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"

. "github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/didi/nightingale/v5/src/webapi/prom"
"github.com/prometheus/common/model"
)

type queryFormItem struct {
Start int64 `json:"start" binding:"required"`
End int64 `json:"end" binding:"required"`
Step int64 `json:"step" binding:"required"`
Query string `json:"query" binding:"required"`
}

type batchQueryForm struct {
Queries []queryFormItem `json:"queries" binding:"required"`
}

type batchQueryRes struct {
SunnyBoy-WYH marked this conversation as resolved.
Show resolved Hide resolved
Data []model.Value `json:"data"`
}

func promBatchQueryRange(c *gin.Context) {

xcluster := c.GetHeader("X-Cluster")
SunnyBoy-WYH marked this conversation as resolved.
Show resolved Hide resolved
var f batchQueryForm
err := c.BindJSON(&f)
if err != nil {
c.String(500, "%s", err.Error())
SunnyBoy-WYH marked this conversation as resolved.
Show resolved Hide resolved
}
res, err := batchQueryRange(xcluster, f.Queries)

ginx.NewRender(c).Data(res, err)
}

func prometheusProxy(c *gin.Context) {
xcluster := c.GetHeader("X-Cluster")
if xcluster == "" {
Expand Down Expand Up @@ -93,3 +127,27 @@ func clustersGets(c *gin.Context) {
}
ginx.NewRender(c).Data(names, nil)
}

func batchQueryRange(clusterName string, data []queryFormItem) (batchQueryRes, error) {

var res batchQueryRes

clusterType, exist := prom.Clusters.Get(clusterName)
if !exist {
return batchQueryRes{}, errors.New("cluster client not exist")
}
for _, item := range data {

r := Range{
Start: time.Unix(item.Start, 0),
End: time.Unix(item.End, 0),
Step: time.Duration(item.Step) * time.Second,
}
resp, _, err := clusterType.PromClient.QueryRange(context.Background(), item.Query, r)
if err != nil {
return res, err
}
res.Data = append(res.Data, resp)
SunnyBoy-WYH marked this conversation as resolved.
Show resolved Hide resolved
}
return res, nil
}
4 changes: 4 additions & 0 deletions src/webapi/webapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func (a Webapi) initialize() (func(), error) {
if err = prom.Init(); err != nil {
return nil, err
}
// init reader clients
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重复了,上面已经有了prom.Init

if err = prom.Init(); err != nil {
return nil, err
}

stat.Init()

Expand Down