Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
akuzm committed Nov 18, 2024
2 parents c2b5e18 + f7cf167 commit dcaefee
Show file tree
Hide file tree
Showing 23 changed files with 600 additions and 96 deletions.
42 changes: 42 additions & 0 deletions src/compat/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <nodes/nodes.h>
#include <optimizer/restrictinfo.h>
#include <pgstat.h>
#include <storage/lmgr.h>
#include <utils/jsonb.h>
#include <utils/lsyscache.h>
#include <utils/rel.h>
Expand Down Expand Up @@ -56,6 +57,47 @@
#error "Unsupported PostgreSQL version"
#endif

#if ((PG_VERSION_NUM >= 140014 && PG_VERSION_NUM < 150000) || \
(PG_VERSION_NUM >= 150009 && PG_VERSION_NUM < 160000) || \
(PG_VERSION_NUM >= 160005 && PG_VERSION_NUM < 170000) || (PG_VERSION_NUM >= 170001))
/*
* The above versions introduced a fix for potentially losing updates to
* pg_class and pg_database due to inplace updates done to those catalog
* tables by PostgreSQL. The fix requires taking a lock on the tuple via
* SearchSysCacheLocked1(). For older PG versions, we just map the new
* function to the unlocked version and the unlocking of the tuple is a noop.
*
* https://github.com/postgres/postgres/commit/3b7a689e1a805c4dac2f35ff14fd5c9fdbddf150
*
* Here's an excerpt from README.tuplock that explains the need for additional
* tuple locks:
*
* If IsInplaceUpdateRelation() returns true for a table, the table is a
* system catalog that receives systable_inplace_update_begin() calls.
* Preparing a heap_update() of these tables follows additional locking rules,
* to ensure we don't lose the effects of an inplace update. In particular,
* consider a moment when a backend has fetched the old tuple to modify, not
* yet having called heap_update(). Another backend's inplace update starting
* then can't conclude until the heap_update() places its new tuple in a
* buffer. We enforce that using locktags as follows. While DDL code is the
* main audience, the executor follows these rules to make e.g. "MERGE INTO
* pg_class" safer. Locking rules are per-catalog:
*
* pg_class heap_update() callers: before copying the tuple to modify, take a
* lock on the tuple, a ShareUpdateExclusiveLock on the relation, or a
* ShareRowExclusiveLock or stricter on the relation.
*/
#define SYSCACHE_TUPLE_LOCK_NEEDED 1
#define AssertSufficientPgClassUpdateLockHeld(relid) \
Assert(CheckRelationOidLockedByMe(relid, ShareUpdateExclusiveLock, false) || \
CheckRelationOidLockedByMe(relid, ShareRowExclusiveLock, true));
#define UnlockSysCacheTuple(rel, tid) UnlockTuple(rel, tid, InplaceUpdateTupleLock);
#else
#define SearchSysCacheLockedCopy1(rel, datum) SearchSysCacheCopy1(rel, datum)
#define UnlockSysCacheTuple(rel, tid)
#define AssertSufficientPgClassUpdateLockHeld(relid)
#endif

/*
* The following are compatibility functions for different versions of
* PostgreSQL. Each compatibility function (or group) has its own logic for
Expand Down
22 changes: 0 additions & 22 deletions src/ts_catalog/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -789,28 +789,6 @@ ts_catalog_scan_all(CatalogTable table, int indexid, ScanKeyData *scankey, int n
ts_scanner_scan(&scanctx);
}

extern TSDLLEXPORT ResultRelInfo *
ts_catalog_open_indexes(Relation heapRel)
{
ResultRelInfo *resultRelInfo;

resultRelInfo = makeNode(ResultRelInfo);
resultRelInfo->ri_RangeTableIndex = 0; /* dummy */
resultRelInfo->ri_RelationDesc = heapRel;
resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */

ExecOpenIndices(resultRelInfo, false);

return resultRelInfo;
}

extern TSDLLEXPORT void
ts_catalog_close_indexes(ResultRelInfo *indstate)
{
ExecCloseIndices(indstate);
pfree(indstate);
}

/*
* Copied verbatim from postgres source CatalogIndexInsert which is static
* in postgres source code.
Expand Down
2 changes: 0 additions & 2 deletions src/ts_catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -1356,8 +1356,6 @@ extern TSDLLEXPORT void ts_catalog_update(Relation rel, HeapTuple tuple);
extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);
extern TSDLLEXPORT ResultRelInfo *ts_catalog_open_indexes(Relation heapRel);
extern TSDLLEXPORT void ts_catalog_close_indexes(ResultRelInfo *indstate);
extern TSDLLEXPORT void ts_catalog_index_insert(ResultRelInfo *indstate, HeapTuple heapTuple);

bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey,
Expand Down
8 changes: 8 additions & 0 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,14 @@ ts_copy_relation_acl(const Oid source_relid, const Oid target_relid, const Oid o
new_repl[AttrNumberGetAttrOffset(Anum_pg_class_relacl)] = true;
new_val[AttrNumberGetAttrOffset(Anum_pg_class_relacl)] = PointerGetDatum(acl);

/*
* ts_copy_relation_acl() is typically used to copy ACLs from the hypertable
* to a newly created chunk. The creation is done via DefineRelation(),
* which takes an AccessExclusiveLock and should be enough to handle any
* inplace update issues.
*/
AssertSufficientPgClassUpdateLockHeld(target_relid);

