Skip to content

Commit

Permalink
DX-64328 Array types for Gandiva (dremio#58)
Browse files Browse the repository at this point in the history
Add List input and output types for Gandiva functions. Add new reference implementations for array_contains and array_remove, tested via integration with Dremio. int32, int64, double and float list types have been tested.

Support List types in function specification and llvm code generation.
Pass back function type information through the expression registry.
See 1p here: https://docs.google.com/document/d/1exwXdUUnk5FqZLzVZyTdhqgwxTk0u9bL54aLVNM5Tas/edit
  • Loading branch information
lriggs committed Sep 3, 2024
1 parent 21a45e1 commit bc3fc36
Show file tree
Hide file tree
Showing 40 changed files with 2,282 additions and 109 deletions.
21 changes: 19 additions & 2 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,10 +521,27 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
return Reserve(sizeof(T) * new_nb_elements);
}

public:
uint8_t* offsetBuffer;
int64_t offsetCapacity;
uint8_t* validityBuffer;
uint8_t* outerValidityBuffer;

protected:
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {
offsetBuffer = nullptr;
offsetCapacity = 0;
validityBuffer = nullptr;
outerValidityBuffer = nullptr;

}
ResizableBuffer(uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm)
: MutableBuffer(data, size, std::move(mm)) {}
: MutableBuffer(data, size, std::move(mm)) {
offsetBuffer = nullptr;
offsetCapacity = 0;
validityBuffer = nullptr;
outerValidityBuffer = nullptr;
}
};

/// \defgroup buffer-allocation-functions Functions for allocating buffers
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/gandiva/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set_source_files_properties(${GANDIVA_PRECOMPILED_CC_PATH} PROPERTIES GENERATED

set(SRC_FILES
annotator.cc
array_ops.cc
bitmap_accumulator.cc
cache.cc
cast_time.cc
Expand All @@ -68,6 +69,7 @@ set(SRC_FILES
function_ir_builder.cc
function_registry.cc
function_registry_arithmetic.cc
function_registry_array.cc
function_registry_datetime.cc
function_registry_hash.cc
function_registry_math_ops.cc
Expand Down Expand Up @@ -244,6 +246,7 @@ endfunction()

add_gandiva_test(internals-test
SOURCES
array_ops_test.cc
bitmap_accumulator_test.cc
cache_test.cc
engine_llvm_test.cc
Expand Down
86 changes: 78 additions & 8 deletions cpp/src/gandiva/annotator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,27 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
int data_idx = buffer_count_++;
int validity_idx = buffer_count_++;
int offsets_idx = FieldDescriptor::kInvalidIdx;
int child_offsets_idx = FieldDescriptor::kInvalidIdx;
if (arrow::is_binary_like(field->type()->id())) {
offsets_idx = buffer_count_++;
}

if (field->type()->id() == arrow::Type::LIST) {
offsets_idx = buffer_count_++;
if (arrow::is_binary_like(field->type()->field(0)->type()->id())) {
child_offsets_idx = buffer_count_++;
}
}
int data_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
if (is_output) {
data_buffer_ptr_idx = buffer_count_++;
}
int child_valid_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
if (field->type()->id() == arrow::Type::LIST) {
child_valid_buffer_ptr_idx = buffer_count_++;
}
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx,
data_buffer_ptr_idx);
data_buffer_ptr_idx, child_offsets_idx, child_valid_buffer_ptr_idx);
}

int Annotator::AddHolderPointer(void* holder) {
Expand All @@ -80,17 +92,76 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
if (desc.HasOffsetsIdx()) {
uint8_t* offsets_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.offsets_idx(), offsets_buf, array_data.offset);
++buffer_idx;

if (desc.HasChildOffsetsIdx()) {
if (is_output) {
// if list field is output field, we should put buffer pointer into eval batch
// for resizing
uint8_t* child_offsets_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0].get());
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);

} else {
// if list field is input field, just put buffer data into eval batch
uint8_t* child_offsets_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0]->data());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);
}
}
if (array_data.type->id() != arrow::Type::LIST ||
arrow::is_binary_like(array_data.type->field(0)->type()->id())) {
// primitive type list data buffer index is 1
// binary like type list data buffer index is 2
++buffer_idx;
}
}

int const childDataIndex = 0;
if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
} else {
uint8_t* data_buf =
const_cast<uint8_t*>(array_data.child_data.at(childDataIndex)->buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.child_data.at(0)->offset);

int const childDataBufferIndex = 0;
if (array_data.child_data.at(childDataIndex)->buffers[childDataBufferIndex] ) {
uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(childDataIndex)->buffers[childDataBufferIndex]->data());
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf, 0);
}

}

uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
if (is_output) {
// pass in the Buffer object for output data buffers. Can be used for resizing.
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);

if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);
} else {
// list data buffer is in child data buffer
uint8_t* data_buf_ptr = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr,
array_data.child_data.at(0)->offset);
}
}

}

EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
Expand All @@ -106,7 +177,6 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
// skip columns not involved in the expression.
continue;
}

PrepareBuffersForField(*(found->second), *(record_batch.column_data(i)),
eval_batch.get(), false /*is_output*/);
}
Expand Down
Loading

0 comments on commit bc3fc36

Please sign in to comment.