Skip to content

Commit

Permalink
Fix timezone differences in date_histogram (#688)
Browse files Browse the repository at this point in the history
#307

edit: one case is still wrong... Comment below `(that's also why we
don't need to worry about the field being in different timezone in
Clickhouse, it's translated to `UTC` anyway)` works in the second case,
need to handle it ourselves in the first one.

For `date_histogram` we have 2 ways we generate SQLs:

a) for longer intervals: e.g. 1 month, we group by using this function
`toStartOfMonth(timestamp)`. When taking timezones into consideration,
it becomes `toStartOfMonth(toTimeZone(timestamp, "timezone_name"))`.
Example: We have such a simple table
![Screenshot 2024-09-09 at 19 00
43](https://github.com/user-attachments/assets/204c0810-a30d-4df6-9ff6-2caf2eb174da)

Before, such a request below returned what it returns now if we omit
`time_zone` parameter, so `"key_as_string": "2023-12-01T00:00:00.000"`
But in Warsaw time, it's `2024-01-01 00:05`, so now we return good,
new-year bucket.

![Screenshot 2024-09-09 at 18 59
27](https://github.com/user-attachments/assets/928c2276-fff1-4137-8025-5460160bd4b4)

b) for shorter intervals, like the one in the issue
#307 ,
`toUnixTimestamp64Milli("@timestamp") / 3600000` becomes
`toUnixTimestamp64Milli("@timestamp")+timeZoneOffset(toTimezone("@timestamp",'Europe/Warsaw'))*1000)
/ 3600000`
Here the SQL is a bit more complex, but I think it needs to be, as this
`toUnixTimestamp` function always operates on `UTC`, translates
different timezones to `UTC`, etc, so we need to add the offset
ourselves.
(that's also why we don't need to worry about the field being in
different timezone in Clickhouse, it's translated to `UTC` anyway)

You can see below that the answers now match those from Elastic, from
#307 , the ratio of
`doc_count` for days is 1-4, not 2-3 as before this fix.
![Screenshot 2024-09-09 at 19 12
20](https://github.com/user-attachments/assets/02294d24-53bb-43c9-8bd4-89a469950869)
in pancakes it works too
![Screenshot 2024-09-09 at 19 16
43](https://github.com/user-attachments/assets/4cf537e5-6af4-47f4-8072-3431e86fb27e)
  • Loading branch information
trzysiek authored Sep 10, 2024
1 parent 749fee2 commit c260e3d
Show file tree
Hide file tree
Showing 18 changed files with 577 additions and 488 deletions.
48 changes: 48 additions & 0 deletions quesma/clickhouse/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,51 @@ func TimestampGroupBy(timestampField model.Expr, typ DateTimeType, groupByInterv
return model.NewLiteral("invalid") // maybe create new type InvalidExpr?
}
}

func TimestampGroupByWithTimezone(timestampField model.Expr, typ DateTimeType,
groupByInterval time.Duration, timezone string) model.Expr {

// If no timezone, or timezone is default (UTC), we just return TimestampGroupBy(...)
if timezone == "" {
return TimestampGroupBy(timestampField, typ, groupByInterval)
}

createAExp := func(innerFuncName string, interval, offsetMultiplier int64) model.Expr {
var offset model.Expr
offset = model.NewFunction(
"timeZoneOffset",
model.NewFunction(
"toTimezone",
timestampField, model.NewLiteral("'"+timezone+"'"),
),
)
if offsetMultiplier != 1 {
offset = model.NewInfixExpr(offset, "*", model.NewLiteral(offsetMultiplier))
}

unixTsWithOffset := model.NewInfixExpr(
model.NewFunction(innerFuncName, timestampField),
"+",
offset,
)

groupByExpr := model.NewInfixExpr(
model.NewParenExpr(unixTsWithOffset),
" / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously
model.NewLiteral(interval),
)

return model.NewFunction("toInt64", groupByExpr)
}

switch typ {
case DateTime64:
// e.g: (toUnixTimestamp64Milli("timestamp")+timeZoneOffset(toTimezone("timestamp",'Europe/Warsaw'))*1000) / 600000
return createAExp("toUnixTimestamp64Milli", groupByInterval.Milliseconds(), 1000)
case DateTime:
return createAExp("toUnixTimestamp", groupByInterval.Milliseconds()/1000, 1)
default:
logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName)
return model.NewLiteral("invalid") // maybe create new type InvalidExpr?
}
}
39 changes: 29 additions & 10 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package bucket_aggregations

import (
"context"
"fmt"
"quesma/clickhouse"
"quesma/kibana"
"quesma/logger"
Expand All @@ -26,14 +27,15 @@ type DateHistogram struct {
ctx context.Context
field model.Expr // name of the field, e.g. timestamp
interval string
timezone string
minDocCount int
intervalType DateHistogramIntervalType
fieldDateTimeType clickhouse.DateTimeType
}

func NewDateHistogram(ctx context.Context, field model.Expr, interval string,
func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone string,
minDocCount int, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram {
return &DateHistogram{ctx: ctx, field: field, interval: interval,
return &DateHistogram{ctx: ctx, field: field, interval: interval, timezone: timezone,
minDocCount: minDocCount, intervalType: intervalType, fieldDateTimeType: fieldDateTimeType}
}

Expand Down Expand Up @@ -66,6 +68,13 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
rows = query.NewRowsTransformer().Transform(query.ctx, rows)
}

// key is in `query.timezone` time, and we need it to be UTC
wantedTimezone, err := time.LoadLocation(query.timezone)
if err != nil {
logger.ErrorWithCtx(query.ctx).Msgf("time.LoadLocation error: %v", err)
wantedTimezone = time.UTC
}

var response []model.JsonMap
for _, row := range rows {
var key int64
Expand All @@ -76,11 +85,16 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
key = query.getKey(row) * intervalInMilliseconds
}

intervalStart := time.UnixMilli(key).UTC().Format("2006-01-02T15:04:05.000")
ts := time.UnixMilli(key).UTC()
intervalStartNotUTC := time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), ts.Minute(), ts.Second(), ts.Nanosecond(), wantedTimezone)

_, timezoneOffsetInSeconds := intervalStartNotUTC.Zone()
key -= int64(timezoneOffsetInSeconds * 1000) // seconds -> milliseconds

response = append(response, model.JsonMap{
"key": key,
"doc_count": row.LastColValue(), // used to be [level], but because some columns are duplicated, it doesn't work in 100% cases now
"key_as_string": intervalStart,
"key_as_string": time.UnixMilli(key).UTC().Format("2006-01-02T15:04:05.000"),
})
}

Expand Down Expand Up @@ -134,17 +148,22 @@ func (query *DateHistogram) generateSQLForFixedInterval() model.Expr {
logger.ErrorWithCtx(query.ctx).Msgf("invalid date type for DateHistogram %+v. Using DateTime64 as default.", query)
dateTimeType = defaultDateTimeType
}
return clickhouse.TimestampGroupBy(query.field, dateTimeType, interval)
return clickhouse.TimestampGroupByWithTimezone(query.field, dateTimeType, interval, query.timezone)
}