/* Find the tuple for the target in `pg_class` */
target_tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(target_relid));
Assert(HeapTupleIsValid(target_tuple));
Expand Down
52 changes: 42 additions & 10 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <access/xact.h>
#include <catalog/dependency.h>
#include <catalog/index.h>
#include <catalog/indexing.h>
#include <commands/event_trigger.h>
#include <commands/tablecmds.h>
#include <commands/trigger.h>
Expand Down Expand Up @@ -458,6 +459,20 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
(errmsg("new compressed chunk \"%s.%s\" created",
NameStr(compress_ht_chunk->fd.schema_name),
NameStr(compress_ht_chunk->fd.table_name))));

/* Since a new compressed relation was created it is necessary to
* invalidate the relcache entry for the chunk because Hypercore TAM
* caches information about the compressed relation in the
* relcache. */
if (ts_is_hypercore_am(cxt.srcht_chunk->amoid))
{
/* Tell other backends */
CacheInvalidateRelcacheByRelid(cxt.srcht_chunk->table_id);

/* Immediately invalidate our own cache */
RelationCacheInvalidateEntry(cxt.srcht_chunk->table_id);
}

EventTriggerAlterTableEnd();
}
else
Expand Down Expand Up @@ -814,17 +829,31 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam,
switch (useam)
{
case USE_AM_FALSE:
elog(NOTICE,
"cannot compress hypercore \"%s\" using heap, recompressing instead",
get_rel_name(chunk->table_id));
TS_FALLTHROUGH;
/* Converting from Hypercore to "regular" compressed is currently
* not supported */
Assert(rel_is_hypercore);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot compress \"%s\" without using Hypercore access method",
get_rel_name(chunk->table_id)),
errhint(
"Decompress first and then compress without Hypercore access method.")));
break;
case USE_AM_NULL:
Assert(rel_is_hypercore);
/* Don't forward the truncate to the compressed data during recompression */
bool truncate_compressed = hypercore_set_truncate_compressed(false);
relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);
hypercore_set_truncate_compressed(truncate_compressed);
break;
case USE_AM_TRUE:
if (rel_is_hypercore)
{
/* Don't forward the truncate to the compressed data during recompression */
bool truncate_compressed = hypercore_set_truncate_compressed(false);
relid = tsl_compress_chunk_wrapper(chunk, if_not_compressed, recompress);
hypercore_set_truncate_compressed(truncate_compressed);
}
else
{
/* Convert to a compressed hypercore by simply calling ALTER TABLE
Expand All @@ -843,9 +872,12 @@ compress_hypercore(Chunk *chunk, bool rel_is_hypercore, UseAccessMethod useam,
* otherwise.
*/
static UseAccessMethod
check_useam(UseAccessMethod arg)
check_useam(UseAccessMethod arg, bool is_hypercore)
{
return arg == USE_AM_NULL ? (UseAccessMethod) ts_guc_default_hypercore_use_access_method : arg;
if (arg == USE_AM_NULL)
return is_hypercore ? USE_AM_TRUE :
(UseAccessMethod) ts_guc_default_hypercore_use_access_method;
return arg;
}

Datum
Expand All @@ -854,13 +886,14 @@ tsl_compress_chunk(PG_FUNCTION_ARGS)
Oid uncompressed_chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
bool if_not_compressed = PG_ARGISNULL(1) ? true : PG_GETARG_BOOL(1);
bool recompress = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
UseAccessMethod useam = check_useam(PG_ARGISNULL(3) ? USE_AM_NULL : PG_GETARG_BOOL(3));
UseAccessMethod useam;

ts_feature_flag_check(FEATURE_HYPERTABLE_COMPRESSION);

TS_PREVENT_FUNC_IF_READ_ONLY();
Chunk *chunk = ts_chunk_get_by_relid(uncompressed_chunk_id, true);
bool rel_is_hypercore = get_table_am_oid(TS_HYPERCORE_TAM_NAME, false) == chunk->amoid;
useam = check_useam(PG_ARGISNULL(3) ? USE_AM_NULL : PG_GETARG_BOOL(3), rel_is_hypercore);

if (rel_is_hypercore || useam == USE_AM_TRUE)
uncompressed_chunk_id =
Expand Down Expand Up @@ -1082,10 +1115,9 @@ get_compressed_chunk_index_for_recompression(Chunk *uncompressed_chunk)

CompressionSettings *settings = ts_compression_settings_get(compressed_chunk->table_id);

ResultRelInfo *indstate = ts_catalog_open_indexes(compressed_chunk_rel);
CatalogIndexState indstate = CatalogOpenIndexes(compressed_chunk_rel);
Oid index_oid = get_compressed_chunk_index(indstate, settings);

ts_catalog_close_indexes(indstate);
CatalogCloseIndexes(indstate);

table_close(compressed_chunk_rel, NoLock);
table_close(uncompressed_chunk_rel, NoLock);
Expand Down
35 changes: 27 additions & 8 deletions tsl/src/compression/compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <access/skey.h>
#include <catalog/heap.h>
#include <catalog/indexing.h>
#include <catalog/pg_am.h>
#include <common/base64.h>
#include <libpq/pqformat.h>
Expand All @@ -27,6 +29,7 @@
#include "debug_assert.h"
#include "debug_point.h"
#include "guc.h"
#include "hypercore/hypercore_handler.h"
#include "nodes/chunk_dispatch/chunk_insert_state.h"
#include "segment_meta.h"
#include "ts_catalog/array_utils.h"
Expand Down Expand Up @@ -180,7 +183,12 @@ static void
RelationDeleteAllRows(Relation rel, Snapshot snap)
{
TupleTableSlot *slot = table_slot_create(rel, NULL);
TableScanDesc scan = table_beginscan(rel, snap, 0, (ScanKey) NULL);
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
TableScanDesc scan = table_beginscan(rel, snap, 0, &scankey);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down Expand Up @@ -291,8 +299,14 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
* The following code is trying to find an existing index that
* matches the configuration so that we can skip sequential scan and
* tuplesort.
*
* Note that Hypercore TAM doesn't support (re-)compression via index at
* this point because the index covers also existing compressed tuples. It
* could be supported for initial compression when there is no compressed
* data, but for now just avoid it altogether since compression indexscan
* isn't enabled by default anyway.
*/
if (ts_guc_enable_compression_indexscan)
if (ts_guc_enable_compression_indexscan && !REL_IS_HYPERCORE(in_rel))
{
List *in_rel_index_oids = RelationGetIndexList(in_rel);
foreach (lc, in_rel_index_oids)
Expand Down Expand Up @@ -442,6 +456,7 @@ compress_chunk(Oid in_table, Oid out_table, int insert_options)
{
int64 nrows_processed = 0;

Assert(!REL_IS_HYPERCORE(in_rel));
elog(ts_guc_debug_compression_path_info ? INFO : DEBUG1,
"using index \"%s\" to scan rows for compression",
get_rel_name(matched_index_rel->rd_id));
Expand Down Expand Up @@ -562,9 +577,13 @@ compress_chunk_sort_relation(CompressionSettings *settings, Relation in_rel)
Tuplesortstate *tuplesortstate;
TableScanDesc scan;
TupleTableSlot *slot;

ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};
tuplesortstate = compression_create_tuplesort_state(settings, in_rel);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, &scankey);
slot = table_slot_create(in_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down Expand Up @@ -793,7 +812,7 @@ row_compressor_init(CompressionSettings *settings, RowCompressor *row_compressor
ALLOCSET_DEFAULT_SIZES),
.compressed_table = compressed_table,
.bistate = need_bistate ? GetBulkInsertState() : NULL,
.resultRelInfo = ts_catalog_open_indexes(compressed_table),
.resultRelInfo = CatalogOpenIndexes(compressed_table),
.n_input_columns = RelationGetDescr(uncompressed_table)->natts,
.count_metadata_column_offset = AttrNumberGetAttrOffset(count_metadata_column_num),
.compressed_values = palloc(sizeof(Datum) * num_columns_in_compressed_table),
Expand Down Expand Up @@ -1124,7 +1143,7 @@ row_compressor_close(RowCompressor *row_compressor)
{
if (row_compressor->bistate)
FreeBulkInsertState(row_compressor->bistate);
ts_catalog_close_indexes(row_compressor->resultRelInfo);
CatalogCloseIndexes(row_compressor->resultRelInfo);
}

/******************
Expand Down Expand Up @@ -1216,7 +1235,7 @@ build_decompressor(Relation in_rel, Relation out_rel)

.out_desc = out_desc,
.out_rel = out_rel,
.indexstate = ts_catalog_open_indexes(out_rel),
.indexstate = CatalogOpenIndexes(out_rel),

.mycid = GetCurrentCommandId(true),
.bistate = GetBulkInsertState(),
Expand Down Expand Up @@ -1266,7 +1285,7 @@ row_decompressor_close(RowDecompressor *decompressor)
{
FreeBulkInsertState(decompressor->bistate);
MemoryContextDelete(decompressor->per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor->indexstate);
CatalogCloseIndexes(decompressor->indexstate);
FreeExecutorState(decompressor->estate);
detoaster_close(&decompressor->detoaster);
}
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#pragma once

#include <postgres.h>
#include <catalog/indexing.h>
#include <executor/tuptable.h>
#include <fmgr.h>
#include <lib/stringinfo.h>
Expand Down Expand Up @@ -130,7 +131,7 @@ typedef struct RowDecompressor

TupleDesc out_desc;
Relation out_rel;
ResultRelInfo *indexstate;
CatalogIndexState indexstate;
EState *estate;

CommandId mycid;
Expand Down
Loading

0 comments on commit dcaefee

Please sign in to comment.