Skip to content

Commit

Permalink
Vacuum large blocks (#220)
Browse files Browse the repository at this point in the history
* [columnar] Vacuum stripes until memory limit is reached

* Limit stripe vacuuming unitl 1GB decompressed size is reached
* Output relation stats (in vacuum) only if VERBOSE flag is set

* Add test for vacuum completion

---------

Co-authored-by: mkaruza <[email protected]>
  • Loading branch information
JerrySievert and mkaruza authored Jan 9, 2024
1 parent cb89681 commit 9cf098a
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 6 deletions.
49 changes: 48 additions & 1 deletion columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,8 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,

if (tupleChunkGroupIndex > chunkGroupCount)
{
elog(ERROR, "Tuple chunk group higher than chunk group count");
elog(WARNING, "Tuple chunk group higher than chunk group count: %d, %d (storage_id = %ld, stripe_id = %ld)", tupleChunkGroupIndex, chunkGroupCount, UInt64GetDatum(storageId), Int64GetDatum(stripe));
tupleChunkGroupIndex = chunkGroupCount;
}

(*chunkGroupRowCounts)[tupleChunkGroupIndex] =
Expand Down Expand Up @@ -1561,6 +1562,52 @@ DeletedRowsForStripe(RelFileLocator relfilelocator, uint32 chunkCount, uint64 st
return deletedRows;
}

/*
* DecompressedLengthForStripe returns total size of all decompressed rows and chunk
* for given stripe
*/
Size
DecompressedLengthForStripe(RelFileLocator relfilelocator, uint64 stripeId)
{
HeapTuple heapTuple = NULL;
ScanKeyData scanKey[2];

uint64 storageId = LookupStorageId(relfilelocator);

Oid columnarChunkOid = ColumnarChunkRelationId();
Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock);
Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock);

ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid,
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
BTEqualStrategyNumber, F_OIDEQ, Int64GetDatum(stripeId));

SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index,
GetTransactionSnapshot(), 2, scanKey);

Size decompressedChunkSize = 0;

while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
{
Datum datumArray[Natts_columnar_chunk];
bool isNullArray[Natts_columnar_chunk];

heap_deform_tuple(heapTuple, RelationGetDescr(columnarChunk), datumArray,
isNullArray);

decompressedChunkSize +=
DatumGetInt64(datumArray[Anum_columnar_chunk_value_decompressed_size - 1]);
}

systable_endscan_ordered(scanDescriptor);
index_close(index, AccessShareLock);
table_close(columnarChunk, AccessShareLock);

return decompressedChunkSize;
}


/*
* GetHighestUsedAddress returns the highest used address for the given
Expand Down
26 changes: 25 additions & 1 deletion columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -1459,19 +1459,30 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)

uint32 lastStripeDeletedRows = 0;

Size totalDecompressedStripeLength = 0;

foreach(lc, stripeMetadataList)
{
StripeMetadata * stripeMetadata = lfirst(lc);
#if PG_VERSION_NUM >= PG_VERSION_16
lastStripeDeletedRows = DeletedRowsForStripe(rel->rd_locator,
stripeMetadata->chunkCount,
stripeMetadata->id);
totalDecompressedStripeLength +=
DecompressedLengthForStripe(rel->rd_locator, stripeMetadata->id);
#else
lastStripeDeletedRows = DeletedRowsForStripe(rel->rd_node,
stripeMetadata->chunkCount,
stripeMetadata->id);
totalDecompressedStripeLength +=
DecompressedLengthForStripe(rel->rd_node, stripeMetadata->id);
#endif

if (totalDecompressedStripeLength >= 1024000000)
{
break;
}

uint64 stripeRowCount = stripeMetadata->rowCount - lastStripeDeletedRows;

if ((totalRowNumberCount + stripeRowCount >= columnarOptions.stripeRowCount))
Expand Down Expand Up @@ -1651,7 +1662,10 @@ columnar_vacuum_rel(Relation rel, VacuumParams *params,
/* this should have been resolved by vacuum.c until now */
Assert(params->truncate != VACOPTVALUE_UNSPECIFIED);

LogRelationStats(rel, elevel);
if (params->options & VACOPT_VERBOSE)
{
LogRelationStats(rel, elevel);
}

