From 09ab2327d7a8168b0b05c9a984b867a50f604114 Mon Sep 17 00:00:00 2001 From: Tao He Date: Fri, 17 Feb 2023 20:14:18 +0800 Subject: [PATCH] Cast StringArray to LargeStringArray otherwise we will fill when we need to contenate chunks Signed-off-by: Tao He --- include/gar/utils/convert_to_arrow_type.h | 6 +- src/data_type.cc | 4 +- src/filesystem.cc | 81 +++++++++++++++++++++++ src/utils.cc | 2 +- 4 files changed, 88 insertions(+), 5 deletions(-) diff --git a/include/gar/utils/convert_to_arrow_type.h b/include/gar/utils/convert_to_arrow_type.h index e4cba9a32..ea3b72c9d 100644 --- a/include/gar/utils/convert_to_arrow_type.h +++ b/include/gar/utils/convert_to_arrow_type.h @@ -53,9 +53,9 @@ CONVERT_TO_ARROW_TYPE(Type::FLOAT, float, arrow::FloatType, arrow::FloatArray, CONVERT_TO_ARROW_TYPE(Type::DOUBLE, double, arrow::DoubleType, arrow::DoubleArray, arrow::DoubleBuilder, arrow::float64(), "double") -CONVERT_TO_ARROW_TYPE(Type::STRING, std::string, arrow::StringType, - arrow::StringArray, arrow::StringBuilder, arrow::utf8(), - "string") +CONVERT_TO_ARROW_TYPE(Type::STRING, std::string, arrow::LargeStringType, + arrow::LargeStringArray, arrow::LargeStringBuilder, + arrow::large_utf8(), "string") } // namespace GAR_NAMESPACE_INTERNAL diff --git a/src/data_type.cc b/src/data_type.cc index 7a4422d1e..da2c11736 100644 --- a/src/data_type.cc +++ b/src/data_type.cc @@ -36,7 +36,7 @@ std::shared_ptr DataType::DataTypeToArrowDataType( case Type::DOUBLE: return arrow::float64(); case Type::STRING: - return arrow::utf8(); + return arrow::large_utf8(); default: throw std::runtime_error("Unsupported data type"); } @@ -57,6 +57,8 @@ DataType DataType::ArrowDataTypeToDataType( return DataType(Type::DOUBLE); case arrow::Type::STRING: return DataType(Type::STRING); + case arrow::Type::LARGE_STRING: + return DataType(Type::STRING); default: throw std::runtime_error("Unsupported data type"); } diff --git a/src/filesystem.cc b/src/filesystem.cc index b1cf86324..1e69b99fd 100644 --- a/src/filesystem.cc +++ b/src/filesystem.cc @@ -27,6 +27,54 @@ limitations under the License. namespace GAR_NAMESPACE_INTERNAL { +namespace detail { +template +static Status CastToLargeOffsetArray( + const std::shared_ptr& in, + const std::shared_ptr& to_type, + std::shared_ptr& out) { // NOLINT(runtime/references) + auto array_data = in->data()->Copy(); + auto offset = array_data->buffers[1]; + using from_offset_type = typename U::offset_type; + using to_string_offset_type = typename T::offset_type; + auto raw_value_offsets_ = + offset == NULLPTR + ? NULLPTR + : reinterpret_cast(offset->data()); + std::vector to_offset(offset->size() / + sizeof(from_offset_type)); + for (size_t i = 0; i < to_offset.size(); ++i) { + to_offset[i] = raw_value_offsets_[i]; + } + std::shared_ptr buffer; + arrow::TypedBufferBuilder buffer_builder; + RETURN_NOT_ARROW_OK( + buffer_builder.Append(to_offset.data(), to_offset.size())); + RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer)); + array_data->type = to_type; + array_data->buffers[1] = buffer; + out = arrow::MakeArray(array_data); + RETURN_NOT_ARROW_OK(out->ValidateFull()); + return Status::OK(); +} + +template +static Status CastToLargeOffsetArray( + const std::shared_ptr& in, + const std::shared_ptr& to_type, + std::shared_ptr& out) { // NOLINT(runtime/references) + std::vector> chunks; + for (auto const& chunk : in->chunks()) { + std::shared_ptr array; + auto status = CastToLargeOffsetArray(chunk, to_type, array); + GAR_RETURN_NOT_OK(status); + chunks.emplace_back(array); + } + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks)); + return Status::OK(); +} +} // namespace detail + Result> FileSystem::ReadFileToTable( const std::string& path, FileType file_type) const noexcept { arrow::MemoryPool* pool = arrow::default_memory_pool(); @@ -64,6 +112,39 @@ Result> FileSystem::ReadFileToTable( default: return Status::Invalid("File type is invalid."); } + // cast string array to large string array as we need concatenate chunks in + // some places, e.g., in vineyard + for (int i = 0; i < table->num_columns(); ++i) { + std::shared_ptr type = table->column(i)->type(); + if (type->id() == arrow::Type::STRING) { + type = arrow::large_utf8(); + } else if (type->id() == arrow::Type::BINARY) { + type = arrow::large_binary(); + } + if (type->Equals(table->column(i)->type())) { + continue; + } + // do casting + auto field = table->field(i)->WithType(type); + std::shared_ptr chunked_array; + if (type->Equals(arrow::large_utf8())) { + auto status = detail::CastToLargeOffsetArray( + table->column(i), type, chunked_array); + GAR_RETURN_NOT_OK(status); + } else if (type->Equals(arrow::large_binary())) { + auto status = detail::CastToLargeOffsetArray( + table->column(i), type, chunked_array); + GAR_RETURN_NOT_OK(status); + } else { + // noop + chunked_array = table->column(i); + } + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i)); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN( + table, table->AddColumn(i, field, chunked_array)); + } return table; } diff --git a/src/utils.cc b/src/utils.cc index 4194e2b7c..eb3a88595 100644 --- a/src/utils.cc +++ b/src/utils.cc @@ -93,7 +93,7 @@ Result GetArrowArrayData( std::string ValueGetter::Value(const void* data, int64_t offset) { return std::string( - reinterpret_cast(data)->GetView(offset)); + reinterpret_cast(data)->GetView(offset)); } } // namespace util