Skip to content

Commit

Permalink
Add dump/restore support for Hypercore TAM
Browse files Browse the repository at this point in the history
Add support for dumping and restoring hypertables that have chunks
that use the Hypercore TAM.

Dumping a Hypercore table requires special consideration because its
data is internally stored in two separate relations: one for
compressed data and one for non-compressed data. The TAM returns data
from both relations, but they may be dumped as separate tables. This
risks dumping the compressed data twice: once via the TAM and once via
the compressed table in compressed format.

The `pg_dump` tool uses `COPY TO` to create dumps of each table, and,
to avoid data duplication when used on Hypercore tables, this change
introduces a GUC that allows selecting one of these two behaviors:

1. A `COPY TO` on a Hypercore table returns all data via the TAM,
   including data stored in the compressed relation. A `COPY TO` on
   the internal compressed relation returns no data.

2. A `COPY TO` on a Hypercore returns only non-compressed data, while
   a `COPY TO` on the compressed relation returns compressed data. A
   `SELECT` still returns all the data as normal.

The second approach is the default because it is consistent with
compression when Hypercore TAM is not used. It will produce a
`pg_dump` archive that includes data in compressed form (if data was
compressed when dumped). Conversely, option (1) will produce an
archive that looks identical to a dump from an non-compressed table.

There are pros and cons of each dump format. A non-compressed archive
is a platform-agnostic logical dump that can be restored to any
platform and architecture, while a compressed archive includes data
that is compressed in a platform-dependent way and needs to be
restored to a compatible system.

A test is added that tests both these settings and corresponding
dumping and restoring.
  • Loading branch information
erimatnor committed Oct 17, 2024
1 parent 4316f2c commit ab64ed9
Show file tree
Hide file tree
Showing 12 changed files with 682 additions and 20 deletions.
23 changes: 23 additions & 0 deletions src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ static const struct config_enum_entry transparent_decompression_options[] = {
{ NULL, 0, false }
};

static const struct config_enum_entry hypercore_copy_to_options[] = {
{ "all_data", HYPERCORE_COPY_ALL_DATA, false },
{ "no_compressed_data", HYPERCORE_COPY_NO_COMPRESSED_DATA, false },
{ NULL, 0, false }
};

bool ts_guc_enable_deprecation_warnings = true;
bool ts_guc_enable_optimizations = true;
bool ts_guc_restoring = false;
Expand Down Expand Up @@ -147,6 +153,8 @@ bool ts_guc_enable_tss_callbacks = true;
TSDLLEXPORT bool ts_guc_enable_delete_after_compression = false;
TSDLLEXPORT bool ts_guc_enable_merge_on_cagg_refresh = false;
TSDLLEXPORT char *ts_guc_hypercore_indexam_whitelist;
TSDLLEXPORT HypercoreCopyToBehavior ts_guc_hypercore_copy_to_behavior =
HYPERCORE_COPY_NO_COMPRESSED_DATA;

/* default value of ts_guc_max_open_chunks_per_insert and
* ts_guc_max_cached_chunks_per_hypertable will be set as their respective boot-value when the
Expand All @@ -163,6 +171,7 @@ char *ts_last_tune_time = NULL;
char *ts_last_tune_version = NULL;

bool ts_guc_debug_require_batch_sorted_merge = false;

bool ts_guc_debug_allow_cagg_with_deprecated_funcs = false;

#ifdef TS_DEBUG
Expand Down Expand Up @@ -973,6 +982,20 @@ _guc_init(void)
/* assign_hook= */ NULL,
/* show_hook= */ NULL);

DefineCustomEnumVariable(MAKE_EXTOPTION("hypercore_copy_to_behavior"),
"The behavior of COPY TO on a hypercore table",
"Set to 'all_data' to return both compressed and uncompressed data "
"via the Hypercore table when using COPY TO. Set to "
"'no_compressed_data' to skip compressed data.",
/* valueAddr= */ (int *) &ts_guc_hypercore_copy_to_behavior,
/* bootValue= */ HYPERCORE_COPY_NO_COMPRESSED_DATA,
/* options= */ hypercore_copy_to_options,
/* context= */ PGC_USERSET,
0,
NULL,
NULL,
NULL);