func (query *DateHistogram) generateSQLForCalendarInterval() model.Expr {
exprForBiggerIntervals := func(toIntervalStartFuncName string) model.Expr {
// returned expr as string:
// "1000 * toInt64(toUnixTimestamp(toStartOf[Week|Month|Quarter|Year](timestamp)))"
toStartOf := model.NewFunction(toIntervalStartFuncName, query.field)
toUnixTimestamp := model.NewFunction("toUnixTimestamp", toStartOf)
toInt64 := model.NewFunction("toInt64", toUnixTimestamp)
return model.NewInfixExpr(toInt64, "*", model.NewLiteral(1000))
// a) "1000 * toInt64(toUnixTimestamp(toStartOf[Week|Month|Quarter|Year](timestamp)))" (with no timezone offset)
// b) as above, but replace timestamp -> toTimeZone(timestamp, timezone) (with timezone present)
timestampFieldWithOffset := query.field
if query.timezone != "" {
timestampFieldWithOffset = model.NewFunction("toTimezone", query.field, model.NewLiteral(fmt.Sprintf("'%s'", query.timezone)))
}
toStartOf := model.NewFunction(toIntervalStartFuncName, timestampFieldWithOffset) // toStartOfMonth(...) or toStartOfWeek(...)
toUnixTimestamp := model.NewFunction("toUnixTimestamp", toStartOf) // toUnixTimestamp(toStartOf...)
toInt64 := model.NewFunction("toInt64", toUnixTimestamp) // toInt64(toUnixTimestamp(...))
return model.NewInfixExpr(toInt64, "*", model.NewLiteral(1000)) // toInt64(...)*1000
}

// calendar_interval: minute/hour/day are the same as fixed_interval: 1m/1h/1d
Expand Down
12 changes: 12 additions & 0 deletions quesma/model/equal.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ func PartlyImplementedIsEqual(a, b Expr) bool {
}
return true
}
case ParenExpr:
if bTyped, ok := b.(ParenExpr); ok {
if len(aTyped.Exprs) != len(bTyped.Exprs) {
return false
}
for i := range aTyped.Exprs {
if !PartlyImplementedIsEqual(aTyped.Exprs[i], bTyped.Exprs[i]) {
return false
}
}
return true
}
}
return false
}
14 changes: 1 addition & 13 deletions quesma/model/expr_string_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,7 @@ func (v *renderer) VisitFunction(e FunctionExpr) interface{} {
}

