Skip to content

Commit

Permalink
Support for CAgg with origin/offset parameter
Browse files Browse the repository at this point in the history
So far, we allowed only CAggs without origin or offset parameters in the
time_bucket definition. This commit adds support for the remaining
time_bucket variants.

Fixes #2265, Fixes #5453, Fixes #5828
  • Loading branch information
jnidzwetzki committed Mar 30, 2024
1 parent 2f1b27c commit f6b0f33
Show file tree
Hide file tree
Showing 27 changed files with 2,910 additions and 254 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_6382
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #6382 Support for time_bucket with origin and offset in CAggs
27 changes: 18 additions & 9 deletions src/func_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,22 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Interval Bucket with origin */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, TIMESTAMPOID, TIMESTAMPOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Interval Bucket with offset */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, TIMESTAMPOID, INTERVALOID },
Expand All @@ -234,20 +236,22 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Interval Bucket with origin */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, TIMESTAMPTZOID, TIMESTAMPTZOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Interval Bucket with offset */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, TIMESTAMPTZOID, INTERVALOID },
Expand All @@ -264,20 +268,22 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Interval Bucket with origin */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, DATEOID, DATEOID },
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Interval Bucket with offset */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INTERVALOID, DATEOID, INTERVALOID },
Expand All @@ -294,10 +300,11 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Int2 Bucket with offset */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INT2OID, INT2OID, INT2OID },
Expand All @@ -314,10 +321,11 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Int4 Bucket with offset */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INT4OID, INT4OID, INT4OID },
Expand All @@ -334,10 +342,11 @@ static FuncInfo funcinfo[] = {
.group_estimate = time_bucket_group_estimate,
.sort_transform = time_bucket_sort_transform,
},
/* Int8 Bucket with offset */
{
.origin = ORIGIN_TIMESCALE,
.is_bucketing_func = true,
.allowed_in_cagg_definition = false,
.allowed_in_cagg_definition = true,
.funcname = "time_bucket",
.nargs = 3,
.arg_types = { INT8OID, INT8OID, INT8OID },
Expand Down
53 changes: 47 additions & 6 deletions src/time_bucket.c
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,28 @@ ts_date_offset_bucket(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(date);
}

TSDLLEXPORT int64
ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
{
NullableDatum null_datum = INIT_NULL_DATUM;
return ts_time_bucket_by_type_extended(interval,
timestamp,
timestamp_type,
null_datum,
null_datum);
}

/* when working with time_buckets stored in our catalog, we may not know ahead of time which
* bucketing function to use, this function dynamically dispatches to the correct time_bucket_<foo>
* based on an inputted timestamp_type
*/
TSDLLEXPORT int64
ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
ts_time_bucket_by_type_extended(int64 interval, int64 timestamp, Oid timestamp_type,
NullableDatum offset, NullableDatum origin)
{
/* Defined offset and origin in one function is not supported */
Assert(offset.isnull == true || origin.isnull == true);

Datum timestamp_in_time_type = ts_internal_to_time_value(timestamp, timestamp_type);
Datum interval_in_interval_type;
Datum time_bucketed;
Expand All @@ -487,22 +502,48 @@ ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid timestamp_type)
break;
case TIMESTAMPOID:
interval_in_interval_type = ts_internal_to_interval_value(interval, INTERVALOID);
bucket_function = ts_timestamp_bucket;
if (offset.isnull)
bucket_function = ts_timestamp_bucket; /* handles also origin */
else
bucket_function = ts_timestamp_offset_bucket;
break;
case TIMESTAMPTZOID:
interval_in_interval_type = ts_internal_to_interval_value(interval, INTERVALOID);
bucket_function = ts_timestamptz_bucket;
if (offset.isnull)
bucket_function = ts_timestamptz_bucket; /* handles also origin */
else
bucket_function = ts_timestamptz_offset_bucket;
break;
case DATEOID:
interval_in_interval_type = ts_internal_to_interval_value(interval, INTERVALOID);
bucket_function = ts_date_bucket;
if (offset.isnull)
bucket_function = ts_date_bucket; /* handles also origin */
else
bucket_function = ts_date_offset_bucket;
break;
default:
elog(ERROR, "invalid time_bucket type \"%s\"", format_type_be(timestamp_type));
}

