diff --git a/columnar/src/backend/columnar/columnar_metadata.c b/columnar/src/backend/columnar/columnar_metadata.c index cc2917e..bd9289c 100644 --- a/columnar/src/backend/columnar/columnar_metadata.c +++ b/columnar/src/backend/columnar/columnar_metadata.c @@ -1402,7 +1402,8 @@ ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount, if (tupleChunkGroupIndex > chunkGroupCount) { - elog(ERROR, "Tuple chunk group higher than chunk group count"); + elog(WARNING, "Tuple chunk group higher than chunk group count: %d, %d (storage_id = %ld, stripe_id = %ld)", tupleChunkGroupIndex, chunkGroupCount, UInt64GetDatum(storageId), Int64GetDatum(stripe)); + tupleChunkGroupIndex = chunkGroupCount; } (*chunkGroupRowCounts)[tupleChunkGroupIndex] = @@ -1561,6 +1562,52 @@ DeletedRowsForStripe(RelFileLocator relfilelocator, uint32 chunkCount, uint64 st return deletedRows; } +/* + * DecompressedLengthForStripe returns total size of all decompressed rows and chunk + * for given stripe + */ +Size +DecompressedLengthForStripe(RelFileLocator relfilelocator, uint64 stripeId) +{ + HeapTuple heapTuple = NULL; + ScanKeyData scanKey[2]; + + uint64 storageId = LookupStorageId(relfilelocator); + + Oid columnarChunkOid = ColumnarChunkRelationId(); + Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock); + Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid, + BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId)); + ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe, + BTEqualStrategyNumber, F_OIDEQ, Int64GetDatum(stripeId)); + + SysScanDesc scanDescriptor = systable_beginscan_ordered(columnarChunk, index, + GetTransactionSnapshot(), 2, scanKey); + + Size decompressedChunkSize = 0; + + while (HeapTupleIsValid(heapTuple = systable_getnext_ordered(scanDescriptor, + ForwardScanDirection))) + { + Datum datumArray[Natts_columnar_chunk]; + bool isNullArray[Natts_columnar_chunk]; + + heap_deform_tuple(heapTuple, RelationGetDescr(columnarChunk), datumArray, + isNullArray); + + decompressedChunkSize += + DatumGetInt64(datumArray[Anum_columnar_chunk_value_decompressed_size - 1]); + } + + systable_endscan_ordered(scanDescriptor); + index_close(index, AccessShareLock); + table_close(columnarChunk, AccessShareLock); + + return decompressedChunkSize; +} + /* * GetHighestUsedAddress returns the highest used address for the given diff --git a/columnar/src/backend/columnar/columnar_tableam.c b/columnar/src/backend/columnar/columnar_tableam.c index 87213f1..d98108e 100644 --- a/columnar/src/backend/columnar/columnar_tableam.c +++ b/columnar/src/backend/columnar/columnar_tableam.c @@ -1459,6 +1459,8 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel) uint32 lastStripeDeletedRows = 0; + Size totalDecompressedStripeLength = 0; + foreach(lc, stripeMetadataList) { StripeMetadata * stripeMetadata = lfirst(lc); @@ -1466,12 +1468,21 @@ TruncateAndCombineColumnarStripes(Relation rel, int elevel) lastStripeDeletedRows = DeletedRowsForStripe(rel->rd_locator, stripeMetadata->chunkCount, stripeMetadata->id); + totalDecompressedStripeLength += + DecompressedLengthForStripe(rel->rd_locator, stripeMetadata->id); #else lastStripeDeletedRows = DeletedRowsForStripe(rel->rd_node, stripeMetadata->chunkCount, stripeMetadata->id); + totalDecompressedStripeLength += + DecompressedLengthForStripe(rel->rd_node, stripeMetadata->id); #endif + if (totalDecompressedStripeLength >= 1024000000) + { + break; + } + uint64 stripeRowCount = stripeMetadata->rowCount - lastStripeDeletedRows; if ((totalRowNumberCount + stripeRowCount >= columnarOptions.stripeRowCount)) @@ -1651,7 +1662,10 @@ columnar_vacuum_rel(Relation rel, VacuumParams *params, /* this should have been resolved by vacuum.c until now */ Assert(params->truncate != VACOPTVALUE_UNSPECIFIED); - LogRelationStats(rel, elevel); + if (params->options & VACOPT_VERBOSE) + { + LogRelationStats(rel, elevel); + } /* * We don't have updates, deletes, or concurrent updates, so all we @@ -1789,6 +1803,12 @@ LogRelationStats(Relation rel, int elevel) List *stripeList = StripesForRelfilenode(relfilelocator, ForwardScanDirection); int stripeCount = list_length(stripeList); + MemoryContext relation_stats_ctx = + AllocSetContextCreate(CurrentMemoryContext, "Vacuum Relation Stats Context", + ALLOCSET_SMALL_SIZES); + + MemoryContext oldcontext = MemoryContextSwitchTo(relation_stats_ctx); + foreach(stripeMetadataCell, stripeList) { StripeMetadata *stripe = lfirst(stripeMetadataCell); @@ -1827,8 +1847,12 @@ LogRelationStats(Relation rel, int elevel) tupleCount += stripe->rowCount; totalStripeLength += stripe->dataLength; + + MemoryContextReset(relation_stats_ctx); } + MemoryContextSwitchTo(oldcontext); + if (unlikely(rel->rd_smgr == NULL)) { #if PG_VERSION_NUM >= PG_VERSION_16 diff --git a/columnar/src/backend/columnar/columnar_writer.c b/columnar/src/backend/columnar/columnar_writer.c index 59d8aee..15e210f 100644 --- a/columnar/src/backend/columnar/columnar_writer.c +++ b/columnar/src/backend/columnar/columnar_writer.c @@ -27,6 +27,7 @@ #include "storage/smgr.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/palloc.h" #include "utils/rel.h" #if PG_VERSION_NUM >= PG_VERSION_16 @@ -148,7 +149,6 @@ ColumnarBeginWrite(RelFileLocator relfilelocator, return writeState; } - /* * ColumnarWriteRow adds a row to the columnar table. If the stripe is not initialized, * we create structures to hold stripe data and skip list. Then, we serialize and @@ -171,6 +171,58 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu ChunkData *chunkData = writeState->chunkData; MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); + uint32 chunkIndex; + uint32 chunkRowIndex; + + if (stripeBuffers) + { + chunkIndex = stripeBuffers->rowCount / chunkRowCount; + chunkRowIndex = stripeBuffers->rowCount % chunkRowCount; + /* + * For each column, we first need to check to see if the next row will fit + * inside the chunk buffer. If it does not fit, then we need to serialize + * the stripe and make a new stripe for insertion. + */ + bool fits = true; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + + /* Check for nulls, skip if null. */ + if (columnNulls[columnIndex]) + { + continue; + } + + Form_pg_attribute attributeForm = + TupleDescAttr(writeState->tupleDescriptor, columnIndex); + + int columnTypeLength = attributeForm->attlen; + char columnTypeAlign = attributeForm->attalign; + + uint32 datumLength = att_addlength_datum(0, columnTypeLength, columnValues[columnIndex]); + uint32 datumLengthAligned = att_align_nominal(datumLength, columnTypeAlign); + + /* Check to see if we are within the 1 gigabyte value. */ + if ((long) chunkData->valueBufferArray[columnIndex]->len + (long) datumLengthAligned > 1024000000) + { + fits = false; + break; + } + } + + if (!fits) + { + /* Flush the stripe. */ + ColumnarFlushPendingWrites(writeState); + + /* Then set up for new stripeBuffers. */ + stripeBuffers = NULL; + + chunkData->rowCount = 0; + } + } + if (stripeBuffers == NULL) { stripeBuffers = CreateEmptyStripeBuffers(options->stripeRowCount, @@ -204,8 +256,9 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu } } - uint32 chunkIndex = stripeBuffers->rowCount / chunkRowCount; - uint32 chunkRowIndex = stripeBuffers->rowCount % chunkRowCount; + chunkIndex = stripeBuffers->rowCount / chunkRowCount; + chunkRowIndex = stripeBuffers->rowCount % chunkRowCount; + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { @@ -642,6 +695,7 @@ SerializeChunkData(ColumnarWriteState *writeState, uint32 chunkIndex, uint32 row bool compressed = CompressBuffer(serializedValueBuffer, compressionBuffer, requestedCompressionType, compressionLevel); + if (compressed) { serializedValueBuffer = compressionBuffer; diff --git a/columnar/src/include/columnar/columnar_metadata.h b/columnar/src/include/columnar/columnar_metadata.h index 2c5a715..948e853 100644 --- a/columnar/src/include/columnar/columnar_metadata.h +++ b/columnar/src/include/columnar/columnar_metadata.h @@ -55,6 +55,7 @@ extern List * StripesForRelfilenode(RelFileLocator relfilelocator, ScanDirection extern uint32 DeletedRowsForStripe(RelFileLocator relfilelocator, uint32 chunkCount, uint64 stripeId); +extern Size DecompressedLengthForStripe(RelFileLocator relfilelocator, uint64 stripeId); extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade); extern StripeMetadata * RewriteStripeMetadataRowWithNewValues(Relation rel, uint64 stripeId, uint64 sizeBytes, uint64 fileOffset, uint64 rowCount, uint64 chunkCount); diff --git a/columnar/src/test/regress/expected/columnar_vacuum.out b/columnar/src/test/regress/expected/columnar_vacuum.out index 16951b9..1485447 100644 --- a/columnar/src/test/regress/expected/columnar_vacuum.out +++ b/columnar/src/test/regress/expected/columnar_vacuum.out @@ -441,3 +441,10 @@ SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE sto (1 row) DROP TABLE t; +-- Verify that we can vacuum humongous fields +CREATE TABLE t (id SERIAL, data TEXT) USING columnar; +INSERT INTO t SELECT 1, repeat('a', 1000000000); +INSERT INTO t SELECT 2, repeat('b', 1000000000); +INSERT INTO t SELECT 3, repeat('c', 1000000000); +VACUUM t; +DROP TABLE t; diff --git a/columnar/src/test/regress/sql/columnar_vacuum.sql b/columnar/src/test/regress/sql/columnar_vacuum.sql index c8ea3e8..2b4491d 100644 --- a/columnar/src/test/regress/sql/columnar_vacuum.sql +++ b/columnar/src/test/regress/sql/columnar_vacuum.sql @@ -242,4 +242,14 @@ VACUUM t; SELECT COUNT(*) = (:columnar_chunk_group_rows / 2) FROM columnar.chunk_group WHERE storage_id = :t_oid; SELECT COUNT(*) = (:columnar_row_mask_rows / 2) FROM columnar.row_mask WHERE storage_id = :t_oid; -DROP TABLE t; \ No newline at end of file +DROP TABLE t; + +-- Verify that we can vacuum humongous fields +CREATE TABLE t (id SERIAL, data TEXT) USING columnar; +INSERT INTO t SELECT 1, repeat('a', 1000000000); +INSERT INTO t SELECT 2, repeat('b', 1000000000); +INSERT INTO t SELECT 3, repeat('c', 1000000000); + +VACUUM t; + +DROP TABLE t;