diff --git a/src/nodes/chunk_dispatch/chunk_insert_state.c b/src/nodes/chunk_dispatch/chunk_insert_state.c index 3ef810d1a63..7738994ba6a 100644 --- a/src/nodes/chunk_dispatch/chunk_insert_state.c +++ b/src/nodes/chunk_dispatch/chunk_insert_state.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -28,10 +29,10 @@ #include "errors.h" #include "chunk_dispatch.h" #include "chunk_insert_state.h" +#include "debug_point.h" #include "ts_catalog/continuous_agg.h" #include "chunk_index.h" #include "indexing.h" -#include /* Just like ExecPrepareExpr except that it doesn't switch to the query memory context */ static inline ExprState * @@ -582,6 +583,7 @@ ts_chunk_insert_state_create(const Chunk *chunk, ChunkDispatch *dispatch) ts_chunk_validate_chunk_status_for_operation(chunk, CHUNK_INSERT, true); + DEBUG_WAITPOINT("chunk_insert_before_lock"); rel = table_open(chunk->table_id, RowExclusiveLock); MemoryContext old_mcxt = MemoryContextSwitchTo(cis_context); diff --git a/tsl/test/isolation/expected/compression_recompress.out b/tsl/test/isolation/expected/compression_recompress.out index c3a6b163830..29ad88bb01c 100644 --- a/tsl/test/isolation/expected/compression_recompress.out +++ b/tsl/test/isolation/expected/compression_recompress.out @@ -1,4 +1,4 @@ -Parsed test spec with 2 sessions +Parsed test spec with 3 sessions starting permutation: s2_block_on_compressed_chunk_size s1_begin s1_recompress_chunk s2_select_from_compressed_chunk s2_wait_for_select_to_finish s2_unblock s1_rollback step s2_block_on_compressed_chunk_size: @@ -35,3 +35,100 @@ recompress step s1_rollback: ROLLBACK; + +starting permutation: s1_compress s3_block_chunk_insert s2_insert s1_decompress s1_compress s3_release_chunk_insert +step s1_compress: + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM sensor_data; + +compression_status +------------------ +Compressed +(1 row) + +s1: NOTICE: chunk "_hyper_X_X_chunk" is already compressed +count +----- + 1 +(1 row) + +compression_status +------------------ +Compressed +(1 row) + +count +----- +15840 +(1 row) + +step s3_block_chunk_insert: + SELECT debug_waitpoint_enable('chunk_insert_before_lock'); + +debug_waitpoint_enable +---------------------- + +(1 row) + +step s2_insert: + INSERT INTO sensor_data VALUES ('2022-01-01 20:00'::timestamptz, 1, 1.0, 1.0), ('2022-01-01 21:00'::timestamptz, 2, 2.0, 2.0) ON CONFLICT (time, sensor_id) DO NOTHING; + +s1: NOTICE: decompressing chunk _hyper_X_X_chunk compress_hyper_X_X_chunk +step s1_decompress: + SELECT count(*) FROM (SELECT decompress_chunk(i) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM sensor_data; + +count +----- + 1 +(1 row) + +compression_status +------------------ +Uncompressed +(1 row) + +count +----- +15840 +(1 row) + +step s1_compress: + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM sensor_data; + +compression_status +------------------ +Uncompressed +(1 row) + +count +----- + 1 +(1 row) + +compression_status +------------------ +Compressed +(1 row) + +count +----- +15840 +(1 row) + +step s3_release_chunk_insert: + SELECT debug_waitpoint_release('chunk_insert_before_lock'); + +debug_waitpoint_release +----------------------- + +(1 row) + +step s2_insert: <... completed> +ERROR: chunk not found diff --git a/tsl/test/isolation/specs/compression_recompress.spec b/tsl/test/isolation/specs/compression_recompress.spec index c3207512584..64c15c69c27 100644 --- a/tsl/test/isolation/specs/compression_recompress.spec +++ b/tsl/test/isolation/specs/compression_recompress.spec @@ -24,8 +24,10 @@ setup { generate_series(1, 5, 1) AS g2(sensor_id) ORDER BY time; + CREATE UNIQUE INDEX ON sensor_data (time, sensor_id); + ALTER TABLE sensor_data SET ( - timescaledb.compress, + timescaledb.compress, timescaledb.compress_segmentby = 'sensor_id', timescaledb.compress_orderby = 'time'); @@ -59,6 +61,19 @@ step "s1_rollback" { ROLLBACK; } +step "s1_compress" { + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM (SELECT compress_chunk(i, if_not_compressed => true) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM sensor_data; +} + +step "s1_decompress" { + SELECT count(*) FROM (SELECT decompress_chunk(i) FROM show_chunks('sensor_data') i) i; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + SELECT count(*) FROM sensor_data; +} + session "s2" ## locking up the catalog table will block the recompression from releasing the index lock ## we should not be deadlocking since the index lock has been reduced to ExclusiveLock @@ -76,5 +91,21 @@ step "s2_select_from_compressed_chunk" { step "s2_wait_for_select_to_finish" { } +step "s2_insert" { + INSERT INTO sensor_data VALUES ('2022-01-01 20:00'::timestamptz, 1, 1.0, 1.0), ('2022-01-01 21:00'::timestamptz, 2, 2.0, 2.0) ON CONFLICT (time, sensor_id) DO NOTHING; +} + +session "s3" + +step "s3_block_chunk_insert" { + SELECT debug_waitpoint_enable('chunk_insert_before_lock'); +} + +step "s3_release_chunk_insert" { + SELECT debug_waitpoint_release('chunk_insert_before_lock'); +} + permutation "s2_block_on_compressed_chunk_size" "s1_begin" "s1_recompress_chunk" "s2_select_from_compressed_chunk" "s2_wait_for_select_to_finish" "s2_unblock" "s1_rollback" + +permutation "s1_compress" "s3_block_chunk_insert" "s2_insert" "s1_decompress" "s1_compress" "s3_release_chunk_insert"