time_bucketed =
DirectFunctionCall2(bucket_function, interval_in_interval_type, timestamp_in_time_type);
if (!offset.isnull)
{
time_bucketed = DirectFunctionCall3(bucket_function,
interval_in_interval_type,
timestamp_in_time_type,
offset.value);
}
else if (!origin.isnull)
{
time_bucketed = DirectFunctionCall3(bucket_function,
interval_in_interval_type,
timestamp_in_time_type,
origin.value);
}
else
{
time_bucketed =
DirectFunctionCall2(bucket_function, interval_in_interval_type, timestamp_in_time_type);
}

return ts_time_value_to_internal(time_bucketed, timestamp_type);
}
Expand Down
8 changes: 8 additions & 0 deletions src/time_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

#include "export.h"

#define INIT_NULL_DATUM \
{ \
.value = 0, .isnull = true \
}

extern TSDLLEXPORT Datum ts_int16_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_int32_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_int64_bucket(PG_FUNCTION_ARGS);
Expand All @@ -18,6 +23,9 @@ extern TSDLLEXPORT Datum ts_timestamp_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_timestamptz_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_timestamptz_timezone_bucket(PG_FUNCTION_ARGS);
extern TSDLLEXPORT int64 ts_time_bucket_by_type(int64 interval, int64 timestamp, Oid type);
extern TSDLLEXPORT int64 ts_time_bucket_by_type_extended(int64 interval, int64 timestamp, Oid type,
NullableDatum offset,
NullableDatum origin);
extern TSDLLEXPORT Datum ts_time_bucket_ng_date(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_time_bucket_ng_timestamp(PG_FUNCTION_ARGS);
extern TSDLLEXPORT Datum ts_time_bucket_ng_timestamptz(PG_FUNCTION_ARGS);
Expand Down
20 changes: 11 additions & 9 deletions src/ts_catalog/continuous_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1383,8 +1383,9 @@ ts_continuous_agg_bucket_on_interval(Oid bucket_function)
}

/*
* Calls one of time_bucket_ng() versions depending on the arguments. This is
* a common procedure used by ts_compute_* below.
* Calls the desired time bucket function depending on the arguments. If the experimental flag is
* set on ContinuousAggsBucketFunction, one of time_bucket_ng() versions is used. This is a common
* procedure used by ts_compute_* below.
*/
static Datum
generic_time_bucket(const ContinuousAggsBucketFunction *bf, Datum timestamp)
Expand Down Expand Up @@ -1524,7 +1525,8 @@ void
ts_compute_inscribed_bucketed_refresh_window_variable(int64 *start, int64 *end,
const ContinuousAggsBucketFunction *bf)
{
Datum start_old, end_old, start_new, end_new;
Datum start_old, end_old, start_aligned, end_aliged;

/*
* It's OK to use TIMESTAMPOID here. Variable-sized buckets can be used
* only for dates, timestamps and timestamptz's. For all these types our
Expand All @@ -1535,16 +1537,16 @@ ts_compute_inscribed_bucketed_refresh_window_variable(int64 *start, int64 *end,
start_old = ts_internal_to_time_value(*start, TIMESTAMPOID);
end_old = ts_internal_to_time_value(*end, TIMESTAMPOID);

start_new = generic_time_bucket(bf, start_old);
end_new = generic_time_bucket(bf, end_old);
start_aligned = generic_time_bucket(bf, start_old);
end_aliged = generic_time_bucket(bf, end_old);

if (DatumGetTimestamp(start_new) != DatumGetTimestamp(start_old))
if (DatumGetTimestamp(start_aligned) != DatumGetTimestamp(start_old))
{
start_new = generic_add_interval(bf, start_new);
start_aligned = generic_add_interval(bf, start_aligned);
}

*start = ts_time_value_to_internal(start_new, TIMESTAMPOID);
*end = ts_time_value_to_internal(end_new, TIMESTAMPOID);
*start = ts_time_value_to_internal(start_aligned, TIMESTAMPOID);
*end = ts_time_value_to_internal(end_aliged, TIMESTAMPOID);
}

/*
Expand Down
Loading

0 comments on commit f6b0f33

Please sign in to comment.