Skip to content

Commit

Permalink
pyarrow backend scanning for pandas
Browse files Browse the repository at this point in the history
formatting checks

remove disabled tests

more clang format fixes...

py lint check

clang tidy

more clang tidy and py lint checks

more and more clang tidy

explicit pyarrow scan ctor

possibly fixed tests not running?

CI fixes

fix pytest

non portable type resolution solution

apple clang test fix?

add some requested changes

apply backend switching

remove fixed list

update httpfs version & remove python-debug

clang tidy

fix array arithmetic on void

revert extension version change

apply clang-tidy

fix compiler errors

... more clang-tidy fixes

added requested changes

clang-tidy
  • Loading branch information
mxwli committed Mar 21, 2024
1 parent 752871d commit 94d30b2
Show file tree
Hide file tree
Showing 27 changed files with 1,485 additions and 49 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ endif ()
if(${BUILD_KUZU})
add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}")
add_definitions(-DKUZU_CMAKE_VERSION="${CMAKE_PROJECT_VERSION}")
add_definitions(-DKUZU_EXTENSION_VERSION="0.2.0")
add_definitions(-DKUZU_EXTENSION_VERSION="0.2.2")

include_directories(src/include)
include_directories(third_party/antlr4_cypher/include)
Expand Down
1 change: 0 additions & 1 deletion src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include "catalog/catalog_entry/rel_table_catalog_entry.h"
#include "common/enums/table_type.h"
#include "common/exception/binder.h"
#include "common/exception/message.h"
#include "common/string_format.h"
#include "function/table/bind_input.h"
#include "main/client_context.h"
Expand Down
1 change: 0 additions & 1 deletion src/binder/bind_expression/bind_variable_expression.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "binder/binder.h"
#include "binder/expression/literal_expression.h"
#include "binder/expression/variable_expression.h"
#include "binder/expression_binder.h"
#include "common/exception/binder.h"
Expand Down
5 changes: 4 additions & 1 deletion src/common/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
add_library(kuzu_common_arrow
OBJECT
arrow_array_scan.cpp
arrow_converter.cpp
arrow_null_mask_tree.cpp
arrow_row_batch.cpp
arrow_converter.cpp)
arrow_type.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_common_arrow>
Expand Down
549 changes: 549 additions & 0 deletions src/common/arrow/arrow_array_scan.cpp

Large diffs are not rendered by default.

214 changes: 214 additions & 0 deletions src/common/arrow/arrow_null_mask_tree.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
#include <vector>

#include "common/arrow/arrow.h"
#include "common/arrow/arrow_nullmask_tree.h"

