Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-write partitioner to use ColumnChunks instead of ValueVectors #2979

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/data_chunk/data_chunk_collection.h"
#include "processor/operator/sink.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace storage {
Expand All @@ -20,7 +20,14 @@ struct PartitionerFunctions {
// partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD
// direction. Each partitioning method corresponds to a PartitioningState.
struct PartitioningBuffer {
std::vector<std::unique_ptr<common::DataChunkCollection>> partitions;
using ColumnChunkCollection = std::vector<std::unique_ptr<storage::ColumnChunk>>;
struct Partition {
// One chunk for each column, grouped into a list
// so that groups from different threads can be quickly merged without copying
// E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc.
std::vector<ColumnChunkCollection> chunks;
};
std::vector<Partition> partitions;

void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};
Expand Down Expand Up @@ -49,11 +56,11 @@ struct PartitionerSharedState {
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline common::DataChunkCollection* getPartitionBuffer(
inline PartitioningBuffer::Partition& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx].get();
return partitioningBuffers[partitioningIdx]->partitions[partitionIdx];
}
};

Expand Down Expand Up @@ -102,7 +109,7 @@ class Partitioner : public Sink {

static void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, storage::MemoryManager* mm);
std::vector<common::partition_idx_t> numPartitions);

private:
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
Expand All @@ -111,6 +118,9 @@ class Partitioner : public Sink {
common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
// so this should be kept relatively small to avoid allocating more memory than is needed
static const uint64_t CHUNK_SIZE = 2048;
std::vector<std::unique_ptr<PartitioningInfo>> infos;
std::shared_ptr<PartitionerSharedState> sharedState;
std::unique_ptr<PartitionerLocalState> localState;
Expand Down
9 changes: 5 additions & 4 deletions src/include/processor/operator/persistent/rel_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "common/enums/rel_direction.h"
#include "processor/operator/partitioner.h"
#include "processor/operator/persistent/batch_insert.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_group.h"

namespace kuzu {
Expand Down Expand Up @@ -60,20 +61,20 @@ class RelBatchInsert final : public BatchInsert {
}

private:
void prepareCSRNodeGroup(common::DataChunkCollection* partition,
void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition,
common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx,
common::offset_t numNodes);

static common::length_t getGapSize(common::length_t length);
static std::vector<common::offset_t> populateStartCSROffsetsAndLengths(
storage::CSRHeaderChunks& csrHeader, common::offset_t numNodes,
common::DataChunkCollection* partition, common::vector_idx_t offsetVectorIdx);
PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx);
static void populateEndCSROffsets(
storage::CSRHeaderChunks& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
common::ValueVector* vector, common::offset_t startOffset);
storage::ColumnChunk& chunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
common::ValueVector* offsetVector, storage::ColumnChunk* offsetChunk);
storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);

// We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now.
std::optional<common::offset_t> checkRelMultiplicityConstraint(
Expand Down
13 changes: 8 additions & 5 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ColumnChunk {
virtual ColumnChunkMetadata getMetadataToFlush() const;

virtual void append(common::ValueVector* vector);
virtual void appendOne(common::ValueVector* vector, common::vector_idx_t pos);
virtual void append(
ColumnChunk* other, common::offset_t startPosInOtherChunk, uint32_t numValuesToAppend);

Expand All @@ -73,8 +74,7 @@ class ColumnChunk {
// `offsetInVector`, we should flatten the vector to pos at `offsetInVector`.
virtual void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk);
virtual void write(
common::ValueVector* vector, common::ValueVector* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* chunk, ColumnChunk* offsetsInChunk, bool isCSR);
virtual void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy);
virtual void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down Expand Up @@ -151,12 +151,12 @@ class BoolColumnChunk : public ColumnChunk {
enableCompression, hasNullChunk) {}

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine temporarily, but we should refactor the two interfaces append(common::ValueVector* vector) and appendOne(common::ValueVector* vector, common::vector_idx_t pos) into a single one by passing SelVector as an argument,

void append(common::ValueVector* vector, SelVector& sel);

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) override;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
};
Expand Down Expand Up @@ -206,8 +206,11 @@ class NullColumnChunk final : public BoolColumnChunk {
};

