Skip to content

Commit

Permalink
Implement top_hits in pancakes (#719)
Browse files Browse the repository at this point in the history
The change implements `top_hits` in pancakes.
![Screenshot 2024-09-08 at 17 37
08](https://github.com/user-attachments/assets/de1884c9-bd56-446f-80a5-46abb88c201a)

Left unfinished, will address in next PRs:
- [ ] better way of handling geo
- [ ] test order of top hits
- [ ] filters/range/dataRange
  • Loading branch information
jakozaur authored Sep 10, 2024
1 parent f0867ae commit 749fee2
Show file tree
Hide file tree
Showing 20 changed files with 732 additions and 68 deletions.
7 changes: 5 additions & 2 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type PerformanceResult struct {
Duration time.Duration
RowsReturned int
ExplainPlan string
Error error
}

// ProcessQuery - only WHERE clause
Expand Down Expand Up @@ -177,12 +178,15 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
}

queryID := getQueryId(ctx)
performanceResult.QueryID = queryID

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings), clickhouse.WithQueryID(queryID))

rows, err := lm.Query(ctx, queryAsString)
if err != nil {
span.End(err)
elapsed := span.End(err)
performanceResult.Duration = elapsed
performanceResult.Error = err
return nil, performanceResult, end_user_errors.GuessClickhouseErrorType(err).InternalDetails("clickhouse: query failed. err: %v, query: %v", err, queryAsString)
}