#ifdef TS_DEBUG
DefineCustomBoolVariable(/* name= */ MAKE_EXTOPTION("shutdown_bgw_scheduler"),
/* short_desc= */ "immediately shutdown the bgw scheduler",
Expand Down
19 changes: 19 additions & 0 deletions src/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ extern TSDLLEXPORT bool ts_guc_debug_require_batch_sorted_merge;
extern TSDLLEXPORT bool ts_guc_debug_allow_cagg_with_deprecated_funcs;
extern TSDLLEXPORT char *ts_guc_hypercore_indexam_whitelist;

/*
* Defines the behavior of COPY TO when used on a Hypercore table.
*
* If set to COPY_ALL_DATA, all data is copied from a Hypercore table,
* including compressed data (but in uncompressed form) from the internal
* compressed relation. When doing a COPY TO on the internal compressed
* relation, no data is returned.
*
* If set to COPY_NO_COMPRESSED_DATA, then only uncompressed data is copied
* (if any). This behavior is compatible with compression without hypercore.
*/
typedef enum HypercoreCopyToBehavior
{
HYPERCORE_COPY_ALL_DATA,
HYPERCORE_COPY_NO_COMPRESSED_DATA,
} HypercoreCopyToBehavior;

extern TSDLLEXPORT HypercoreCopyToBehavior ts_guc_hypercore_copy_to_behavior;

void _guc_init(void);

typedef enum
Expand Down
3 changes: 2 additions & 1 deletion test/sql/utils/pg_dump_aux_dump.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
DUMPFILE=${DUMPFILE:-$1}
EXTRA_PGOPTIONS=${EXTRA_PGOPTIONS:-$2}
# Override PGOPTIONS to remove verbose output
PGOPTIONS='--client-min-messages=warning'
PGOPTIONS="--client-min-messages=warning $EXTRA_PGOPTIONS"

export PGOPTIONS

Expand Down
18 changes: 9 additions & 9 deletions tsl/src/compression/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1116,13 +1116,11 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples
TableScanDesc scan;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
Snapshot snapshot = GetLatestSnapshot();
ScanKeyData scankey = {
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. No actual scankey necessary */
.sk_flags = SK_NO_COMPRESSED,
};

scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, &scankey);
scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, NULL);
/* If scan is using Hypercore, configure the scan to only return
* compressed data */
hypercore_scan_set_skip_compressed(scan);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Expand Down Expand Up @@ -1189,10 +1187,12 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
}

snapshot = GetLatestSnapshot();
/* Let compression TAM know it should only return tuples from the
* non-compressed relation. */
scankey->sk_flags = SK_NO_COMPRESSED;

scan = table_beginscan(uncompressed_chunk_rel, snapshot, nsegbycols_nonnull, scankey);
/* If scan is using Hypercore, configure the scan to only return
* compressed data */
hypercore_scan_set_skip_compressed(scan);

TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);

while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
Expand Down
58 changes: 54 additions & 4 deletions tsl/src/hypercore/hypercore_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "compression/compression.h"
#include "compression/create.h"
#include "debug_assert.h"
#include "extension.h"
#include "guc.h"
#include "hypercore_handler.h"
#include "process_utility.h"
Expand All @@ -81,6 +82,20 @@ static void convert_to_hypercore_finish(Oid relid);
static List *partially_compressed_relids = NIL; /* Relids that needs to have
* updated status set at end of
* transaction */
/*
* For COPY <hypercore_rel> TO commands, track the relid of the hypercore
* being copied from. It is needed to filter out compressed data in the COPY
* scan so that pg_dump does not dump compressed data twice: once in
* uncompressed format via the hypercore rel and once in compressed format in
* the internal compressed rel that gets dumped separately.
*/
static Oid hypercore_skip_compressed_data_relid = InvalidOid;

void
hypercore_skip_compressed_data_for_relation(Oid relid)

Check warning on line 95 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L95

Added line #L95 was not covered by tests
{
hypercore_skip_compressed_data_relid = relid;

Check warning on line 97 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L97

Added line #L97 was not covered by tests
}

