Skip to content

Commit

Permalink
[columnar] Cache eviction crash (#142)
Browse files Browse the repository at this point in the history
Cache can be evected if still is in use, so it can trigger random memory
problems or crash running process.
Fixed by keeping track of current chunk that is in use so we don't
consider it for eviction.
  • Loading branch information
mkaruza authored Sep 12, 2023
1 parent 5f97011 commit 3c5e13d
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 20 deletions.
125 changes: 105 additions & 20 deletions columnar/src/backend/columnar/columnar_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,35 @@ static uint64 totalAllocationLength = 0;
*/
static ColumnarCacheStatistics statistics = { 0 };

/*
* Housekeeping of current chunk in use - so they are not evicted.
*/
typedef struct ColumarCacheChunkGroupInUse
{
uint64 relId;
uint64 stripeId;
uint64 chunkId;
} ColumarCacheChunkGroupInUse;

static List * ChunkGroupsInUse = NIL;

/*
* ColumnarCacheMemoryContext
*
* Returns the cache MemoryContext, initializing the cache MemoryContext
* as a child of TopMemoryContext if it does not exist, also clears any
* statistics gathered.
*/
MemoryContext ColumnarCacheMemoryContext(void)
MemoryContext
ColumnarCacheMemoryContext(void)
{
if (columnarCacheContext == NULL)
{
columnarCacheContext = AllocSetContextCreate(TopMemoryContext, "Columnar Decompression Cache", 0, (uint64) (columnar_page_cache_size * 1024 * 1024 * .1), columnar_page_cache_size * 1024 * 1024);
columnarCacheContext =
AllocSetContextCreate(TopMemoryContext,
"Columnar Decompression Cache",
0, (uint64) (columnar_page_cache_size * 1024 * 1024 * .1),
columnar_page_cache_size * 1024 * 1024);
memset(&statistics, 0, sizeof(ColumnarCacheStatistics));
head = NULL;
}
Expand All @@ -91,22 +108,24 @@ MemoryContext ColumnarCacheMemoryContext(void)
}

/*
* ColumnarResetCache
*
* Deletes the caching memory context and sets it to NULL, thus removing the
* cache and all of its entries.
*/
void ColumnarResetCache(void)
{
* ColumnarResetCache
*
* Deletes the caching memory context and sets it to NULL, thus removing the
* cache and all of its entries.
*/
void
ColumnarResetCache(void)
{
if (columnarCacheContext != NULL)
{
MemoryContextDelete(columnarCacheContext);
columnarCacheContext = NULL;
ChunkGroupsInUse = NIL;
}

totalAllocationLength = 0U;
head = NULL;
}
}

/*
* ColumnarFindInCache
Expand All @@ -115,7 +134,8 @@ MemoryContext ColumnarCacheMemoryContext(void)
* If found, it increments the readCount, and returns the entry. If
* none are found, NULL is returned instead.
*/
static ColumnarCacheEntry *ColumnarFindInCache(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId)
static ColumnarCacheEntry *
ColumnarFindInCache(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId)
{
if (head == NULL)
{
Expand All @@ -127,7 +147,8 @@ static ColumnarCacheEntry *ColumnarFindInCache(uint64 relId, uint64 stripeId, ui
{
ColumnarCacheEntry *entry = dlist_container(ColumnarCacheEntry, list_node, iter.cur);

if (entry->relId == relId && entry->stripeId == stripeId && entry->chunkId == chunkId && entry->columnId == columnId)
if (entry->relId == relId && entry->stripeId == stripeId &&
entry->chunkId == chunkId && entry->columnId == columnId)
{
entry->readCount++;

Expand All @@ -147,15 +168,17 @@ static ColumnarCacheEntry *ColumnarFindInCache(uint64 relId, uint64 stripeId, ui
*
* Returns boolean.
*/
static bool ColumnarInvalidateCacheEntry(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId)
static bool
ColumnarInvalidateCacheEntry(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId)
{
dlist_mutable_iter miter;

dlist_foreach_modify(miter, head)
{
ColumnarCacheEntry *entry = dlist_container(ColumnarCacheEntry, list_node, miter.cur);

if (entry->relId == relId && entry->stripeId == stripeId && entry->chunkId == chunkId && entry->columnId == columnId)
if (entry->relId == relId && entry->stripeId == stripeId &&
entry->chunkId == chunkId && entry->columnId == columnId)
{
dlist_delete(miter.cur);

Expand All @@ -169,7 +192,8 @@ static bool ColumnarInvalidateCacheEntry(uint64 relId, uint64 stripeId, uint64 c
return true;
}

static void EvictCache(uint64 size)
static void
EvictCache(uint64 size)
{
uint64 lastCount = 0;
uint64 nextLowestCount = PG_UINT64_MAX;
Expand All @@ -189,6 +213,25 @@ static void EvictCache(uint64 size)

if (entry->readCount == lastCount)
{
bool skipCacheEntry = false;
ListCell *lc;
foreach(lc, ChunkGroupsInUse)
{
ColumarCacheChunkGroupInUse *chunkGroupInUse =
(ColumarCacheChunkGroupInUse *) lfirst(lc);

if (chunkGroupInUse->relId == entry->relId &&
chunkGroupInUse->stripeId == entry->stripeId &&
chunkGroupInUse->chunkId == entry->chunkId)
{
skipCacheEntry = true;
break;
}
}

if (skipCacheEntry)
continue;

dlist_delete(miter.cur);

totalAllocationLength -= entry->length;
Expand Down Expand Up @@ -218,12 +261,50 @@ static void EvictCache(uint64 size)
}
}

void
ColumnarMarkChunkGroupInUse(uint64 relId, uint64 stripeId, uint32 chunkId)
{
bool found = false;
ListCell *lc;

MemoryContext ctx = MemoryContextSwitchTo(ColumnarCacheMemoryContext());

foreach(lc, ChunkGroupsInUse)
{
ColumarCacheChunkGroupInUse *chunkGroupInUse =
(ColumarCacheChunkGroupInUse *) lfirst(lc);

if (chunkGroupInUse->relId == relId)
{
chunkGroupInUse->stripeId = stripeId;
chunkGroupInUse->chunkId = chunkId;
found = true;
}
}

if (!found)
{
ColumarCacheChunkGroupInUse *newChunkGroupInUse =
palloc0(sizeof(ColumarCacheChunkGroupInUse));

newChunkGroupInUse->relId = relId;
newChunkGroupInUse->stripeId = stripeId;
newChunkGroupInUse->chunkId = chunkId;

ChunkGroupsInUse = lappend(ChunkGroupsInUse, newChunkGroupInUse);
}

MemoryContextSwitchTo(ctx);
}

/*
* ColumnarAddCacheEntry
*
* Adds a cache entry, or updates an existing entry.
*/
void ColumnarAddCacheEntry(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId, void *data)
void
ColumnarAddCacheEntry(uint64 relId, uint64 stripeId, uint64 chunkId,
uint32 columnId, void *data)
{
if (columnar_enable_page_cache == false)
{
Expand Down Expand Up @@ -288,7 +369,8 @@ void ColumnarAddCacheEntry(uint64 relId, uint64 stripeId, uint64 chunkId, uint32
/* If we are over our cache allocation, clear until we are at 90%. */
if (totalAllocationLength >= (columnar_page_cache_size * 1024 * 1024))
{
EvictCache((columnar_page_cache_size * 1024 * 1024 * .1) + (totalAllocationLength - (columnar_page_cache_size * 1024 * 1024)));
EvictCache((columnar_page_cache_size * 1024 * 1024 * .1) +
(totalAllocationLength - (columnar_page_cache_size * 1024 * 1024)));
}

statistics.writes++;
Expand All @@ -302,7 +384,8 @@ void ColumnarAddCacheEntry(uint64 relId, uint64 stripeId, uint64 chunkId, uint32
* Search for a cache entry, returning NULL if not found. If found,
* make a copy in the current memory context and return it.
*/
void *ColumnarRetrieveCache(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId)
void *
ColumnarRetrieveCache(uint64 relId, uint64 stripeId, uint64 chunkId, uint32 columnId)
{
if (columnar_enable_page_cache == false)
{
Expand Down Expand Up @@ -330,7 +413,8 @@ void *ColumnarRetrieveCache(uint64 relId, uint64 stripeId, uint64 chunkId, uint3
*
* Returns how large our cache is, used for accounting.
*/
static uint64 ColumnarCacheLength()
static uint64
ColumnarCacheLength()
{
uint64 count = 0;

Expand All @@ -348,7 +432,8 @@ static uint64 ColumnarCacheLength()
return count;
}

ColumnarCacheStatistics *ColumnarGetCacheStatistics(void)
ColumnarCacheStatistics *
ColumnarGetCacheStatistics(void)
{
statistics.endingCacheSize = totalAllocationLength;
statistics.entries = ColumnarCacheLength();
Expand Down
5 changes: 5 additions & 0 deletions columnar/src/backend/columnar/columnar_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,11 @@ DeserializeChunkData(StripeBuffers *stripeBuffers, uint64 chunkIndex,
columnBuffers->chunkBuffersArray[chunkIndex];
bool shouldCache = columnar_enable_page_cache == true && chunkBuffers->valueCompressionType != COMPRESSION_NONE;

if (shouldCache)
{
ColumnarMarkChunkGroupInUse(state->relation->rd_id, stripeId, chunkIndex);
}

/* decompress and deserialize current chunk's data */
StringInfo valueBuffer = NULL;

Expand Down
6 changes: 6 additions & 0 deletions columnar/src/backend/columnar/columnar_tableam.c
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ columnar_index_fetch_end(IndexFetchTableData *sscan)
ColumnarEndRead(scan->cs_readState);
scan->cs_readState = NULL;
}

/* clean up any caches. */
if (columnar_enable_page_cache == true)
{
ColumnarResetCache();
}
}


Expand Down
1 change: 1 addition & 0 deletions columnar/src/include/columnar/columnar.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ extern void CleanupReadStateCache(SubTransactionId currentSubXid);
extern MemoryContext GetColumnarReadStateCache(void);

/* columnar_cache.c */
extern void ColumnarMarkChunkGroupInUse(uint64 relId, uint64 stripeId, uint32 chunkId);
extern void ColumnarAddCacheEntry(uint64, uint64, uint64, uint32, void *);
extern void *ColumnarRetrieveCache(uint64, uint64, uint64, uint32);
extern void ColumnarResetCache(void);
Expand Down

0 comments on commit 3c5e13d

Please sign in to comment.