Skip to content

Commit

Permalink
feat: support same pron in extra-filters
Browse files Browse the repository at this point in the history
  • Loading branch information
taloric committed Oct 18, 2024
1 parent 2812b2f commit 312dab1
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 33 deletions.
1 change: 1 addition & 0 deletions server/querier/app/prometheus/router/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func promQueryRange(svc *service.PrometheusService) gin.HandlerFunc {
debug := c.Request.FormValue("debug")
block_team_id := c.Request.FormValue("block-team-id")
offloading := c.Request.FormValue("operator-offloading")
args.ExtraFilters = c.Request.FormValue("extra-filters")
setRouterArgs(slimit, &args.Slimit, config.Cfg.Prometheus.SeriesLimit, strconv.Atoi)
setRouterArgs(debug, &args.Debug, config.Cfg.Prometheus.RequestQueryWithDebug, strconv.ParseBool)
setRouterArgs(offloading, &args.Offloading, config.Cfg.Prometheus.OperatorOffloading, strconv.ParseBool)
Expand Down
221 changes: 188 additions & 33 deletions server/querier/app/prometheus/service/converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/promql/parser"
"github.com/xwb1989/sqlparser"

"github.com/deepflowio/deepflow/server/querier/app/prometheus/model"
"github.com/deepflowio/deepflow/server/querier/common"
Expand Down Expand Up @@ -300,37 +301,11 @@ func (p *prometheusReader) promReaderTransToSQL(ctx context.Context, req *prompb
filters := make([]string, 0, len(q.Matchers)+1)
filters = append(filters, fmt.Sprintf("(time >= %d AND time <= %d)", startTime, endTime))
for _, matcher := range q.Matchers {
if matcher.Name == PROMETHEUS_METRICS_NAME {
tagName, tagAlias, isDeepFlowTag, newFilter := p.parseMatchers(matcher, prefixType, db)
if newFilter == "" {
continue
}
tagName, tagAlias, isDeepFlowTag := p.parsePromQLTag(prefixType, db, matcher.Name)
operation, value := getLabelMatcher(matcher.Type, matcher.Value, isDeepFlowTag)
if operation == "" {
return ctx, "", "", "", "", fmt.Errorf("unknown match type %v", matcher.Type)
}

// for normal query & DeepFlow metrics, query enum tag can only use tag name(Enum(x)) in filter clause
tagMatcher := tagName
if prefixType != prefixNone && isDeepFlowTag && tagAlias != "" {
// for Prometheus metrics, query DeepFlow enum tag can only use tag alias(x_enum) in filter clause
tagMatcher = tagAlias
}

if len(value) > 1 {
tmpFilters := make([]string, 0, len(value))
for _, v := range value {
tmpFilters = append(tmpFilters, fmt.Sprintf("%s %s '%s'", tagMatcher, operation, escapeSingleQuote(v)))
}
filters = append(filters, fmt.Sprintf("(%s)", strings.Join(tmpFilters, " OR ")))
} else {
// () with only ONE condition in it will cause error
if value[0] == "" && isDeepFlowTag {
// only for DeepFlow Tag, when value is empty, use [not] exist(`tag`) for query
filters = append(filters, fmt.Sprintf("%s(%s)", operation, tagMatcher))
} else {
filters = append(filters, fmt.Sprintf("%s %s '%s'", tagMatcher, operation, escapeSingleQuote(value[0])))
}
}
filters = append(filters, newFilter)

if db == "" || db == chCommon.DB_NAME_PROMETHEUS || db == chCommon.DB_NAME_EXT_METRICS {
if isDeepFlowTag && (len(q.Hints.Grouping) == 0 || tagAlias != "") {
Expand All @@ -344,6 +319,35 @@ func (p *prometheusReader) promReaderTransToSQL(ctx context.Context, req *prompb
}
}

if len(p.extraFilters) > 0 {
// to support same pron like promql filters, here we need to parse and extract `where` clause
// it can't be use in querier where directly
extraLabelMatchers, err := parseExtraFiltersToMatchers(p.extraFilters)
if err == nil {
// outside matchers use 'OR' for connected
outerFilters := make([]string, 0, len(extraLabelMatchers))
for i := 0; i < len(extraLabelMatchers); i++ {
// inside matchers use 'AND' for connected
innerFilters := make([]string, 0, len(extraLabelMatchers[i]))
for j := 0; j < len(extraLabelMatchers[i]); j++ {
matcher := extraLabelMatchers[i][j]
tagName, tagAlias, isDeepFlowTag, newFilter := p.parseMatchers(matcher, prefixType, db)
if newFilter == "" {
continue
}
innerFilters = append(innerFilters, newFilter)
if db == "" || db == chCommon.DB_NAME_PROMETHEUS || db == chCommon.DB_NAME_EXT_METRICS {
if isDeepFlowTag && (len(q.Hints.Grouping) == 0 || tagAlias != "") {
expectedDeepFlowNativeTags[tagName] = tagAlias
}
}
}
outerFilters = append(outerFilters, fmt.Sprintf("(%s)", strings.Join(innerFilters, " AND ")))
}
filters = append(filters, fmt.Sprintf("(%s)", strings.Join(outerFilters, " OR ")))
}
}

// append query field: 4. append DeepFlow native tags for Prometheus metrics
for tagName, tagAlias := range expectedDeepFlowNativeTags {
// reduce Prometheus query DeepFlow tags
Expand All @@ -362,14 +366,45 @@ func (p *prometheusReader) promReaderTransToSQL(ctx context.Context, req *prompb
if len(p.blockTeamID) > 0 {
filters = append(filters, fmt.Sprintf("team_id not in (%s)", strings.Join(p.blockTeamID, ",")))
}
if len(p.extraFilters) > 0 {
filters = append(filters, fmt.Sprintf("(%s)", p.extraFilters))
}

sql := parseToQuerierSQL(ctx, db, table, metricsArray, filters, groupBy, orderBy)
return ctx, sql, db, dataPrecision, queryMetric, err
}

func (p *prometheusReader) parseMatchers(matcher *prompb.LabelMatcher, prefixType prefix, db string) (string, string, bool, string) {
if matcher.Name == labels.MetricName {
return "", "", false, ""
}
tagName, tagAlias, isDeepFlowTag := p.parsePromQLTag(prefixType, db, matcher.Name)
operation, value := getLabelMatcher(matcher.Type, matcher.Value, isDeepFlowTag)
if operation == "" {
return "", "", false, ""
}

// for normal query & DeepFlow metrics, query enum tag can only use tag name(Enum(x)) in filter clause
tagMatcher := tagName
if prefixType != prefixNone && isDeepFlowTag && tagAlias != "" {
// for Prometheus metrics, query DeepFlow enum tag can only use tag alias(x_enum) in filter clause
tagMatcher = tagAlias
}

if len(value) > 1 {
tmpFilters := make([]string, 0, len(value))
for _, v := range value {
tmpFilters = append(tmpFilters, fmt.Sprintf("%s %s '%s'", tagMatcher, operation, escapeSingleQuote(v)))
}
return tagName, tagAlias, isDeepFlowTag, fmt.Sprintf("(%s)", strings.Join(tmpFilters, " OR "))
} else {
// () with only ONE condition in it will cause error
if value[0] == "" && isDeepFlowTag {
// only for DeepFlow Tag, when value is empty, use [not] exist(`tag`) for query
return tagName, tagAlias, isDeepFlowTag, fmt.Sprintf("%s(%s)", operation, tagMatcher)
} else {
return tagName, tagAlias, isDeepFlowTag, fmt.Sprintf("%s %s '%s'", tagMatcher, operation, escapeSingleQuote(value[0]))
}
}
}

// return: prefixType, metricName, db, table, dataPrecision, metricAlias
// prefixType: identified if use `tag_` or `df_` prefix in labels for prometheus native metrics
// metricName: real metric in database
Expand Down Expand Up @@ -1101,7 +1136,28 @@ func (p *prometheusReader) parseQueryRequestToSQL(ctx context.Context, queryReq
filters = append(filters, fmt.Sprintf("team_id not in (%s)", strings.Join(p.blockTeamID, ",")))
}
if len(p.extraFilters) > 0 {
filters = append(filters, fmt.Sprintf("(%s)", p.extraFilters))
extraLabelMatchers, err := parseExtraFiltersToMatchers(p.extraFilters)
if err == nil {
// outside matchers use 'OR' for connected
outerFilters := make([]string, 0, len(extraLabelMatchers))
for i := 0; i < len(extraLabelMatchers); i++ {
// inside matchers use 'AND' for connected
innerFilters := make([]string, 0, len(extraLabelMatchers[i]))
for j := 0; j < len(extraLabelMatchers[i]); j++ {
matcher := extraLabelMatchers[i][j]
tagName, tagAlias, isDeepFlowTag, newFilter := p.parseMatchers(matcher, prefixDeepFlow, chCommon.DB_NAME_PROMETHEUS)
if newFilter == "" {
continue
}
if isDeepFlowTag && cap(groupBy) == 0 {
expectedQueryTags[tagName] = tagAlias
}
innerFilters = append(innerFilters, newFilter)
}
outerFilters = append(outerFilters, fmt.Sprintf("(%s)", strings.Join(innerFilters, " AND ")))
}
filters = append(filters, fmt.Sprintf("(%s)", strings.Join(outerFilters, " OR ")))
}
}
sql := parseToQuerierSQL(ctx, chCommon.DB_NAME_PROMETHEUS, queryReq.GetMetric(), selection, filters, groupBy, orderBy)
return sql
Expand Down Expand Up @@ -1395,3 +1451,102 @@ func removeTagPrefix(tag string) string {
func escapeSingleQuote(v string) string {
return strings.Replace(v, "'", "''", -1)
}

func parseOperator(op string) prompb.LabelMatcher_Type {
switch op {
case "=":
return prompb.LabelMatcher_EQ
case "!=":
return prompb.LabelMatcher_NEQ
default:
return prompb.LabelMatcher_EQ
}
}

func parseExtraFiltersToMatchers(filters string) ([][]*prompb.LabelMatcher, error) {
fakeSQL := fmt.Sprintf("select 1 from t where %s", filters)
stmt, err := sqlparser.Parse(fakeSQL)
if err != nil {
return nil, err
}
selectStmt := stmt.(*sqlparser.Select)
labelMatchers := make([][]*prompb.LabelMatcher, 0)
_, err = iterateExprs(selectStmt.Where.Expr, &labelMatchers)
if err != nil {
return nil, err
}
return labelMatchers, nil
}

func iterateExprs(node sqlparser.Expr, labelMatchers *[][]*prompb.LabelMatcher) (sqlparser.Expr, error) {
switch node := node.(type) {
case *sqlparser.AndExpr:
left, err := iterateExprs(node.Left, labelMatchers)
if err != nil {
return left, err
}
right, err := iterateExprs(node.Right, labelMatchers)
if err != nil {
return right, err
}
if left == nil {
return right, nil
} else if right == nil {
return left, nil
}
return node, nil
case *sqlparser.OrExpr:
left, err := iterateExprs(node.Left, labelMatchers)
if err != nil {
return left, err
}
right, err := iterateExprs(node.Right, labelMatchers)
if err != nil {
return right, err
}
if left == nil {
return right, nil
} else if right == nil {
return left, nil
}
return node, nil
case *sqlparser.ParenExpr:
(*labelMatchers) = append((*labelMatchers), []*prompb.LabelMatcher{})
expr, err := iterateExprs(node.Expr, labelMatchers)
if err != nil {
return expr, err
}
return expr, nil
case *sqlparser.ComparisonExpr:
var comparExpr sqlparser.Expr
if parenExpr, ok := node.Left.(*sqlparser.ParenExpr); ok {
comparExpr = parenExpr.Expr
} else {
comparExpr = node.Left
}
var colName, colValue, op string
switch comparExpr.(type) {
case *sqlparser.SQLVal:
colValue = sqlparser.String(comparExpr)
colName = sqlparser.String(node.Right)
op = node.Operator
case *sqlparser.ColName:
colName = sqlparser.String(comparExpr)
colValue = sqlparser.String(node.Right)
op = node.Operator
}
lastIndex := len(*labelMatchers) - 1
if lastIndex < 0 {
(*labelMatchers) = append((*labelMatchers), []*prompb.LabelMatcher{})
}

(*labelMatchers)[lastIndex] = append((*labelMatchers)[lastIndex], &prompb.LabelMatcher{
Type: parseOperator(op),
Name: colName,
Value: colValue,
})
return node, nil
default:
return node, nil
}
}
32 changes: 32 additions & 0 deletions server/querier/app/prometheus/service/converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,3 +907,35 @@ func TestParseQueryRequestToSQL(t *testing.T) {
}

}

func TestParseExFilters(t *testing.T) {
var testCases = []struct {
input string
output [][]*prompb.LabelMatcher
}{{
input: "(pod=1 and pod_ns=2) or (pod=1 and pod_ns=3) or (pod=5 and pod_ns=3)",
output: [][]*prompb.LabelMatcher{
{{Name: "pod", Type: prompb.LabelMatcher_EQ, Value: "1"}, {Name: "pod_ns", Type: prompb.LabelMatcher_EQ, Value: "2"}},
{{Name: "pod", Type: prompb.LabelMatcher_EQ, Value: "1"}, {Name: "pod_ns", Type: prompb.LabelMatcher_EQ, Value: "3"}},
{{Name: "pod", Type: prompb.LabelMatcher_EQ, Value: "5"}, {Name: "pod_ns", Type: prompb.LabelMatcher_EQ, Value: "3"}},
},
}}
t.Run("ParseExFilters", func(t *testing.T) {
for i := 0; i < len(testCases); i++ {
output, err := parseExtraFiltersToMatchers(testCases[i].input)
if err != nil {
t.Errorf("Expected output: %v, got: %v", testCases[i].output, err)
} else {
outerIndex := len(testCases[i].output)
for j := 0; j < outerIndex; j++ {
innerIndex := len(testCases[i].output[j])
for k := 0; k < innerIndex; k++ {
assert.Equal(t, testCases[i].output[j][k].Name, output[j][k].Name)
assert.Equal(t, testCases[i].output[j][k].Value, output[j][k].Value)
assert.Equal(t, testCases[i].output[j][k].Type, output[j][k].Type)
}
}
}
}
})
}

0 comments on commit 312dab1

Please sign in to comment.