Expand All @@ -191,7 +195,6 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
elapsed := span.End(nil)
performanceResult.Duration = elapsed
performanceResult.RowsReturned = len(res)
performanceResult.QueryID = queryID
if err == nil {
if lm.shouldExplainQuery(elapsed) {
performanceResult.ExplainPlan = lm.explainQuery(ctx, queryAsString, elapsed)
Expand Down
1 change: 1 addition & 0 deletions quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var InvalidExpr = Expr(nil)

// ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go
type ColumnRef struct {
TableAlias string // used for alias in joins, most of the times empty string.
ColumnName string
}

Expand Down
6 changes: 5 additions & 1 deletion quesma/model/expr_string_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ func (v *renderer) VisitColumnRef(e ColumnRef) interface{} {
name = strings.TrimSuffix(name, "::keyword") // TODO is this needed?
name = strings.TrimSuffix(name, types.MultifieldMapKeysSuffix)
name = strings.TrimSuffix(name, types.MultifieldMapValuesSuffix)
return strconv.Quote(name)
if len(e.TableAlias) > 0 {
return fmt.Sprintf("%s.%s", strconv.Quote(e.TableAlias), strconv.Quote(name))
} else {
return strconv.Quote(name)
}
}

func (v *renderer) VisitPrefixExpr(e PrefixExpr) interface{} {
Expand Down
41 changes: 34 additions & 7 deletions quesma/model/metrics_aggregations/top_hits.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metrics_aggregations

import (
"context"
"encoding/json"
"quesma/logger"
"quesma/model"
"quesma/schema"
Expand All @@ -12,21 +13,27 @@ import (
)

type TopHits struct {
ctx context.Context
ctx context.Context
Size int
OrderBy []model.OrderByExpr
}

func NewTopHits(ctx context.Context) TopHits {
return TopHits{ctx: ctx}
func NewTopHits(ctx context.Context, size int) TopHits {
return TopHits{ctx: ctx, Size: size}
}

func NewTopHitsWithOrderBy(ctx context.Context, size int, orderBy []model.OrderByExpr) TopHits {
return TopHits{ctx: ctx, Size: size, OrderBy: orderBy}
}

func (query TopHits) AggregationType() model.AggregationType {
return model.MetricsAggregation
}

// TODO implement correct
// TODO: implement correct
func (query TopHits) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
var topElems []any
if len(rows) > 0 && level >= len(rows[0].Cols)-1 {
if len(rows) > 0 && level >= len(rows[0].Cols) {
// values are [level, len(row.Cols) - 1]
logger.WarnWithCtx(query.ctx).Msgf(
"no columns returned for top_hits aggregation, level: %d, len(rows[0].Cols): %d, len(rows): %d",
Expand Down Expand Up @@ -74,17 +81,37 @@ func (query TopHits) TranslateSqlResponseToJson(rows []model.QueryResultRow, lev
}

} else {
sourceMap[col.ColName] = col.ExtractValue(query.ctx)
value := col.ExtractValue(query.ctx)
// TODO: this is hack, we should not assume this is location
if strings.HasSuffix(col.ColName, "Location") {
if valueStr, ok := value.(string); ok {
var valueJson model.JsonMap
if err := json.Unmarshal([]byte(valueStr), &valueJson); err == nil {
value = valueJson
}
}
}
sourceMap[col.ColName] = value
}
}

elem := model.JsonMap{
"_source": sourceMap,
"_score": 1.0, // placeholder
"_id": "", // TODO: placeholder
"_index": "", // TODO: placeholder
}
topElems = append(topElems, elem)
}
return model.JsonMap{
"hits": topElems,
"hits": model.JsonMap{
"hits": topElems,
"max_score": 1.0, // placeholder
"total": model.JsonMap{ // could be better
"relation": "eq", // TODO: wrong, but let's pass test, it should ge geq
"value": len(topElems),
},
},
}
}

Expand Down
10 changes: 9 additions & 1 deletion quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type aggrQueryBuilder struct {
type metricsAggregation struct {
AggrType string
Fields []model.Expr // on these fields we're doing aggregation. Array, because e.g. 'top_hits' can have multiple fields
OrderBy []model.OrderByExpr // only for top_hits
FieldType clickhouse.DateTimeType // field type of FieldNames[0]. If it's a date field, a slightly different response is needed
Percentiles map[string]float64 // Only for percentiles aggregation
Keyed bool // Only for percentiles aggregation
Expand Down Expand Up @@ -284,7 +285,7 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio
case "quantile":
query.Type = metrics_aggregations.NewQuantile(b.ctx, util.MapKeysSortedByValue(metricsAggr.Percentiles), metricsAggr.Keyed, metricsAggr.FieldType)
case "top_hits":
query.Type = metrics_aggregations.NewTopHits(b.ctx)
query.Type = metrics_aggregations.NewTopHits(b.ctx, metricsAggr.Size)
case "top_metrics":
query.Type = metrics_aggregations.NewTopMetrics(b.ctx, metricsAggr.sortByExists())
case "value_count":
Expand Down Expand Up @@ -574,8 +575,14 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m

const defaultSize = 1
size := defaultSize
orderBy := []model.OrderByExpr{}
if mapTyped, ok := topHits.(QueryMap); ok {
size = cw.parseSize(mapTyped, defaultSize)
orderBy = cw.parseOrder(mapTyped, queryMap, []model.Expr{})
if len(orderBy) == 1 && orderBy[0].IsCountDesc() { // we don't need count DESC
orderBy = []model.OrderByExpr{}
}

} else {
logger.WarnWithCtx(cw.Ctx).Msgf("top_hits is not a map, but %T, value: %v. Using default size.", topHits, topHits)
}
Expand All @@ -584,6 +591,7 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
Fields: exprs,
FieldType: metricsAggregationDefaultFieldType, // don't need to check, it's unimportant for this aggregation
Size: size,
OrderBy: orderBy,
}, true
}

Expand Down
9 changes: 5 additions & 4 deletions quesma/queryparser/pancake_aggregation_parser_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre
model.NewFunction("sumOrNull", expr))

case "top_hits":
// see other buildMetricsAggregation(), we don't implement it now
return nil, errors.New("top_hits is not implemented yet in version una")
innerFieldsAsSelect := make([]model.Expr, len(metricsAggr.Fields))
copy(innerFieldsAsSelect, metricsAggr.Fields)
return innerFieldsAsSelect, nil
case "top_metrics":
// see other buildMetricsAggregation(), we don't implement it now
return nil, errors.New("top_hits is not implemented yet in version una")
return nil, errors.New("top_metrics is not implemented yet in version una")
case "percentile_ranks":
result = make([]model.Expr, 0, len(metricsAggr.CutValues))
for _, cutValueAsString := range metricsAggr.CutValues {
Expand Down Expand Up @@ -138,7 +139,7 @@ func generateMetricsType(ctx context.Context, metricsAggr metricsAggregation) mo
case "quantile":
return metrics_aggregations.NewQuantile(ctx, util.MapKeysSortedByValue(metricsAggr.Percentiles), metricsAggr.Keyed, metricsAggr.FieldType)
case "top_hits":
return metrics_aggregations.NewTopHits(ctx)
return metrics_aggregations.NewTopHitsWithOrderBy(ctx, metricsAggr.Size, metricsAggr.OrderBy)
case "top_metrics":
return metrics_aggregations.NewTopMetrics(ctx, metricsAggr.sortByExists())
case "value_count":
Expand Down
37 changes: 36 additions & 1 deletion quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
"quesma/model/metrics_aggregations"
"quesma/util"
"strconv"
"strings"
)

Expand Down Expand Up @@ -38,6 +40,34 @@ func (p *pancakeJSONRenderer) selectMetricRows(metricName string, rows []model.Q
return
}

func (p *pancakeJSONRenderer) selectTopHitsRows(topHits *pancakeModelMetricAggregation, rows []model.QueryResultRow) (result []model.QueryResultRow) {
for _, row := range rows {
var newCols []model.QueryResultCol
for _, col := range row.Cols {
if strings.HasPrefix(col.ColName, topHits.InternalNamePrefix()) {
numStr := strings.TrimPrefix(col.ColName, topHits.InternalNamePrefix())
if num, err := strconv.Atoi(numStr); err == nil {
var overrideName string
if num < 0 || num >= len(topHits.selectedColumns) {
logger.WarnWithCtx(p.ctx).Msgf("invalid top_hits column index %d", num)
} else {
selectedColumn := topHits.selectedColumns[num]
if colRef, ok := selectedColumn.(model.ColumnRef); ok {
overrideName = colRef.ColumnName
}
}
if len(overrideName) > 0 {
col.ColName = overrideName
}
newCols = append(newCols, col)
}
}
}
result = append(result, model.QueryResultRow{Index: row.Index, Cols: newCols})
}
return
}

func (p *pancakeJSONRenderer) selectPrefixRows(prefix string, rows []model.QueryResultRow) (result []model.QueryResultRow) {
for _, row := range rows {
var newCols []model.QueryResultCol
Expand Down Expand Up @@ -195,7 +225,12 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
layer := remainingLayers[0]

for _, metric := range layer.currentMetricAggregations {
metricRows := p.selectMetricRows(metric.internalName+"_col_", rows)
var metricRows []model.QueryResultRow
if _, ok := metric.queryType.(metrics_aggregations.TopHits); ok {
metricRows = p.selectTopHitsRows(metric, rows)
} else {
metricRows = p.selectMetricRows(metric.InternalNamePrefix(), rows)
}
result[metric.name] = metric.queryType.TranslateSqlResponseToJson(metricRows, 0) // TODO: fill level?
// TODO: maybe add metadata also here? probably not needed
}
Expand Down
10 changes: 9 additions & 1 deletion quesma/queryparser/pancake_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,15 @@ func newPancakeModelPipelineAggregation(name string, previousAggrNames []string,
const pancakeBucketAggregationNoLimit = 0
const noSampleLimit = 0

// Helper functions
// Naming functions
func (p pancakeModelMetricAggregation) InternalNamePrefix() string {
return p.internalName + "_col_"
}

func (p pancakeModelMetricAggregation) InternalNameForCol(id int) string {
return fmt.Sprintf("%s%d", p.InternalNamePrefix(), id)
}

func (p pancakeModelBucketAggregation) InternalNameForKeyPrefix() string {
return fmt.Sprintf("%skey", p.internalName)
}
Expand Down
Loading

0 comments on commit 749fee2

Please sign in to comment.