#define HYPERCORE_AM_INFO_SIZE(natts) \
(sizeof(HypercoreInfo) + (sizeof(ColumnCompressionSettings) * (natts)))
Expand Down Expand Up @@ -167,7 +182,7 @@ static HypercoreInfo *
lazy_build_hypercore_info_cache(Relation rel, bool create_chunk_constraints,
bool *compressed_relation_created)
{
Assert(OidIsValid(rel->rd_id) && !ts_is_hypertable(rel->rd_id));
Assert(OidIsValid(rel->rd_id) && (!ts_extension_is_loaded() || !ts_is_hypertable(rel->rd_id)));

HypercoreInfo *hsinfo;
CompressionSettings *settings;
Expand Down Expand Up @@ -372,6 +387,18 @@ static bool hypercore_getnextslot_noncompressed(HypercoreScanDesc scan, ScanDire
static bool hypercore_getnextslot_compressed(HypercoreScanDesc scan, ScanDirection direction,
TupleTableSlot *slot);

void
hypercore_scan_set_skip_compressed(TableScanDesc scan)
{
HypercoreScanDesc hscan;

if (scan->rs_rd->rd_tableam != hypercore_routine())
return;

hscan = (HypercoreScanDesc) scan;
hscan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;

Check warning on line 399 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L398-L399

Added lines #L398 - L399 were not covered by tests
}

#if PG17_GE
static int
compute_targrows(Relation rel)
Expand Down Expand Up @@ -468,6 +495,26 @@ get_scan_type(uint32 flags)
}
#endif

static inline bool
should_skip_compressed_data(const Relation rel)

Check warning on line 499 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L499

Added line #L499 was not covered by tests
{
/*
* Skip compressed data in a scan if any of these apply:
*
* 1. Transaparent decompression (DecompressChunk) is enabled for
* hypercore.
*
* 2. The scan was started with a flag indicating no compressed data
* should be returned.
*
* 3. A COPY <hypercore> TO <file> on the hypercore is executed and we
* want to ensure such commands issued by pg_dump doesn't lead to
* dumping compressed data twice.
*/
return (ts_guc_enable_transparent_decompression == 2) ||
RelationGetRelid(rel) == hypercore_skip_compressed_data_relid;
}

static TableScanDesc
hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey keys,
ParallelTableScanDesc parallel_scan, uint32 flags)
Expand Down Expand Up @@ -504,8 +551,7 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
HypercoreInfo *hsinfo = RelationGetHypercoreInfo(relation);
scan->compressed_rel = table_open(hsinfo->compressed_relid, AccessShareLock);

if ((ts_guc_enable_transparent_decompression == 2) ||
(keys && keys->sk_flags & SK_NO_COMPRESSED))
if (should_skip_compressed_data(relation))
{
/*
* Don't read compressed data if transparent decompression is enabled
Expand All @@ -514,7 +560,7 @@ hypercore_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key
* Transparent decompression reads compressed data itself, directly
* from the compressed chunk, so avoid reading it again here.
*/
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;
hypercore_scan_set_skip_compressed(&scan->rs_base);

Check warning on line 563 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L563

Added line #L563 was not covered by tests
}

initscan(scan, keys, nkeys);
Expand Down Expand Up @@ -606,6 +652,9 @@ hypercore_endscan(TableScanDesc sscan)
pfree(scan->rs_base.rs_key);

pfree(scan);

/* Clear the COPY TO filter state */
hypercore_skip_compressed_data_relid = InvalidOid;

Check warning on line 657 in tsl/src/hypercore/hypercore_handler.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/hypercore/hypercore_handler.c#L657

Added line #L657 was not covered by tests
}

static bool
Expand Down Expand Up @@ -3374,6 +3423,7 @@ hypercore_xact_event(XactEvent event, void *arg)
Ensure(OidIsValid(hsinfo->compressed_relid),
"hypercore \"%s\" has no compressed data relation",
get_rel_name(relid));

Chunk *chunk = ts_chunk_get_by_relid(relid, true);
ts_chunk_set_partial(chunk);
table_close(rel, NoLock);
Expand Down
7 changes: 2 additions & 5 deletions tsl/src/hypercore/hypercore_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@

#include "hypertable.h"

/* Scan key flag (skey.h) to indicate that a table scan should only return
* tuples from the non-compressed relation. Bits 16-31 are reserved for
* individual access methods, so use bit 16. */
#define SK_NO_COMPRESSED 0x8000

extern void hypercore_set_analyze_relid(Oid relid);
extern const TableAmRoutine *hypercore_routine(void);
extern void hypercore_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht);
extern void hypercore_alter_access_method_begin(Oid relid, bool to_other_am);
extern void hypercore_alter_access_method_finish(Oid relid, bool to_other_am);
extern Datum hypercore_handler(PG_FUNCTION_ARGS);
extern void hypercore_xact_event(XactEvent event, void *arg);
extern void hypercore_skip_compressed_data_for_relation(Oid relid);
extern void hypercore_scan_set_skip_compressed(TableScanDesc scan);