struct ColumnChunkFactory {
// inMemory starts string column chunk dictionaries at zero instead of reserving space for
// values to grow
static std::unique_ptr<ColumnChunk> createColumnChunk(common::LogicalType dataType,
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE,
bool inMemory = false);

static std::unique_ptr<ColumnChunk> createNullColumnChunk(
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) {
Expand Down
15 changes: 6 additions & 9 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "common/column_data_format.h"
#include "common/data_chunk/data_chunk.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand Down Expand Up @@ -32,15 +31,14 @@
uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::DataChunkState* columnState, uint64_t numValuesToAppend);
common::offset_t append(NodeGroup* other, common::offset_t offsetInOtherNodeGroup);
void write(common::DataChunk* dataChunk, common::vector_idx_t offsetVector);
void write(std::vector<std::unique_ptr<ColumnChunk>>& data, common::vector_idx_t offsetVector);

void finalize(uint64_t nodeGroupIdx_);

virtual inline void writeToColumnChunk(common::vector_idx_t chunkIdx,
common::vector_idx_t vectorIdx, common::DataChunk* dataChunk,
common::ValueVector* offsetVector) {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, false /* isCSR */);
common::vector_idx_t vectorIdx, std::vector<std::unique_ptr<ColumnChunk>>& data,
ColumnChunk& offsetChunk) {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, false /*isCSR*/);

Check warning on line 41 in src/include/storage/store/node_group.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/store/node_group.h#L41

Added line #L41 was not covered by tests
}

protected:
Expand Down Expand Up @@ -81,9 +79,8 @@
const CSRHeaderChunks& getCSRHeader() const { return csrHeaderChunks; }

inline void writeToColumnChunk(common::vector_idx_t chunkIdx, common::vector_idx_t vectorIdx,
common::DataChunk* dataChunk, common::ValueVector* offsetVector) override {
chunks[chunkIdx]->write(
dataChunk->getValueVector(vectorIdx).get(), offsetVector, true /* isCSR */);
std::vector<std::unique_ptr<ColumnChunk>>& data, ColumnChunk& offsetChunk) override {
chunks[chunkIdx]->write(data[vectorIdx].get(), &offsetChunk, true /* isCSR */);
}

private:
Expand Down
11 changes: 7 additions & 4 deletions src/include/storage/store/string_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
#pragma once

#include "common/assert.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"
#include "storage/store/dictionary_chunk.h"

