Skip to content

Commit

Permalink
[columnar] Running simple Vacuum after INSERT in a columnar table mak…
Browse files Browse the repository at this point in the history
…es it give random errors

* Clear vacuum process status flag
* When deleting metadata tables also use stripe id as scan condition
  • Loading branch information
mkaruza committed Apr 27, 2023
1 parent d306a93 commit 3ef7cf5
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 60 deletions.
17 changes: 8 additions & 9 deletions columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
}

FinishModifyRelation(modifyState);
table_close(columnarChunkGroup, NoLock);
table_close(columnarChunkGroup, RowExclusiveLock);
}


Expand Down Expand Up @@ -663,7 +663,7 @@ SaveEmptyRowMask(uint64 storageId, uint64 stripeId,
}

FinishModifyRelation(modifyState);
table_close(columnarRowMask, NoLock);
table_close(columnarRowMask, RowExclusiveLock);

return chunkInserted;
}
Expand Down Expand Up @@ -1960,20 +1960,19 @@ DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], storageIdAtrrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[0], stripeIdAttrNumber, BTEqualStrategyNumber,
ScanKeyInit(&scanKey[1], stripeIdAttrNumber, BTEqualStrategyNumber,
F_INT8EQ, UInt64GetDatum(stripeId));

Relation metadataTable = try_relation_open(metadataTableId, AccessShareLock);
Relation metadataTable = try_relation_open(metadataTableId, RowShareLock);
if (metadataTable == NULL)
{
/* extension has been dropped */
return;
}

Relation index = index_open(storageIdIndexId, AccessShareLock);

Relation index = index_open(storageIdIndexId, RowShareLock);
SysScanDesc scanDescriptor = systable_beginscan_ordered(metadataTable, index, NULL,
1, scanKey);
2, scanKey);

ModifyState *modifyState = StartModifyRelation(metadataTable);

Expand All @@ -1988,8 +1987,8 @@ DeleteStripeFromColumnarMetadataTable(Oid metadataTableId,

FinishModifyRelation(modifyState);

index_close(index, AccessShareLock);
table_close(metadataTable, AccessShareLock);
index_close(index, RowShareLock);
table_close(metadataTable, RowShareLock);
}


Expand Down
6 changes: 3 additions & 3 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,11 @@ ColumnarReadRowByRowNumber(ColumnarReadState *readState,
}

/*
* ColumnaSetStripeReadState
* ColumnarSetStripeReadState
*/
bool
ColumnaSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata)
ColumnarSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata)
{
if (!ColumnarReadIsCurrentStripe(readState, startStripeMetadata->firstRowNumber))
{
Expand Down
25 changes: 20 additions & 5 deletions columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
Expand Down Expand Up @@ -1200,6 +1201,20 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)
}
}

/*
* We need to clear current proces (VACUUUM) status flag here. Why?
* Current process has flag `PROC_IN_VACUUM` which is problematic because
* we write to from here to other metadata heap tables from here. If concurrent process
* read page in which we inserted tuples these tuples will be considered
* DEAD and will be removed. Concurrent process, when snapshot is constructed, will
* calculate RecentXmin but will not consider this process because of `PROC_IN_VACUUM`
* flag. It looks that invalidating status flag here doesn't affect execution.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyProc->statusFlags = 0;
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
LWLockRelease(ProcArrayLock);

ColumnarWriteState *writeState = ColumnarBeginWrite(rel->rd_node,
columnarOptions,
tupleDesc);
Expand All @@ -1222,8 +1237,8 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)
randomAccess,
NULL);

ColumnaSetStripeReadState(readState,
list_nth(stripeMetadataList, startingStripeListPosition - 1));
ColumnarSetStripeReadState(readState,
list_nth(stripeMetadataList, startingStripeListPosition - 1));

Datum *values = palloc0(tupleDesc->natts * sizeof(Datum));
bool *nulls = palloc0(tupleDesc->natts * sizeof(bool));
Expand All @@ -1249,15 +1264,15 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)

ColumnarStorageTruncate(rel, newDataReservation);

ColumnarEndWrite(writeState);
ColumnarEndRead(readState);

for (int i = 0; i < startingStripeListPosition; i++)
{
StripeMetadata *metadata = list_nth(stripeMetadataList, i);
DeleteMetadataRowsForStripeId(rel->rd_node, metadata->id);
}

ColumnarEndWrite(writeState);
ColumnarEndRead(readState);

return true;
}

Expand Down
4 changes: 2 additions & 2 deletions columnar/src/include/columnar/columnar.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ extern void ColumnarReadRowByRowNumberOrError(ColumnarReadState *readState,
extern bool ColumnarReadRowByRowNumber(ColumnarReadState *readState,
uint64 rowNumber, Datum *columnValues,
bool *columnNulls);
extern bool ColumnaSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata);
extern bool ColumnarSetStripeReadState(ColumnarReadState *readState,
StripeMetadata *startStripeMetadata);

/* Function declarations for common functions */
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
Expand Down
47 changes: 27 additions & 20 deletions columnar/src/test/regress/expected/columnar_vacuum.out
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,10 @@ SET columnar.compression TO default;
SET columnar.stripe_row_limit TO default;
SET columnar.chunk_group_row_limit TO default;
CREATE TABLE t(a INT, b INT) USING columnar;
SELECT columnar_test_helpers.columnar_relation_storageid(pg_class.oid) AS t_oid FROM pg_class WHERE relname='t' \gset
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
SELECT COUNT(*) = 2 FROM columnar.stripe;
SELECT COUNT(*) = 2 FROM columnar.stripe WHERE storage_id = :t_oid;
?column?
----------
t
Expand All @@ -305,20 +306,20 @@ SELECT COUNT(*) = 2 FROM columnar.stripe;
VACUUM t;
-- No change since we can't combine 2 stripe because row_number is higher
-- than maximum row number per stripe
SELECT COUNT(*) = 2 FROM columnar.stripe;
SELECT COUNT(*) = 2 FROM columnar.stripe WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