func (v *renderer) VisitLiteral(l LiteralExpr) interface{} {

if l.Value == "*" {
return "*"
}

switch l.Value.(type) {
case string:
return fmt.Sprintf("%s", l.Value)
case float64:
return fmt.Sprintf("%f", l.Value)
default:
return fmt.Sprintf("%v", l.Value)
}
return fmt.Sprintf("%v", l.Value)
}

func (v *renderer) VisitInfix(e InfixExpr) interface{} {
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ func TestParenExpr(t *testing.T) {
NewInfixExpr(
NewFunction("floor", NewLiteral(1.5)),
"+", NewLiteral(2.5))), "/", NewLiteral(3.5))
assert.Equal(t, "(floor(1.500000)+2.500000)/3.500000", AsString(parenExpr))
assert.Equal(t, "(floor(1.5)+2.5)/3.5", AsString(parenExpr))
}
13 changes: 12 additions & 1 deletion quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,14 +731,15 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
}
field := cw.parseFieldField(dateHistogram, "date_histogram")
minDocCount := cw.parseMinDocCount(dateHistogram)
timezone := cw.parseStringField(dateHistogram, "time_zone", "")
interval, intervalType := cw.extractInterval(dateHistogram)
dateTimeType := cw.Table.GetDateTimeTypeFromExpr(cw.Ctx, field)

if dateTimeType == clickhouse.Invalid {
logger.WarnWithCtx(cw.Ctx).Msgf("invalid date time type for field %s", field)
}

dateHistogramAggr := bucket_aggregations.NewDateHistogram(cw.Ctx, field, interval, minDocCount, intervalType, dateTimeType)
dateHistogramAggr := bucket_aggregations.NewDateHistogram(cw.Ctx, field, interval, timezone, minDocCount, intervalType, dateTimeType)
currentAggr.Type = dateHistogramAggr

sqlQuery := dateHistogramAggr.GenerateSQL()
Expand Down Expand Up @@ -1080,6 +1081,16 @@ func (cw *ClickhouseQueryTranslator) parseFloatField(queryMap QueryMap, fieldNam
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldName string, defaultValue string) string {
if valueRaw, exists := queryMap[fieldName]; exists {
if asString, ok := valueRaw.(string); ok {
return asString
}
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a string, but %T, value: %v. Using default: %s", fieldName, valueRaw, valueRaw, defaultValue)
}
return defaultValue
}