namespace kuzu {
namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
StringColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

void resetToEmpty() final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand All @@ -43,7 +46,7 @@ class StringColumnChunk : public ColumnChunk {
void appendStringColumnChunk(StringColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend);

void setValueFromString(const char* value, uint64_t length, uint64_t pos);
void setValueFromString(std::string_view value, uint64_t pos);

private:
std::unique_ptr<DictionaryChunk> dictionaryChunk;
Expand Down
9 changes: 6 additions & 3 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#pragma once

#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
StructColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

inline ColumnChunk* getChild(common::vector_idx_t childIdx) {
KU_ASSERT(childIdx < childChunks.size());
Expand All @@ -20,11 +23,11 @@ class StructColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;
void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;

void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
7 changes: 4 additions & 3 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ struct VarListDataColumnChunk {
class VarListColumnChunk : public ColumnChunk {

public:
VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);
VarListColumnChunk(
common::LogicalType dataType, uint64_t capacity, bool enableCompression, bool inMemory);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk->dataColumnChunk.get();
Expand All @@ -36,11 +37,11 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
// Note: `write` assumes that no `append` will be called afterward.
void write(common::ValueVector* vector, common::offset_t offsetInVector,
common::offset_t offsetInChunk) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector,
bool isCSR) final;
void write(ColumnChunk* chunk, ColumnChunk* dstOffsets, bool isCSR) final;
void write(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
common::offset_t dstOffsetInChunk, common::offset_t numValuesToCopy) override;
void copy(ColumnChunk* srcChunk, common::offset_t srcOffsetInChunk,
Expand Down
54 changes: 36 additions & 18 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "processor/operator/partitioner.h"

#include <cstdint>

#include "common/constants.h"
#include "processor/execution_context.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_table.h"

using namespace kuzu::common;
Expand Down Expand Up @@ -42,7 +47,7 @@ void PartitionerSharedState::initialize() {
numPartitions.resize(2);
numPartitions[0] = getNumPartitions(maxNodeOffsets[0]);
numPartitions[1] = getNumPartitions(maxNodeOffsets[1]);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions, mm);
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions);
}

partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) {
Expand Down Expand Up @@ -71,9 +76,13 @@ void PartitionerSharedState::merge(
void PartitioningBuffer::merge(std::unique_ptr<PartitioningBuffer> localPartitioningState) {
KU_ASSERT(partitions.size() == localPartitioningState->partitions.size());
for (auto partitionIdx = 0u; partitionIdx < partitions.size(); partitionIdx++) {
auto sharedPartition = partitions[partitionIdx].get();
auto localPartition = localPartitioningState->partitions[partitionIdx].get();
sharedPartition->merge(localPartition);
auto& sharedPartition = partitions[partitionIdx];
auto& localPartition = localPartitioningState->partitions[partitionIdx];
sharedPartition.chunks.reserve(
sharedPartition.chunks.size() + localPartition.chunks.size());
for (auto j = 0u; j < localPartition.chunks.size(); j++) {
sharedPartition.chunks.push_back(std::move(localPartition.chunks[j]));
}
}
}

Expand All @@ -91,10 +100,9 @@ void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) {
sharedState->initialize();
}

void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* context) {
void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) {
localState = std::make_unique<PartitionerLocalState>();
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions,
context->clientContext->getMemoryManager());
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions);
}

static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>& columnPositions,
Expand All @@ -114,14 +122,14 @@ static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>&

void Partitioner::initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions, MemoryManager* mm) {
std::vector<common::partition_idx_t> numPartitions) {
partitioningBuffers.resize(numPartitions.size());
for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) {
auto numPartition = numPartitions[partitioningIdx];
auto partitioningBuffer = std::make_unique<PartitioningBuffer>();
partitioningBuffer->partitions.reserve(numPartition);
for (auto i = 0u; i < numPartition; i++) {
partitioningBuffer->partitions.push_back(std::make_unique<DataChunkCollection>(mm));
partitioningBuffer->partitions.emplace_back();
}
partitioningBuffers[partitioningIdx] = std::move(partitioningBuffer);
}
Expand All @@ -146,18 +154,28 @@ void Partitioner::executeInternal(ExecutionContext* context) {

void Partitioner::copyDataToPartitions(
partition_idx_t partitioningIdx, DataChunk* chunkToCopyFrom) {
auto originalChunkState = chunkToCopyFrom->state;
chunkToCopyFrom->state = std::make_shared<DataChunkState>(1 /* capacity */);
chunkToCopyFrom->state->selVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
for (auto i = 0u; i < originalChunkState->selVector->selectedSize; i++) {
auto posToCopyFrom = originalChunkState->selVector->selectedPositions[i];
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
KU_ASSERT(
partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size());
auto partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get();
chunkToCopyFrom->state->selVector->selectedPositions[0] = posToCopyFrom;
partition->append(*chunkToCopyFrom);
auto& partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx];
if (partition.chunks.empty() || partition.chunks.back()[0]->getNumValues() + 1 >
partition.chunks.back()[0]->getCapacity()) {
partition.chunks.emplace_back();
partition.chunks.back().reserve(chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk(
chunkToCopyFrom->getValueVector(i)->dataType, false /*enableCompression*/,
Partitioner::CHUNK_SIZE));
}
}
KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom->getNumValueVectors());
for (auto i = 0u; i < chunkToCopyFrom->getNumValueVectors(); i++) {
partition.chunks.back()[i]->appendOne(
chunkToCopyFrom->getValueVector(i).get(), posToCopyFrom);
}
}
}

Expand Down
Loading
Loading