Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cast StringArray to LargeStringArray otherwise we will fill when we contenate chunks #105

Merged
merged 1 commit into from
Feb 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/gar/utils/convert_to_arrow_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ CONVERT_TO_ARROW_TYPE(Type::FLOAT, float, arrow::FloatType, arrow::FloatArray,
CONVERT_TO_ARROW_TYPE(Type::DOUBLE, double, arrow::DoubleType,
arrow::DoubleArray, arrow::DoubleBuilder,
arrow::float64(), "double")
CONVERT_TO_ARROW_TYPE(Type::STRING, std::string, arrow::StringType,
arrow::StringArray, arrow::StringBuilder, arrow::utf8(),
"string")
CONVERT_TO_ARROW_TYPE(Type::STRING, std::string, arrow::LargeStringType,
arrow::LargeStringArray, arrow::LargeStringBuilder,
arrow::large_utf8(), "string")

} // namespace GAR_NAMESPACE_INTERNAL

Expand Down
4 changes: 3 additions & 1 deletion src/data_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::shared_ptr<arrow::DataType> DataType::DataTypeToArrowDataType(
case Type::DOUBLE:
return arrow::float64();
case Type::STRING:
return arrow::utf8();
return arrow::large_utf8();
default:
throw std::runtime_error("Unsupported data type");
}
Expand All @@ -57,6 +57,8 @@ DataType DataType::ArrowDataTypeToDataType(
return DataType(Type::DOUBLE);
case arrow::Type::STRING:
return DataType(Type::STRING);
case arrow::Type::LARGE_STRING:
return DataType(Type::STRING);
default:
throw std::runtime_error("Unsupported data type");
}
Expand Down
81 changes: 81 additions & 0 deletions src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,54 @@ limitations under the License.

namespace GAR_NAMESPACE_INTERNAL {

namespace detail {
template <typename U, typename T>
static Status CastToLargeOffsetArray(
const std::shared_ptr<arrow::Array>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::Array>& out) { // NOLINT(runtime/references)
auto array_data = in->data()->Copy();
auto offset = array_data->buffers[1];
using from_offset_type = typename U::offset_type;
using to_string_offset_type = typename T::offset_type;
auto raw_value_offsets_ =
offset == NULLPTR
? NULLPTR
: reinterpret_cast<const from_offset_type*>(offset->data());
std::vector<to_string_offset_type> to_offset(offset->size() /
sizeof(from_offset_type));
for (size_t i = 0; i < to_offset.size(); ++i) {
to_offset[i] = raw_value_offsets_[i];
}
std::shared_ptr<arrow::Buffer> buffer;
arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
RETURN_NOT_ARROW_OK(
buffer_builder.Append(to_offset.data(), to_offset.size()));
RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
array_data->type = to_type;
array_data->buffers[1] = buffer;
out = arrow::MakeArray(array_data);
RETURN_NOT_ARROW_OK(out->ValidateFull());
return Status::OK();
}

template <typename U, typename T>
static Status CastToLargeOffsetArray(
const std::shared_ptr<arrow::ChunkedArray>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::ChunkedArray>& out) { // NOLINT(runtime/references)
std::vector<std::shared_ptr<arrow::Array>> chunks;
for (auto const& chunk : in->chunks()) {
std::shared_ptr<arrow::Array> array;
auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
GAR_RETURN_NOT_OK(status);
chunks.emplace_back(array);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
return Status::OK();
}
} // namespace detail

Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
const std::string& path, FileType file_type) const noexcept {
arrow::MemoryPool* pool = arrow::default_memory_pool();
Expand Down Expand Up @@ -64,6 +112,39 @@ Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
default:
return Status::Invalid("File type is invalid.");
}
// cast string array to large string array as we need concatenate chunks in
// some places, e.g., in vineyard
for (int i = 0; i < table->num_columns(); ++i) {
std::shared_ptr<arrow::DataType> type = table->column(i)->type();
if (type->id() == arrow::Type::STRING) {
type = arrow::large_utf8();
} else if (type->id() == arrow::Type::BINARY) {
type = arrow::large_binary();
}
if (type->Equals(table->column(i)->type())) {
continue;
}
// do casting
auto field = table->field(i)->WithType(type);
std::shared_ptr<arrow::ChunkedArray> chunked_array;
if (type->Equals(arrow::large_utf8())) {
auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
arrow::LargeStringArray>(
table->column(i), type, chunked_array);
GAR_RETURN_NOT_OK(status);
} else if (type->Equals(arrow::large_binary())) {
auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
arrow::LargeBinaryArray>(
table->column(i), type, chunked_array);
GAR_RETURN_NOT_OK(status);
} else {
// noop
chunked_array = table->column(i);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
table, table->AddColumn(i, field, chunked_array));
}
return table;
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Result<const void*> GetArrowArrayData(

std::string ValueGetter<std::string>::Value(const void* data, int64_t offset) {
return std::string(
reinterpret_cast<const arrow::StringArray*>(data)->GetView(offset));
reinterpret_cast<const arrow::LargeStringArray*>(data)->GetView(offset));
}

} // namespace util
Expand Down