From 38e4398e06cc37bc212e62b352b5740c233e199f Mon Sep 17 00:00:00 2001 From: Benjamin Winger Date: Tue, 27 Feb 2024 09:26:32 -0500 Subject: [PATCH] Re-write partitioner to use ColumnChunks instead of ValueVectors ValueVectors have high memory fragmentation, and allocate strings in 256KB chunks for only 2048 strings. ColumnChunks can have a much larger capacity, and also support string de-duplication. --- src/include/processor/operator/partitioner.h | 20 ++- .../operator/persistent/rel_batch_insert.h | 9 +- src/include/storage/store/column_chunk.h | 13 +- src/include/storage/store/node_group.h | 15 +-- .../storage/store/string_column_chunk.h | 11 +- .../storage/store/struct_column_chunk.h | 9 +- .../storage/store/var_list_column_chunk.h | 7 +- src/processor/operator/partitioner.cpp | 54 ++++++--- .../operator/persistent/rel_batch_insert.cpp | 57 +++++---- src/storage/store/column_chunk.cpp | 114 +++++++++--------- src/storage/store/dictionary_chunk.cpp | 8 +- src/storage/store/node_group.cpp | 15 +-- src/storage/store/string_column_chunk.cpp | 67 +++++----- src/storage/store/struct_column_chunk.cpp | 37 ++++-- src/storage/store/var_list_column_chunk.cpp | 50 ++++++-- 15 files changed, 283 insertions(+), 203 deletions(-) diff --git a/src/include/processor/operator/partitioner.h b/src/include/processor/operator/partitioner.h index 65800727cc6..c6651840944 100644 --- a/src/include/processor/operator/partitioner.h +++ b/src/include/processor/operator/partitioner.h @@ -1,7 +1,7 @@ #pragma once -#include "common/data_chunk/data_chunk_collection.h" #include "processor/operator/sink.h" +#include "storage/store/column_chunk.h" namespace kuzu { namespace storage { @@ -20,7 +20,14 @@ struct PartitionerFunctions { // partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD // direction. Each partitioning method corresponds to a PartitioningState. struct PartitioningBuffer { - std::vector> partitions; + using ColumnChunkCollection = std::vector>; + struct Partition { + // One chunk for each column, grouped into a list + // so that groups from different threads can be quickly merged without copying + // E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc. + std::vector chunks; + }; + std::vector partitions; void merge(std::unique_ptr localPartitioningStates); }; @@ -49,11 +56,11 @@ struct PartitionerSharedState { void resetState(); void merge(std::vector> localPartitioningStates); - inline common::DataChunkCollection* getPartitionBuffer( + inline PartitioningBuffer::Partition& getPartitionBuffer( common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) { KU_ASSERT(partitioningIdx < partitioningBuffers.size()); KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size()); - return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get(); + return partitioningBuffers[partitioningIdx]->partitions[partitionIdx]; } }; @@ -102,7 +109,7 @@ class Partitioner : public Sink { static void initializePartitioningStates( std::vector>& partitioningBuffers, - std::vector numPartitions, storage::MemoryManager* mm); + std::vector numPartitions); private: // TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be @@ -111,6 +118,9 @@ class Partitioner : public Sink { common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom); private: + // Same size as a value vector. Each thread will allocate a chunk for each node group, + // so this should be kept relatively small to avoid allocating more memory than is needed + static const uint64_t CHUNK_SIZE = 2048; std::vector> infos; std::shared_ptr sharedState; std::unique_ptr localState; diff --git a/src/include/processor/operator/persistent/rel_batch_insert.h b/src/include/processor/operator/persistent/rel_batch_insert.h index ed10d2091f7..8df5ab2000f 100644 --- a/src/include/processor/operator/persistent/rel_batch_insert.h +++ b/src/include/processor/operator/persistent/rel_batch_insert.h @@ -3,6 +3,7 @@ #include "common/enums/rel_direction.h" #include "processor/operator/partitioner.h" #include "processor/operator/persistent/batch_insert.h" +#include "storage/store/column_chunk.h" #include "storage/store/node_group.h" namespace kuzu { @@ -60,20 +61,20 @@ class RelBatchInsert final : public BatchInsert { } private: - void prepareCSRNodeGroup(common::DataChunkCollection* partition, + void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition, common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx, common::offset_t numNodes); static common::length_t getGapSize(common::length_t length); static std::vector populateStartCSROffsetsAndLengths( storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes, - common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx); + PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx); static void populateEndCSROffsets( storage::CSRHeaderChunks& csrHeader, std::vector& gaps); static void setOffsetToWithinNodeGroup( - common::ValueVector* vector, common::offset_t startOffset); + storage::ColumnChunk& chunk, common::offset_t startOffset); static void setOffsetFromCSROffsets( - common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk); + storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk); // We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now. std::optional checkRelMultiplicityConstraint( diff --git a/src/include/storage/store/column_chunk.h b/src/include/storage/store/column_chunk.h index 4cff129b379..af24d378a88 100644 --- a/src/include/storage/store/column_chunk.h +++ b/src/include/storage/store/column_chunk.h @@ -55,6 +55,7 @@ class ColumnChunk { virtual ColumnChunkMetadata getMetadataToFlush() const; virtual void append(common::ValueVector* vector); + virtual void appendOne(common::ValueVector* vector, common::vector_idx_t pos); virtual void append( ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend); @@ -73,8 +74,7 @@ class ColumnChunk { // `offsetInVector`, we should flatten the vector to pos at `offsetInVector`. virtual void write(common::ValueVector* vector, common::offset_t offsetInVector, common::offset_t offsetInChunk); - virtual void write( - common::ValueVector* vector, common::ValueVector* offsetsInChunk, bool isCSR); + virtual void write(ColumnChunk* chunk, ColumnChunk* offsetsInChunk, bool isCSR); virtual void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy); virtual void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, @@ -151,12 +151,12 @@ class BoolColumnChunk : public ColumnChunk { enableCompression, hasNullChunk) {} void append(common::ValueVector* vector) final; + void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final; void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend) override; void write(common::ValueVector* vector, common::offset_t offsetInVector, common::offset_t offsetInChunk) override; - void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector, - bool isCSR) final; + void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final; void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override; }; @@ -206,8 +206,11 @@ class NullColumnChunk final : public BoolColumnChunk { }; struct ColumnChunkFactory { + // inMemory starts string column chunk dictionaries at zero instead of reserving space for + // values to grow static std::unique_ptr createColumnChunk(common::LogicalType dataType, - bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE); + bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE, + bool inMemory = false); static std::unique_ptr createNullColumnChunk( bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) { diff --git a/src/include/storage/store/node_group.h b/src/include/storage/store/node_group.h index 1343b10a6fd..755cfb98d13 100644 --- a/src/include/storage/store/node_group.h +++ b/src/include/storage/store/node_group.h @@ -1,7 +1,6 @@ #pragma once #include "common/column_data_format.h" -#include "common/data_chunk/data_chunk.h" #include "storage/store/column_chunk.h" namespace kuzu { @@ -32,15 +31,14 @@ class NodeGroup { uint64_t append(const std::vector& columnVectors, common::DataChunkState* columnState, uint64_t numValuesToAppend); common::offset_t append(NodeGroup* other, common::offset_t offsetInOtherNodeGroup); - void write(common::DataChunk* dataChunk, common::vector_idx_t offsetVector); + void write(std::vector>& data, common::vector_idx_t offsetVector); void finalize(uint64_t nodeGroupIdx_); virtual inline void writeToColumnChunk(common::vector_idx_t chunkIdx, - common::vector_idx_t vectorIdx, common::DataChunk* dataChunk, - common::ValueVector* offsetVector) { - chunks[chunkIdx]->write( - dataChunk->getValueVector(vectorIdx).get(), offsetVector, false /* isCSR */); + common::vector_idx_t vectorIdx, std::vector>& data, + ColumnChunk& offsetChunk) { + chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, false /*isCSR*/); } protected: @@ -81,9 +79,8 @@ class CSRNodeGroup : public NodeGroup { const CSRHeaderChunks& getCSRHeader() const { return csrHeaderChunks; } inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx, - common::DataChunk* dataChunk, common::ValueVector* offsetVector) override { - chunks[chunkIdx]->write( - dataChunk->getValueVector(vectorIdx).get(), offsetVector, true /* isCSR */); + std::vector>& data, ColumnChunk& offsetChunk) override { + chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, true /* isCSR */); } private: diff --git a/src/include/storage/store/string_column_chunk.h b/src/include/storage/store/string_column_chunk.h index 6d7fd5e971d..2f999914d7d 100644 --- a/src/include/storage/store/string_column_chunk.h +++ b/src/include/storage/store/string_column_chunk.h @@ -1,6 +1,8 @@ #pragma once #include "common/assert.h" +#include "common/types/types.h" +#include "storage/store/column_chunk.h" #include "storage/store/dictionary_chunk.h" namespace kuzu { @@ -8,17 +10,18 @@ namespace storage { class StringColumnChunk : public ColumnChunk { public: - StringColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression); + StringColumnChunk( + common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory); void resetToEmpty() final; void append(common::ValueVector* vector) final; + void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final; void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend) final; void write(common::ValueVector* vector, common::offset_t offsetInVector, common::offset_t offsetInChunk) final; - void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector, - bool isCSR) final; + void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final; void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override; void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, @@ -43,7 +46,7 @@ class StringColumnChunk : public ColumnChunk { void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend); - void setValueFromString(const char* value, uint64_t length, uint64_t pos); + void setValueFromString(std::string_view value, uint64_t pos); private: std::unique_ptr dictionaryChunk; diff --git a/src/include/storage/store/struct_column_chunk.h b/src/include/storage/store/struct_column_chunk.h index f24628f1abc..110c424f095 100644 --- a/src/include/storage/store/struct_column_chunk.h +++ b/src/include/storage/store/struct_column_chunk.h @@ -1,5 +1,7 @@ #pragma once +#include "common/types/internal_id_t.h" +#include "common/types/types.h" #include "storage/store/column_chunk.h" namespace kuzu { @@ -7,7 +9,8 @@ namespace storage { class StructColumnChunk : public ColumnChunk { public: - StructColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression); + StructColumnChunk( + common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory); inline ColumnChunk* getChild(common::vector_idx_t childIdx) { KU_ASSERT(childIdx < childChunks.size()); @@ -20,11 +23,11 @@ class StructColumnChunk : public ColumnChunk { void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend) final; void append(common::ValueVector* vector) final; + void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final; void write(common::ValueVector* vector, common::offset_t offsetInVector, common::offset_t offsetInChunk) final; - void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector, - bool isCSR) final; + void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final; void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override; void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, diff --git a/src/include/storage/store/var_list_column_chunk.h b/src/include/storage/store/var_list_column_chunk.h index bc2c9986100..d2b896aeffe 100644 --- a/src/include/storage/store/var_list_column_chunk.h +++ b/src/include/storage/store/var_list_column_chunk.h @@ -27,7 +27,8 @@ struct VarListDataColumnChunk { class VarListColumnChunk : public ColumnChunk { public: - VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression); + VarListColumnChunk( + common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory); inline ColumnChunk* getDataColumnChunk() const { return varListDataColumnChunk->dataColumnChunk.get(); @@ -36,11 +37,11 @@ class VarListColumnChunk : public ColumnChunk { void resetToEmpty() final; void append(common::ValueVector* vector) final; + void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final; // Note: `write` assumes that no `append` will be called afterward. void write(common::ValueVector* vector, common::offset_t offsetInVector, common::offset_t offsetInChunk) final; - void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector, - bool isCSR) final; + void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final; void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override; void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk, diff --git a/src/processor/operator/partitioner.cpp b/src/processor/operator/partitioner.cpp index 8db8df79e23..536d1af731c 100644 --- a/src/processor/operator/partitioner.cpp +++ b/src/processor/operator/partitioner.cpp @@ -1,5 +1,10 @@ #include "processor/operator/partitioner.h" +#include + +#include "common/constants.h" +#include "processor/execution_context.h" +#include "storage/store/column_chunk.h" #include "storage/store/node_table.h" using namespace kuzu::common; @@ -42,7 +47,7 @@ void PartitionerSharedState::initialize() { numPartitions.resize(2); numPartitions[0] = getNumPartitions(maxNodeOffsets[0]); numPartitions[1] = getNumPartitions(maxNodeOffsets[1]); - Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions, mm); + Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions); } partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) { @@ -71,9 +76,13 @@ void PartitionerSharedState::merge( void PartitioningBuffer::merge(std::unique_ptr localPartitioningState) { KU_ASSERT(partitions.size() == localPartitioningState->partitions.size()); for (auto partitionIdx = 0u; partitionIdx < partitions.size(); partitionIdx++) { - auto sharedPartition = partitions[partitionIdx].get(); - auto localPartition = localPartitioningState->partitions[partitionIdx].get(); - sharedPartition->merge(localPartition); + auto& sharedPartition = partitions[partitionIdx]; + auto& localPartition = localPartitioningState->partitions[partitionIdx]; + sharedPartition.chunks.reserve( + sharedPartition.chunks.size() + localPartition.chunks.size()); + for (auto j = 0u; j < localPartition.chunks.size(); j++) { + sharedPartition.chunks.push_back(std::move(localPartition.chunks[j])); + } } } @@ -91,10 +100,9 @@ void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) { sharedState->initialize(); } -void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* context) { +void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) { localState = std::make_unique(); - initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions, - context->clientContext->getMemoryManager()); + initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions); } static void constructDataChunk(DataChunk* dataChunk, const std::vector& columnPositions, @@ -114,14 +122,14 @@ static void constructDataChunk(DataChunk* dataChunk, const std::vector& void Partitioner::initializePartitioningStates( std::vector>& partitioningBuffers, - std::vector numPartitions, MemoryManager* mm) { + std::vector numPartitions) { partitioningBuffers.resize(numPartitions.size()); for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) { auto numPartition = numPartitions[partitioningIdx]; auto partitioningBuffer = std::make_unique(); partitioningBuffer->partitions.reserve(numPartition); for (auto i = 0u; i < numPartition; i++) { - partitioningBuffer->partitions.push_back(std::make_unique(mm)); + partitioningBuffer->partitions.emplace_back(); } partitioningBuffers[partitioningIdx] = std::move(partitioningBuffer); } @@ -146,18 +154,28 @@ void Partitioner::executeInternal(ExecutionContext* context) { void Partitioner::copyDataToPartitions( partition_idx_t partitioningIdx, DataChunk* chunkToCopyFrom) { - auto originalChunkState = chunkToCopyFrom->state; - chunkToCopyFrom->state = std::make_shared(1 /* capacity */); - chunkToCopyFrom->state->selVector->resetSelectorToValuePosBufferWithSize(1 /* size */); - for (auto i = 0u; i < originalChunkState->selVector->selectedSize; i++) { - auto posToCopyFrom = originalChunkState->selVector->selectedPositions[i]; + for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) { + auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i]; auto partitionIdx = partitionIdxes->getValue(posToCopyFrom); KU_ASSERT( partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size()); - auto partition = - localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get(); - chunkToCopyFrom->state->selVector->selectedPositions[0] = posToCopyFrom; - partition->append(*chunkToCopyFrom); + auto& partition = + localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx]; + if (partition.chunks.empty() || partition.chunks.back()[0]->getNumValues() + 1 > + partition.chunks.back()[0]->getCapacity()) { + partition.chunks.emplace_back(); + partition.chunks.back().reserve(chunkToCopyFrom->getNumValueVectors()); + for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) { + partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk( + chunkToCopyFrom->getValueVector(i)->dataType, false /*enableCompression*/, + Partitioner::CHUNK_SIZE)); + } + } + KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom->getNumValueVectors()); + for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) { + partition.chunks.back()[i]->appendOne( + chunkToCopyFrom->getValueVector(i).get(), posToCopyFrom); + } } } diff --git a/src/processor/operator/persistent/rel_batch_insert.cpp b/src/processor/operator/persistent/rel_batch_insert.cpp index 0b83ab14a9a..8fad55e9d20 100644 --- a/src/processor/operator/persistent/rel_batch_insert.cpp +++ b/src/processor/operator/persistent/rel_batch_insert.cpp @@ -3,7 +3,9 @@ #include "common/exception/copy.h" #include "common/exception/message.h" #include "common/string_format.h" +#include "processor/operator/partitioner.h" #include "processor/result/factorized_table.h" +#include "storage/store/column_chunk.h" #include "storage/store/rel_table.h" using namespace kuzu::common; @@ -37,12 +39,11 @@ void RelBatchInsert::executeInternal(ExecutionContext* /*context*/) { break; } // Read the whole partition, and set to node group. - auto partitioningBuffer = partitionerSharedState->getPartitionBuffer( + auto& partitioningBuffer = partitionerSharedState->getPartitionBuffer( relInfo->partitioningIdx, relLocalState->nodeGroupIdx); auto startNodeOffset = StorageUtils::getStartOffsetOfNodeGroup(relLocalState->nodeGroupIdx); - for (auto dataChunk : partitioningBuffer->getChunks()) { - setOffsetToWithinNodeGroup( - dataChunk->getValueVector(relInfo->offsetVectorIdx).get(), startNodeOffset); + for (auto& columns : partitioningBuffer.chunks) { + setOffsetToWithinNodeGroup(*columns[relInfo->offsetVectorIdx], startNodeOffset); } // Calculate num of source nodes in this node group. // This will be used to set the num of values of the node group. @@ -50,8 +51,8 @@ void RelBatchInsert::executeInternal(ExecutionContext* /*context*/) { partitionerSharedState->maxNodeOffsets[relInfo->partitioningIdx] - startNodeOffset + 1); prepareCSRNodeGroup( partitioningBuffer, startNodeOffset, relInfo->offsetVectorIdx, numNodes); - for (auto dataChunk : partitioningBuffer->getChunks()) { - localState->nodeGroup->write(dataChunk, relInfo->offsetVectorIdx); + for (auto& chunk : partitioningBuffer.chunks) { + localState->nodeGroup->write(chunk, relInfo->offsetVectorIdx); } localState->nodeGroup->finalize(relLocalState->nodeGroupIdx); // Flush node group to table. @@ -61,7 +62,7 @@ void RelBatchInsert::executeInternal(ExecutionContext* /*context*/) { } } -void RelBatchInsert::prepareCSRNodeGroup(DataChunkCollection* partition, +void RelBatchInsert::prepareCSRNodeGroup(PartitioningBuffer::Partition& partition, common::offset_t startNodeOffset, vector_idx_t offsetVectorIdx, offset_t numNodes) { auto relInfo = ku_dynamic_cast(info.get()); auto csrNodeGroup = ku_dynamic_cast(localState->nodeGroup.get()); @@ -79,9 +80,9 @@ void RelBatchInsert::prepareCSRNodeGroup(DataChunkCollection* partition, offset_t csrChunkCapacity = csrHeader.getEndCSROffset(numNodes - 1) + csrHeader.getCSRLength(numNodes - 1); localState->nodeGroup->resizeChunks(csrChunkCapacity); - for (auto dataChunk : partition->getChunks()) { - auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get(); - setOffsetFromCSROffsets(offsetVector, csrHeader.offset.get()); + for (auto& dataChunk : partition.chunks) { + auto offsetChunk = dataChunk[offsetVectorIdx].get(); + setOffsetFromCSROffsets(offsetChunk, csrHeader.offset.get()); } populateEndCSROffsets(csrHeader, gaps); } @@ -104,7 +105,7 @@ length_t RelBatchInsert::getGapSize(length_t length) { } std::vector RelBatchInsert::populateStartCSROffsetsAndLengths(CSRHeaderChunks& csrHeader, - offset_t numNodes, DataChunkCollection* partition, vector_idx_t offsetVectorIdx) { + offset_t numNodes, PartitioningBuffer::Partition& partition, vector_idx_t offsetVectorIdx) { KU_ASSERT(numNodes == csrHeader.length->getNumValues() && numNodes == csrHeader.offset->getNumValues()); std::vector gaps; @@ -113,11 +114,10 @@ std::vector RelBatchInsert::populateStartCSROffsetsAndLengths(CSRHeade auto csrLengths = (length_t*)csrHeader.length->getData(); std::fill(csrLengths, csrLengths + numNodes, 0); // Calculate length for each node. Store the num of tuples of node i at csrLengths[i]. - for (auto chunk : partition->getChunks()) { - auto offsetVector = chunk->getValueVector(offsetVectorIdx); - for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) { - auto pos = offsetVector->state->selVector->selectedPositions[i]; - auto nodeOffset = offsetVector->getValue(pos); + for (auto& chunk : partition.chunks) { + auto& offsetChunk = chunk[offsetVectorIdx]; + for (auto i = 0u; i < offsetChunk->getNumValues(); i++) { + auto nodeOffset = offsetChunk->getValue(i); KU_ASSERT(nodeOffset < numNodes); csrLengths[nodeOffset]++; } @@ -134,23 +134,22 @@ std::vector RelBatchInsert::populateStartCSROffsetsAndLengths(CSRHeade return gaps; } -void RelBatchInsert::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) { - KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && - vector->state->selVector->isUnfiltered()); - auto offsets = (offset_t*)vector->getData(); - for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { +void RelBatchInsert::setOffsetToWithinNodeGroup(ColumnChunk& chunk, offset_t startOffset) { + KU_ASSERT(chunk.getDataType().getPhysicalType() == PhysicalTypeID::INT64); + auto offsets = (offset_t*)chunk.getData(); + for (auto i = 0u; i < chunk.getNumValues(); i++) { offsets[i] -= startOffset; } } -void RelBatchInsert::setOffsetFromCSROffsets(ValueVector* offsetVector, ColumnChunk* offsetChunk) { - KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && - offsetVector->state->selVector->isUnfiltered()); - for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) { - auto nodeOffset = offsetVector->getValue(i); - auto csrOffset = offsetChunk->getValue(nodeOffset); - offsetVector->setValue(i, csrOffset); - offsetChunk->setValue(csrOffset + 1, nodeOffset); +void RelBatchInsert::setOffsetFromCSROffsets( + ColumnChunk* nodeOffsetChunk, ColumnChunk* csrOffsetChunk) { + KU_ASSERT(nodeOffsetChunk->getDataType().getPhysicalType() == PhysicalTypeID::INT64); + for (auto i = 0u; i < nodeOffsetChunk->getNumValues(); i++) { + auto nodeOffset = nodeOffsetChunk->getValue(i); + auto csrOffset = csrOffsetChunk->getValue(nodeOffset); + nodeOffsetChunk->setValue(csrOffset, i); + csrOffsetChunk->setValue(csrOffset + 1, nodeOffset); } } diff --git a/src/storage/store/column_chunk.cpp b/src/storage/store/column_chunk.cpp index 60445d41fe1..78de045f063 100644 --- a/src/storage/store/column_chunk.cpp +++ b/src/storage/store/column_chunk.cpp @@ -1,6 +1,8 @@ #include "storage/store/column_chunk.h" #include "common/exception/copy.h" +#include "common/types/internal_id_t.h" +#include "common/types/types.h" #include "storage/compression/compression.h" #include "storage/storage_utils.h" #include "storage/store/string_column_chunk.h" @@ -211,6 +213,16 @@ void ColumnChunk::append(ValueVector* vector) { numValues += vector->state->selVector->selectedSize; } +void ColumnChunk::appendOne(common::ValueVector* vector, common::vector_idx_t pos) { + KU_ASSERT(vector->dataType.getPhysicalType() == dataType.getPhysicalType()); + KU_ASSERT(numValues < capacity); + memcpy(buffer.get() + numValues * numBytesPerValue, vector->getData() + pos * numBytesPerValue, + 1 * numBytesPerValue); + // TODO(Guodong): Should be wrapped into nullChunk->appendOne(vector); + nullChunk->setNull(this->numValues, vector->isNull(pos)); + numValues += 1; +} + void ColumnChunk::append( ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { KU_ASSERT(other->dataType.getPhysicalType() == dataType.getPhysicalType()); @@ -224,29 +236,24 @@ void ColumnChunk::append( numValues += numValuesToAppend; } -void ColumnChunk::write(ValueVector* vector, ValueVector* offsetsInChunk, bool isCSR) { - KU_ASSERT( - vector->dataType.getPhysicalType() == dataType.getPhysicalType() && - offsetsInChunk->dataType.getPhysicalType() == PhysicalTypeID::INT64 && - vector->state->selVector->selectedSize == offsetsInChunk->state->selVector->selectedSize); - auto offsets = (offset_t*)offsetsInChunk->getData(); - for (auto i = 0u; i < offsetsInChunk->state->selVector->selectedSize; i++) { - auto offsetInChunk = offsets[offsetsInChunk->state->selVector->selectedPositions[i]]; - KU_ASSERT(offsetInChunk < capacity); - if (!isCSR && !nullChunk->isNull(offsetInChunk)) { +void ColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) { + KU_ASSERT(chunk->dataType.getPhysicalType() == dataType.getPhysicalType() && + dstOffsets->dataType.getPhysicalType() == PhysicalTypeID::INT64 && + chunk->getNumValues() == dstOffsets->getNumValues()); + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto dstOffset = dstOffsets->getValue(i); + KU_ASSERT(dstOffset < capacity); + if (!isCSR && !nullChunk->isNull(dstOffset)) { throw CopyException(stringFormat("Node with offset: {} can only have one neighbour due " "to the MANY-ONE/ONE-ONE relationship constraint.", - offsetInChunk)); + dstOffset)); } - auto offsetInVector = vector->state->selVector->selectedPositions[i]; - if (!vector->isNull(offsetInVector)) { - memcpy(buffer.get() + offsetInChunk * numBytesPerValue, - vector->getData() + offsetInVector * numBytesPerValue, numBytesPerValue); - } - nullChunk->setNull(offsetInChunk, vector->isNull(offsetInVector)); - if (offsetInChunk >= numValues) { - numValues = offsetInChunk + 1; + if (!chunk->getNullChunk()->isNull(i)) { + memcpy(buffer.get() + dstOffset * numBytesPerValue, + chunk->getData() + i * numBytesPerValue, numBytesPerValue); } + nullChunk->setNull(dstOffset, chunk->getNullChunk()->isNull(i)); + numValues = dstOffset >= numValues ? dstOffset + 1 : numValues; } } @@ -335,6 +342,7 @@ offset_t ColumnChunk::getOffsetInBuffer(offset_t pos) const { void ColumnChunk::copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk) { auto bufferToWrite = buffer.get() + startPosInChunk * numBytesPerValue; + KU_ASSERT(startPosInChunk + vector->state->selVector->selectedSize <= capacity); auto vectorDataToWriteFrom = vector->getData(); if (vector->state->selVector->isUnfiltered()) { memcpy(bufferToWrite, vectorDataToWriteFrom, @@ -418,6 +426,13 @@ void BoolColumnChunk::append(ValueVector* vector) { numValues += vector->state->selVector->selectedSize; } +void BoolColumnChunk::appendOne(ValueVector* vector, vector_idx_t pos) { + KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::BOOL); + nullChunk->setNull(numValues, vector->isNull(pos)); + NullMask::setNull((uint64_t*)buffer.get(), numValues, vector->getValue(pos)); + numValues += 1; +} + void BoolColumnChunk::append( ColumnChunk* other, offset_t startPosInOtherChunk, uint32_t numValuesToAppend) { NullMask::copyNullMask((uint64_t*)static_cast(other)->buffer.get(), @@ -428,23 +443,18 @@ void BoolColumnChunk::append( numValues += numValuesToAppend; } -void BoolColumnChunk::write( - ValueVector* valueVector, ValueVector* offsetInChunkVector, bool /*isCSR*/) { - KU_ASSERT(valueVector->dataType.getPhysicalType() == PhysicalTypeID::BOOL && - offsetInChunkVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && - valueVector->state->selVector->selectedSize == - offsetInChunkVector->state->selVector->selectedSize); - auto offsets = (offset_t*)offsetInChunkVector->getData(); - for (auto i = 0u; i < offsetInChunkVector->state->selVector->selectedSize; i++) { - auto offsetInChunk = offsets[offsetInChunkVector->state->selVector->selectedPositions[i]]; - KU_ASSERT(offsetInChunk < capacity); - auto offsetInVector = valueVector->state->selVector->selectedPositions[i]; - NullMask::setNull( - (uint64_t*)buffer.get(), offsetInChunk, valueVector->getValue(offsetInVector)); +void BoolColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool /*isCSR*/) { + KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::BOOL && + dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 && + chunk->getNumValues() == dstOffsets->getNumValues()); + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto dstOffset = dstOffsets->getValue(i); + KU_ASSERT(dstOffset < capacity); + NullMask::setNull((uint64_t*)buffer.get(), dstOffset, chunk->getValue(i)); if (nullChunk) { - nullChunk->write(valueVector, offsetInVector, offsetInChunk); + nullChunk->setNull(dstOffset, chunk->getNullChunk()->isNull(i)); } - numValues = offsetInChunk >= numValues ? offsetInChunk + 1 : numValues; + numValues = dstOffset >= numValues ? dstOffset + 1 : numValues; } } @@ -516,23 +526,19 @@ class FixedListColumnChunk : public ColumnChunk { numValues += numValuesToAppend; } - void write(ValueVector* valueVector, ValueVector* offsetInChunkVector, bool /*isCSR*/) final { - KU_ASSERT(valueVector->dataType.getPhysicalType() == PhysicalTypeID::FIXED_LIST && - offsetInChunkVector->dataType.getPhysicalType() == PhysicalTypeID::INT64); - auto offsets = (offset_t*)offsetInChunkVector->getData(); - KU_ASSERT(valueVector->state->selVector->selectedSize == - offsetInChunkVector->state->selVector->selectedSize); - for (auto i = 0u; i < offsetInChunkVector->state->selVector->selectedSize; i++) { - auto offsetInChunk = - offsets[offsetInChunkVector->state->selVector->selectedPositions[i]]; - KU_ASSERT(offsetInChunk < capacity); - auto offsetInVector = valueVector->state->selVector->selectedPositions[i]; - nullChunk->write(valueVector, offsetInVector, offsetInChunk); - if (!valueVector->isNull(offsetInVector)) { - memcpy(buffer.get() + getOffsetInBuffer(offsetInChunk), - valueVector->getData() + offsetInVector * numBytesPerValue, numBytesPerValue); + void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool /*isCSR*/) final { + KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::FIXED_LIST && + dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64); + KU_ASSERT(chunk->getNumValues() == dstOffsets->getNumValues()); + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto dstOffset = dstOffsets->getValue(i); + KU_ASSERT(dstOffset < capacity); + nullChunk->setNull(dstOffset, chunk->getNullChunk()->isNull(i)); + if (!chunk->getNullChunk()->isNull(i)) { + memcpy(buffer.get() + getOffsetInBuffer(dstOffset), + chunk->getData() + i * numBytesPerValue, numBytesPerValue); } - numValues = offsetInChunk >= numValues ? offsetInChunk + 1 : numValues; + numValues = dstOffset >= numValues ? dstOffset + 1 : numValues; } } @@ -595,7 +601,7 @@ class InternalIDColumnChunk final : public ColumnChunk { }; std::unique_ptr ColumnChunkFactory::createColumnChunk( - LogicalType dataType, bool enableCompression, uint64_t capacity) { + LogicalType dataType, bool enableCompression, uint64_t capacity, bool inMemory) { switch (dataType.getPhysicalType()) { case PhysicalTypeID::BOOL: { return std::make_unique(capacity, enableCompression); @@ -629,15 +635,15 @@ std::unique_ptr ColumnChunkFactory::createColumnChunk( } case PhysicalTypeID::STRING: { return std::make_unique( - std::move(dataType), capacity, enableCompression); + std::move(dataType), capacity, enableCompression, inMemory); } case PhysicalTypeID::VAR_LIST: { return std::make_unique( - std::move(dataType), capacity, enableCompression); + std::move(dataType), capacity, enableCompression, inMemory); } case PhysicalTypeID::STRUCT: { return std::make_unique( - std::move(dataType), capacity, enableCompression); + std::move(dataType), capacity, enableCompression, inMemory); } default: KU_UNREACHABLE; diff --git a/src/storage/store/dictionary_chunk.cpp b/src/storage/store/dictionary_chunk.cpp index 2470aeaad7a..69dc48ffda7 100644 --- a/src/storage/store/dictionary_chunk.cpp +++ b/src/storage/store/dictionary_chunk.cpp @@ -16,7 +16,7 @@ namespace storage { // chunk to be greater than the node group size since they remove unused entries. // So the chunk is initialized with a size equal to 3/4 the node group size, making sure there // is always extra space for updates. -static const uint64_t OFFSET_CHUNK_INITIAL_CAPACITY = StorageConstants::NODE_GROUP_SIZE * 0.75; +static const double OFFSET_CHUNK_CAPACITY_FACTOR = 0.75; DictionaryChunk::DictionaryChunk(uint64_t capacity, bool enableCompression) : enableCompression{enableCompression}, @@ -25,7 +25,7 @@ DictionaryChunk::DictionaryChunk(uint64_t capacity, bool enableCompression) stringDataChunk = ColumnChunkFactory::createColumnChunk( *LogicalType::UINT8(), false /*enableCompression*/, capacity); offsetChunk = ColumnChunkFactory::createColumnChunk( - *LogicalType::UINT64(), enableCompression, OFFSET_CHUNK_INITIAL_CAPACITY); + *LogicalType::UINT64(), enableCompression, capacity * OFFSET_CHUNK_CAPACITY_FACTOR); } void DictionaryChunk::resetToEmpty() { @@ -59,7 +59,9 @@ DictionaryChunk::string_index_t DictionaryChunk::appendString(std::string_view v stringDataChunk->setNumValues(startOffset + val.size()); auto index = offsetChunk->getNumValues(); if (index >= offsetChunk->getCapacity()) { - offsetChunk->resize(std::bit_ceil(offsetChunk->getCapacity() * CHUNK_RESIZE_RATIO)); + offsetChunk->resize(offsetChunk->getCapacity() == 0 ? + 2 : + (offsetChunk->getCapacity() * CHUNK_RESIZE_RATIO)); } offsetChunk->setValue(startOffset, index); offsetChunk->setNumValues(index + 1); diff --git a/src/storage/store/node_group.cpp b/src/storage/store/node_group.cpp index d50c64a0d53..2b4582503c8 100644 --- a/src/storage/store/node_group.cpp +++ b/src/storage/store/node_group.cpp @@ -125,21 +125,22 @@ offset_t NodeGroup::append(NodeGroup* other, offset_t offsetInOtherNodeGroup) { return numNodesToAppend; } -void NodeGroup::write(DataChunk* dataChunk, vector_idx_t offsetVectorIdx) { - KU_ASSERT(dataChunk->getNumValueVectors() == chunks.size() + 1); - auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get(); +void NodeGroup::write( + std::vector>& data, vector_idx_t offsetVectorIdx) { + KU_ASSERT(data.size() == chunks.size() + 1); + auto& offsetChunk = data[offsetVectorIdx]; vector_idx_t vectorIdx = 0, chunkIdx = 0; - for (auto i = 0u; i < dataChunk->getNumValueVectors(); i++) { + for (auto i = 0u; i < data.size(); i++) { if (i == offsetVectorIdx) { vectorIdx++; continue; } - KU_ASSERT(vectorIdx < dataChunk->getNumValueVectors()); - writeToColumnChunk(chunkIdx, vectorIdx, dataChunk, offsetVector); + KU_ASSERT(vectorIdx < data.size()); + writeToColumnChunk(chunkIdx, vectorIdx, data, *offsetChunk); chunkIdx++; vectorIdx++; } - numRows += offsetVector->state->selVector->selectedSize; + numRows += offsetChunk->getNumValues(); } void NodeGroup::finalize(uint64_t nodeGroupIdx_) { diff --git a/src/storage/store/string_column_chunk.cpp b/src/storage/store/string_column_chunk.cpp index f187b905b9f..a392f6bf358 100644 --- a/src/storage/store/string_column_chunk.cpp +++ b/src/storage/store/string_column_chunk.cpp @@ -1,5 +1,6 @@ #include "storage/store/string_column_chunk.h" +#include "storage/store/column_chunk.h" #include "storage/store/dictionary_chunk.h" using namespace kuzu::common; @@ -8,10 +9,11 @@ namespace kuzu { namespace storage { StringColumnChunk::StringColumnChunk( - LogicalType dataType, uint64_t capacity, bool enableCompression) + LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory) : ColumnChunk{std::move(dataType), capacity, enableCompression}, - dictionaryChunk{std::make_unique(capacity, enableCompression)}, needFinalize{ - false} {} + dictionaryChunk{ + std::make_unique(inMemory ? 0 : capacity, enableCompression)}, + needFinalize{false} {} void StringColumnChunk::resetToEmpty() { ColumnChunk::resetToEmpty(); @@ -19,20 +21,23 @@ void StringColumnChunk::resetToEmpty() { } void StringColumnChunk::append(ValueVector* vector) { - KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { // index is stored in main chunk, data is stored in the data chunk auto pos = vector->state->selVector->selectedPositions[i]; - nullChunk->setNull(numValues, vector->isNull(pos)); - if (vector->isNull(pos)) { - numValues++; - continue; - } - auto kuString = vector->getValue(pos); - auto dstPos = numValues; - numValues++; - setValueFromString(kuString.getAsString().c_str(), kuString.len, dstPos); + appendOne(vector, pos); + } +} + +void StringColumnChunk::appendOne(common::ValueVector* vector, common::vector_idx_t pos) { + KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::STRING); + // index is stored in main chunk, data is stored in the data chunk + nullChunk->setNull(numValues, vector->isNull(pos)); + auto dstPos = numValues++; + if (vector->isNull(pos)) { + return; } + auto kuString = vector->getValue(pos); + setValueFromString(kuString.getAsStringView(), dstPos); } void StringColumnChunk::append( @@ -62,30 +67,26 @@ void StringColumnChunk::write( } if (!vector->isNull(offsetInVector)) { auto kuStr = vector->getValue(offsetInVector); - setValueFromString(kuStr.getAsString().c_str(), kuStr.len, offsetInChunk); + setValueFromString(kuStr.getAsStringView(), offsetInChunk); } } -void StringColumnChunk::write( - ValueVector* valueVector, ValueVector* offsetInChunkVector, bool /*isCSR*/) { - KU_ASSERT(valueVector->dataType.getPhysicalType() == PhysicalTypeID::STRING && - offsetInChunkVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && - valueVector->state->selVector->selectedSize == - offsetInChunkVector->state->selVector->selectedSize); - for (auto i = 0u; i < valueVector->state->selVector->selectedSize; i++) { - auto pos = valueVector->state->selVector->selectedPositions[i]; - auto offsetInChunk = offsetInChunkVector->getValue(pos); +void StringColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool /*isCSR*/) { + KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRING && + dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 && + chunk->getNumValues() == dstOffsets->getNumValues()); + for (auto i = 0u; i < chunk->getNumValues(); i++) { + auto offsetInChunk = dstOffsets->getValue(i); if (!needFinalize && offsetInChunk < numValues) [[unlikely]] { needFinalize = true; } - auto offsetInVector = valueVector->state->selVector->selectedPositions[i]; - nullChunk->setNull(offsetInChunk, valueVector->isNull(offsetInVector)); + nullChunk->setNull(offsetInChunk, chunk->getNullChunk()->isNull(i)); if (offsetInChunk >= numValues) { numValues = offsetInChunk + 1; } - if (!valueVector->isNull(offsetInVector)) { - auto kuStr = valueVector->getValue(offsetInVector); - setValueFromString((const char*)kuStr.getData(), kuStr.len, offsetInChunk); + if (!chunk->getNullChunk()->isNull(i)) { + auto stringChunk = ku_dynamic_cast(chunk); + setValueFromString(stringChunk->getValue(i), offsetInChunk); } } } @@ -101,8 +102,7 @@ void StringColumnChunk::write(ColumnChunk* srcChunk, offset_t srcOffsetInChunk, continue; } auto srcStringChunk = ku_dynamic_cast(srcChunk); - auto stringInOtherChunk = srcStringChunk->getValue(srcPos); - setValueFromString(stringInOtherChunk.data(), stringInOtherChunk.size(), dstPos); + setValueFromString(srcStringChunk->getValue(srcPos), dstPos); } } @@ -131,14 +131,13 @@ void StringColumnChunk::appendStringColumnChunk( indices[posInChunk] = 0; continue; } - auto stringInOtherChunk = other->getValue(posInOtherChunk); - setValueFromString(stringInOtherChunk.data(), stringInOtherChunk.size(), posInChunk); + setValueFromString(other->getValue(posInOtherChunk), posInChunk); } } -void StringColumnChunk::setValueFromString(const char* value, uint64_t length, uint64_t pos) { +void StringColumnChunk::setValueFromString(std::string_view value, uint64_t pos) { KU_ASSERT(pos < numValues); - auto index = dictionaryChunk->appendString(std::string_view(value, length)); + auto index = dictionaryChunk->appendString(value); ColumnChunk::setValue(index, pos); } diff --git a/src/storage/store/struct_column_chunk.cpp b/src/storage/store/struct_column_chunk.cpp index 2c83e40308b..cafd0e26f2d 100644 --- a/src/storage/store/struct_column_chunk.cpp +++ b/src/storage/store/struct_column_chunk.cpp @@ -1,5 +1,9 @@ #include "storage/store/struct_column_chunk.h" +#include "common/types/internal_id_t.h" +#include "common/types/types.h" +#include "storage/store/column_chunk.h" + using namespace kuzu::common; namespace kuzu { @@ -8,13 +12,13 @@ namespace storage { // TODO: need to handle this case, when the whole struct entry is null, should set all fields to // null too. StructColumnChunk::StructColumnChunk( - LogicalType dataType, uint64_t capacity, bool enableCompression) + LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory) : ColumnChunk{std::move(dataType), capacity} { auto fieldTypes = StructType::getFieldTypes(&this->dataType); childChunks.resize(fieldTypes.size()); for (auto i = 0u; i < fieldTypes.size(); i++) { childChunks[i] = ColumnChunkFactory::createColumnChunk( - *fieldTypes[i]->copy(), enableCompression, capacity); + *fieldTypes[i]->copy(), enableCompression, capacity, inMemory); } } @@ -49,6 +53,17 @@ void StructColumnChunk::append(ValueVector* vector) { numValues += vector->state->selVector->selectedSize; } +void StructColumnChunk::appendOne(ValueVector* vector, vector_idx_t pos) { + auto numFields = StructType::getNumFields(&dataType); + for (auto i = 0u; i < numFields; i++) { + childChunks[i]->appendOne(StructVector::getFieldVector(vector, i).get(), pos); + } + for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { + nullChunk->setNull(numValues + i, vector->isNull(pos)); + } + numValues += 1; +} + void StructColumnChunk::resize(uint64_t newCapacity) { ColumnChunk::resize(newCapacity); capacity = newCapacity; @@ -77,18 +92,16 @@ void StructColumnChunk::write( } } -void StructColumnChunk::write( - ValueVector* valueVector, ValueVector* offsetInChunkVector, bool isCSR) { - KU_ASSERT(valueVector->dataType.getPhysicalType() == PhysicalTypeID::STRUCT); - auto offsets = reinterpret_cast(offsetInChunkVector->getData()); - for (auto i = 0u; i < offsetInChunkVector->state->selVector->selectedSize; i++) { - auto offsetInChunk = offsets[offsetInChunkVector->state->selVector->selectedPositions[i]]; +void StructColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) { + KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT); + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto offsetInChunk = dstOffsets->getValue(i); KU_ASSERT(offsetInChunk < capacity); - nullChunk->setNull(offsetInChunk, valueVector->isNull(i)); + nullChunk->setNull(offsetInChunk, chunk->getNullChunk()->isNull(i)); } - auto fields = StructVector::getFieldVectors(valueVector); - for (auto i = 0u; i < fields.size(); i++) { - childChunks[i]->write(fields[i].get(), offsetInChunkVector, isCSR); + auto structChunk = ku_dynamic_cast(chunk); + for (auto i = 0u; i < childChunks.size(); i++) { + childChunks[i]->write(structChunk->getChild(i), dstOffsets, isCSR); } } diff --git a/src/storage/store/var_list_column_chunk.cpp b/src/storage/store/var_list_column_chunk.cpp index 058dfda7ca6..733eddeb0f2 100644 --- a/src/storage/store/var_list_column_chunk.cpp +++ b/src/storage/store/var_list_column_chunk.cpp @@ -2,6 +2,7 @@ #include "common/cast.h" #include "common/types/value/value.h" +#include "storage/store/column_chunk.h" using namespace kuzu::common; @@ -24,12 +25,12 @@ void VarListDataColumnChunk::resizeBuffer(uint64_t numValues) { } VarListColumnChunk::VarListColumnChunk( - LogicalType dataType, uint64_t capacity, bool enableCompression) + LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory) : ColumnChunk{std::move(dataType), capacity, enableCompression, true /* hasNullChunk */}, needFinalize{false} { varListDataColumnChunk = std::make_unique( ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType)->copy(), - enableCompression, 0 /* capacity */)); + enableCompression, 0 /* capacity */, inMemory)); KU_ASSERT(this->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST); } @@ -88,7 +89,33 @@ void VarListColumnChunk::append(ValueVector* vector) { } copyListValues(vector->getValue(pos), dataVector); } - numValues += vector->state->selVector->selectedSize; + numValues += numToAppend; +} + +void VarListColumnChunk::appendOne(common::ValueVector* vector, common::vector_idx_t pos) { + auto numToAppend = 1; + auto newCapacity = capacity; + while (numValues + numToAppend >= newCapacity) { + newCapacity *= 1.5; + } + if (capacity != newCapacity) { + resize(newCapacity); + } + auto nextListOffsetInChunk = getListOffset(numValues); + auto offsetBufferToWrite = (offset_t*)(buffer.get()); + uint64_t listLen = vector->isNull(pos) ? 0 : vector->getValue(pos).size; + nullChunk->setNull(numValues, vector->isNull(pos)); + nextListOffsetInChunk += listLen; + offsetBufferToWrite[numValues] = nextListOffsetInChunk; + + varListDataColumnChunk->resizeBuffer(nextListOffsetInChunk); + auto dataVector = ListVector::getDataVector(vector); + dataVector->setState(std::make_unique()); + dataVector->state->selVector->resetSelectorToValuePosBuffer(); + if (!vector->isNull(pos)) { + copyListValues(vector->getValue(pos), dataVector); + } + numValues += numToAppend; } void VarListColumnChunk::appendNullList() { @@ -99,21 +126,18 @@ void VarListColumnChunk::appendNullList() { numValues++; } -void VarListColumnChunk::write( - ValueVector* valueVector, ValueVector* offsetInChunkVector, bool /*isCSR*/) { +void VarListColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool /*isCSR*/) { needFinalize = true; if (!indicesColumnChunk) { initializeIndices(); } - KU_ASSERT(valueVector->dataType.getPhysicalType() == dataType.getPhysicalType() && - offsetInChunkVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 && - valueVector->state->selVector->selectedSize == - offsetInChunkVector->state->selVector->selectedSize); + KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() && + dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 && + chunk->getNumValues() == dstOffsets->getNumValues()); auto currentIndex = numValues; - append(valueVector); - for (auto i = 0u; i < offsetInChunkVector->state->selVector->selectedSize; i++) { - auto posInChunk = offsetInChunkVector->getValue( - offsetInChunkVector->state->selVector->selectedPositions[i]); + append(chunk, 0, chunk->getNumValues()); + for (auto i = 0u; i < dstOffsets->getNumValues(); i++) { + auto posInChunk = dstOffsets->getValue(i); KU_ASSERT(posInChunk < capacity); indicesColumnChunk->setValue(currentIndex++, posInChunk); indicesColumnChunk->getNullChunk()->setNull(posInChunk, false);