Skip to content

Commit

Permalink
Re-write partitioner to use ColumnChunks instead of ValueVectors
Browse files Browse the repository at this point in the history
ValueVectors allocate strings in 256KB chunks for only 2048 strings.
ColumnChunks can have a much larger capacity, and also support string
de-duplication.
  • Loading branch information
benjaminwinger committed Feb 29, 2024
1 parent e76c6ea commit a5b30d6
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 202 deletions.
16 changes: 10 additions & 6 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include "common/data_chunk/data_chunk_collection.h"
#include "processor/operator/persistent/copy_node.h"
#include "processor/operator/sink.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace processor {
Expand All @@ -18,11 +18,14 @@ struct PartitionerFunctions {
// partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD
// direction. Each partitioning method corresponds to a PartitioningState.
struct PartitioningBuffer {
std::vector<std::unique_ptr<common::DataChunkCollection>> partitions;
// One vector of column chunks (one per column) for each node group/partition
std::vector<std::vector<std::unique_ptr<storage::ColumnChunk>>> partitions;

void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};

struct PartitioningInfo;

// NOTE: Currently, Partitioner is tightly coupled with CopyRel. We should generalize it later when
// necessary. Here, each partition is essentially a node group.
struct PartitionerSharedState {
Expand All @@ -41,16 +44,16 @@ struct PartitionerSharedState {

explicit PartitionerSharedState(storage::MemoryManager* mm) : mm{mm} {}

void initialize();
void initialize(const std::vector<std::unique_ptr<PartitioningInfo>>& partitionInfos);
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline common::DataChunkCollection* getPartitionBuffer(
inline std::vector<std::unique_ptr<storage::ColumnChunk>>& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get();
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx];
}
};

Expand Down Expand Up @@ -99,7 +102,8 @@ class Partitioner : public Sink {

static void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, storage::MemoryManager* mm);
std::vector<common::partition_idx_t> numPartitions,
const std::vector<std::unique_ptr<PartitioningInfo>>& partitionInfos);

private:
// TODO: For now, CopyRel will guarantee all data are inside one data chunk. Should be
Expand Down
11 changes: 6 additions & 5 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "processor/operator/partitioner.h"
#include "processor/operator/sink.h"
#include "storage/stats/rels_store_statistics.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_group.h"
#include "storage/store/rel_table.h"
#include "storage/wal/wal.h"
Expand Down Expand Up @@ -94,18 +95,18 @@ class CopyRel : public Sink {
0;
}

void prepareCSRNodeGroup(common::DataChunkCollection* partition,
common::vector_idx_t offsetVectorIdx, common::offset_t numNodes);
void prepareCSRNodeGroup(std::vector<std::unique_ptr<storage::ColumnChunk>>& partition,
uint32_t offsetChunkIdx, common::offset_t numNodes);

static void populateStartCSROffsetsAndLengths(storage::CSRHeaderChunks& csrHeader,
std::vector<common::offset_t>& gaps, common::offset_t numNodes,
common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx);
std::vector<std::unique_ptr<storage::ColumnChunk>>& partition, uint32_t offsetChunkIdx);
static void populateEndCSROffsets(
storage::CSRHeaderChunks& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
common::ValueVector* vector, common::offset_t startOffset);
storage::ColumnChunk& offsetChunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk);
storage::ColumnChunk& nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);

protected:
std::unique_ptr<CopyRelInfo> info;
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "common/vector/value_vector.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/compression/compression.h"
#include "storage/local_storage/local_table.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -55,6 +56,7 @@ class ColumnChunk {
virtual ColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector);
virtual void appendOne(common::ValueVector* vector, common::vector_idx_t pos);
virtual void append(
ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend);

