Skip to content

Commit

Permalink
Relax strong table lock when refreshing a CAGG
Browse files Browse the repository at this point in the history
When refreshing a Continuous Aggregate we take a table lock on
_timescaledb_catalog.continuous_aggs_invalidation_threshold when
processing the invalidation logs (the first transaction of the refresh
Continuous Aggregate procedure). It means that even two different
Continuous Aggregates over two different hypertables will wait each
other in the first phase of the refreshing procedure. Also it lead to
problems when a pg_dump is running because it take an AccessShareLock on
tables so Continuous Aggregate refresh execution will wait until the
pg_dump finish.

Improved it by relaxing the strong table-level lock to a row-level lock
so now the Continuous Aggregate refresh procedure can be executed in
multiple sessions with less locks.

Fix #3554
  • Loading branch information
fabriziomello committed Aug 23, 2023
1 parent 0f3d395 commit 5bba74a
Show file tree
Hide file tree
Showing 21 changed files with 579 additions and 376 deletions.
1 change: 1 addition & 0 deletions .unreleased/feature_5809
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #5809 Relax invalidation threshold table-level lock to row-level when refreshing a Continuous Aggregate
4 changes: 3 additions & 1 deletion tsl/src/continuous_aggs/create.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "finalize.h"
#include "common.h"
#include "create.h"
#include "invalidation_threshold.h"

#include "debug_assert.h"
#include "ts_catalog/catalog.h"
Expand Down Expand Up @@ -929,9 +930,10 @@ tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *

mat_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
Ensure(NULL != mat_ht, "materialization hypertable %d not found", cagg->data.mat_hypertable_id);

ts_cagg_watermark_insert(mat_ht, 0, true);

invalidation_threshold_initialize(cagg);

if (!stmt->into->skipData)
{
InternalTimeRange refresh_window = {
Expand Down
257 changes: 145 additions & 112 deletions tsl/src/continuous_aggs/invalidation_threshold.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,47 +66,74 @@

typedef struct InvalidationThresholdData
{
int64 threshold;
bool was_updated;
const ContinuousAgg *cagg;
const InternalTimeRange *refresh_window;
int64 computed_invalidation_threshold;
} InvalidationThresholdData;

static ScanTupleResult scan_update_invalidation_threshold(TupleInfo *ti, void *data);
static ScanTupleResult invalidation_threshold_tuple_found(TupleInfo *ti, void *data);

static ScanTupleResult
scan_update_invalidation_threshold(TupleInfo *ti, void *data)
invalidation_threshold_scan_update(TupleInfo *ti, void *const data)
{
InvalidationThresholdData *invthresh = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_continuous_aggs_invalidation_threshold form =
(Form_continuous_aggs_invalidation_threshold) GETSTRUCT(tuple);
InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data;

if (ti->lockresult != TM_Ok)
return SCAN_CONTINUE;

bool isnull;
Datum datum =
slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull);

/* NULL should never happen because we always initialize the threshold with the MIN
* value of the partition type */
Ensure(!isnull,
"invalidation threshold for hypertable %d is null",
invthresh->cagg->data.raw_hypertable_id);

int64 current_invalidation_threshold = DatumGetInt64(datum);

if (invthresh->threshold > form->watermark)
/* Compute new invalidation threshold. Note that this computation caps the
* threshold at the end of the last bucket that holds data in the
* underlying hypertable. */
invthresh->computed_invalidation_threshold =
invalidation_threshold_compute(invthresh->cagg, invthresh->refresh_window);

if (invthresh->computed_invalidation_threshold > current_invalidation_threshold)
{
HeapTuple new_tuple = heap_copytuple(tuple);
form = (Form_continuous_aggs_invalidation_threshold) GETSTRUCT(new_tuple);
bool nulls[Natts_continuous_agg];
Datum values[Natts_continuous_agg];
bool do_replace[Natts_continuous_agg] = { false };
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
HeapTuple new_tuple;
TupleDesc tupdesc = ts_scanner_get_tupledesc(ti);

heap_deform_tuple(tuple, tupdesc, values, nulls);

do_replace[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
true;
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
Int64GetDatum(invthresh->computed_invalidation_threshold);

new_tuple = heap_modify_tuple(tuple, tupdesc, values, nulls, do_replace);

form->watermark = invthresh->threshold;
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);
invthresh->was_updated = true;

if (should_free)
heap_freetuple(tuple);
}
else
{
elog(DEBUG1,
"hypertable %d existing watermark >= new invalidation threshold " INT64_FORMAT
" " INT64_FORMAT,
form->hypertable_id,
form->watermark,
invthresh->threshold);
invthresh->threshold = form->watermark;
invthresh->cagg->data.raw_hypertable_id,
current_invalidation_threshold,
invthresh->computed_invalidation_threshold);
invthresh->computed_invalidation_threshold = current_invalidation_threshold;
}

if (should_free)
heap_freetuple(tuple);

return SCAN_DONE;
return SCAN_CONTINUE;
}