// parseFieldFieldMaybeScript is basically almost a copy of parseFieldField above, but it also handles a basic script, if "field" is missing.
func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any, aggregationType string) (field model.Expr, isFromScript bool) {
Map, ok := shouldBeMap.(QueryMap)
Expand Down
61 changes: 34 additions & 27 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package queryparser
import (
"cmp"
"context"
"fmt"
"github.com/jinzhu/copier"
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
Expand Down Expand Up @@ -168,8 +169,7 @@ var aggregationTests = []struct {
"min": 1706881636029
},
"field": "timestamp",
"fixed_interval": "3h",
"time_zone": "Europe/Warsaw"
"fixed_interval": "3h"
}
}
},
Expand Down Expand Up @@ -418,8 +418,7 @@ var aggregationTests = []struct {
"max": 1707818397034,
"min": 1707213597034
},
"field": "order_date",
"time_zone": "Europe/Warsaw"
"field": "order_date"
}
}
},
Expand Down Expand Up @@ -542,8 +541,7 @@ var aggregationTests = []struct {
"date_histogram": {
"field": "order_date",
"fixed_interval": "12h",
"min_doc_count": 1,
"time_zone": "Europe/Warsaw"
"min_doc_count": 1
}
}
},
Expand Down Expand Up @@ -649,9 +647,9 @@ var aggregationTests = []struct {
"size": 0
}`,
[]string{
`SELECT floor("bytes"/1782.000000)*1782.000000, count() FROM ` + tableName + ` ` +
`GROUP BY floor("bytes"/1782.000000)*1782.000000 ` +
`ORDER BY floor("bytes"/1782.000000)*1782.000000`,
`SELECT floor("bytes"/1782)*1782, count() FROM ` + tableName + ` ` +
`GROUP BY floor("bytes"/1782)*1782 ` +
`ORDER BY floor("bytes"/1782)*1782`,
`SELECT count() FROM ` + tableName,
},
},
Expand Down Expand Up @@ -723,17 +721,26 @@ func sortAggregations(aggregations []*model.Query) {
}

func allAggregationTests() []testdata.AggregationTestCase {
const lowerBoundTestNr = 80
const lowerBoundTestNr = 90
allTests := make([]testdata.AggregationTestCase, 0, lowerBoundTestNr)
allTests = append(allTests, testdata.AggregationTests...)
allTests = append(allTests, testdata.AggregationTests2...)
allTests = append(allTests, opensearch_visualize.AggregationTests...)
allTests = append(allTests, dashboard_1.AggregationTests...)
allTests = append(allTests, testdata.PipelineAggregationTests...)
allTests = append(allTests, opensearch_visualize.PipelineAggregationTests...)
allTests = append(allTests, kibana_visualize.AggregationTests...)
allTests = append(allTests, clients.KunkkaTests...)
allTests = append(allTests, clients.OpheliaTests...)

add := func(testsToAdd []testdata.AggregationTestCase, testFilename string) {
for i, test := range testsToAdd {
test.TestName = fmt.Sprintf("%s(file:%s,nr:%d)", test.TestName, testFilename, i)
allTests = append(allTests, test)
}
}

add(testdata.AggregationTests, "agg_req")
add(testdata.AggregationTests2, "agg_req_2")
add(opensearch_visualize.AggregationTests, "opensearch-visualize/agg_req")
add(dashboard_1.AggregationTests, "dashboard-1/agg_req")
add(testdata.PipelineAggregationTests, "pipeline_agg_req")
add(opensearch_visualize.PipelineAggregationTests, "opensearch-visualize/pipeline_agg_req")
add(kibana_visualize.AggregationTests, "kibana-visualize/agg_r")
add(clients.KunkkaTests, "clients/kunkka")
add(clients.OpheliaTests, "clients/ophelia")

return allTests
}

Expand Down Expand Up @@ -778,10 +785,10 @@ func TestAggregationParserExternalTestcases(t *testing.T) {
cw := ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), SchemaRegistry: s, Config: cfg}
for i, test := range allAggregationTests() {
t.Run(test.TestName+"("+strconv.Itoa(i)+")", func(t *testing.T) {
if test.TestName == "Max/Sum bucket with some null buckets. Reproduce: Visualize -> Vertical Bar: Metrics: Max (Sum) Bucket (Aggregation: Date Histogram, Metric: Min)" {
if test.TestName == "Max/Sum bucket with some null buckets. Reproduce: Visualize -> Vertical Bar: Metrics: Max (Sum) Bucket (Aggregation: Date Histogram, Metric: Min)(file:opensearch-visualize/pipeline_agg_req,nr:18)" {
t.Skip("Needs to be fixed by keeping last key for every aggregation. Now we sometimes don't know it. Hard to reproduce, leaving it for separate PR")
}
if test.TestName == "complex sum_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Sum Bucket (Bucket: Date Histogram, Metric: Average), Buckets: X-Asis: Histogram" {
if test.TestName == "complex sum_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Sum Bucket (Bucket: Date Histogram, Metric: Average), Buckets: X-Asis: Histogram(file:opensearch-visualize/pipeline_agg_req,nr:22)" {
t.Skip("Waiting for fix. Now we handle only the case where pipeline agg is at the same nesting level as its parent. Should be quick to fix.")
}
if i == 27 || i == 29 || i == 30 {
Expand All @@ -790,7 +797,7 @@ func TestAggregationParserExternalTestcases(t *testing.T) {
if strings.HasPrefix(test.TestName, "dashboard-1") {
t.Skip("Those 2 tests have nested histograms with min_doc_count=0. Some work done long time ago (Krzysiek)")
}
if test.TestName == "Range with subaggregations. Reproduce: Visualize -> Pie chart -> Aggregation: Top Hit, Buckets: Aggregation: Range" {
if test.TestName == "Range with subaggregations. Reproduce: Visualize -> Pie chart -> Aggregation: Top Hit, Buckets: Aggregation: Range(file:opensearch-visualize/agg_req,nr:1)" {
t.Skip("Need a (most likely) small fix to top_hits.")
}
if i == 20 {
Expand All @@ -801,15 +808,15 @@ func TestAggregationParserExternalTestcases(t *testing.T) {
}
if test.TestName == "it's the same input as in previous test, but with the original output from Elastic."+
"Skipped for now, as our response is different in 2 things: key_as_string date (probably not important) + we don't return 0's (e.g. doc_count: 0)."+
"If we need clients/kunkka/test_0, used to be broken before aggregations merge fix" {
"If we need clients/kunkka/test_0, used to be broken before aggregations merge fix(file:clients/kunkka,nr:1)" {
t.Skip("Unskip and remove the previous test after those fixes.")
}
if test.TestName == "clients/kunkka/test_1, used to be broken before aggregations merge fix" {
if test.TestName == "clients/kunkka/test_1, used to be broken before aggregations merge fix(file:clients/kunkka,nr:2)" {
t.Skip("Small details left for this test to be correct. I'll (Krzysiek) fix soon after returning to work")
}
if test.TestName == "Ophelia Test 3: 5x terms + a lot of other aggregations" ||
test.TestName == "Ophelia Test 6: triple terms + other aggregations + order by another aggregations" ||
test.TestName == "Ophelia Test 7: 5x terms + a lot of other aggregations + different order bys" {
if test.TestName == "Ophelia Test 3: 5x terms + a lot of other aggregations(file:clients/ophelia,nr:2)" ||
test.TestName == "Ophelia Test 6: triple terms + other aggregations + order by another aggregations(file:clients/ophelia,nr:5)" ||
test.TestName == "Ophelia Test 7: 5x terms + a lot of other aggregations + different order bys(file:clients/ophelia,nr:6)" {
t.Skip("Very similar to 2 previous tests, results have like 500-1000 lines. They are almost finished though. Maybe I'll fix soon, but not in this PR")
}

Expand Down
4 changes: 3 additions & 1 deletion quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,16 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
}
field := cw.parseFieldField(dateHistogram, "date_histogram")
minDocCount := cw.parseMinDocCount(dateHistogram)
timezone := cw.parseStringField(dateHistogram, "time_zone", "")
interval, intervalType := cw.extractInterval(dateHistogram)
dateTimeType := cw.Table.GetDateTimeTypeFromExpr(cw.Ctx, field)

if dateTimeType == clickhouse.Invalid {
return false, fmt.Errorf("invalid date time type for field %s", field)
}

dateHistogramAggr := bucket_aggregations.NewDateHistogram(cw.Ctx, field, interval, minDocCount, intervalType, dateTimeType)
dateHistogramAggr := bucket_aggregations.NewDateHistogram(
cw.Ctx, field, interval, timezone, minDocCount, intervalType, dateTimeType)
aggregation.queryType = dateHistogramAggr

sqlQuery := dateHistogramAggr.GenerateSQL()
Expand Down
Loading

0 comments on commit c260e3d

Please sign in to comment.