From 20e237c0a6a2a012f3d0b95ba37215f521c29590 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 14 Nov 2024 18:38:15 +0100 Subject: [PATCH 1/5] Fix notice when recompressing hypercore TAM In a previous change, the parameter for using hypercore TAM when compressing a chunk changed, which also caused the compress_chunk() function to start emitting a notice when recompressing a chunk using Hypercore TAM. This message was meant to be emitted when recompressing and specifying an incompatible parameter option, but now it is emitted even when not specifying an option. This change ensures the message is no longer emitted when recompressing, unless an incompatible option is specified. Also update the message to be an error instead of a notice, and ensure that it matches the new parameter name. --- tsl/src/compression/api.c | 24 +++++++++++++++------ tsl/test/expected/hypercore.out | 2 -- tsl/test/expected/hypercore_create.out | 30 ++++---------------------- tsl/test/expected/hypercore_vacuum.out | 2 -- tsl/test/sql/hypercore_create.sql | 7 +++--- 5 files changed, 24 insertions(+), 41 deletions(-) diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index 375e9b52769..cb1ef4781d5 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -814,10 +814,16 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam, switch (useam) { case USE_AM_FALSE: - elog(NOTICE, - "cannot compress hypercore \"%s\" using heap, recompressing instead", - get_rel_name(chunk->table_id)); - TS_FALLTHROUGH; + /* Converting from Hypercore to "regular" compressed is currently + * not supported */ + Assert(rel_is_hypercore); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot compress \"%s\" without using Hypercore access method", + get_rel_name(chunk->table_id)), + errhint( + "Decompress first and then compress without Hypercore access method."))); + break; case USE_AM_NULL: Assert(rel_is_hypercore); relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress); @@ -843,9 +849,12 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam, * otherwise. */ static UseAccessMethod -check_useam(UseAccessMethod arg) +check_useam(UseAccessMethod arg, bool is_hypercore) { - return arg == USE_AM_NULL ? (UseAccessMethod) ts_guc_default_hypercore_use_access_method : arg; + if (arg == USE_AM_NULL) + return is_hypercore ? USE_AM_TRUE : + (UseAccessMethod) ts_guc_default_hypercore_use_access_method; + return arg; } Datum @@ -854,13 +863,14 @@ tsl_compress_chunk(PG_FUNCTION_ARGS) Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1); bool recompress = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2); - UseAccessMethod useam = check_useam(PG_ARGISNULL(3) ? USE_AM_NULL : PG_GETARG_BOOL(3)); + UseAccessMethod useam; ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION); TS_PREVENT_FUNC_IF_READ_ONLY(); Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true); bool rel_is_hypercore = get_table_am_oid(TS_HYPERCORE_TAM_NAME, false) == chunk->amoid; + useam = check_useam(PG_ARGISNULL(3) ? USE_AM_NULL : PG_GETARG_BOOL(3), rel_is_hypercore); if (rel_is_hypercore || useam == USE_AM_TRUE) uncompressed_chunk_id = diff --git a/tsl/test/expected/hypercore.out b/tsl/test/expected/hypercore.out index 985be96d57c..03e5983b7dd 100644 --- a/tsl/test/expected/hypercore.out +++ b/tsl/test/expected/hypercore.out @@ -521,7 +521,6 @@ SELECT sum(_ts_meta_count) FROM :cchunk; CALL recompress_chunk(:'chunk'); WARNING: procedure public.recompress_chunk(regclass,boolean) is deprecated and the functionality is now included in public.compress_chunk. this compatibility function will be removed in a future version. -NOTICE: cannot compress hypercore "_hyper_1_1_chunk" using heap, recompressing instead -- Data should be returned even after recompress, but now from the -- compressed relation. Still using index scan. EXPLAIN (verbose, costs off) @@ -576,7 +575,6 @@ SELECT * FROM :chunk WHERE time = '2022-06-01 00:06:15'::timestamptz; -- Can't recompress twice without new non-compressed rows CALL recompress_chunk(:'chunk'); WARNING: procedure public.recompress_chunk(regclass,boolean) is deprecated and the functionality is now included in public.compress_chunk. this compatibility function will be removed in a future version. -NOTICE: cannot compress hypercore "_hyper_1_1_chunk" using heap, recompressing instead NOTICE: chunk "_hyper_1_1_chunk" is already compressed \set ON_ERROR_STOP 1 -- Compressed count after recompression diff --git a/tsl/test/expected/hypercore_create.out b/tsl/test/expected/hypercore_create.out index 78ed60ec070..6dc829c549b 100644 --- a/tsl/test/expected/hypercore_create.out +++ b/tsl/test/expected/hypercore_create.out @@ -646,33 +646,12 @@ commit; select compress_chunk(ch, hypercore_use_access_method => true, if_not_compressed => false) from show_chunks('test2') ch; ERROR: chunk "_hyper_1_1_chunk" is already compressed -\set ON_ERROR_STOP 1 --- Compressing from hypercore not using access method should lead to --- recompression of hypercore with a notice. +-- Compressing from hypercore and not using access method should lead +-- to an error since it is not supported. select compress_chunk(ch, hypercore_use_access_method => false) from show_chunks('test2') ch; -NOTICE: cannot compress hypercore "_hyper_1_1_chunk" using heap, recompressing instead -NOTICE: chunk "_hyper_1_1_chunk" is already compressed -NOTICE: cannot compress hypercore "_hyper_1_3_chunk" using heap, recompressing instead -NOTICE: chunk "_hyper_1_3_chunk" is already compressed -NOTICE: cannot compress hypercore "_hyper_1_5_chunk" using heap, recompressing instead -NOTICE: chunk "_hyper_1_5_chunk" is already compressed -NOTICE: cannot compress hypercore "_hyper_1_7_chunk" using heap, recompressing instead -NOTICE: chunk "_hyper_1_7_chunk" is already compressed -NOTICE: cannot compress hypercore "_hyper_1_9_chunk" using heap, recompressing instead -NOTICE: chunk "_hyper_1_9_chunk" is already compressed -NOTICE: cannot compress hypercore "_hyper_1_11_chunk" using heap, recompressing instead -NOTICE: chunk "_hyper_1_11_chunk" is already compressed - compress_chunk ------------------------------------------ - _timescaledb_internal._hyper_1_1_chunk - _timescaledb_internal._hyper_1_3_chunk - _timescaledb_internal._hyper_1_5_chunk - _timescaledb_internal._hyper_1_7_chunk - _timescaledb_internal._hyper_1_9_chunk - _timescaledb_internal._hyper_1_11_chunk -(6 rows) - +ERROR: cannot compress "_hyper_1_1_chunk" without using Hypercore access method +\set ON_ERROR_STOP 1 -- Compressing a hypercore should by default lead to -- recompression. First check that :chunk is a hypercore. select ch as chunk from show_chunks('test2') ch limit 1 \gset @@ -691,7 +670,6 @@ select ctid from :chunk where created_at = '2022-06-01 10:01' and device_id = 6; (1 row) select compress_chunk(:'chunk'); -NOTICE: cannot compress hypercore "_hyper_1_1_chunk" using heap, recompressing instead compress_chunk ---------------------------------------- _timescaledb_internal._hyper_1_1_chunk diff --git a/tsl/test/expected/hypercore_vacuum.out b/tsl/test/expected/hypercore_vacuum.out index 0f2382b1412..eab589787e3 100644 --- a/tsl/test/expected/hypercore_vacuum.out +++ b/tsl/test/expected/hypercore_vacuum.out @@ -227,7 +227,6 @@ insert into hystable (time, location, device, temp) values ('2022-06-01 00:01', 1, 1, 1.0); -- Recompress to get data back into compressed form select compress_chunk(:'hystable_chunk'); -NOTICE: cannot compress hypercore "_hyper_1_2_chunk" using heap, recompressing instead compress_chunk ---------------------------------------- _timescaledb_internal._hyper_1_2_chunk @@ -311,7 +310,6 @@ select tuple_count from pgstattuple(:'hystable_device_chunk_idx'); -- rebuild indexes. It will optimize the structure of the index and -- thus reduce the number of index tuples select compress_chunk(:'hystable_chunk'); -NOTICE: cannot compress hypercore "_hyper_1_2_chunk" using heap, recompressing instead compress_chunk ---------------------------------------- _timescaledb_internal._hyper_1_2_chunk diff --git a/tsl/test/sql/hypercore_create.sql b/tsl/test/sql/hypercore_create.sql index db179cd4488..95f39da7725 100644 --- a/tsl/test/sql/hypercore_create.sql +++ b/tsl/test/sql/hypercore_create.sql @@ -319,12 +319,11 @@ commit; select compress_chunk(ch, hypercore_use_access_method => true, if_not_compressed => false) from show_chunks('test2') ch; -\set ON_ERROR_STOP 1 - --- Compressing from hypercore not using access method should lead to --- recompression of hypercore with a notice. +-- Compressing from hypercore and not using access method should lead +-- to an error since it is not supported. select compress_chunk(ch, hypercore_use_access_method => false) from show_chunks('test2') ch; +\set ON_ERROR_STOP 1 -- Compressing a hypercore should by default lead to -- recompression. First check that :chunk is a hypercore. From 7740ae8f9c46875e88788e0c495ccd8c187d0d65 Mon Sep 17 00:00:00 2001 From: Mats Kindahl Date: Fri, 15 Nov 2024 07:46:06 +0100 Subject: [PATCH 2/5] Remove unnecessary TimescaleDB functions Functions `ts_catalog_open_indexes` and friends are just a copy of the corresponding PostgreSQL function `CatalogOpenIndexes`, so remove them and use these instead. --- src/ts_catalog/catalog.c | 22 ---------------------- src/ts_catalog/catalog.h | 2 -- tsl/src/compression/api.c | 6 +++--- tsl/src/compression/compression.c | 9 +++++---- tsl/src/compression/compression.h | 3 ++- 5 files changed, 10 insertions(+), 32 deletions(-) diff --git a/src/ts_catalog/catalog.c b/src/ts_catalog/catalog.c index cb519e2f9da..4278ac763c0 100644 --- a/src/ts_catalog/catalog.c +++ b/src/ts_catalog/catalog.c @@ -789,28 +789,6 @@ ts_catalog_scan_all(CatalogTable table, int indexid, ScanKeyData *scankey, int n ts_scanner_scan(&scanctx); } -extern TSDLLEXPORT ResultRelInfo * -ts_catalog_open_indexes(Relation heapRel) -{ - ResultRelInfo *resultRelInfo; - - resultRelInfo = makeNode(ResultRelInfo); - resultRelInfo->ri_RangeTableIndex = 0; /* dummy */ - resultRelInfo->ri_RelationDesc = heapRel; - resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */ - - ExecOpenIndices(resultRelInfo, false); - - return resultRelInfo; -} - -extern TSDLLEXPORT void -ts_catalog_close_indexes(ResultRelInfo *indstate) -{ - ExecCloseIndices(indstate); - pfree(indstate); -} - /* * Copied verbatim from postgres source CatalogIndexInsert which is static * in postgres source code. diff --git a/src/ts_catalog/catalog.h b/src/ts_catalog/catalog.h index 49d77b03ccb..36d925acef4 100644 --- a/src/ts_catalog/catalog.h +++ b/src/ts_catalog/catalog.h @@ -1356,8 +1356,6 @@ extern TSDLLEXPORT void ts_catalog_update(Relation rel, HeapTuple tuple); extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid); extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid); extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation); -extern TSDLLEXPORT ResultRelInfo *ts_catalog_open_indexes(Relation heapRel); -extern TSDLLEXPORT void ts_catalog_close_indexes(ResultRelInfo *indstate); extern TSDLLEXPORT void ts_catalog_index_insert(ResultRelInfo *indstate, HeapTuple heapTuple); bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey, diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index cb1ef4781d5..9f9e25c6c62 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -1092,10 +1093,9 @@ get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk) CompressionSettings *settings = ts_compression_settings_get(compressed_chunk->table_id); - ResultRelInfo *indstate = ts_catalog_open_indexes(compressed_chunk_rel); + CatalogIndexState indstate = CatalogOpenIndexes(compressed_chunk_rel); Oid index_oid = get_compressed_chunk_index(indstate, settings); - - ts_catalog_close_indexes(indstate); + CatalogCloseIndexes(indstate); table_close(compressed_chunk_rel, NoLock); table_close(uncompressed_chunk_rel, NoLock); diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index bede1ae062e..222c08f71de 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -5,6 +5,7 @@ */ #include #include +#include #include #include #include @@ -793,7 +794,7 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor ALLOCSET_DEFAULT_SIZES), .compressed_table = compressed_table, .bistate = need_bistate ? GetBulkInsertState() : NULL, - .resultRelInfo = ts_catalog_open_indexes(compressed_table), + .resultRelInfo = CatalogOpenIndexes(compressed_table), .n_input_columns = RelationGetDescr(uncompressed_table)->natts, .count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num), .compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table), @@ -1124,7 +1125,7 @@ row_compressor_close(RowCompressor *row_compressor) { if (row_compressor->bistate) FreeBulkInsertState(row_compressor->bistate); - ts_catalog_close_indexes(row_compressor->resultRelInfo); + CatalogCloseIndexes(row_compressor->resultRelInfo); } /****************** @@ -1216,7 +1217,7 @@ build_decompressor(Relation in_rel, Relation out_rel) .out_desc = out_desc, .out_rel = out_rel, - .indexstate = ts_catalog_open_indexes(out_rel), + .indexstate = CatalogOpenIndexes(out_rel), .mycid = GetCurrentCommandId(true), .bistate = GetBulkInsertState(), @@ -1266,7 +1267,7 @@ row_decompressor_close(RowDecompressor *decompressor) { FreeBulkInsertState(decompressor->bistate); MemoryContextDelete(decompressor->per_compressed_row_ctx); - ts_catalog_close_indexes(decompressor->indexstate); + CatalogCloseIndexes(decompressor->indexstate); FreeExecutorState(decompressor->estate); detoaster_close(&decompressor->detoaster); } diff --git a/tsl/src/compression/compression.h b/tsl/src/compression/compression.h index 8e991ce9d22..0ede01bdf0e 100644 --- a/tsl/src/compression/compression.h +++ b/tsl/src/compression/compression.h @@ -6,6 +6,7 @@ #pragma once #include +#include #include #include #include @@ -130,7 +131,7 @@ typedef struct RowDecompressor TupleDesc out_desc; Relation out_rel; - ResultRelInfo *indexstate; + CatalogIndexState indexstate; EState *estate; CommandId mycid; From 3583b124dbf216aaa563fc898f802edfbbd86b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 17 Oct 2024 14:57:27 +0200 Subject: [PATCH 3/5] Fix issues with Hypercore TAM recompression A truncate on a hypercore TAM table is executed across both compressed and non-compressed data. This caused an issue when recompressing because it tries to truncate also the compressed data. Fix this issue by introducing a flag that allows truncating only the non-compressed data. Another issue releated to cache invalidation is also fixed. Since a recompression sometimes creates a new compressed relation, and the compressed relid is cached in the Hypercore TAM's relcache entry, the cache needs to be invalidated during recompression. However, this wasn't done previously leading to an error. This is fixed by adding a relcache invalidation during recompression. Finally, compression using an index scan is disabled for Hypercore TAM since the index covers also compressed data (in the recompression case). While the index could be used when compressing the first time (when only non-compressed data is indexed), it is still disabled completely for Hypercore TAM given that index scans are not used by default anyway. Tests are added to cover all of the issues described above. --- tsl/src/compression/api.c | 22 ++++ tsl/src/compression/compression.c | 26 +++- tsl/src/hypercore/hypercore_handler.c | 48 +++++++- tsl/src/hypercore/hypercore_handler.h | 10 ++ tsl/test/expected/hypercore.out | 164 +++++++++++++++++++++++++ tsl/test/expected/hypercore_create.out | 4 +- tsl/test/sql/CMakeLists.txt | 3 +- tsl/test/sql/hypercore.sql | 93 ++++++++++++++ tsl/test/src/CMakeLists.txt | 9 +- tsl/test/src/test_hypercore.c | 125 +++++++++++++++++++ 10 files changed, 488 insertions(+), 16 deletions(-) create mode 100644 tsl/test/src/test_hypercore.c diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index 9f9e25c6c62..f4adac8071a 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -459,6 +459,20 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid) (errmsg("new compressed chunk \"%s.%s\" created", NameStr(compress_ht_chunk->fd.schema_name), NameStr(compress_ht_chunk->fd.table_name)))); + + /* Since a new compressed relation was created it is necessary to + * invalidate the relcache entry for the chunk because Hypercore TAM + * caches information about the compressed relation in the + * relcache. */ + if (ts_is_hypercore_am(cxt.srcht_chunk->amoid)) + { + /* Tell other backends */ + CacheInvalidateRelcacheByRelid(cxt.srcht_chunk->table_id); + + /* Immediately invalidate our own cache */ + RelationCacheInvalidateEntry(cxt.srcht_chunk->table_id); + } + EventTriggerAlterTableEnd(); } else @@ -827,11 +841,19 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam, break; case USE_AM_NULL: Assert(rel_is_hypercore); + /* Don't forward the truncate to the compressed data during recompression */ + bool truncate_compressed = hypercore_set_truncate_compressed(false); relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress); + hypercore_set_truncate_compressed(truncate_compressed); break; case USE_AM_TRUE: if (rel_is_hypercore) + { + /* Don't forward the truncate to the compressed data during recompression */ + bool truncate_compressed = hypercore_set_truncate_compressed(false); relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress); + hypercore_set_truncate_compressed(truncate_compressed); + } else { /* Convert to a compressed hypercore by simply calling ALTER TABLE diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 222c08f71de..d336c19eadf 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -4,6 +4,7 @@ * LICENSE-TIMESCALE for a copy of the license. */ #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "debug_assert.h" #include "debug_point.h" #include "guc.h" +#include "hypercore/hypercore_handler.h" #include "nodes/chunk_dispatch/chunk_insert_state.h" #include "segment_meta.h" #include "ts_catalog/array_utils.h" @@ -181,7 +183,12 @@ static void RelationDeleteAllRows(Relation rel, Snapshot snap) { TupleTableSlot *slot = table_slot_create(rel, NULL); - TableScanDesc scan = table_beginscan(rel, snap, 0, (ScanKey) NULL); + ScanKeyData scankey = { + /* Let compression TAM know it should only return tuples from the + * non-compressed relation. No actual scankey necessary */ + .sk_flags = SK_NO_COMPRESSED, + }; + TableScanDesc scan = table_beginscan(rel, snap, 0, &scankey); while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { @@ -292,8 +299,14 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options) * The following code is trying to find an existing index that * matches the configuration so that we can skip sequential scan and * tuplesort. + * + * Note that Hypercore TAM doesn't support (re-)compression via index at + * this point because the index covers also existing compressed tuples. It + * could be supported for initial compression when there is no compressed + * data, but for now just avoid it altogether since compression indexscan + * isn't enabled by default anyway. */ - if (ts_guc_enable_compression_indexscan) + if (ts_guc_enable_compression_indexscan && !REL_IS_HYPERCORE(in_rel)) { List *in_rel_index_oids = RelationGetIndexList(in_rel); foreach (lc, in_rel_index_oids) @@ -443,6 +456,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options) { int64 nrows_processed = 0; + Assert(!REL_IS_HYPERCORE(in_rel)); elog(ts_guc_debug_compression_path_info ? INFO : DEBUG1, "using index \"%s\" to scan rows for compression", get_rel_name(matched_index_rel->rd_id)); @@ -563,9 +577,13 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel) Tuplesortstate *tuplesortstate; TableScanDesc scan; TupleTableSlot *slot; - + ScanKeyData scankey = { + /* Let compression TAM know it should only return tuples from the + * non-compressed relation. No actual scankey necessary */ + .sk_flags = SK_NO_COMPRESSED, + }; tuplesortstate = compression_create_tuplesort_state(settings, in_rel); - scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); + scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, &scankey); slot = table_slot_create(in_rel, NULL); while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) diff --git a/tsl/src/hypercore/hypercore_handler.c b/tsl/src/hypercore/hypercore_handler.c index 50cac0ea4b4..a5a47436abd 100644 --- a/tsl/src/hypercore/hypercore_handler.c +++ b/tsl/src/hypercore/hypercore_handler.c @@ -64,7 +64,6 @@ #include "debug_assert.h" #include "guc.h" #include "hypercore_handler.h" -#include "process_utility.h" #include "relstats.h" #include "trigger.h" #include "ts_catalog/array_utils.h" @@ -82,6 +81,28 @@ static List *partially_compressed_relids = NIL; /* Relids that needs to have * updated status set at end of * transaction */ +static bool hypercore_truncate_compressed = true; + +/* + * Configure whether a TRUNCATE on Hypercore TAM should truncate all data + * (both compressed and non-compressed) or only non-compressed data. + * + * This is used during re-compression where non-compressed data gets folded + * into existing compressed data. In that case, the existing compressed data + * should remain, but the non-compressed data that got compressed should be + * truncated. + * + * Note that this setting is sticky so it needs to be reset after the truncate + * operation completes. + */ +bool +hypercore_set_truncate_compressed(bool onoff) +{ + bool old_value = hypercore_truncate_compressed; + hypercore_truncate_compressed = onoff; + return old_value; +} + #define HYPERCORE_AM_INFO_SIZE(natts) \ (sizeof(HypercoreInfo) + (sizeof(ColumnCompressionSettings) * (natts))) @@ -516,6 +537,7 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key * from the compressed chunk, so avoid reading it again here. */ scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED; + scan->rs_base.rs_flags |= SO_HYPERCORE_SKIP_COMPRESSED; } initscan(scan, keys, nkeys); @@ -559,9 +581,23 @@ hypercore_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_s initscan(scan, key, scan->rs_base.rs_nkeys); scan->reset = true; - scan->hs_scan_state = HYPERCORE_SCAN_START; - table_rescan(scan->cscan_desc, key); + /* Check if there's a change in "skip compressed" */ + if (key) + { + if (key->sk_flags & SK_NO_COMPRESSED) + scan->rs_base.rs_flags = SO_HYPERCORE_SKIP_COMPRESSED; + else + scan->rs_base.rs_flags &= ~SO_HYPERCORE_SKIP_COMPRESSED; + } + + if (scan->rs_base.rs_flags & SO_HYPERCORE_SKIP_COMPRESSED) + scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED; + else + scan->hs_scan_state = HYPERCORE_SCAN_START; + + if (scan->cscan_desc) + table_rescan(scan->cscan_desc, key); Relation relation = scan->uscan_desc->rs_rd; const TableAmRoutine *oldtam = switch_to_heapam(relation); @@ -1644,7 +1680,7 @@ hypercore_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapsh { TM_Result result = TM_Ok; - if (is_compressed_tid(tid)) + if (is_compressed_tid(tid) && hypercore_truncate_compressed) { HypercoreInfo *caminfo = RelationGetHypercoreInfo(relation); Relation crel = table_open(caminfo->compressed_relid, RowExclusiveLock); @@ -1828,7 +1864,7 @@ hypercore_relation_set_new_filelocator(Relation rel, const RelFileLocator *newrl * change the rel file number for it as well. This can happen if you, for * example, execute a transactional TRUNCATE. */ Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel)); - if (OidIsValid(compressed_relid)) + if (OidIsValid(compressed_relid) && hypercore_truncate_compressed) { Relation compressed_rel = table_open(compressed_relid, AccessExclusiveLock); #if PG16_GE @@ -1848,7 +1884,7 @@ hypercore_relation_nontransactional_truncate(Relation rel) rel->rd_tableam = oldtam; Oid compressed_relid = chunk_get_compressed_chunk_relid(RelationGetRelid(rel)); - if (OidIsValid(compressed_relid)) + if (OidIsValid(compressed_relid) && hypercore_truncate_compressed) { Relation crel = table_open(compressed_relid, AccessShareLock); crel->rd_tableam->relation_nontransactional_truncate(crel); diff --git a/tsl/src/hypercore/hypercore_handler.h b/tsl/src/hypercore/hypercore_handler.h index 3cc5d5a68fe..cc9fabfa103 100644 --- a/tsl/src/hypercore/hypercore_handler.h +++ b/tsl/src/hypercore/hypercore_handler.h @@ -18,6 +18,13 @@ * individual access methods, so use bit 16. */ #define SK_NO_COMPRESSED 0x8000 +typedef enum HypercoreScanOptions +{ + /* Normal scan options stretch to 9th bit. Start at bit 15 out of 32 to be + * safe. */ + SO_HYPERCORE_SKIP_COMPRESSED = 1 << 15, +} HypercoreScanOptions; + extern void hypercore_set_analyze_relid(Oid relid); extern const TableAmRoutine *hypercore_routine(void); extern void hypercore_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht); @@ -25,6 +32,7 @@ extern void hypercore_alter_access_method_begin(Oid relid, bool to_other_am); extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am); extern Datum hypercore_handler(PG_FUNCTION_ARGS); extern void hypercore_xact_event(XactEvent event, void *arg); +extern bool hypercore_set_truncate_compressed(bool onoff); typedef struct ColumnCompressionSettings { @@ -56,4 +64,6 @@ typedef struct HypercoreInfo ColumnCompressionSettings columns[FLEXIBLE_ARRAY_MEMBER]; } HypercoreInfo; +#define REL_IS_HYPERCORE(rel) ((rel)->rd_tableam == hypercore_routine()) + extern HypercoreInfo *RelationGetHypercoreInfo(Relation rel); diff --git a/tsl/test/expected/hypercore.out b/tsl/test/expected/hypercore.out index 03e5983b7dd..abb73ea1aa4 100644 --- a/tsl/test/expected/hypercore.out +++ b/tsl/test/expected/hypercore.out @@ -592,3 +592,167 @@ SELECT count(*) FROM :chunk; (1 row) drop table readings; +--------------------------------------------- +-- Test recompression via compress_chunk() -- +--------------------------------------------- +show timescaledb.enable_transparent_decompression; + timescaledb.enable_transparent_decompression +---------------------------------------------- + off +(1 row) + +create table recompress (time timestamptz, value int); +select create_hypertable('recompress', 'time', create_default_indexes => false); +NOTICE: adding not-null constraint to column "time" + create_hypertable +------------------------- + (3,public,recompress,t) +(1 row) + +insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2); +select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk + from timescaledb_information.chunks + where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass + order by unique_chunk asc + limit 1 \gset +alter table recompress set (timescaledb.compress_orderby='time'); +WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes +NOTICE: default segment by for hypertable "recompress" is set to "" +alter table :unique_chunk set access method hypercore; +-- Should already be compressed +select compress_chunk(:'unique_chunk'); +NOTICE: chunk "_hyper_3_34_chunk" is already compressed + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +-- Insert something to compress +insert into recompress values ('2024-01-01 03:00', 3); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +-- Make sure we see the data after recompression and everything is +-- compressed +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 +(3 rows) + +-- Add a time index to test recompression with index scan. Index scans +-- during compression is actually disabled for Hypercore TAM since the +-- index covers also compressed data, so this is only a check that the +-- GUC can be set without negative consequences. +create index on recompress (time); +set timescaledb.enable_compression_indexscan=true; +-- Insert another value to compress +insert into recompress values ('2024-01-02 04:00', 4); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 + t | Tue Jan 02 04:00:00 2024 PST | 4 +(4 rows) + +-- Test using delete instead of truncate when compressing +set timescaledb.enable_delete_after_compression=true; +-- Insert another value to compress +insert into recompress values ('2024-01-02 05:00', 5); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 + t | Tue Jan 02 04:00:00 2024 PST | 4 + t | Tue Jan 02 05:00:00 2024 PST | 5 +(5 rows) + +-- Add a segmentby key to test segmentwise recompression +-- Insert another value to compress that goes into same segment +alter table :unique_chunk set access method heap; +alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value'); +alter table :unique_chunk set access method hypercore; +insert into recompress values ('2024-01-02 06:00', 5); +select compress_chunk(:'unique_chunk'); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_3_34_chunk +(1 row) + +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + is_compressed_tid | time | value +-------------------+------------------------------+------- + t | Mon Jan 01 01:00:00 2024 PST | 1 + t | Mon Jan 01 02:00:00 2024 PST | 2 + t | Mon Jan 01 03:00:00 2024 PST | 3 + t | Tue Jan 02 04:00:00 2024 PST | 4 + t | Tue Jan 02 05:00:00 2024 PST | 5 + t | Tue Jan 02 06:00:00 2024 PST | 5 +(6 rows) + +-------------------------------------- +-- C-native tests for hypercore TAM -- +-------------------------------------- +-- Test rescan functionality and ability to return only non-compressed data +create table rescan (time timestamptz, device int, temp float); +select create_hypertable('rescan', 'time'); +NOTICE: adding not-null constraint to column "time" + create_hypertable +--------------------- + (5,public,rescan,t) +(1 row) + +alter table rescan set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device'); +insert into rescan values ('2024-11-01 01:00', 1, 1.0), ('2024-11-01 02:00', 1, 2.0), ('2024-11-01 03:00', 1, 3.0), ('2024-11-01 06:00', 1, 4.0), ('2024-11-01 05:00', 1, 5.0); +select format('%I.%I', chunk_schema, chunk_name)::regclass as rescan_chunk + from timescaledb_information.chunks + where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'rescan'::regclass + order by rescan_chunk asc + limit 1 \gset +select compress_chunk(:'rescan_chunk', hypercore_use_access_method => true); + compress_chunk +----------------------------------------- + _timescaledb_internal._hyper_5_40_chunk +(1 row) + +select relname, amname + from show_chunks('rescan') as chunk + join pg_class on (pg_class.oid = chunk) + join pg_am on (relam = pg_am.oid); + relname | amname +-------------------+----------- + _hyper_5_40_chunk | hypercore +(1 row) + +insert into rescan values ('2024-11-02 01:00', 2, 1.0), ('2024-11-02 02:00', 2, 2.0), ('2024-11-02 03:00', 1, 3.0), ('2024-11-02 05:00', 2, 4.0); +reset role; +create function test_hypercore(relid regclass) +returns void as :TSL_MODULE_PATHNAME, 'ts_test_hypercore' language c; +set role :ROLE_DEFAULT_PERM_USER; +select test_hypercore(:'rescan_chunk'); + test_hypercore +---------------- + +(1 row) + diff --git a/tsl/test/expected/hypercore_create.out b/tsl/test/expected/hypercore_create.out index 6dc829c549b..e2f024334b4 100644 --- a/tsl/test/expected/hypercore_create.out +++ b/tsl/test/expected/hypercore_create.out @@ -438,7 +438,7 @@ select * from compressed_rel_size_stats order by rel; _timescaledb_internal._hyper_1_7_chunk | hypercore | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_9_chunk | hypercore | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_11_chunk | hypercore | test2 | 373 | 10 | 10 - _timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 0 | 0 | 0 + _timescaledb_internal._hyper_4_13_chunk | hypercore | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_17_chunk | hypercore | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_18_chunk | hypercore | test3 | 1 | 1 | 1 (9 rows) @@ -476,7 +476,7 @@ select * from compressed_rel_size_stats order by rel; _timescaledb_internal._hyper_1_7_chunk | heap | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_9_chunk | heap | test2 | 2016 | 10 | 10 _timescaledb_internal._hyper_1_11_chunk | heap | test2 | 373 | 10 | 10 - _timescaledb_internal._hyper_4_13_chunk | heap | test3 | 0 | 0 | 0 + _timescaledb_internal._hyper_4_13_chunk | heap | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_17_chunk | heap | test3 | 1 | 1 | 1 _timescaledb_internal._hyper_4_18_chunk | heap | test3 | 1 | 1 | 1 (9 rows) diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 451e8f59945..9cc7e62b84a 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -135,14 +135,13 @@ endif(CMAKE_BUILD_TYPE MATCHES Debug) if((${PG_VERSION_MAJOR} GREATER_EQUAL "15")) if(CMAKE_BUILD_TYPE MATCHES Debug) - list(APPEND TEST_FILES bgw_scheduler_control.sql) + list(APPEND TEST_FILES bgw_scheduler_control.sql hypercore.sql) endif() list( APPEND TEST_FILES merge_compress.sql cagg_refresh_using_merge.sql - hypercore.sql hypercore_columnar.sql hypercore_copy.sql hypercore_create.sql diff --git a/tsl/test/sql/hypercore.sql b/tsl/test/sql/hypercore.sql index 3eba3ae24f8..1a77833d023 100644 --- a/tsl/test/sql/hypercore.sql +++ b/tsl/test/sql/hypercore.sql @@ -310,3 +310,96 @@ SELECT sum(_ts_meta_count) FROM :cchunk; SELECT count(*) FROM :chunk; drop table readings; + +--------------------------------------------- +-- Test recompression via compress_chunk() -- +--------------------------------------------- +show timescaledb.enable_transparent_decompression; + +create table recompress (time timestamptz, value int); +select create_hypertable('recompress', 'time', create_default_indexes => false); +insert into recompress values ('2024-01-01 01:00', 1), ('2024-01-01 02:00', 2); + +select format('%I.%I', chunk_schema, chunk_name)::regclass as unique_chunk + from timescaledb_information.chunks + where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'recompress'::regclass + order by unique_chunk asc + limit 1 \gset + +alter table recompress set (timescaledb.compress_orderby='time'); +alter table :unique_chunk set access method hypercore; + +-- Should already be compressed +select compress_chunk(:'unique_chunk'); + +-- Insert something to compress +insert into recompress values ('2024-01-01 03:00', 3); + +select compress_chunk(:'unique_chunk'); + +-- Make sure we see the data after recompression and everything is +-- compressed +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-- Add a time index to test recompression with index scan. Index scans +-- during compression is actually disabled for Hypercore TAM since the +-- index covers also compressed data, so this is only a check that the +-- GUC can be set without negative consequences. +create index on recompress (time); +set timescaledb.enable_compression_indexscan=true; + +-- Insert another value to compress +insert into recompress values ('2024-01-02 04:00', 4); +select compress_chunk(:'unique_chunk'); +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-- Test using delete instead of truncate when compressing +set timescaledb.enable_delete_after_compression=true; + +-- Insert another value to compress +insert into recompress values ('2024-01-02 05:00', 5); + +select compress_chunk(:'unique_chunk'); +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-- Add a segmentby key to test segmentwise recompression +-- Insert another value to compress that goes into same segment +alter table :unique_chunk set access method heap; +alter table recompress set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='value'); +alter table :unique_chunk set access method hypercore; +insert into recompress values ('2024-01-02 06:00', 5); + +select compress_chunk(:'unique_chunk'); +select _timescaledb_debug.is_compressed_tid(ctid), * from recompress order by time; + +-------------------------------------- +-- C-native tests for hypercore TAM -- +-------------------------------------- + +-- Test rescan functionality and ability to return only non-compressed data +create table rescan (time timestamptz, device int, temp float); +select create_hypertable('rescan', 'time'); +alter table rescan set (timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device'); +insert into rescan values ('2024-11-01 01:00', 1, 1.0), ('2024-11-01 02:00', 1, 2.0), ('2024-11-01 03:00', 1, 3.0), ('2024-11-01 06:00', 1, 4.0), ('2024-11-01 05:00', 1, 5.0); + +select format('%I.%I', chunk_schema, chunk_name)::regclass as rescan_chunk + from timescaledb_information.chunks + where format('%I.%I', hypertable_schema, hypertable_name)::regclass = 'rescan'::regclass + order by rescan_chunk asc + limit 1 \gset + +select compress_chunk(:'rescan_chunk', hypercore_use_access_method => true); + +select relname, amname + from show_chunks('rescan') as chunk + join pg_class on (pg_class.oid = chunk) + join pg_am on (relam = pg_am.oid); + +insert into rescan values ('2024-11-02 01:00', 2, 1.0), ('2024-11-02 02:00', 2, 2.0), ('2024-11-02 03:00', 1, 3.0), ('2024-11-02 05:00', 2, 4.0); + +reset role; +create function test_hypercore(relid regclass) +returns void as :TSL_MODULE_PATHNAME, 'ts_test_hypercore' language c; +set role :ROLE_DEFAULT_PERM_USER; + +select test_hypercore(:'rescan_chunk'); diff --git a/tsl/test/src/CMakeLists.txt b/tsl/test/src/CMakeLists.txt index 7591f271aaa..e5b232e9447 100644 --- a/tsl/test/src/CMakeLists.txt +++ b/tsl/test/src/CMakeLists.txt @@ -1,6 +1,11 @@ set(SOURCES - test_chunk_stats.c test_merge_chunk.c compression_unit_test.c - compression_sql_test.c decompress_text_test_impl.c test_continuous_agg.c) + test_chunk_stats.c + test_merge_chunk.c + compression_unit_test.c + compression_sql_test.c + decompress_text_test_impl.c + test_continuous_agg.c + test_hypercore.c) include(${PROJECT_SOURCE_DIR}/tsl/src/build-defs.cmake) diff --git a/tsl/test/src/test_hypercore.c b/tsl/test/src/test_hypercore.c new file mode 100644 index 00000000000..202ff070076 --- /dev/null +++ b/tsl/test/src/test_hypercore.c @@ -0,0 +1,125 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include + +#include "export.h" +#include "hypercore/arrow_tts.h" +#include "hypercore/hypercore_handler.h" +#include "test_utils.h" + +/* + * Test that Hypercore rescan API works correctly. + * + * In particular, test that scanning only non-compressed data across rescans + * work. + */ +static void +test_rescan_hypercore(Oid relid) +{ + Relation rel = table_open(relid, AccessShareLock); + TupleTableSlot *slot = table_slot_create(rel, NULL); + TableScanDesc scan; + Snapshot snapshot = GetTransactionSnapshot(); + ScanKeyData scankey = { + /* Let compression TAM know it should only return tuples from the + * non-compressed relation. No actual scankey necessary */ + .sk_flags = SK_NO_COMPRESSED, + }; + unsigned int compressed_tuple_count = 0; + unsigned int noncompressed_tuple_count = 0; + unsigned int prev_noncompressed_tuple_count = 0; + unsigned int prev_compressed_tuple_count = 0; + + TestAssertTrue(TTS_IS_ARROWTUPLE(slot)); + + /* Scan only non-compressed data */ + scan = table_beginscan(rel, snapshot, 0, &scankey); + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + if (is_compressed_tid(&slot->tts_tid)) + compressed_tuple_count++; + else + noncompressed_tuple_count++; + } + + TestAssertTrue(compressed_tuple_count == 0); + TestAssertTrue(noncompressed_tuple_count > 0); + prev_noncompressed_tuple_count = noncompressed_tuple_count; + prev_compressed_tuple_count = compressed_tuple_count; + compressed_tuple_count = 0; + noncompressed_tuple_count = 0; + + /* Rescan only non-compressed data */ + table_rescan(scan, &scankey); + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + if (is_compressed_tid(&slot->tts_tid)) + compressed_tuple_count++; + else + noncompressed_tuple_count++; + } + + TestAssertTrue(compressed_tuple_count == 0); + TestAssertTrue(noncompressed_tuple_count == prev_noncompressed_tuple_count); + TestAssertTrue(compressed_tuple_count == prev_compressed_tuple_count); + prev_noncompressed_tuple_count = noncompressed_tuple_count; + prev_compressed_tuple_count = compressed_tuple_count; + compressed_tuple_count = 0; + noncompressed_tuple_count = 0; + + /* Rescan only non-compressed data even though giving no new scan key */ + table_rescan(scan, NULL); + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + if (is_compressed_tid(&slot->tts_tid)) + compressed_tuple_count++; + else + noncompressed_tuple_count++; + } + + TestAssertTrue(compressed_tuple_count == 0); + TestAssertTrue(noncompressed_tuple_count == prev_noncompressed_tuple_count); + TestAssertTrue(compressed_tuple_count == prev_compressed_tuple_count); + prev_noncompressed_tuple_count = noncompressed_tuple_count; + prev_compressed_tuple_count = compressed_tuple_count; + compressed_tuple_count = 0; + noncompressed_tuple_count = 0; + + /* Rescan both compressed and non-compressed data by specifying new flag */ + scankey.sk_flags = 0; + table_rescan(scan, &scankey); + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + if (is_compressed_tid(&slot->tts_tid)) + compressed_tuple_count++; + else + noncompressed_tuple_count++; + } + + TestAssertTrue(noncompressed_tuple_count == prev_noncompressed_tuple_count); + TestAssertTrue(compressed_tuple_count > 0); + + table_endscan(scan); + table_close(rel, NoLock); + ExecDropSingleTupleTableSlot(slot); +} + +TS_FUNCTION_INFO_V1(ts_test_hypercore); + +Datum +ts_test_hypercore(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + test_rescan_hypercore(relid); + PG_RETURN_VOID(); +} From 7e30ed65651ab5b0adee835f17829fa69a52382e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 14 Nov 2024 17:56:28 +0100 Subject: [PATCH 4/5] Add syscache tuple lock for pg17.1 and backports PostgreSQL 17.1 introduced a fix for potential lost data when doing updates to pg_class and pg_database catalog tables. The reason the catalog update can be lost is because PG in some places performs non-MVCC compliant inplace updates of tuples in those catalogs, which requires extra locking to prevent overwrites. See the following commit for more information: https://github.com/postgres/postgres/commit/3b7a689e1a805c4dac2f35ff14fd5c9fdbddf150 This fix was also backported to older PG versions 14.14, 15.9, 16.5, which TimescaleDB supports. To be safe against inplace updates, the relation that is updated in pg_class via heap_update() needs to be locked with ShareUpdateExclusiveLock on the relation, or a ShareRowExclusiveLock or stricter on the relation. Otherwise, the update code needs to take a tuple-level lock. TimescaleDB is affected by this change in the function that performs "quick migration" from a compressed chunk to a Hypercore TAM chunk. In that case, the regular ALTER TABLE ... SET ACCESS METHOD handling is bypassed and the pg_class catalog table is updated directly to avoid having to rewrite the compressed data. Since this migration doesn't take a heavy lock on the chunk, it needs the tuple-level lock in the update of pg_class. Other places in our code that update pg_class tuples may be protected by locks on the relation and requires no extra tuple-level lock. A macro is added to check that appropriate level locks are held on the relation prior to the catalog changes. --- src/compat/compat.h | 42 +++++++++++++++++++++++++++ src/utils.c | 8 +++++ tsl/src/hypercore/hypercore_handler.c | 9 +++++- tsl/src/hypercore/utils.c | 16 ++++++---- tsl/src/reorder.c | 3 ++ 5 files changed, 72 insertions(+), 6 deletions(-) diff --git a/src/compat/compat.h b/src/compat/compat.h index e5266825d18..d811ba46c06 100644 --- a/src/compat/compat.h +++ b/src/compat/compat.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -56,6 +57,47 @@ #error "Unsupported PostgreSQL version" #endif +#if ((PG_VERSION_NUM >= 140014 && PG_VERSION_NUM < 150000) || \ + (PG_VERSION_NUM >= 150009 && PG_VERSION_NUM < 160000) || \ + (PG_VERSION_NUM >= 160005 && PG_VERSION_NUM < 170000) || (PG_VERSION_NUM >= 170001)) +/* + * The above versions introduced a fix for potentially losing updates to + * pg_class and pg_database due to inplace updates done to those catalog + * tables by PostgreSQL. The fix requires taking a lock on the tuple via + * SearchSysCacheLocked1(). For older PG versions, we just map the new + * function to the unlocked version and the unlocking of the tuple is a noop. + * + * https://github.com/postgres/postgres/commit/3b7a689e1a805c4dac2f35ff14fd5c9fdbddf150 + * + * Here's an excerpt from README.tuplock that explains the need for additional + * tuple locks: + * + * If IsInplaceUpdateRelation() returns true for a table, the table is a + * system catalog that receives systable_inplace_update_begin() calls. + * Preparing a heap_update() of these tables follows additional locking rules, + * to ensure we don't lose the effects of an inplace update. In particular, + * consider a moment when a backend has fetched the old tuple to modify, not + * yet having called heap_update(). Another backend's inplace update starting + * then can't conclude until the heap_update() places its new tuple in a + * buffer. We enforce that using locktags as follows. While DDL code is the + * main audience, the executor follows these rules to make e.g. "MERGE INTO + * pg_class" safer. Locking rules are per-catalog: + * + * pg_class heap_update() callers: before copying the tuple to modify, take a + * lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a + * ShareRowExclusiveLock or stricter on the relation. + */ +#define SYSCACHE_TUPLE_LOCK_NEEDED 1 +#define AssertSufficientPgClassUpdateLockHeld(relid) \ + Assert(CheckRelationOidLockedByMe(relid, ShareUpdateExclusiveLock, false) || \ + CheckRelationOidLockedByMe(relid, ShareRowExclusiveLock, true)); +#define UnlockSysCacheTuple(rel, tid) UnlockTuple(rel, tid, InplaceUpdateTupleLock); +#else +#define SearchSysCacheLockedCopy1(rel, datum) SearchSysCacheCopy1(rel, datum) +#define UnlockSysCacheTuple(rel, tid) +#define AssertSufficientPgClassUpdateLockHeld(relid) +#endif + /* * The following are compatibility functions for different versions of * PostgreSQL. Each compatibility function (or group) has its own logic for diff --git a/src/utils.c b/src/utils.c index 61e695b9bbe..ff4c3580d2e 100644 --- a/src/utils.c +++ b/src/utils.c @@ -1534,6 +1534,14 @@ ts_copy_relation_acl(const Oid source_relid, const Oid target_relid, const Oid o new_repl[AttrNumberGetAttrOffset(Anum_pg_class_relacl)] = true; new_val[AttrNumberGetAttrOffset(Anum_pg_class_relacl)] = PointerGetDatum(acl); + /* + * ts_copy_relation_acl() is typically used to copy ACLs from the hypertable + * to a newly created chunk. The creation is done via DefineRelation(), + * which takes an AccessExclusiveLock and should be enough to handle any + * inplace update issues. + */ + AssertSufficientPgClassUpdateLockHeld(target_relid); + /* Find the tuple for the target in `pg_class` */ target_tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(target_relid)); Assert(HeapTupleIsValid(target_tuple)); diff --git a/tsl/src/hypercore/hypercore_handler.c b/tsl/src/hypercore/hypercore_handler.c index a5a47436abd..617616ea18a 100644 --- a/tsl/src/hypercore/hypercore_handler.c +++ b/tsl/src/hypercore/hypercore_handler.c @@ -1958,7 +1958,14 @@ compress_and_swap_heap(Relation rel, Tuplesortstate *tuplesort, TransactionId *x table_close(new_compressed_rel, NoLock); table_close(old_compressed_rel, NoLock); - /* Update stats for the compressed relation */ + /* + * Update stats for the compressed relation. + * + * We have an AccessExclusivelock from above so no tuple lock is needed + * during update of the pg_class catalog table. + */ + AssertSufficientPgClassUpdateLockHeld(new_compressed_relid); + Relation relRelation = table_open(RelationRelationId, RowExclusiveLock); HeapTuple reltup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(new_compressed_relid)); if (!HeapTupleIsValid(reltup)) diff --git a/tsl/src/hypercore/utils.c b/tsl/src/hypercore/utils.c index f8a716e8ece..391d0005315 100644 --- a/tsl/src/hypercore/utils.c +++ b/tsl/src/hypercore/utils.c @@ -12,10 +12,12 @@ #include #include #include +#include #include #include #include +#include "compat/compat.h" #include "extension_constants.h" #include "utils.h" #include @@ -30,14 +32,17 @@ hypercore_set_am(const RangeVar *rv) { HeapTuple tp; Oid relid = RangeVarGetRelid(rv, NoLock, false); + Relation class_rel = table_open(RelationRelationId, RowExclusiveLock); + + tp = SearchSysCacheLockedCopy1(RELOID, ObjectIdGetDatum(relid)); - tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); if (HeapTupleIsValid(tp)) { Form_pg_class reltup = (Form_pg_class) GETSTRUCT(tp); Oid hypercore_amoid = get_table_am_oid(TS_HYPERCORE_TAM_NAME, false); - Relation class_rel = table_open(RelationRelationId, RowExclusiveLock); - +#ifdef SYSCACHE_TUPLE_LOCK_NEEDED + ItemPointerData otid = tp->t_self; +#endif elog(DEBUG1, "migrating table \"%s\" to hypercore", get_rel_name(relid)); reltup->relam = hypercore_amoid; @@ -54,8 +59,7 @@ hypercore_set_am(const RangeVar *rv) }; recordDependencyOn(&depender, &referenced, DEPENDENCY_NORMAL); - table_close(class_rel, RowExclusiveLock); - ReleaseSysCache(tp); + UnlockSysCacheTuple(class_rel, &otid); /* * On compressed tables, indexes only contain non-compressed data, so @@ -75,4 +79,6 @@ hypercore_set_am(const RangeVar *rv) reindex_relation(relid, 0, ¶ms); #endif } + + table_close(class_rel, RowExclusiveLock); } diff --git a/tsl/src/reorder.c b/tsl/src/reorder.c index f9c2b4e3c61..514a72b080c 100644 --- a/tsl/src/reorder.c +++ b/tsl/src/reorder.c @@ -926,6 +926,9 @@ swap_relation_files(Oid r1, Oid r2, bool swap_toast_by_content, bool is_internal Oid swaptemp; char swptmpchr; + AssertSufficientPgClassUpdateLockHeld(r1); + AssertSufficientPgClassUpdateLockHeld(r2); + /* We need writable copies of both pg_class tuples. */ relRelation = table_open(RelationRelationId, RowExclusiveLock); From f7cf16745cee2fbef2f86ee34566a7a83e6e4169 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Mon, 18 Nov 2024 18:11:16 +0100 Subject: [PATCH 5/5] Add vacuum to fix flaky tests (#7452) --- tsl/test/expected/compression.out | 3 ++- tsl/test/expected/compression_ddl.out | 1 + tsl/test/sql/compression.sql | 2 ++ tsl/test/sql/compression_ddl.sql | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tsl/test/expected/compression.out b/tsl/test/expected/compression.out index 05620c4ddea..fc5e0ec56db 100644 --- a/tsl/test/expected/compression.out +++ b/tsl/test/expected/compression.out @@ -1764,6 +1764,7 @@ SELECT compress_chunk(i) FROM show_chunks('f_sensor_data') i; (1 row) CALL reindex_compressed_hypertable('f_sensor_data'); +VACUUM ANALYZE f_sensor_data; -- Encourage use of parallel plans SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; @@ -1827,7 +1828,7 @@ SELECT * FROM f_sensor_data WHERE sensor_id > 100; -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Output: _hyper_37_73_chunk."time", _hyper_37_73_chunk.sensor_id, _hyper_37_73_chunk.cpu, _hyper_37_73_chunk.temperature - Workers Planned: 2 + Workers Planned: 3 -> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_73_chunk Output: _hyper_37_73_chunk."time", _hyper_37_73_chunk.sensor_id, _hyper_37_73_chunk.cpu, _hyper_37_73_chunk.temperature -> Parallel Index Scan using compress_hyper_38_74_chunk_sensor_id__ts_meta_min_1__ts_met_idx on _timescaledb_internal.compress_hyper_38_74_chunk diff --git a/tsl/test/expected/compression_ddl.out b/tsl/test/expected/compression_ddl.out index 3a6d2c63c66..60f6d102288 100644 --- a/tsl/test/expected/compression_ddl.out +++ b/tsl/test/expected/compression_ddl.out @@ -1901,6 +1901,7 @@ SELECT compress_chunk(show_chunks('test_partials')); _timescaledb_internal._hyper_33_122_chunk (3 rows) +VACUUM ANALYZE test_partials; -- fully compressed EXPLAIN (costs off) SELECT * FROM test_partials ORDER BY time; QUERY PLAN diff --git a/tsl/test/sql/compression.sql b/tsl/test/sql/compression.sql index f12519f0e7b..adb7b81f07b 100644 --- a/tsl/test/sql/compression.sql +++ b/tsl/test/sql/compression.sql @@ -794,6 +794,8 @@ ALTER TABLE f_sensor_data SET (timescaledb.compress, timescaledb.compress_segmen SELECT compress_chunk(i) FROM show_chunks('f_sensor_data') i; CALL reindex_compressed_hypertable('f_sensor_data'); +VACUUM ANALYZE f_sensor_data; + -- Encourage use of parallel plans SET parallel_setup_cost = 0; SET parallel_tuple_cost = 0; diff --git a/tsl/test/sql/compression_ddl.sql b/tsl/test/sql/compression_ddl.sql index af872977be5..8b8c098591d 100644 --- a/tsl/test/sql/compression_ddl.sql +++ b/tsl/test/sql/compression_ddl.sql @@ -840,6 +840,7 @@ VALUES -- chunk1 -- enable compression, compress all chunks ALTER TABLE test_partials SET (timescaledb.compress); SELECT compress_chunk(show_chunks('test_partials')); +VACUUM ANALYZE test_partials; -- fully compressed EXPLAIN (costs off) SELECT * FROM test_partials ORDER BY time; -- test P, F, F