/*
Expand All @@ -119,102 +146,48 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data)
* is returned instead.
*/
int64
invalidation_threshold_set_or_get(int32 raw_hypertable_id, int64 invalidation_threshold)
invalidation_threshold_set_or_get(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window)
{
bool threshold_found;
InvalidationThresholdData data = {
.threshold = invalidation_threshold,
.was_updated = false,
};
bool found = false;
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
InvalidationThresholdData updatectx = {
.cagg = cagg,
.refresh_window = refresh_window,
};
ScannerCtx scanctx = {
.table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
.index =
catalog_get_index(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.data = &updatectx,
.limit = 1,
.tuple_found = invalidation_threshold_scan_update,
.lockmode = RowExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.tuplock = &scantuplock,
.flags = SCANNER_F_KEEPLOCK,
};

ScanKeyInit(&scankey[0],
Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id,
Anum_continuous_aggs_invalidation_threshold_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(raw_hypertable_id));

/* NOTE: this function deliberately takes an AccessExclusiveLock when updating the invalidation
* threshold, instead of the weaker RowExclusiveLock lock normally held for such operations: in
* order to ensure we do not lose invalidations from concurrent mutations, we must ensure that
* all transactions which read the invalidation threshold have either completed, or not yet read
* the value; if we used a RowExclusiveLock we could race such a transaction and update the
* threshold between the time it is read but before the other transaction commits. This would
* cause us to lose the updates. The AccessExclusiveLock ensures no one else can possibly be
* reading the threshold.
*/
threshold_found =
ts_catalog_scan_one(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD /*=table*/,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY /*=indexid*/,
scankey /*=scankey*/,
1 /*=num_keys*/,
scan_update_invalidation_threshold /*=tuple_found*/,
AccessExclusiveLock /*=lockmode*/,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_TABLE_NAME /*=table_name*/,
&data /*=data*/);

if (!threshold_found)
{
Catalog *catalog = ts_catalog_get();
/* NOTE: this function deliberately takes a stronger lock than RowExclusive, see the comment
* above for the rationale
*/
Relation rel =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);
TupleDesc desc = RelationGetDescr(rel);
Datum values[Natts_continuous_aggs_invalidation_threshold];
bool nulls[Natts_continuous_aggs_invalidation_threshold] = { false };
Int32GetDatum(cagg->data.raw_hypertable_id));

values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_hypertable_id)] =
Int32GetDatum(raw_hypertable_id);
values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
Int64GetDatum(invalidation_threshold);

ts_catalog_insert_values(rel, desc, values, nulls);
table_close(rel, NoLock);
}
found = ts_scanner_scan_one(&scanctx, false, "invalidation threshold");
Ensure(found,
"invalidation threshold for hypertable %d not found",
cagg->data.raw_hypertable_id);

return data.threshold;
}

static ScanTupleResult
invalidation_threshold_tuple_found(TupleInfo *ti, void *data)
{
int64 *threshold = data;
bool isnull;
Datum datum =
slot_getattr(ti->slot, Anum_continuous_aggs_invalidation_threshold_watermark, &isnull);

Assert(!isnull);
*threshold = DatumGetInt64(datum);

return SCAN_CONTINUE;
}

int64
invalidation_threshold_get(int32 hypertable_id)
{
int64 threshold = 0;
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0],
Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(hypertable_id));

if (!ts_catalog_scan_one(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD /*=table*/,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY /*=indexid*/,
scankey /*=scankey*/,
1 /*=num_keys*/,
invalidation_threshold_tuple_found /*=tuple_found*/,
AccessShareLock /*=lockmode*/,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_TABLE_NAME /*=table_name*/,
&threshold /*=data*/))
elog(ERROR, "could not find invalidation threshold for hypertable %d", hypertable_id);

return threshold;
return updatectx.computed_invalidation_threshold;
}