namespace kuzu {
namespace common {

// scans are based on data specification found here
// https://arrow.apache.org/docs/format/Columnar.html

// all offsets are measured by value, not physical size

void ArrowNullMaskTree::copyToValueVector(ValueVector* vec, uint64_t dstOffset, uint64_t count) {
vec->setNullFromBits(mask->getData(), offset, dstOffset, count);
}

ArrowNullMaskTree ArrowNullMaskTree::operator+(int64_t offset) {
// this operation is mostly a special case for dictionary/run-end encoding
ArrowNullMaskTree ret(*this);
ret.offset += offset;
return ret;
}

bool ArrowNullMaskTree::copyFromBuffer(const void* buffer, uint64_t srcOffset, uint64_t count) {
if (buffer == nullptr) {
mask->setAllNonNull();
return false;
}
mask->copyFromNullBits((const uint64_t*)buffer, srcOffset, 0, count, true);
return true;
}

bool ArrowNullMaskTree::applyParentBitmap(const NullMask* parent, uint64_t count) {
if (parent == nullptr) {
return false;
}
const uint64_t* buffer = parent->data;
if (buffer != nullptr) {
for (int64_t i = 0; i < (count >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2); i++) {
mask->buffer[i] |= buffer[i];
}
return true;
}
return false;
}

template<typename offsetsT>
void ArrowNullMaskTree::scanListPushDown(
const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, uint64_t count) {
const offsetsT* offsets = ((const offsetsT*)array->buffers[1]) + srcOffset;
offsetsT auxiliaryLength = offsets[count] - offsets[0];
NullMask pushDownMask((auxiliaryLength + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >>
NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2);
for (int64_t i = 0; i < count; i++) {
pushDownMask.setNullFromRange(offsets[i], offsets[i + 1] - offsets[i], isNull(i));
}
children->push_back(ArrowNullMaskTree(
schema->children[0], array->children[0], offsets[0], auxiliaryLength, &pushDownMask));
}

void ArrowNullMaskTree::scanStructPushDown(
const ArrowSchema* schema, const ArrowArray* array, uint64_t srcOffset, uint64_t count) {
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(
schema->children[i], array->children[i], srcOffset, count, mask.get()));
}
}

ArrowNullMaskTree::ArrowNullMaskTree(const ArrowSchema* schema, const ArrowArray* array,
uint64_t srcOffset, uint64_t count, const NullMask* parentBitmap)
: offset{0}, mask{std::make_shared<common::NullMask>(
(count + NullMask::NUM_BITS_PER_NULL_ENTRY - 1) >>
NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2)},
children(std::make_shared<std::vector<ArrowNullMaskTree>>()) {
if (schema->dictionary != nullptr) {
copyFromBuffer(array->buffers[0], srcOffset, count);
applyParentBitmap(parentBitmap, count);
dictionary = std::make_shared<ArrowNullMaskTree>(schema->dictionary, array->dictionary,
array->dictionary->offset, array->dictionary->length);
return;
}
const char* arrowType = schema->format;
std::vector<common::StructField> structFields;
switch (arrowType[0]) {
case 'n':
mask->setAllNull();
break;
case 'b':
case 'c':
case 'C':
case 's':
case 'S':
case 'i':
case 'I':
case 'l':
case 'L':
case 'f':
case 'g':
copyFromBuffer(array->buffers[0], srcOffset, count);
break;
case 'z':
case 'Z':
case 'u':
case 'U':
case 'v':
case 'w':
case 't':
copyFromBuffer(array->buffers[0], srcOffset, count);
applyParentBitmap(parentBitmap, count);
break;
case '+':
switch (arrowType[1]) {
case 'l':
copyFromBuffer(array->buffers[0], srcOffset, count);
applyParentBitmap(parentBitmap, count);
scanListPushDown<int32_t>(schema, array, srcOffset, count);
break;
case 'L':
copyFromBuffer(array->buffers[0], srcOffset, count);
applyParentBitmap(parentBitmap, count);
scanListPushDown<int64_t>(schema, array, srcOffset, count);
break;
case 'w':
// TODO manh: array null resolution
KU_UNREACHABLE;
copyFromBuffer(array->buffers[0], srcOffset, count);
applyParentBitmap(parentBitmap, count);
break;
case 's':
copyFromBuffer(array->buffers[0], srcOffset, count);
applyParentBitmap(parentBitmap, count);
scanStructPushDown(schema, array, srcOffset, count);
break;
case 'm':
// TODO maxwell bind map types
KU_UNREACHABLE;
case 'u': {
const int8_t* types = (const int8_t*)array->buffers[0];
if (schema->format[2] == 'd') {
const int32_t* offsets = (const int32_t*)array->buffers[1];
std::vector<int32_t> countChildren(array->n_children),
lowestOffsets(array->n_children);
std::vector<int32_t> highestOffsets(array->n_children);
for (int64_t i = srcOffset; i < srcOffset + count; i++) {
int32_t curOffset = offsets[i];
int32_t curType = types[i];
if (countChildren[curType] == 0) {
lowestOffsets[curType] = curOffset;
}
highestOffsets[curType] = curOffset;
countChildren[curType]++;
}
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i],
lowestOffsets[i], highestOffsets[i] - lowestOffsets[i]));
}
for (int64_t i = srcOffset; i < srcOffset + count; i++) {
int32_t curOffset = offsets[i];
int8_t curType = types[i];
mask->setNull(i, children->operator[](curType).isNull(curOffset));
}
} else {
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(
ArrowNullMaskTree(schema->children[i], array->children[i], srcOffset, count));
}
for (int64_t i = srcOffset; i < srcOffset + count; i++) {
int8_t curType = types[i];
mask->setNull(i, children->operator[](curType).isNull(i));
// this isn't specified in the arrow specification, but is it valid to
// compute this using a bitwise OR?
}
}
if (parentBitmap != nullptr) {
for (int64_t i = 0; i < count >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; i++) {
mask->buffer[i] |= parentBitmap->buffer[i];
}
}
} break;
case 'v':
// list views *suck*, especially when trying to write code that can support
// parallelization for this, we generate child NullMaskTrees on the fly, rather than
// attempt any precomputation
if (array->buffers[0] == nullptr) {
mask->setAllNonNull();
} else {
mask->copyFromNullBits((const uint64_t*)array->buffers[0], srcOffset, 0, count, true);
}
if (parentBitmap != nullptr) {
for (int64_t i = 0; i < count >> NullMask::NUM_BITS_PER_NULL_ENTRY_LOG2; i++) {
mask->buffer[i] |= parentBitmap->buffer[i];
}
}
break;
case 'r':
// it's better to resolve validity during the actual scanning for run-end encoded arrays
// so for this, let's just resolve child validities and move on
for (int64_t i = 0; i < array->n_children; i++) {
children->push_back(ArrowNullMaskTree(schema->children[i], array->children[i],
array->children[i]->offset, array->children[i]->length));
}
break;
default:
KU_UNREACHABLE;
}
break;
default:
KU_UNREACHABLE;
}
}

} // namespace common
} // namespace kuzu
149 changes: 149 additions & 0 deletions src/common/arrow/arrow_type.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#include "common/arrow/arrow_converter.h"
#include "common/exception/not_implemented.h"
#include "common/exception/runtime.h"