Expand All @@ -73,8 +75,7 @@ class ColumnChunk {
// `offsetInVector`, we should flatten the vector to pos at `offsetInVector`.
virtual void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk);
virtual void write(
common::ValueVector* vector, common::ValueVector* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* chunk, ColumnChunk* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy);
virtual void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down Expand Up @@ -151,12 +152,12 @@ class BoolColumnChunk : public ColumnChunk {
enableCompression, hasNullChunk) {}

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
};
Expand Down
14 changes: 6 additions & 8 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ class NodeGroup {
uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::DataChunkState* columnState, uint64_t numValuesToAppend);
common::offset_t append(NodeGroup* other, common::offset_t offsetInOtherNodeGroup);
void write(common::DataChunk* dataChunk, common::vector_idx_t offsetVector);
void write(std::vector<std::unique_ptr<ColumnChunk>>& data, common::vector_idx_t offsetVector);

void finalize(uint64_t nodeGroupIdx_);

virtual inline void writeToColumnChunk(common::vector_idx_t chunkIdx,
common::vector_idx_t vectorIdx, common::DataChunk* dataChunk,
common::ValueVector* offsetVector) {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, false /* isCSR */);
common::vector_idx_t vectorIdx, std::vector<std::unique_ptr<ColumnChunk>>& data,
ColumnChunk& offsetChunk) {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, false /*isCSR*/);
}

protected:
Expand Down Expand Up @@ -81,9 +80,8 @@ class CSRNodeGroup : public NodeGroup {
const CSRHeaderChunks& getCSRHeader() const { return csrHeaderChunks; }

inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx,
common::DataChunk* dataChunk, common::ValueVector* offsetVector) override {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, true /* isCSR */);
std::vector<std::unique_ptr<ColumnChunk>>& data, ColumnChunk& offsetChunk) override {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, true /* isCSR */);
}

private:
Expand Down
8 changes: 5 additions & 3 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include "common/assert.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
#include "storage/store/dictionary_chunk.h"

namespace kuzu {
Expand All @@ -12,13 +14,13 @@ class StringColumnChunk : public ColumnChunk {

void resetToEmpty() final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand All @@ -43,7 +45,7 @@ class StringColumnChunk : public ColumnChunk {
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);

void setValueFromString(const char* value, uint64_t length, uint64_t pos);
void setValueFromString(std::string_view value, uint64_t pos);

private:
DictionaryChunk dictionaryChunk;
Expand Down
6 changes: 4 additions & 2 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand All @@ -20,11 +22,11 @@ class StructColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
// Note: `write` assumes that no `append` will be called afterward.
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
67 changes: 47 additions & 20 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "processor/operator/partitioner.h"

#include "storage/store/column_chunk.h"

using namespace kuzu::common;
using namespace kuzu::storage;

Expand Down Expand Up @@ -31,7 +33,8 @@ static common::partition_idx_t getNumPartitions(offset_t maxOffset) {
return (maxOffset + StorageConstants::NODE_GROUP_SIZE) / StorageConstants::NODE_GROUP_SIZE;
}

void PartitionerSharedState::initialize() {
void PartitionerSharedState::initialize(
const std::vector<std::unique_ptr<PartitioningInfo>>& partitionInfos) {
maxNodeOffsets.resize(2);
maxNodeOffsets[0] =
srcNodeTable->getMaxNodeOffset(transaction::Transaction::getDummyWriteTrx().get());
Expand All @@ -40,7 +43,7 @@ void PartitionerSharedState::initialize() {
numPartitions.resize(2);
numPartitions[0] = getNumPartitions(maxNodeOffsets[0]);
numPartitions[1] = getNumPartitions(maxNodeOffsets[1]);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions, mm);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions, partitionInfos);
}

partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) {
Expand Down Expand Up @@ -69,9 +72,22 @@ void PartitionerSharedState::merge(
void PartitioningBuffer::merge(std::unique_ptr<PartitioningBuffer> localPartitioningState) {
KU_ASSERT(partitions.size() == localPartitioningState->partitions.size());
for (auto partitionIdx = 0u; partitionIdx < partitions.size(); partitionIdx++) {
auto sharedPartition = partitions[partitionIdx].get();
auto localPartition = localPartitioningState->partitions[partitionIdx].get();
sharedPartition->merge(localPartition);
auto& sharedPartition = partitions[partitionIdx];
auto& localPartition = localPartitioningState->partitions[partitionIdx];
if (sharedPartition.empty()) {
sharedPartition = std::move(localPartition);
} else {
KU_ASSERT(localPartition.size() == sharedPartition.size());
for (auto i = 0u; i < sharedPartition.size(); i++) {
auto newValues =
sharedPartition[i]->getNumValues() + localPartition[i]->getNumValues();
if (newValues > sharedPartition[i]->getCapacity()) {
sharedPartition[i]->resize(std::bit_ceil(newValues));
}
sharedPartition[i]->append(
localPartition[i].get(), 0, localPartition[i]->getNumValues());
}
}
}
}

Expand All @@ -86,13 +102,13 @@ Partitioner::Partitioner(std::unique_ptr<ResultSetDescriptor> resultSetDescripto
}

void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) {
sharedState->initialize();
sharedState->initialize(infos);
}

void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* context) {
void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) {
localState = std::make_unique<PartitionerLocalState>();
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions,
context->clientContext->getMemoryManager());
initializePartitioningStates(
localState->partitioningBuffers, sharedState->numPartitions, infos);
}