/*
Expand Down Expand Up @@ -284,3 +257,63 @@ invalidation_threshold_compute(const ContinuousAgg *cagg, const InternalTimeRang

return refresh_window->end;
}

/*
* Initialize the invalidation threshold.
*
* The initial value of the invalidation threshold should be the MIN
* value for the Continuous Aggregate partition type.
*/
void
invalidation_threshold_initialize(const ContinuousAgg *cagg)
{
bool found = false;
ScanKeyData scankey[1];
Catalog *catalog = ts_catalog_get();
ScannerCtx scanctx = {
.table = catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
.index =
catalog_get_index(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD, BGW_JOB_PKEY_IDX),
.nkeys = 1,
.scankey = scankey,
.limit = 1,
.lockmode = ShareUpdateExclusiveLock,
.scandirection = ForwardScanDirection,
.result_mctx = CurrentMemoryContext,
.flags = SCANNER_F_KEEPLOCK,
};

ScanKeyInit(&scankey[0],
Anum_continuous_aggs_invalidation_threshold_hypertable_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(cagg->data.raw_hypertable_id));

found = ts_scanner_scan_one(&scanctx, false, "invalidation threshold");

if (!found)
{
Relation rel =
table_open(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
ShareUpdateExclusiveLock);
TupleDesc desc = RelationGetDescr(rel);
Datum values[Natts_continuous_aggs_invalidation_threshold];
bool nulls[Natts_continuous_aggs_invalidation_threshold] = { false };
CatalogSecurityContext sec_ctx;
/* get the MIN value for the partition type */
int64 min_value = ts_continuous_agg_bucket_width_variable(cagg) ?
ts_time_get_nobegin(cagg->partition_type) :
ts_time_get_min(cagg->partition_type);

values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_hypertable_id)] =
Int32GetDatum(cagg->data.raw_hypertable_id);

values[AttrNumberGetAttrOffset(Anum_continuous_aggs_invalidation_threshold_watermark)] =
Int64GetDatum(min_value);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_insert_values(rel, desc, values, nulls);
ts_catalog_restore_user(&sec_ctx);
table_close(rel, NoLock);
}
}
7 changes: 4 additions & 3 deletions tsl/src/continuous_aggs/invalidation_threshold.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

typedef struct InternalTimeRange InternalTimeRange;
typedef struct ContinuousAgg ContinuousAgg;
typedef struct Hypertable Hypertable;

extern int64 invalidation_threshold_get(int32 hypertable_id);
extern int64 invalidation_threshold_set_or_get(int32 raw_hypertable_id,
int64 invalidation_threshold);
extern int64 invalidation_threshold_set_or_get(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
extern int64 invalidation_threshold_compute(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
extern void invalidation_threshold_initialize(const ContinuousAgg *cagg);

#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_H */
19 changes: 4 additions & 15 deletions tsl/src/continuous_aggs/refresh.c
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,8 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
const CaggRefreshCallContext callctx, const bool start_isnull,
const bool end_isnull)
{
Catalog *catalog = ts_catalog_get();
int32 mat_id = cagg->data.mat_hypertable_id;
InternalTimeRange refresh_window = *refresh_window_arg;
int64 computed_invalidation_threshold;
int64 invalidation_threshold;
bool is_raw_ht_distributed;
int rc;
Expand Down Expand Up @@ -828,35 +826,26 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
&refresh_window,
"refreshing continuous aggregate");

/* Perform the refresh across two transactions.
/*
* Perform the refresh across two transactions.
*
* The first transaction moves the invalidation threshold (if needed) and
* copies over invalidations from the hypertable log to the cagg
* invalidation log. Doing the threshold and copying as part of the first
* transaction ensures that the threshold and new invalidations will be
* visible as soon as possible to concurrent refreshes and that we keep
* locks for only a short period. Note that the first transaction
* serializes around the threshold table lock, which protects both the
* threshold and the invalidation processing against concurrent refreshes.
* locks for only a short period.
*
* The second transaction processes the cagg invalidation log and then
* performs the actual refresh (materialization of data). This transaction
* serializes around a lock on the materialized hypertable for the
* continuous aggregate that gets refreshed.
*/
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);

/* Compute new invalidation threshold. Note that this computation caps the
* threshold at the end of the last bucket that holds data in the
* underlying hypertable. */
computed_invalidation_threshold = invalidation_threshold_compute(cagg, &refresh_window);

/* Set the new invalidation threshold. Note that this only updates the
* threshold if the new value is greater than the old one. Otherwise, the
* existing threshold is returned. */
invalidation_threshold = invalidation_threshold_set_or_get(cagg->data.raw_hypertable_id,
computed_invalidation_threshold);
invalidation_threshold = invalidation_threshold_set_or_get(cagg, &refresh_window);

/* We must also cap the refresh window at the invalidation threshold. If
* we process invalidations after the threshold, the continuous aggregates
Expand Down
Loading

0 comments on commit 5bba74a

Please sign in to comment.