DELETE FROM t WHERE a % 2 = 0;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
Expand All @@ -327,19 +328,19 @@ SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0;
VACUUM t;
-- Stripes are merged into one stripe because total number of non-deleted rows
-- is less than maximum stripe row number
SELECT COUNT(*) = 1 FROM columnar.stripe;
SELECT COUNT(*) = 1 FROM columnar.stripe WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) FROM columnar.chunk_group WHERE deleted_rows = 0 AND storage_id = :t_oid;
count
-------
10
(1 row)

SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0 AND storage_id = :t_oid;
count
-------
10
Expand All @@ -348,18 +349,22 @@ SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0;
DROP TABLE t;
-- Vacuum on single stripe
CREATE TABLE t(a INT, b INT) USING columnar;
SELECT columnar_test_helpers.columnar_relation_storageid(pg_class.oid) AS t_oid FROM pg_class WHERE relname='t' \gset
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
SELECT COUNT(*) = (SELECT COUNT(*) FROM columnar.row_mask) FROM columnar.chunk_group;
SELECT COUNT(*) = (SELECT COUNT(*) FROM columnar.row_mask WHERE storage_id = :t_oid)
FROM columnar.chunk_group WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) AS columnar_chunk_group_rows FROM columnar.chunk_group \gset
SELECT COUNT(*) AS columnar_row_mask_rows FROM columnar.row_mask \gset
SELECT COUNT(*) AS columnar_chunk_group_rows FROM columnar.chunk_group WHERE storage_id = :t_oid \gset
SELECT COUNT(*) AS columnar_row_mask_rows FROM columnar.row_mask WHERE storage_id = :t_oid \gset
DELETE FROM t WHERE a % 2 = 0;
SELECT COUNT(*) AS columnar_chunk_group_after_delete_rows FROM columnar.chunk_group WHERE deleted_rows >= 1 \gset
SELECT COUNT(*) AS columnar_row_mask_after_delete_rows FROM columnar.row_mask WHERE deleted_rows >= 1 \gset
SELECT COUNT(*) AS columnar_chunk_group_after_delete_rows FROM columnar.chunk_group
WHERE deleted_rows >= 1 AND storage_id = :t_oid \gset
SELECT COUNT(*) AS columnar_row_mask_after_delete_rows FROM columnar.row_mask
WHERE deleted_rows >= 1 AND storage_id = :t_oid \gset
SELECT :columnar_chunk_group_after_delete_rows = :columnar_chunk_group_rows;
?column?
----------
Expand All @@ -372,39 +377,41 @@ SELECT :columnar_row_mask_after_delete_rows = :columnar_row_mask_rows;
t
(1 row)

SELECT SUM(deleted_rows) FROM columnar.row_mask;
SELECT SUM(deleted_rows) FROM columnar.row_mask WHERE storage_id = :t_oid;
sum
-------
50000
(1 row)

SELECT SUM(deleted_rows) FROM columnar.chunk_group;
SELECT SUM(deleted_rows) FROM columnar.chunk_group WHERE storage_id = :t_oid;
sum
-------
50000
(1 row)

VACUUM t;
-- No more deleted_rows after vacuum
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows >= 1;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows >= 1 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows >= 1;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows >= 1 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group
WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask
WHERE deleted_rows = 0 AND storage_id = :t_oid;
?column?
----------
t
Expand All @@ -421,13 +428,13 @@ SELECT (:table_count_mod_7 / :table_count) < 0.2;
DELETE FROM t WHERE a % 7 = 0;
VACUUM t;
-- Vacuuming will not be done because number of deleted rows / total_rows is less than 20%
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE storage_id = :t_oid;
?column?
----------
t
(1 row)

SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask ;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE storage_id = :t_oid;
?column?
----------
t
Expand Down
50 changes: 29 additions & 21 deletions columnar/src/test/regress/sql/columnar_vacuum.sql
Original file line number Diff line number Diff line change
Expand Up @@ -156,69 +156,77 @@ SET columnar.compression TO default;
SET columnar.stripe_row_limit TO default;
SET columnar.chunk_group_row_limit TO default;


