Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vacuum large blocks #220

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion columnar/src/backend/columnar/columnar_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,8 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount,

if (tupleChunkGroupIndex > chunkGroupCount)
{
elog(ERROR, "Tuple chunk group higher than chunk group count");
elog(WARNING, "Tuple chunk group higher than chunk group count: %d, %d (storage_id = %ld, stripe_id = %ld)", tupleChunkGroupIndex, chunkGroupCount, UInt64GetDatum(storageId), Int64GetDatum(stripe));
tupleChunkGroupIndex = chunkGroupCount;
}

(*chunkGroupRowCounts)[tupleChunkGroupIndex] =
Expand Down Expand Up @@ -1561,6 +1562,52 @@ DeletedRowsForStripe(RelFileLocator relfilelocator, uint32 chunkCount, uint64 st
return deletedRows;
}

/*
* DecompressedLengthForStripe returns total size of all decompressed rows and chunk
* for given stripe
*/
Size
DecompressedLengthForStripe(RelFileLocator relfilelocator, uint64 stripeId)
{
HeapTuple heapTuple = NULL;
ScanKeyData scanKey[2];

uint64 storageId = LookupStorageId(relfilelocator);

Oid columnarChunkOid = ColumnarChunkRelationId();
Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock);
Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock);

ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid,
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
BTEqualStrategyNumber, F_OIDEQ, Int64GetDatum(stripeId));

SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index,
GetTransactionSnapshot(), 2, scanKey);

Size decompressedChunkSize = 0;

while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection)))
{
Datum datumArray[Natts_columnar_chunk];
bool isNullArray[Natts_columnar_chunk];

heap_deform_tuple(heapTuple, RelationGetDescr(columnarChunk), datumArray,
isNullArray);

decompressedChunkSize +=
DatumGetInt64(datumArray[Anum_columnar_chunk_value_decompressed_size - 1]);
}

systable_endscan_ordered(scanDescriptor);
index_close(index, AccessShareLock);
table_close(columnarChunk, AccessShareLock);

return decompressedChunkSize;
}


/*
* GetHighestUsedAddress returns the highest used address for the given
Expand Down
26 changes: 25 additions & 1 deletion columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -1459,19 +1459,30 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel)

uint32 lastStripeDeletedRows = 0;

Size totalDecompressedStripeLength = 0;

foreach(lc, stripeMetadataList)
{
StripeMetadata * stripeMetadata = lfirst(lc);
#if PG_VERSION_NUM >= PG_VERSION_16
lastStripeDeletedRows = DeletedRowsForStripe(rel->rd_locator,
stripeMetadata->chunkCount,
stripeMetadata->id);
totalDecompressedStripeLength +=
DecompressedLengthForStripe(rel->rd_locator, stripeMetadata->id);
#else
lastStripeDeletedRows = DeletedRowsForStripe(rel->rd_node,
stripeMetadata->chunkCount,
stripeMetadata->id);
totalDecompressedStripeLength +=
DecompressedLengthForStripe(rel->rd_node, stripeMetadata->id);
#endif

if (totalDecompressedStripeLength >= 1024000000)
{
break;
}

uint64 stripeRowCount = stripeMetadata->rowCount - lastStripeDeletedRows;

if ((totalRowNumberCount + stripeRowCount >= columnarOptions.stripeRowCount))
Expand Down Expand Up @@ -1651,7 +1662,10 @@ columnar_vacuum_rel(Relation rel, VacuumParams *params,
/* this should have been resolved by vacuum.c until now */
Assert(params->truncate != VACOPTVALUE_UNSPECIFIED);

LogRelationStats(rel, elevel);
if (params->options & VACOPT_VERBOSE)
{
LogRelationStats(rel, elevel);
}

