diff --git a/.unreleased/feature_5809 b/.unreleased/feature_5809 new file mode 100644 index 00000000000..51a1b30a450 --- /dev/null +++ b/.unreleased/feature_5809 @@ -0,0 +1 @@ +Implements: #5809 Relax invalidation threshold table-level lock to row-level when refreshing a Continuous Aggregate diff --git a/tsl/src/continuous_aggs/create.c b/tsl/src/continuous_aggs/create.c index c6555799565..a792967f93a 100644 --- a/tsl/src/continuous_aggs/create.c +++ b/tsl/src/continuous_aggs/create.c @@ -56,6 +56,7 @@ #include "finalize.h" #include "common.h" #include "create.h" +#include "invalidation_threshold.h" #include "debug_assert.h" #include "ts_catalog/catalog.h" @@ -929,9 +930,10 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void * mat_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id); Ensure(NULL != mat_ht, "materialization hypertable %d not found", cagg->data.mat_hypertable_id); - ts_cagg_watermark_insert(mat_ht, 0, true); + invalidation_threshold_insert(cagg, 0, true); + if (!stmt->into->skipData) { InternalTimeRange refresh_window = { diff --git a/tsl/src/continuous_aggs/invalidation_threshold.c b/tsl/src/continuous_aggs/invalidation_threshold.c index 964b9631699..567110dac0c 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.c +++ b/tsl/src/continuous_aggs/invalidation_threshold.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "ts_catalog/continuous_agg.h" #include "continuous_aggs/materialize.h" @@ -66,47 +67,67 @@ typedef struct InvalidationThresholdData { - int64 threshold; - bool was_updated; + const ContinuousAgg *cagg; + const InternalTimeRange *refresh_window; + int64 computed_invalidation_threshold; } InvalidationThresholdData; -static ScanTupleResult scan_update_invalidation_threshold(TupleInfo *ti, void *data); -static ScanTupleResult invalidation_threshold_tuple_found(TupleInfo *ti, void *data); - static ScanTupleResult -scan_update_invalidation_threshold(TupleInfo *ti, void *data) +invalidation_threshold_scan_update(TupleInfo *ti, void *const data) { - InvalidationThresholdData *invthresh = data; - bool should_free; - HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); - Form_continuous_aggs_invalidation_threshold form = - (Form_continuous_aggs_invalidation_threshold) GETSTRUCT(tuple); + InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data; + + if (ti->lockresult != TM_Ok) + return SCAN_CONTINUE; + + bool isnull; + Datum datum = + slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull); + int64 current_invalidation_threshold = DatumGetInt64(datum); + + /* Compute new invalidation threshold. Note that this computation caps the + * threshold at the end of the last bucket that holds data in the + * underlying hypertable. */ + invthresh->computed_invalidation_threshold = + invalidation_threshold_compute(invthresh->cagg, invthresh->refresh_window); - if (invthresh->threshold > form->watermark) + if (invthresh->computed_invalidation_threshold > current_invalidation_threshold) { - HeapTuple new_tuple = heap_copytuple(tuple); - form = (Form_continuous_aggs_invalidation_threshold) GETSTRUCT(new_tuple); + bool nulls[Natts_continuous_agg]; + Datum values[Natts_continuous_agg]; + bool doReplace[Natts_continuous_agg] = { false }; + bool should_free; + HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); + HeapTuple new_tuple; + TupleDesc tupdesc = ts_scanner_get_tupledesc(ti); + + heap_deform_tuple(tuple, tupdesc, values, nulls); + + doReplace[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] = + true; + values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] = + Int64GetDatum(invthresh->computed_invalidation_threshold); + + new_tuple = heap_modify_tuple(tuple, tupdesc, values, nulls, doReplace); - form->watermark = invthresh->threshold; ts_catalog_update(ti->scanrel, new_tuple); heap_freetuple(new_tuple); - invthresh->was_updated = true; + + if (should_free) + heap_freetuple(tuple); } else { elog(DEBUG1, "hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT " " INT64_FORMAT, - form->hypertable_id, - form->watermark, - invthresh->threshold); - invthresh->threshold = form->watermark; + invthresh->cagg->data.raw_hypertable_id, + current_invalidation_threshold, + invthresh->computed_invalidation_threshold); + invthresh->computed_invalidation_threshold = current_invalidation_threshold; } - if (should_free) - heap_freetuple(tuple); - - return SCAN_DONE; + return SCAN_CONTINUE; } /* @@ -119,102 +140,48 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) * is returned instead. */ int64 -invalidation_threshold_set_or_get(int32 raw_hypertable_id, int64 invalidation_threshold) +invalidation_threshold_set_or_get(const ContinuousAgg *cagg, + const InternalTimeRange *refresh_window) { - bool threshold_found; - InvalidationThresholdData data = { - .threshold = invalidation_threshold, - .was_updated = false, - }; + bool found = false; ScanKeyData scankey[1]; + Catalog *catalog = ts_catalog_get(); + ScanTupLock scantuplock = { + .waitpolicy = LockWaitBlock, + .lockmode = LockTupleExclusive, + }; + InvalidationThresholdData updatectx = { + .cagg = cagg, + .refresh_window = refresh_window, + }; + ScannerCtx scanctx = { + .table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), + .index = + catalog_get_index(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, BGW_JOB_PKEY_IDX), + .nkeys = 1, + .scankey = scankey, + .data = &updatectx, + .limit = 1, + .tuple_found = invalidation_threshold_scan_update, + .lockmode = RowExclusiveLock, + .scandirection = ForwardScanDirection, + .result_mctx = CurrentMemoryContext, + .tuplock = &scantuplock, + .flags = SCANNER_F_KEEPLOCK, + }; ScanKeyInit(&scankey[0], - Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id, + Anum_continuous_aggs_invalidation_threshold_hypertable_id, BTEqualStrategyNumber, F_INT4EQ, - Int32GetDatum(raw_hypertable_id)); - - /* NOTE: this function deliberately takes an AccessExclusiveLock when updating the invalidation - * threshold, instead of the weaker RowExclusiveLock lock normally held for such operations: in - * order to ensure we do not lose invalidations from concurrent mutations, we must ensure that - * all transactions which read the invalidation threshold have either completed, or not yet read - * the value; if we used a RowExclusiveLock we could race such a transaction and update the - * threshold between the time it is read but before the other transaction commits. This would - * cause us to lose the updates. The AccessExclusiveLock ensures no one else can possibly be - * reading the threshold. - */ - threshold_found = - ts_catalog_scan_one(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD /*=table*/, - CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY /*=indexid*/, - scankey /*=scankey*/, - 1 /*=num_keys*/, - scan_update_invalidation_threshold /*=tuple_found*/, - AccessExclusiveLock /*=lockmode*/, - CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_TABLE_NAME /*=table_name*/, - &data /*=data*/); - - if (!threshold_found) - { - Catalog *catalog = ts_catalog_get(); - /* NOTE: this function deliberately takes a stronger lock than RowExclusive, see the comment - * above for the rationale - */ - Relation rel = - table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), - AccessExclusiveLock); - TupleDesc desc = RelationGetDescr(rel); - Datum values[Natts_continuous_aggs_invalidation_threshold]; - bool nulls[Natts_continuous_aggs_invalidation_threshold] = { false }; - - values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_hypertable_id)] = - Int32GetDatum(raw_hypertable_id); - values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] = - Int64GetDatum(invalidation_threshold); - - ts_catalog_insert_values(rel, desc, values, nulls); - table_close(rel, NoLock); - } - - return data.threshold; -} - -static ScanTupleResult -invalidation_threshold_tuple_found(TupleInfo *ti, void *data) -{ - int64 *threshold = data; - bool isnull; - Datum datum = - slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull); + Int32GetDatum(cagg->data.raw_hypertable_id)); - Assert(!isnull); - *threshold = DatumGetInt64(datum); + found = ts_scanner_scan_one(&scanctx, false, "invalidation threshold"); + Ensure(found, + "invalidation threshold for hypertable %d not found", + cagg->data.raw_hypertable_id); - return SCAN_CONTINUE; -} - -int64 -invalidation_threshold_get(int32 hypertable_id) -{ - int64 threshold = 0; - ScanKeyData scankey[1]; - - ScanKeyInit(&scankey[0], - Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id, - BTEqualStrategyNumber, - F_INT4EQ, - Int32GetDatum(hypertable_id)); - - if (!ts_catalog_scan_one(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD /*=table*/, - CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY /*=indexid*/, - scankey /*=scankey*/, - 1 /*=num_keys*/, - invalidation_threshold_tuple_found /*=tuple_found*/, - AccessShareLock /*=lockmode*/, - CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_TABLE_NAME /*=table_name*/, - &threshold /*=data*/)) - elog(ERROR, "could not find invalidation threshold for hypertable %d", hypertable_id); - - return threshold; + return updatectx.computed_invalidation_threshold; } /* @@ -284,3 +251,63 @@ invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRang return refresh_window->end; } + +void +invalidation_threshold_insert(const ContinuousAgg *cagg, int64 threshold, bool threshold_isnull) +{ + bool found = false; + ScanKeyData scankey[1]; + Catalog *catalog = ts_catalog_get(); + ScannerCtx scanctx = { + .table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), + .index = + catalog_get_index(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, BGW_JOB_PKEY_IDX), + .nkeys = 1, + .scankey = scankey, + .limit = 1, + .lockmode = ShareUpdateExclusiveLock, + .scandirection = ForwardScanDirection, + .result_mctx = CurrentMemoryContext, + .flags = SCANNER_F_KEEPLOCK, + }; + + ScanKeyInit(&scankey[0], + Anum_continuous_aggs_invalidation_threshold_hypertable_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(cagg->data.raw_hypertable_id)); + + found = ts_scanner_scan_one(&scanctx, false, "invalidation threshold"); + + DEBUG_WAITPOINT("invalidation_threshold_insert"); + + if (!found) + { + Relation rel = + table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), + ShareUpdateExclusiveLock); + TupleDesc desc = RelationGetDescr(rel); + Datum values[Natts_continuous_aggs_invalidation_threshold]; + bool nulls[Natts_continuous_aggs_invalidation_threshold] = { false }; + CatalogSecurityContext sec_ctx; + + values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_hypertable_id)] = + Int32GetDatum(cagg->data.raw_hypertable_id); + + /* if trying to insert a NULL threshold then get the MIN value for the time dimension */ + if (threshold_isnull) + { + threshold = ts_continuous_agg_bucket_width_variable(cagg) ? + ts_time_get_nobegin(cagg->partition_type) : + ts_time_get_min(cagg->partition_type); + } + + values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] = + Int64GetDatum(threshold); + + ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx); + ts_catalog_insert_values(rel, desc, values, nulls); + ts_catalog_restore_user(&sec_ctx); + table_close(rel, NoLock); + } +} diff --git a/tsl/src/continuous_aggs/invalidation_threshold.h b/tsl/src/continuous_aggs/invalidation_threshold.h index 7e935e933db..df7ea0aa3f5 100644 --- a/tsl/src/continuous_aggs/invalidation_threshold.h +++ b/tsl/src/continuous_aggs/invalidation_threshold.h @@ -10,11 +10,13 @@ typedef struct InternalTimeRange InternalTimeRange; typedef struct ContinuousAgg ContinuousAgg; +typedef struct Hypertable Hypertable; -extern int64 invalidation_threshold_get(int32 hypertable_id); -extern int64 invalidation_threshold_set_or_get(int32 raw_hypertable_id, - int64 invalidation_threshold); +extern int64 invalidation_threshold_set_or_get(const ContinuousAgg *cagg, + const InternalTimeRange *refresh_window); extern int64 invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window); +extern void invalidation_threshold_insert(const ContinuousAgg *cagg, int64 threshold, + bool threshold_isnull); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_H */ diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index a6298c7478a..64d60803418 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -760,10 +760,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx, const bool start_isnull, const bool end_isnull) { - Catalog *catalog = ts_catalog_get(); int32 mat_id = cagg->data.mat_hypertable_id; InternalTimeRange refresh_window = *refresh_window_arg; - int64 computed_invalidation_threshold; int64 invalidation_threshold; bool is_raw_ht_distributed; int rc; @@ -828,35 +826,26 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, &refresh_window, "refreshing continuous aggregate"); - /* Perform the refresh across two transactions. + /* + * Perform the refresh across two transactions. * * The first transaction moves the invalidation threshold (if needed) and * copies over invalidations from the hypertable log to the cagg * invalidation log. Doing the threshold and copying as part of the first * transaction ensures that the threshold and new invalidations will be * visible as soon as possible to concurrent refreshes and that we keep - * locks for only a short period. Note that the first transaction - * serializes around the threshold table lock, which protects both the - * threshold and the invalidation processing against concurrent refreshes. + * locks for only a short period. * * The second transaction processes the cagg invalidation log and then * performs the actual refresh (materialization of data). This transaction * serializes around a lock on the materialized hypertable for the * continuous aggregate that gets refreshed. */ - LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), - AccessExclusiveLock); - - /* Compute new invalidation threshold. Note that this computation caps the - * threshold at the end of the last bucket that holds data in the - * underlying hypertable. */ - computed_invalidation_threshold = invalidation_threshold_compute(cagg, &refresh_window); /* Set the new invalidation threshold. Note that this only updates the * threshold if the new value is greater than the old one. Otherwise, the * existing threshold is returned. */ - invalidation_threshold = invalidation_threshold_set_or_get(cagg->data.raw_hypertable_id, - computed_invalidation_threshold); + invalidation_threshold = invalidation_threshold_set_or_get(cagg, &refresh_window); /* We must also cap the refresh window at the invalidation threshold. If * we process invalidations after the threshold, the continuous aggregates diff --git a/tsl/test/expected/cagg_invalidation.out b/tsl/test/expected/cagg_invalidation.out index 4882e496399..520a670893d 100644 --- a/tsl/test/expected/cagg_invalidation.out +++ b/tsl/test/expected/cagg_invalidation.out @@ -224,13 +224,15 @@ $$; CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); -- Must refresh to move the invalidation threshold, or no --- invalidations will be generated. Initially, there is no threshold --- set: +-- invalidations will be generated. Initially, threshold is the +-- MIN of the time dimension data type: SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+---------------------- + 1 | -9223372036854775808 + 2 | -2147483648 +(2 rows) -- There should be only "infinite" invalidations in the cagg -- invalidation log: @@ -246,10 +248,11 @@ SELECT * FROM cagg_invals; CALL refresh_continuous_aggregate('cond_10', 1, 50); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Invalidations should be cleared inside the refresh window: SELECT * FROM cagg_invals; @@ -277,10 +280,11 @@ CALL refresh_continuous_aggregate('cond_10', 20, 49); psql:include/cagg_invalidation_common.sql:207: NOTICE: continuous aggregate "cond_10" is already up-to-date SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Nothing changes with invalidations either since the region was -- already refreshed and no new invalidations have been generated: @@ -991,9 +995,10 @@ WHERE user_view_name = 'thresh_2' \gset SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) -- Test manual invalidation error \if :IS_DISTRIBUTED diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-13.out b/tsl/test/expected/cagg_invalidation_dist_ht-13.out index fb54f224464..6120c85883a 100644 --- a/tsl/test/expected/cagg_invalidation_dist_ht-13.out +++ b/tsl/test/expected/cagg_invalidation_dist_ht-13.out @@ -260,13 +260,15 @@ $$; CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); -- Must refresh to move the invalidation threshold, or no --- invalidations will be generated. Initially, there is no threshold --- set: +-- invalidations will be generated. Initially, threshold is the +-- MIN of the time dimension data type: SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+---------------------- + 1 | -9223372036854775808 + 2 | -2147483648 +(2 rows) -- There should be only "infinite" invalidations in the cagg -- invalidation log: @@ -282,10 +284,11 @@ SELECT * FROM cagg_invals; CALL refresh_continuous_aggregate('cond_10', 1, 50); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Invalidations should be cleared inside the refresh window: SELECT * FROM cagg_invals; @@ -313,10 +316,11 @@ CALL refresh_continuous_aggregate('cond_10', 20, 49); psql:include/cagg_invalidation_common.sql:207: NOTICE: continuous aggregate "cond_10" is already up-to-date SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Nothing changes with invalidations either since the region was -- already refreshed and no new invalidations have been generated: @@ -1054,9 +1058,10 @@ WHERE user_view_name = 'thresh_2' \gset SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) -- Test manual invalidation error \if :IS_DISTRIBUTED diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-14.out b/tsl/test/expected/cagg_invalidation_dist_ht-14.out index dd0640a0cec..889d99fbb1c 100644 --- a/tsl/test/expected/cagg_invalidation_dist_ht-14.out +++ b/tsl/test/expected/cagg_invalidation_dist_ht-14.out @@ -260,13 +260,15 @@ $$; CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); -- Must refresh to move the invalidation threshold, or no --- invalidations will be generated. Initially, there is no threshold --- set: +-- invalidations will be generated. Initially, threshold is the +-- MIN of the time dimension data type: SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+---------------------- + 1 | -9223372036854775808 + 2 | -2147483648 +(2 rows) -- There should be only "infinite" invalidations in the cagg -- invalidation log: @@ -282,10 +284,11 @@ SELECT * FROM cagg_invals; CALL refresh_continuous_aggregate('cond_10', 1, 50); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Invalidations should be cleared inside the refresh window: SELECT * FROM cagg_invals; @@ -313,10 +316,11 @@ CALL refresh_continuous_aggregate('cond_10', 20, 49); psql:include/cagg_invalidation_common.sql:207: NOTICE: continuous aggregate "cond_10" is already up-to-date SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Nothing changes with invalidations either since the region was -- already refreshed and no new invalidations have been generated: @@ -1054,9 +1058,10 @@ WHERE user_view_name = 'thresh_2' \gset SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) -- Test manual invalidation error \if :IS_DISTRIBUTED diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-15.out b/tsl/test/expected/cagg_invalidation_dist_ht-15.out index dd0640a0cec..889d99fbb1c 100644 --- a/tsl/test/expected/cagg_invalidation_dist_ht-15.out +++ b/tsl/test/expected/cagg_invalidation_dist_ht-15.out @@ -260,13 +260,15 @@ $$; CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); -- Must refresh to move the invalidation threshold, or no --- invalidations will be generated. Initially, there is no threshold --- set: +-- invalidations will be generated. Initially, threshold is the +-- MIN of the time dimension data type: SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+---------------------- + 1 | -9223372036854775808 + 2 | -2147483648 +(2 rows) -- There should be only "infinite" invalidations in the cagg -- invalidation log: @@ -282,10 +284,11 @@ SELECT * FROM cagg_invals; CALL refresh_continuous_aggregate('cond_10', 1, 50); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Invalidations should be cleared inside the refresh window: SELECT * FROM cagg_invals; @@ -313,10 +316,11 @@ CALL refresh_continuous_aggregate('cond_10', 20, 49); psql:include/cagg_invalidation_common.sql:207: NOTICE: continuous aggregate "cond_10" is already up-to-date SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- - 1 | 50 -(1 row) + hypertable_id | watermark +---------------+------------- + 1 | 50 + 2 | -2147483648 +(2 rows) -- Nothing changes with invalidations either since the region was -- already refreshed and no new invalidations have been generated: @@ -1054,9 +1058,10 @@ WHERE user_view_name = 'thresh_2' \gset SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) -- Test manual invalidation error \if :IS_DISTRIBUTED diff --git a/tsl/test/expected/cagg_watermark.out b/tsl/test/expected/cagg_watermark.out index 146aa5400b4..53ce6fba7e4 100644 --- a/tsl/test/expected/cagg_watermark.out +++ b/tsl/test/expected/cagg_watermark.out @@ -233,9 +233,10 @@ CREATE MATERIALIZED VIEW cit_view GROUP BY 1 WITH NO DATA; INSERT INTO ca_inval_test SELECT generate_series(0, 5); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+------------- + 3 | -2147483648 +(1 row) SELECT * from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; hypertable_id | lowest_modified_value | greatest_modified_value @@ -243,7 +244,9 @@ SELECT * from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; (0 rows) \c :TEST_DBNAME :ROLE_SUPERUSER -INSERT INTO _timescaledb_catalog.continuous_aggs_invalidation_threshold VALUES (3, 15); +UPDATE _timescaledb_catalog.continuous_aggs_invalidation_threshold +SET watermark = 15 +WHERE hypertable_id = 3; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER INSERT INTO ca_inval_test SELECT generate_series(5, 15); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold; @@ -328,9 +331,10 @@ CREATE MATERIALIZED VIEW continuous_view FROM ts_continuous_test GROUP BY 1 WITH NO DATA; SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold; - hypertable_id | watermark ----------------+----------- -(0 rows) + hypertable_id | watermark +---------------+------------- + 5 | -2147483648 +(1 row) SELECT * from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; hypertable_id | lowest_modified_value | greatest_modified_value @@ -338,7 +342,9 @@ SELECT * from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; (0 rows) \c :TEST_DBNAME :ROLE_SUPERUSER -INSERT INTO _timescaledb_catalog.continuous_aggs_invalidation_threshold VALUES (5, 2); +UPDATE _timescaledb_catalog.continuous_aggs_invalidation_threshold +SET watermark = 2 +WHERE hypertable_id = 5; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER INSERT INTO ts_continuous_test VALUES (1, 1); SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold; diff --git a/tsl/test/expected/exp_cagg_monthly.out b/tsl/test/expected/exp_cagg_monthly.out index 92d7b726ddf..42d67cf0ab8 100644 --- a/tsl/test/expected/exp_cagg_monthly.out +++ b/tsl/test/expected/exp_cagg_monthly.out @@ -78,13 +78,14 @@ WHERE mat_hypertable_id = :cagg_id; t | time_bucket_ng | @ 1 mon | | (1 row) --- Check that there is no saved invalidation threshold before any refreshes +-- Check that the saved invalidation threshold is -infinity SELECT _timescaledb_functions.to_timestamp(watermark) FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :ht_id; to_timestamp -------------- -(0 rows) + -infinity +(1 row) -- Make sure truncating of the refresh window works \set ON_ERROR_STOP 0 diff --git a/tsl/test/expected/exp_cagg_timezone.out b/tsl/test/expected/exp_cagg_timezone.out index c53c0964eee..5fd8ad058f6 100644 --- a/tsl/test/expected/exp_cagg_timezone.out +++ b/tsl/test/expected/exp_cagg_timezone.out @@ -157,13 +157,14 @@ WHERE mat_hypertable_id = :cagg_id_1w; t | time_bucket_ng | @ 7 days | | MSK (1 row) --- Check the invalidation threshold -SELECT to_char(_timescaledb_functions.to_timestamp(watermark) at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') +-- Check the invalidation threshold is -infinity +SELECT _timescaledb_functions.to_timestamp(watermark) at time zone 'MSK' FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :ht_id; - to_char ---------- -(0 rows) + timezone +----------- + -infinity +(1 row) -- Make sure the invalidation log is empty SELECT diff --git a/tsl/test/isolation/expected/cagg_concurrent_refresh.out b/tsl/test/isolation/expected/cagg_concurrent_refresh.out index 7c9f701a8d8..2f2d64ce8bf 100644 --- a/tsl/test/isolation/expected/cagg_concurrent_refresh.out +++ b/tsl/test/isolation/expected/cagg_concurrent_refresh.out @@ -1,4 +1,4 @@ -Parsed test spec with 8 sessions +Parsed test spec with 9 sessions starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table step R1_refresh: @@ -13,7 +13,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -31,10 +32,11 @@ cagg_bucket_count 7 (1 row) -hypertable|threshold -----------+--------- -conditions| 70 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 70 +conditions2|-2147483648 +(2 rows) step R3_refresh: CALL refresh_continuous_aggregate('cond_10', 70, 107); @@ -48,7 +50,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -69,10 +72,11 @@ cagg_bucket_count 10 (1 row) -hypertable|threshold -----------+--------- -conditions| 100 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 100 +conditions2|-2147483648 +(2 rows) step L2_read_unlock_threshold_table: ROLLBACK; @@ -91,11 +95,10 @@ step L2_read_lock_threshold_table: step R3_refresh: CALL refresh_continuous_aggregate('cond_10', 70, 107); - + step L2_read_unlock_threshold_table: ROLLBACK; -step R3_refresh: <... completed> step S1_select: SELECT bucket, avg_temp FROM cond_10 @@ -105,7 +108,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -122,10 +126,11 @@ cagg_bucket_count 6 (1 row) -hypertable|threshold -----------+--------- -conditions| 100 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 100 +conditions2|-2147483648 +(2 rows) step L3_unlock_cagg_table: ROLLBACK; @@ -144,11 +149,10 @@ step L2_read_lock_threshold_table: step R3_refresh: CALL refresh_continuous_aggregate('cond_10', 70, 107); - + step L2_read_unlock_threshold_table: ROLLBACK; -step R3_refresh: <... completed> step S1_select: SELECT bucket, avg_temp FROM cond_10 @@ -158,7 +162,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -179,10 +184,11 @@ cagg_bucket_count 10 (1 row) -hypertable|threshold -----------+--------- -conditions| 100 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 100 +conditions2|-2147483648 +(2 rows) step L3_unlock_cagg_table: ROLLBACK; @@ -201,11 +207,10 @@ step L2_read_lock_threshold_table: step R1_refresh: CALL refresh_continuous_aggregate('cond_10', 25, 70); - + step L2_read_unlock_threshold_table: ROLLBACK; -step R1_refresh: <... completed> step S1_select: SELECT bucket, avg_temp FROM cond_10 @@ -215,7 +220,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -236,10 +242,11 @@ cagg_bucket_count 10 (1 row) -hypertable|threshold -----------+--------- -conditions| 100 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 100 +conditions2|-2147483648 +(2 rows) step L3_unlock_cagg_table: ROLLBACK; @@ -273,7 +280,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -291,10 +299,11 @@ cagg_bucket_count 7 (1 row) -hypertable|threshold -----------+--------- -conditions| 70 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 70 +conditions2|-2147483648 +(2 rows) step L1_unlock_threshold_table: ROLLBACK; @@ -333,7 +342,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -351,10 +361,11 @@ cagg_bucket_count 7 (1 row) -hypertable|threshold -----------+--------- -conditions| 70 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 70 +conditions2|-2147483648 +(2 rows) step L1_unlock_threshold_table: ROLLBACK; @@ -392,7 +403,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -413,10 +425,11 @@ cagg_bucket_count 10 (1 row) -hypertable|threshold -----------+--------- -conditions| 100 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 100 +conditions2|-2147483648 +(2 rows) step L1_unlock_threshold_table: ROLLBACK; @@ -453,7 +466,8 @@ step S1_select: SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; bucket| avg_temp ------+---------------- @@ -470,10 +484,11 @@ cagg_bucket_count 6 (1 row) -hypertable|threshold -----------+--------- -conditions| 100 -(1 row) +hypertable | threshold +-----------+----------- +conditions | 100 +conditions2|-2147483648 +(2 rows) step L1_unlock_threshold_table: ROLLBACK; @@ -481,3 +496,11 @@ step L1_unlock_threshold_table: step L2_read_unlock_threshold_table: ROLLBACK; + +starting permutation: R1_refresh R12_refresh +step R1_refresh: + CALL refresh_continuous_aggregate('cond_10', 25, 70); + +step R12_refresh: + CALL refresh_continuous_aggregate('cond2_10', 25, 70); + diff --git a/tsl/test/isolation/expected/cagg_concurrent_refresh_dist_ht.out b/tsl/test/isolation/expected/cagg_concurrent_refresh_dist_ht.out index 7271cbd3715..c4d3b3d9a17 100644 --- a/tsl/test/isolation/expected/cagg_concurrent_refresh_dist_ht.out +++ b/tsl/test/isolation/expected/cagg_concurrent_refresh_dist_ht.out @@ -121,11 +121,10 @@ step L2_read_lock_threshold_table: step R3_refresh: CALL refresh_continuous_aggregate('cond_10', 70, 107); - + step L2_read_unlock_threshold_table: ROLLBACK; -step R3_refresh: <... completed> step S1_select: SELECT bucket, avg_temp FROM cond_10 @@ -189,11 +188,10 @@ step L2_read_lock_threshold_table: step R3_refresh: CALL refresh_continuous_aggregate('cond_10', 70, 107); - + step L2_read_unlock_threshold_table: ROLLBACK; -step R3_refresh: <... completed> step S1_select: SELECT bucket, avg_temp FROM cond_10 @@ -261,11 +259,10 @@ step L2_read_lock_threshold_table: step R1_refresh: CALL refresh_continuous_aggregate('cond_10', 25, 70); - + step L2_read_unlock_threshold_table: ROLLBACK; -step R1_refresh: <... completed> step S1_select: SELECT bucket, avg_temp FROM cond_10 diff --git a/tsl/test/isolation/expected/cagg_insert.out b/tsl/test/isolation/expected/cagg_insert.out index d5f3fccab1e..6b7f716672a 100644 --- a/tsl/test/isolation/expected/cagg_insert.out +++ b/tsl/test/isolation/expected/cagg_insert.out @@ -1,141 +1,168 @@ -Parsed test spec with 12 sessions +Parsed test spec with 15 sessions -starting permutation: LockInvalThrEx Refresh2 Refresh UnlockInvalThrEx +starting permutation: LockInvalThrEx Refresh Refresh2 Refresh3 UnlockInvalThrEx step LockInvalThrEx: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold ; -step Refresh2: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Refresh2: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); step UnlockInvalThrEx: ROLLBACK; -step Refresh2: <... completed> -R: NOTICE: continuous aggregate "continuous_view" is already up-to-date step Refresh: <... completed> +step Refresh2: <... completed> +step Refresh3: <... completed> -starting permutation: Ib LockCagg I1 Refresh Ic UnlockCagg +starting permutation: Ib LockCagg1 I1 Refresh Ic UnLockCagg1 step Ib: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step LockCagg: BEGIN; SELECT lock_cagg('continuous_view'); +step LockCagg1: BEGIN; SELECT lock_cagg('continuous_view_1'); lock_cagg --------- (1 row) -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step Ic: COMMIT; -step UnlockCagg: ROLLBACK; +step UnLockCagg1: ROLLBACK; step Refresh: <... completed> -starting permutation: Ib LockCagg Refresh I1 Ic UnlockCagg +starting permutation: Ib LockCagg1 Refresh I1 Ic UnLockCagg1 step Ib: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step LockCagg: BEGIN; SELECT lock_cagg('continuous_view'); +step LockCagg1: BEGIN; SELECT lock_cagg('continuous_view_1'); lock_cagg --------- (1 row) -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); step Ic: COMMIT; -step UnlockCagg: ROLLBACK; +step UnLockCagg1: ROLLBACK; step Refresh: <... completed> -starting permutation: Sb LockCagg Refresh S1 Sc UnlockCagg +starting permutation: I2b LockCagg2 I21 Refresh Refresh3 I2c UnLockCagg2 +step I2b: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; +step LockCagg2: BEGIN; SELECT lock_cagg('continuous_view_2'); +lock_cagg +--------- + +(1 row) + +step I21: INSERT INTO ts_continuous_test_2 VALUES (1, 1); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); +step I2c: COMMIT; +step UnLockCagg2: ROLLBACK; +step Refresh3: <... completed> + +starting permutation: I2b LockCagg2 Refresh3 Refresh I21 I2c UnLockCagg2 +step I2b: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; +step LockCagg2: BEGIN; SELECT lock_cagg('continuous_view_2'); +lock_cagg +--------- + +(1 row) + +step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step I21: INSERT INTO ts_continuous_test_2 VALUES (1, 1); +step I2c: COMMIT; +step UnLockCagg2: ROLLBACK; +step Refresh3: <... completed> + +starting permutation: Sb LockCagg1 Refresh S1 Sc UnLockCagg1 step Sb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step LockCagg: BEGIN; SELECT lock_cagg('continuous_view'); +step LockCagg1: BEGIN; SELECT lock_cagg('continuous_view_1'); lock_cagg --------- (1 row) -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step S1: SELECT count(*) FROM ts_continuous_test; +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step S1: SELECT count(*) FROM ts_continuous_test_1; count ----- 30 (1 row) step Sc: COMMIT; -step UnlockCagg: ROLLBACK; +step UnLockCagg1: ROLLBACK; step Refresh: <... completed> -starting permutation: Sb LockCagg S1 Refresh Sc UnlockCagg +starting permutation: Sb LockCagg1 S1 Refresh Sc UnLockCagg1 step Sb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step LockCagg: BEGIN; SELECT lock_cagg('continuous_view'); +step LockCagg1: BEGIN; SELECT lock_cagg('continuous_view_1'); lock_cagg --------- (1 row) -step S1: SELECT count(*) FROM ts_continuous_test; +step S1: SELECT count(*) FROM ts_continuous_test_1; count ----- 30 (1 row) -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step Sc: COMMIT; -step UnlockCagg: ROLLBACK; +step UnLockCagg1: ROLLBACK; step Refresh: <... completed> starting permutation: Ib LockInvalThr Refresh I1 Ic UnlockInvalThr step Ib: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; step LockInvalThr: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold IN SHARE MODE; -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); -step Ic: COMMIT; +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step Ic: COMMIT; step UnlockInvalThr: ROLLBACK; step Refresh: <... completed> -step Ic: <... completed> starting permutation: Ib LockInvalThr I1 Refresh Ic UnlockInvalThr step Ib: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; step LockInvalThr: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold IN SHARE MODE; -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step Ic: COMMIT; +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Ic: COMMIT; step UnlockInvalThr: ROLLBACK; step Refresh: <... completed> -step Ic: <... completed> starting permutation: Ib LockInval I1 Ic Refresh UnlockInval step Ib: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; step LockInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); step Ic: COMMIT; -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step UnlockInval: ROLLBACK; step Refresh: <... completed> starting permutation: Ipb LockInval Refresh Ip1 Ipc UnlockInval step Ipb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; step LockInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step Ip1: INSERT INTO ts_continuous_test VALUES (29, 29); -step Ipc: COMMIT; +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Ip1: INSERT INTO ts_continuous_test_1 VALUES (29, 29); +step Ipc: COMMIT; step UnlockInval: ROLLBACK; step Refresh: <... completed> -step Ipc: <... completed> starting permutation: Ipb LockInval Ip1 Refresh Ipc UnlockInval step Ipb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; step LockInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; -step Ip1: INSERT INTO ts_continuous_test VALUES (29, 29); -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step Ipc: COMMIT; +step Ip1: INSERT INTO ts_continuous_test_1 VALUES (29, 29); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Ipc: COMMIT; step UnlockInval: ROLLBACK; step Refresh: <... completed> -step Ipc: <... completed> starting permutation: Ipb LockInval Ip1 Ipc Refresh UnlockInval step Ipb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; step LockInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; -step Ip1: INSERT INTO ts_continuous_test VALUES (29, 29); +step Ip1: INSERT INTO ts_continuous_test_1 VALUES (29, 29); step Ipc: COMMIT; -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step UnlockInval: ROLLBACK; step Refresh: <... completed> starting permutation: Refresh SV1 LockMatInval Refresh1 Ib I1 LockInvalThrEx Ic UnlockMatInval UnlockInvalThrEx SV1 -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); -step SV1: SELECT * FROM continuous_view order by 1; +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step SV1: SELECT * FROM continuous_view_1 ORDER BY 1; time_bucket|count -----------+----- 0| 5 @@ -144,18 +171,17 @@ time_bucket|count (3 rows) step LockMatInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log; -step Refresh1: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh1: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step Ib: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); step LockInvalThrEx: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_invalidation_threshold ; step Ic: COMMIT; step UnlockMatInval: ROLLBACK; -R1: NOTICE: continuous aggregate "continuous_view" is already up-to-date step Refresh1: <... completed> step LockInvalThrEx: <... completed> step UnlockInvalThrEx: ROLLBACK; step Ic: <... completed> -step SV1: SELECT * FROM continuous_view order by 1; +step SV1: SELECT * FROM continuous_view_1 ORDER BY 1; time_bucket|count -----------+----- 0| 5 @@ -165,12 +191,12 @@ time_bucket|count starting permutation: I1 Refresh LockInval Refresh Sb S1 Sc UnlockInval -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step LockInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step Sb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step S1: SELECT count(*) FROM ts_continuous_test; +step S1: SELECT count(*) FROM ts_continuous_test_1; count ----- 31 @@ -178,22 +204,36 @@ count step Sc: COMMIT; step UnlockInval: ROLLBACK; -R: NOTICE: continuous aggregate "continuous_view" is already up-to-date step Refresh: <... completed> starting permutation: I1 Refresh LockInval Sb S1 Refresh Sc UnlockInval -step I1: INSERT INTO ts_continuous_test VALUES (1, 1); -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step LockInval: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; step Sb: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; -step S1: SELECT count(*) FROM ts_continuous_test; +step S1: SELECT count(*) FROM ts_continuous_test_1; count ----- 31 (1 row) -step Refresh: CALL refresh_continuous_aggregate('continuous_view', NULL, 15); +step Refresh: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); step Sc: COMMIT; step UnlockInval: ROLLBACK; -R: NOTICE: continuous aggregate "continuous_view" is already up-to-date step Refresh: <... completed> + +starting permutation: I1 I21 Refresh1 Refresh2 Refresh3 +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step I21: INSERT INTO ts_continuous_test_2 VALUES (1, 1); +step Refresh1: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Refresh2: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); + +starting permutation: I1 I2b I21 Refresh2 Refresh3 I2c Refresh3 +step I1: INSERT INTO ts_continuous_test_1 VALUES (1, 1); +step I2b: BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms'; +step I21: INSERT INTO ts_continuous_test_2 VALUES (1, 1); +step Refresh2: CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); +step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); +step I2c: COMMIT; +step Refresh3: CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); diff --git a/tsl/test/isolation/specs/cagg_concurrent_refresh.spec b/tsl/test/isolation/specs/cagg_concurrent_refresh.spec index a4936e76ac8..87b897dd03a 100644 --- a/tsl/test/isolation/specs/cagg_concurrent_refresh.spec +++ b/tsl/test/isolation/specs/cagg_concurrent_refresh.spec @@ -15,18 +15,23 @@ setup { SELECT _timescaledb_internal.stop_background_workers(); + CREATE TABLE conditions(time int, temp float); SELECT create_hypertable('conditions', 'time', chunk_time_interval => 20); + INSERT INTO conditions SELECT t, abs(timestamp_hash(to_timestamp(t)::timestamp))%40 FROM generate_series(1, 100, 1) t; + CREATE OR REPLACE FUNCTION cond_now() RETURNS int LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM conditions $$; + SELECT set_integer_now_func('conditions', 'cond_now'); + CREATE MATERIALIZED VIEW cond_10 WITH (timescaledb.continuous, timescaledb.materialized_only=true) @@ -34,6 +39,7 @@ setup SELECT time_bucket(10, time) AS bucket, avg(temp) AS avg_temp FROM conditions GROUP BY 1 WITH NO DATA; + CREATE MATERIALIZED VIEW cond_20 WITH (timescaledb.continuous, timescaledb.materialized_only=true) @@ -42,6 +48,31 @@ setup FROM conditions GROUP BY 1 WITH NO DATA; + CREATE TABLE conditions2(time int, temp float); + + SELECT create_hypertable('conditions2', 'time', chunk_time_interval => 20); + + INSERT INTO conditions2 + SELECT t, abs(timestamp_hash(to_timestamp(t)::timestamp))%40 + FROM generate_series(1, 100, 1) t; + + CREATE OR REPLACE FUNCTION cond2_now() + RETURNS int LANGUAGE SQL STABLE AS + $$ + SELECT coalesce(max(time), 0) + FROM conditions2 + $$; + + SELECT set_integer_now_func('conditions2', 'cond2_now'); + + CREATE MATERIALIZED VIEW cond2_10 + WITH (timescaledb.continuous, + timescaledb.materialized_only=true) + AS + SELECT time_bucket(10, time) AS bucket, avg(temp) AS avg_temp + FROM conditions2 + GROUP BY 1 WITH NO DATA; + CREATE OR REPLACE FUNCTION cagg_bucket_count(cagg regclass) RETURNS int AS $$ @@ -80,6 +111,7 @@ setup BEGIN SELECT format('%I.%I', user_view_schema, user_view_name) FROM _timescaledb_catalog.continuous_agg + WHERE user_view_name = cagg INTO mattable; EXECUTE format('LOCK table %s IN EXCLUSIVE MODE', mattable); END; $$ LANGUAGE plpgsql; @@ -117,6 +149,7 @@ setup teardown { DROP TABLE conditions CASCADE; + DROP TABLE conditions2 CASCADE; } # Session to refresh the cond_10 continuous aggregate @@ -131,6 +164,16 @@ step "R1_refresh" CALL refresh_continuous_aggregate('cond_10', 25, 70); } +session "R12" +setup +{ + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; +} +step "R12_refresh" +{ + CALL refresh_continuous_aggregate('cond2_10', 25, 70); +} # Refresh that overlaps with R1 session "R2" @@ -249,7 +292,8 @@ step "S1_select" SELECT h.table_name AS hypertable, it.watermark AS threshold FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, _timescaledb_catalog.hypertable h - WHERE it.hypertable_id = h.id; + WHERE it.hypertable_id = h.id + ORDER BY 1; } #################################################################### @@ -297,3 +341,6 @@ permutation "L3_lock_cagg_table" "R1_refresh" "R3_refresh" "L3_unlock_cagg_table # Concurrent refreshing across two different aggregates on same # hypertable does not block permutation "L3_lock_cagg_table" "R3_refresh" "R4_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table" + +# Concurrent refresh of caggs on different hypertables should not block each other +permutation "R1_refresh" "R12_refresh" diff --git a/tsl/test/isolation/specs/cagg_insert.spec b/tsl/test/isolation/specs/cagg_insert.spec index 5aa28292830..b0dbd3924fb 100644 --- a/tsl/test/isolation/specs/cagg_insert.spec +++ b/tsl/test/isolation/specs/cagg_insert.spec @@ -5,17 +5,36 @@ setup { SELECT _timescaledb_internal.stop_background_workers(); - CREATE TABLE ts_continuous_test(time INTEGER, location INTEGER); - SELECT create_hypertable('ts_continuous_test', 'time', chunk_time_interval => 10); - CREATE OR REPLACE FUNCTION integer_now_test() returns INT LANGUAGE SQL STABLE as $$ SELECT coalesce(max(time), 0) FROM ts_continuous_test $$; - SELECT set_integer_now_func('ts_continuous_test', 'integer_now_test'); - INSERT INTO ts_continuous_test SELECT i, i FROM - (SELECT generate_series(0, 29) AS i) AS i; - CREATE MATERIALIZED VIEW continuous_view - WITH ( timescaledb.continuous, timescaledb.materialized_only=true) - AS SELECT time_bucket('5', time), COUNT(location) - FROM ts_continuous_test - GROUP BY 1 WITH NO DATA; + + CREATE TABLE ts_continuous_test_1(time INTEGER, location INTEGER); + SELECT create_hypertable('ts_continuous_test_1', 'time', chunk_time_interval => 10); + + CREATE OR REPLACE FUNCTION integer_now_test_1() RETURNS INTEGER LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM ts_continuous_test_1 $$; + SELECT set_integer_now_func('ts_continuous_test_1', 'integer_now_test_1'); + + INSERT INTO ts_continuous_test_1 SELECT i, i FROM generate_series(0, 29) AS i; + + CREATE MATERIALIZED VIEW continuous_view_1 + WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS + SELECT time_bucket('5', time), COUNT(location) + FROM ts_continuous_test_1 + GROUP BY 1 + WITH NO DATA; + + CREATE TABLE ts_continuous_test_2(time INTEGER, location INTEGER); + SELECT create_hypertable('ts_continuous_test_2', 'time', chunk_time_interval => 10); + + CREATE OR REPLACE FUNCTION integer_now_test_2() RETURNS INTEGER LANGUAGE SQL STABLE AS $$ SELECT coalesce(max(time), 0) FROM ts_continuous_test_2 $$; + SELECT set_integer_now_func('ts_continuous_test_2', 'integer_now_test_2'); + + INSERT INTO ts_continuous_test_2 SELECT i, i FROM generate_series(0, 29) AS i; + + CREATE MATERIALIZED VIEW continuous_view_2 + WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS + SELECT time_bucket('5', time), COUNT(location) + FROM ts_continuous_test_2 + GROUP BY 1 + WITH NO DATA; CREATE OR REPLACE FUNCTION lock_cagg(cagg name) RETURNS void AS $$ DECLARE @@ -23,47 +42,57 @@ setup BEGIN SELECT format('%I.%I', user_view_schema, user_view_name) FROM _timescaledb_catalog.continuous_agg + WHERE user_view_name = cagg INTO mattable; - EXECUTE format('LOCK table %s IN EXCLUSIVE MODE', mattable); + EXECUTE format('LOCK TABLE %s IN EXCLUSIVE MODE', mattable); END; $$ LANGUAGE plpgsql; } teardown { - DROP TABLE ts_continuous_test CASCADE; + DROP TABLE ts_continuous_test_1 CASCADE; + DROP TABLE ts_continuous_test_2 CASCADE; } session "I" step "Ib" { BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms';} -step "I1" { INSERT INTO ts_continuous_test VALUES (1, 1); } +step "I1" { INSERT INTO ts_continuous_test_1 VALUES (1, 1); } step "Ic" { COMMIT; } session "Ip" step "Ipb" { BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms';} -step "Ip1" { INSERT INTO ts_continuous_test VALUES (29, 29); } +step "Ip1" { INSERT INTO ts_continuous_test_1 VALUES (29, 29); } step "Ipc" { COMMIT; } +session "I2" +step "I2b" { BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms';} +step "I21" { INSERT INTO ts_continuous_test_2 VALUES (1, 1); } +step "I2c" { COMMIT; } + session "S" step "Sb" { BEGIN; SET LOCAL lock_timeout = '500ms'; SET LOCAL deadlock_timeout = '10ms';} -step "S1" { SELECT count(*) FROM ts_continuous_test; } +step "S1" { SELECT count(*) FROM ts_continuous_test_1; } step "Sc" { COMMIT; } session "SV" -step "SV1" { SELECT * FROM continuous_view order by 1; } +step "SV1" { SELECT * FROM continuous_view_1 ORDER BY 1; } session "R" -setup { SET client_min_messages TO NOTICE; } -step "Refresh" { CALL refresh_continuous_aggregate('continuous_view', NULL, 15); } +setup { SET client_min_messages TO WARNING; } +step "Refresh" { CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); } session "R1" -setup { SET client_min_messages TO NOTICE; } -step "Refresh1" { CALL refresh_continuous_aggregate('continuous_view', NULL, 15); } +setup { SET client_min_messages TO WARNING; } +step "Refresh1" { CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); } session "R2" -setup { SET lock_timeout = '500ms'; SET deadlock_timeout = '10ms'; } -step "Refresh2" { CALL refresh_continuous_aggregate('continuous_view', NULL, 15); } -teardown { SET lock_timeout TO default; SET deadlock_timeout to default; } +setup { SET client_min_messages TO WARNING; } +step "Refresh2" { CALL refresh_continuous_aggregate('continuous_view_1', NULL, 15); } + +session "R3" +setup { SET client_min_messages TO WARNING; } +step "Refresh3" { CALL refresh_continuous_aggregate('continuous_view_2', NULL, 15); } -# the invalidation log is copied in the first materialization tranasction +# the invalidation log is copied in the first materialization transaction session "L" step "LockInval" { BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; } step "UnlockInval" { ROLLBACK; } @@ -80,26 +109,34 @@ step "UnlockInvalThrEx" { ROLLBACK; } # locking a cagg's materialized hypertable will block the REFRESH in # the second transaction, but not INSERTs. -session "LC" -step "LockCagg" { BEGIN; SELECT lock_cagg('continuous_view'); } -step "UnlockCagg" { ROLLBACK; } +session "LC1" +step "LockCagg1" { BEGIN; SELECT lock_cagg('continuous_view_1'); } +step "UnLockCagg1" { ROLLBACK; } + +session "LC2" +step "LockCagg2" { BEGIN; SELECT lock_cagg('continuous_view_2'); } +step "UnLockCagg2" { ROLLBACK; } # the materialization invalidation log session "LM" step "LockMatInval" { BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_materialization_invalidation_log; } step "UnlockMatInval" { ROLLBACK; } + #only one refresh -permutation "LockInvalThrEx" "Refresh2" "Refresh" "UnlockInvalThrEx" +permutation "LockInvalThrEx" "Refresh" "Refresh2" "Refresh3" "UnlockInvalThrEx" #refresh and insert do not block each other once refresh is out of the #first transaction where it moves the invalidation threshold -permutation "Ib" "LockCagg" "I1" "Refresh" "Ic" "UnlockCagg" -permutation "Ib" "LockCagg" "Refresh" "I1" "Ic" "UnlockCagg" +permutation "Ib" "LockCagg1" "I1" "Refresh" "Ic" "UnLockCagg1" +permutation "Ib" "LockCagg1" "Refresh" "I1" "Ic" "UnLockCagg1" + +permutation "I2b" "LockCagg2" "I21" "Refresh" "Refresh3" "I2c" "UnLockCagg2" +permutation "I2b" "LockCagg2" "Refresh3" "Refresh" "I21" "I2c" "UnLockCagg2" #refresh and select can run concurrently. Refresh blocked only by lock on # cagg's materialized hypertable. Needs RowExclusive for 2nd txn. -permutation "Sb" "LockCagg" "Refresh" "S1" "Sc" "UnlockCagg" -permutation "Sb" "LockCagg" "S1" "Refresh" "Sc" "UnlockCagg" +permutation "Sb" "LockCagg1" "Refresh" "S1" "Sc" "UnLockCagg1" +permutation "Sb" "LockCagg1" "S1" "Refresh" "Sc" "UnLockCagg1" #refresh will see new invalidations (you can tell since they are waiting on the invalidation log lock) permutation "Ib" "LockInvalThr" "Refresh" "I1" "Ic" "UnlockInvalThr" @@ -116,9 +153,11 @@ permutation "Ipb" "LockInval" "Refresh" "Ip1" "Ipc" "UnlockInval" permutation "Ipb" "LockInval" "Ip1" "Refresh" "Ipc" "UnlockInval" permutation "Ipb" "LockInval" "Ip1" "Ipc" "Refresh" "UnlockInval" - #refresh and insert/select do not block each other -#refresh1 is blocked on LockMatInval , insert is blocked on invalidation threshold. so refresh1 does not see the insert from I1 +#refresh1 is blocked on LockMatInval, insert is blocked on invalidation threshold. so refresh1 does not see the insert from I1 permutation "Refresh" "SV1" "LockMatInval" "Refresh1" "Ib" "I1" "LockInvalThrEx" "Ic" "UnlockMatInval" "UnlockInvalThrEx" "SV1" permutation "I1" "Refresh" "LockInval" "Refresh" "Sb" "S1" "Sc" "UnlockInval" permutation "I1" "Refresh" "LockInval" "Sb" "S1" "Refresh" "Sc" "UnlockInval" + +permutation "I1" "I21" "Refresh1" "Refresh2" "Refresh3" +permutation "I1" "I2b" "I21" "Refresh2" "Refresh3" "I2c" "Refresh3" diff --git a/tsl/test/sql/cagg_watermark.sql b/tsl/test/sql/cagg_watermark.sql index d1f94d852e5..81f72bf9153 100644 --- a/tsl/test/sql/cagg_watermark.sql +++ b/tsl/test/sql/cagg_watermark.sql @@ -120,7 +120,9 @@ SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold; SELECT * from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; \c :TEST_DBNAME :ROLE_SUPERUSER -INSERT INTO _timescaledb_catalog.continuous_aggs_invalidation_threshold VALUES (3, 15); +UPDATE _timescaledb_catalog.continuous_aggs_invalidation_threshold +SET watermark = 15 +WHERE hypertable_id = 3; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER INSERT INTO ca_inval_test SELECT generate_series(5, 15); @@ -174,7 +176,9 @@ SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold; SELECT * from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log; \c :TEST_DBNAME :ROLE_SUPERUSER -INSERT INTO _timescaledb_catalog.continuous_aggs_invalidation_threshold VALUES (5, 2); +UPDATE _timescaledb_catalog.continuous_aggs_invalidation_threshold +SET watermark = 2 +WHERE hypertable_id = 5; \c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER INSERT INTO ts_continuous_test VALUES (1, 1); diff --git a/tsl/test/sql/exp_cagg_monthly.sql b/tsl/test/sql/exp_cagg_monthly.sql index a6984a61e47..b32a34ea32a 100644 --- a/tsl/test/sql/exp_cagg_monthly.sql +++ b/tsl/test/sql/exp_cagg_monthly.sql @@ -74,7 +74,7 @@ SELECT experimental, name, bucket_width, origin, timezone FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE mat_hypertable_id = :cagg_id; --- Check that there is no saved invalidation threshold before any refreshes +-- Check that the saved invalidation threshold is -infinity SELECT _timescaledb_functions.to_timestamp(watermark) FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :ht_id; diff --git a/tsl/test/sql/exp_cagg_timezone.sql b/tsl/test/sql/exp_cagg_timezone.sql index d0bfe8d681b..5e23a163037 100644 --- a/tsl/test/sql/exp_cagg_timezone.sql +++ b/tsl/test/sql/exp_cagg_timezone.sql @@ -149,8 +149,8 @@ SELECT experimental, name, bucket_width, origin, timezone FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE mat_hypertable_id = :cagg_id_1w; --- Check the invalidation threshold -SELECT to_char(_timescaledb_functions.to_timestamp(watermark) at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') +-- Check the invalidation threshold is -infinity +SELECT _timescaledb_functions.to_timestamp(watermark) at time zone 'MSK' FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :ht_id; @@ -175,7 +175,6 @@ FROM conditions_summary_tz ORDER by month, city; -- Check the invalidation threshold - SELECT to_char(_timescaledb_functions.to_timestamp(watermark) at time zone 'MSK', 'YYYY-MM-DD HH24:MI:SS') FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :ht_id; diff --git a/tsl/test/sql/include/cagg_invalidation_common.sql b/tsl/test/sql/include/cagg_invalidation_common.sql index 77fe40e9593..f8727bdb11a 100644 --- a/tsl/test/sql/include/cagg_invalidation_common.sql +++ b/tsl/test/sql/include/cagg_invalidation_common.sql @@ -182,8 +182,8 @@ CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); -- Must refresh to move the invalidation threshold, or no --- invalidations will be generated. Initially, there is no threshold --- set: +-- invalidations will be generated. Initially, threshold is the +-- MIN of the time dimension data type: SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold ORDER BY 1,2;