CREATE TABLE t(a INT, b INT) USING columnar;

SELECT columnar_test_helpers.columnar_relation_storageid(pg_class.oid) AS t_oid FROM pg_class WHERE relname='t' \gset

INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;
INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;

SELECT COUNT(*) = 2 FROM columnar.stripe;
SELECT COUNT(*) = 2 FROM columnar.stripe WHERE storage_id = :t_oid;

VACUUM t;

-- No change since we can't combine 2 stripe because row_number is higher
-- than maximum row number per stripe

SELECT COUNT(*) = 2 FROM columnar.stripe;
SELECT COUNT(*) = 2 FROM columnar.stripe WHERE storage_id = :t_oid;

DELETE FROM t WHERE a % 2 = 0;

SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows = 0 AND storage_id = :t_oid;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows = 0 AND storage_id = :t_oid;

VACUUM t;

-- Stripes are merged into one stripe because total number of non-deleted rows
-- is less than maximum stripe row number

SELECT COUNT(*) = 1 FROM columnar.stripe;
SELECT COUNT(*) = 1 FROM columnar.stripe WHERE storage_id = :t_oid;

SELECT COUNT(*) FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) FROM columnar.chunk_group WHERE deleted_rows = 0 AND storage_id = :t_oid;
SELECT COUNT(*) FROM columnar.row_mask WHERE deleted_rows = 0 AND storage_id = :t_oid;

DROP TABLE t;

-- Vacuum on single stripe

CREATE TABLE t(a INT, b INT) USING columnar;

SELECT columnar_test_helpers.columnar_relation_storageid(pg_class.oid) AS t_oid FROM pg_class WHERE relname='t' \gset

INSERT INTO t SELECT g, g % 10 from generate_series(1,100000) g;

SELECT COUNT(*) = (SELECT COUNT(*) FROM columnar.row_mask) FROM columnar.chunk_group;
SELECT COUNT(*) = (SELECT COUNT(*) FROM columnar.row_mask WHERE storage_id = :t_oid)
FROM columnar.chunk_group WHERE storage_id = :t_oid;

SELECT COUNT(*) AS columnar_chunk_group_rows FROM columnar.chunk_group \gset
SELECT COUNT(*) AS columnar_row_mask_rows FROM columnar.row_mask \gset
SELECT COUNT(*) AS columnar_chunk_group_rows FROM columnar.chunk_group WHERE storage_id = :t_oid \gset
SELECT COUNT(*) AS columnar_row_mask_rows FROM columnar.row_mask WHERE storage_id = :t_oid \gset

DELETE FROM t WHERE a % 2 = 0;

SELECT COUNT(*) AS columnar_chunk_group_after_delete_rows FROM columnar.chunk_group WHERE deleted_rows >= 1 \gset
SELECT COUNT(*) AS columnar_row_mask_after_delete_rows FROM columnar.row_mask WHERE deleted_rows >= 1 \gset
SELECT COUNT(*) AS columnar_chunk_group_after_delete_rows FROM columnar.chunk_group
WHERE deleted_rows >= 1 AND storage_id = :t_oid \gset
SELECT COUNT(*) AS columnar_row_mask_after_delete_rows FROM columnar.row_mask
WHERE deleted_rows >= 1 AND storage_id = :t_oid \gset

SELECT :columnar_chunk_group_after_delete_rows = :columnar_chunk_group_rows;
SELECT :columnar_row_mask_after_delete_rows = :columnar_row_mask_rows;

SELECT SUM(deleted_rows) FROM columnar.row_mask;
SELECT SUM(deleted_rows) FROM columnar.chunk_group;
SELECT SUM(deleted_rows) FROM columnar.row_mask WHERE storage_id = :t_oid;
SELECT SUM(deleted_rows) FROM columnar.chunk_group WHERE storage_id = :t_oid;

VACUUM t;

-- No more deleted_rows after vacuum

SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows >= 1;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows >= 1;
SELECT COUNT(*) = 0 FROM columnar.chunk_group WHERE deleted_rows >= 1 AND storage_id = :t_oid;
SELECT COUNT(*) = 0 FROM columnar.row_mask WHERE deleted_rows >= 1 AND storage_id = :t_oid;

SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE deleted_rows = 0;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE deleted_rows = 0;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group
WHERE deleted_rows = 0 AND storage_id = :t_oid;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask
WHERE deleted_rows = 0 AND storage_id = :t_oid;

SELECT COUNT(*) AS table_count FROM t \gset
SELECT COUNT(*) AS table_count_mod_7 FROM t WHERE a % 7 = 0 \gset
Expand All @@ -231,7 +239,7 @@ VACUUM t;

-- Vacuuming will not be done because number of deleted rows / total_rows is less than 20%

SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask ;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE storage_id = :t_oid;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE storage_id = :t_oid;

DROP TABLE t;

0 comments on commit 3ef7cf5

Please sign in to comment.