/*
* We don't have updates, deletes, or concurrent updates, so all we
Expand Down Expand Up @@ -1789,6 +1803,12 @@ LogRelationStats(Relation rel, int elevel)
List *stripeList = StripesForRelfilenode(relfilelocator, ForwardScanDirection);
int stripeCount = list_length(stripeList);

MemoryContext relation_stats_ctx =
AllocSetContextCreate(CurrentMemoryContext, "Vacuum Relation Stats Context",
ALLOCSET_SMALL_SIZES);

MemoryContext oldcontext = MemoryContextSwitchTo(relation_stats_ctx);

foreach(stripeMetadataCell, stripeList)
{
StripeMetadata *stripe = lfirst(stripeMetadataCell);
Expand Down Expand Up @@ -1827,8 +1847,12 @@ LogRelationStats(Relation rel, int elevel)

tupleCount += stripe->rowCount;
totalStripeLength += stripe->dataLength;

MemoryContextReset(relation_stats_ctx);
}

MemoryContextSwitchTo(oldcontext);

if (unlikely(rel->rd_smgr == NULL))
{
#if PG_VERSION_NUM >= PG_VERSION_16
Expand Down
60 changes: 57 additions & 3 deletions columnar/src/backend/columnar/columnar_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "storage/smgr.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/rel.h"

#if PG_VERSION_NUM >= PG_VERSION_16
Expand Down Expand Up @@ -148,7 +149,6 @@ ColumnarBeginWrite(RelFileLocator relfilelocator,
return writeState;
}


/*
* ColumnarWriteRow adds a row to the columnar table. If the stripe is not initialized,
* we create structures to hold stripe data and skip list. Then, we serialize and
Expand All @@ -171,6 +171,58 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
ChunkData *chunkData = writeState->chunkData;
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);

uint32 chunkIndex;
uint32 chunkRowIndex;

if (stripeBuffers)
{
chunkIndex = stripeBuffers->rowCount / chunkRowCount;
chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;
/*
* For each column, we first need to check to see if the next row will fit
* inside the chunk buffer. If it does not fit, then we need to serialize
* the stripe and make a new stripe for insertion.
*/
bool fits = true;

for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{

/* Check for nulls, skip if null. */
if (columnNulls[columnIndex])
{
continue;
}

Form_pg_attribute attributeForm =
TupleDescAttr(writeState->tupleDescriptor, columnIndex);

int columnTypeLength = attributeForm->attlen;
char columnTypeAlign = attributeForm->attalign;

uint32 datumLength = att_addlength_datum(0, columnTypeLength, columnValues[columnIndex]);
uint32 datumLengthAligned = att_align_nominal(datumLength, columnTypeAlign);

/* Check to see if we are within the 1 gigabyte value. */
if ((long) chunkData->valueBufferArray[columnIndex]->len + (long) datumLengthAligned > 1024000000)
{
fits = false;
break;
}
}

if (!fits)
{
/* Flush the stripe. */
ColumnarFlushPendingWrites(writeState);

/* Then set up for new stripeBuffers. */
stripeBuffers = NULL;

chunkData->rowCount = 0;
}
}

if (stripeBuffers == NULL)
{
stripeBuffers = CreateEmptyStripeBuffers(options->stripeRowCount,
Expand Down Expand Up @@ -204,8 +256,9 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
}
}

uint32 chunkIndex = stripeBuffers->rowCount / chunkRowCount;
uint32 chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;
chunkIndex = stripeBuffers->rowCount / chunkRowCount;
chunkRowIndex = stripeBuffers->rowCount % chunkRowCount;


for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Expand Down Expand Up @@ -642,6 +695,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row
bool compressed = CompressBuffer(serializedValueBuffer, compressionBuffer,
requestedCompressionType,
compressionLevel);

if (compressed)
{
serializedValueBuffer = compressionBuffer;
Expand Down
1 change: 1 addition & 0 deletions columnar/src/include/columnar/columnar_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ extern List * StripesForRelfilenode(RelFileLocator relfilelocator, ScanDirection
extern uint32 DeletedRowsForStripe(RelFileLocator relfilelocator,
uint32 chunkCount,
uint64 stripeId);
extern Size DecompressedLengthForStripe(RelFileLocator relfilelocator, uint64 stripeId);
extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade);
extern StripeMetadata * RewriteStripeMetadataRowWithNewValues(Relation rel, uint64 stripeId,
uint64 sizeBytes, uint64 fileOffset, uint64 rowCount, uint64 chunkCount);
Expand Down
7 changes: 7 additions & 0 deletions columnar/src/test/regress/expected/columnar_vacuum.out
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,10 @@ SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE sto
(1 row)

DROP TABLE t;
-- Verify that we can vacuum humongous fields
CREATE TABLE t (id SERIAL, data TEXT) USING columnar;
INSERT INTO t SELECT 1, repeat('a', 1000000000);
INSERT INTO t SELECT 2, repeat('b', 1000000000);
INSERT INTO t SELECT 3, repeat('c', 1000000000);
VACUUM t;
DROP TABLE t;
12 changes: 11 additions & 1 deletion columnar/src/test/regress/sql/columnar_vacuum.sql
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,14 @@ VACUUM t;
SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE storage_id = :t_oid;
SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE storage_id = :t_oid;

DROP TABLE t;
DROP TABLE t;

-- Verify that we can vacuum humongous fields
CREATE TABLE t (id SERIAL, data TEXT) USING columnar;
INSERT INTO t SELECT 1, repeat('a', 1000000000);
INSERT INTO t SELECT 2, repeat('b', 1000000000);
INSERT INTO t SELECT 3, repeat('c', 1000000000);

VACUUM t;

DROP TABLE t;