namespace kuzu {
namespace common {

// pyarrow format string specifications can be found here
// https://arrow.apache.org/docs/format/CDataInterface.html#data-type-description-format-strings

LogicalType ArrowConverter::fromArrowSchema(const ArrowSchema* schema) {
const char* arrowType = schema->format;
std::vector<StructField> structFields;
// if we have a dictionary, then the logical type of the column is dependent upon the
// logical type of the dict
if (schema->dictionary != nullptr) {
return fromArrowSchema(schema->dictionary);
}
switch (arrowType[0]) {
case 'n':
return LogicalType(LogicalTypeID::ANY);
case 'b':
return LogicalType(LogicalTypeID::BOOL);
case 'c':
return LogicalType(LogicalTypeID::INT8);
case 'C':
return LogicalType(LogicalTypeID::UINT8);
case 's':
return LogicalType(LogicalTypeID::INT16);
case 'S':
return LogicalType(LogicalTypeID::UINT16);
case 'i':
return LogicalType(LogicalTypeID::INT32);
case 'I':
return LogicalType(LogicalTypeID::UINT32);
case 'l':
return LogicalType(LogicalTypeID::INT64);
case 'L':
return LogicalType(LogicalTypeID::UINT64);
case 'f':
return LogicalType(LogicalTypeID::FLOAT);
case 'g':
return LogicalType(LogicalTypeID::DOUBLE);
case 'z':
case 'Z':
return LogicalType(LogicalTypeID::BLOB);
case 'u':
case 'U':
return LogicalType(LogicalTypeID::STRING);
case 'v':
switch (arrowType[1]) {
case 'z':
return LogicalType(LogicalTypeID::BLOB);
case 'u':
return LogicalType(LogicalTypeID::STRING);
default:
KU_UNREACHABLE;
}

case 'd':
throw NotImplementedException("custom bitwidth decimals are not supported");
case 'w':
return LogicalType(LogicalTypeID::BLOB); // fixed width binary
case 't':
switch (arrowType[1]) {
case 'd':
if (arrowType[2] == 'D') {
return LogicalType(LogicalTypeID::DATE);
} else {
return LogicalType(LogicalTypeID::TIMESTAMP_MS);
}
case 't':
// TODO implement pure time type
throw NotImplementedException("Pure time types are not supported");
case 's':
// TODO maxwell: timezone support
switch (arrowType[2]) {
case 's':
return LogicalType(LogicalTypeID::TIMESTAMP_SEC);
case 'm':
return LogicalType(LogicalTypeID::TIMESTAMP_MS);
case 'u':
return LogicalType(LogicalTypeID::TIMESTAMP);
case 'n':
return LogicalType(LogicalTypeID::TIMESTAMP_NS);
default:
KU_UNREACHABLE;
}
case 'D':
// duration
case 'i':
// interval
return LogicalType(LogicalTypeID::INTERVAL);
default:
KU_UNREACHABLE;
}
case '+':
KU_ASSERT(schema->n_children > 0);
switch (arrowType[1]) {
// complex types need a complementary ExtraTypeInfo object
case 'l':
case 'L':
return *LogicalType::VAR_LIST(
std::make_unique<LogicalType>(fromArrowSchema(schema->children[0])));
case 'w':
throw RuntimeException("Fixed list is currently WIP.");
// TODO Manh: Array Binding
// return *LogicalType::FIXED_LIST(
// std::make_unique<LogicalType>(fromArrowSchema(schema->children[0])),
// std::stoi(arrowType+3));
case 's':
for (int64_t i = 0; i < schema->n_children; i++) {
structFields.emplace_back(std::string(schema->children[i]->name),
std::make_unique<LogicalType>(fromArrowSchema(schema->children[i])));
}
return *LogicalType::STRUCT(std::move(structFields));
case 'm':
// TODO maxwell bind map types
throw NotImplementedException("Scanning Arrow Map types is not supported");
case 'u':
throw RuntimeException("Unions are currently WIP.");
for (int64_t i = 0; i < schema->n_children; i++) {
structFields.emplace_back(std::string(schema->children[i]->name),
std::make_unique<LogicalType>(fromArrowSchema(schema->children[i])));
}
return *LogicalType::UNION(std::move(structFields));
case 'v':
switch (arrowType[2]) {
case 'l':
case 'L':
return *LogicalType::VAR_LIST(
std::make_unique<LogicalType>(fromArrowSchema(schema->children[0])));
default:
KU_UNREACHABLE;
}
case 'r':
// logical type corresponds to second child
return fromArrowSchema(schema->children[1]);
default:
KU_UNREACHABLE;
}
default:
KU_UNREACHABLE;
}
// refer to arrow_converted.cpp:65
}

} // namespace common
} // namespace kuzu
Loading

0 comments on commit 94d30b2

Please sign in to comment.