typedef struct ColumnCompressionSettings
{
Expand Down
19 changes: 19 additions & 0 deletions tsl/src/nodes/columnar_scan/columnar_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <nodes/execnodes.h>
#include <nodes/extensible.h>
#include <nodes/nodeFuncs.h>
#include <nodes/parsenodes.h>
#include <nodes/pathnodes.h>
#include <nodes/pg_list.h>
#include <optimizer/cost.h>
Expand All @@ -29,6 +30,7 @@
#include "columnar_scan.h"
#include "compression/arrow_c_data_interface.h"
#include "compression/compression.h"
#include "guc.h"
#include "hypercore/arrow_tts.h"
#include "hypercore/hypercore_handler.h"
#include "import/ts_explain.h"
Expand All @@ -52,6 +54,7 @@ typedef struct ColumnarScanState
List *scankey_quals;
List *vectorized_quals_orig;
SimpleProjInfo sprojinfo;
bool only_scan;
} ColumnarScanState;

static bool
Expand Down Expand Up @@ -425,6 +428,10 @@ columnar_scan_exec(CustomScanState *state)
cstate->nscankeys,
cstate->scankeys);
state->ss.ss_currentScanDesc = scandesc;

if (cstate->only_scan &&
(ts_guc_hypercore_copy_to_behavior == HYPERCORE_COPY_NO_COMPRESSED_DATA))
hypercore_scan_set_skip_compressed(scandesc);

Check warning on line 434 in tsl/src/nodes/columnar_scan/columnar_scan.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/columnar_scan/columnar_scan.c#L434

Added line #L434 was not covered by tests
}

/*
Expand Down Expand Up @@ -836,6 +843,7 @@ columnar_scan_state_create(CustomScan *cscan)
#if PG16_GE
cstate->css.slotOps = &TTSOpsArrowTuple;
#endif
cstate->only_scan = linitial_int(cscan->custom_private);

Check warning on line 846 in tsl/src/nodes/columnar_scan/columnar_scan.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/columnar_scan/columnar_scan.c#L846

Added line #L846 was not covered by tests

return (Node *) cstate;
}
Expand Down Expand Up @@ -897,6 +905,8 @@ columnar_scan_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *best_p
columnar_scan_plan->methods = &columnar_scan_plan_methods;
columnar_scan_plan->scan.scanrelid = rel->relid;

bool only_scan = (rel->reloptkind == RELOPT_BASEREL) && !ts_rte_is_marked_for_expansion(rte);
columnar_scan_plan->custom_private = list_make1_int(only_scan);

Check warning on line 909 in tsl/src/nodes/columnar_scan/columnar_scan.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/columnar_scan/columnar_scan.c#L909

Added line #L909 was not covered by tests
/* output target list */
columnar_scan_plan->scan.plan.targetlist = tlist;

Expand Down Expand Up @@ -989,6 +999,15 @@ columnar_scan_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *h
{
ColumnarScanPath *cspath;
Relids required_outer;
RangeTblEntry *rte = planner_rt_fetch(rel->relid, root);

/* If the rel is NOT marked for expansion, it means this is a SELECT *
* FROM ONLY query and it is necessary to always do a ColumnarScan even if
* it is disabled. Only ColumnarScan has the functionality to tell the TAM
* to only return non-compressed data. */
if (!ts_guc_enable_columnarscan && rel->reloptkind != RELOPT_BASEREL &&
!ts_rte_is_marked_for_expansion(rte))
return;

Check warning on line 1010 in tsl/src/nodes/columnar_scan/columnar_scan.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/columnar_scan/columnar_scan.c#L1010

Added line #L1010 was not covered by tests

/*
* We don't support pushing join clauses into the quals of a seqscan, but
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ tsl_set_rel_pathlist_query(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeT
*/
else if (ts_is_hypercore_am(chunk->amoid))
{
if (ts_guc_enable_columnarscan)
if (ts_guc_enable_transparent_decompression != 2)
columnar_scan_set_rel_pathlist(root, rel, ht);

hypercore_set_rel_pathlist(root, rel, ht);
Expand Down
Loading

0 comments on commit ab64ed9

Please sign in to comment.