static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>& columnPositions,
Expand All @@ -112,14 +128,23 @@ static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>&

void Partitioner::initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, MemoryManager* mm) {
std::vector<common::partition_idx_t> numPartitions,
const std::vector<std::unique_ptr<PartitioningInfo>>& partitionInfos) {
partitioningBuffers.resize(numPartitions.size());
for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) {
auto numPartition = numPartitions[partitioningIdx];
auto partitioningBuffer = std::make_unique<PartitioningBuffer>();
partitioningBuffer->partitions.reserve(numPartition);
for (auto i = 0u; i < numPartition; i++) {
partitioningBuffer->partitions.push_back(std::make_unique<DataChunkCollection>(mm));
std::vector<std::unique_ptr<ColumnChunk>> partition;
for (auto& type : partitionInfos[partitioningIdx]->columnTypes) {
// Some default small capacity; it will grow. 2 may be too small...
// Compression is enabled to speed up strings and reduce memory usage by
// de-duplicating values
partition.push_back(ColumnChunkFactory::createColumnChunk(
*type, true /*enableCompression*/, 2 /*capacity=*/));
}
partitioningBuffer->partitions.emplace_back(std::move(partition));
}
partitioningBuffers[partitioningIdx] = std::move(partitioningBuffer);
}
Expand All @@ -144,18 +169,20 @@ void Partitioner::executeInternal(ExecutionContext* context) {

void Partitioner::copyDataToPartitions(
partition_idx_t partitioningIdx, DataChunk* chunkToCopyFrom) {
auto originalChunkState = chunkToCopyFrom->state;
chunkToCopyFrom->state = std::make_shared<DataChunkState>(1 /* capacity */);
chunkToCopyFrom->state->selVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
for (auto i = 0u; i < originalChunkState->selVector->selectedSize; i++) {
auto posToCopyFrom = originalChunkState->selVector->selectedPositions[i];
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
KU_ASSERT(
partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size());
auto partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get();
chunkToCopyFrom->state->selVector->selectedPositions[0] = posToCopyFrom;
partition->append(*chunkToCopyFrom);
auto& partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx];
KU_ASSERT(partition.size() == chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
if (partition[i]->getNumValues() == partition[i]->getCapacity()) {
partition[i]->resize(partition[i]->getCapacity() * 2);
}
partition[i]->appendOne(chunkToCopyFrom->getValueVector(i).get(), posToCopyFrom);
}
}
}

Expand Down
Loading

0 comments on commit a5b30d6

Please sign in to comment.