/*
* We don't have updates, deletes, or concurrent updates, so all we
Expand Down Expand Up @@ -1789,6 +1803,12 @@ LogRelationStats(Relation rel, int elevel)
List *stripeList = StripesForRelfilenode(relfilelocator, ForwardScanDirection);
int stripeCount = list_length(stripeList);

MemoryContext relation_stats_ctx =
AllocSetContextCreate(CurrentMemoryContext, "Vacuum Relation Stats Context",
ALLOCSET_SMALL_SIZES);

MemoryContext oldcontext = MemoryContextSwitchTo(relation_stats_ctx);

foreach(stripeMetadataCell, stripeList)
{
StripeMetadata *stripe = lfirst(stripeMetadataCell);
Expand Down Expand Up @@ -1827,8 +1847,12 @@ LogRelationStats(Relation rel, int elevel)

tupleCount += stripe->rowCount;
totalStripeLength += stripe->dataLength;

MemoryContextReset(relation_stats_ctx);
}

MemoryContextSwitchTo(oldcontext);

if (unlikely(rel->rd_smgr == NULL))
{
#if PG_VERSION_NUM >= PG_VERSION_16
Expand Down
60 changes: 57 additions & 3 deletions columnar/src/backend/columnar/columnar_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "storage/smgr.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/rel.h"

#if PG_VERSION_NUM >= PG_VERSION_16
Expand Down Expand Up @@ -148,7 +149,6 @@ ColumnarBeginWrite(RelFileLocator relfilelocator,
return writeState;
}


/*
* ColumnarWriteRow adds a row to the columnar table. If the stripe is not initialized,
* we create structures to hold stripe data and skip list. Then, we serialize and
Expand All @@ -171,6 +171,58 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
ChunkData *chunkData = writeState->chunkData;
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);

uint32 chunkIndex;
uint32 chunkRowIndex;

if (stripeBuffers)
{
chunkIndex = stripeBuffers->rowCount / chunkRowCount;
chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;
/*
* For each column, we first need to check to see if the next row will fit
* inside the chunk buffer. If it does not fit, then we need to serialize
* the stripe and make a new stripe for insertion.
*/
bool fits = true;

for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{

/* Check for nulls, skip if null. */
if (columnNulls[columnIndex])
{
continue;
}

Form_pg_attribute attributeForm =
TupleDescAttr(writeState->tupleDescriptor, columnIndex);

int columnTypeLength = attributeForm->attlen;
char columnTypeAlign = attributeForm->attalign;

uint32 datumLength = att_addlength_datum(0, columnTypeLength, columnValues[columnIndex]);
uint32 datumLengthAligned = att_align_nominal(datumLength, columnTypeAlign);

/* Check to see if we are within the 1 gigabyte value. */
if ((long) chunkData->valueBufferArray[columnIndex]->len + (long) datumLengthAligned > 1024000000)
{
fits = false;
break;
}
}

if (!fits)
{
/* Flush the stripe. */
ColumnarFlushPendingWrites(writeState);

/* Then set up for new stripeBuffers. */
stripeBuffers = NULL;

chunkData->rowCount = 0;
}
}

if (stripeBuffers == NULL)
{
stripeBuffers = CreateEmptyStripeBuffers(options->stripeRowCount,
Expand Down Expand Up @@ -204,8 +256,9 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
}
}

uint32 chunkIndex = stripeBuffers->rowCount / chunkRowCount;
uint32 chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;
chunkIndex = stripeBuffers->rowCount / chunkRowCount;
chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;


for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Expand Down Expand Up @@ -642,6 +695,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row
bool compressed = CompressBuffer(serializedValueBuffer, compressionBuffer,
requestedCompressionType,
compressionLevel);

if (compressed)
{
serializedValueBuffer = compressionBuffer;
Expand Down
1 change: 1 addition & 0 deletions columnar/src/include/columnar/columnar_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ extern List * StripesForRelfilenode(RelFileLocator relfilelocator, ScanDirection
extern uint32 DeletedRowsForStripe(RelFileLocator relfilelocator,
uint32 chunkCount,
uint64 stripeId);
extern Size DecompressedLengthForStripe(RelFileLocator relfilelocator, uint64 stripeId);
extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade);
extern StripeMetadata * RewriteStripeMetadataRowWithNewValues(Relation rel, uint64 stripeId,
uint64 sizeBytes, uint64 fileOffset, uint64 rowCount, uint64 chunkCount);
Expand Down
7 changes: 7 additions & 0 deletions columnar/src/test/regress/expected/columnar_vacuum.out
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,10 @@ SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE sto
(1 row)

DROP TABLE t;
-- Verify that we can vacuum humongous fields
CREATE TABLE t (id SERIAL, data TEXT) USING columnar;
INSERT INTO t SELECT 1, repeat('a', 1000000000);
INSERT INTO t SELECT 2, repeat('b', 1000000000);
INSERT INTO t SELECT 3, repeat('c', 1000000000);
VACUUM t;
DROP TABLE t;
12 changes: 11 additions & 1 deletion columnar/src/test/regress/sql/columnar_vacuum.sql
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,14 @@ VACUUM t;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE storage_id = :t_oid;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE storage_id = :t_oid;

DROP TABLE t;
DROP TABLE t;

-- Verify that we can vacuum humongous fields
CREATE TABLE t (id SERIAL, data TEXT) USING columnar;
INSERT INTO t SELECT 1, repeat('a', 1000000000);
INSERT INTO t SELECT 2, repeat('b', 1000000000);
INSERT INTO t SELECT 3, repeat('c', 1000000000);

VACUUM t;

DROP TABLE t;

0 comments on commit 9cf098a

Please sign in to comment.