Skip to content

Commit

Permalink
Cast StringArray to LargeStringArray otherwise we will fill when we n…
Browse files Browse the repository at this point in the history
…eed to contenate chunks

Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow committed Feb 17, 2023
1 parent d2c0818 commit 09ab232
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 5 deletions.
6 changes: 3 additions & 3 deletions include/gar/utils/convert_to_arrow_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ CONVERT_TO_ARROW_TYPE(Type::FLOAT, float, arrow::FloatType, arrow::FloatArray,
CONVERT_TO_ARROW_TYPE(Type::DOUBLE, double, arrow::DoubleType,
arrow::DoubleArray, arrow::DoubleBuilder,
arrow::float64(), "double")
CONVERT_TO_ARROW_TYPE(Type::STRING, std::string, arrow::StringType,
arrow::StringArray, arrow::StringBuilder, arrow::utf8(),
"string")
CONVERT_TO_ARROW_TYPE(Type::STRING, std::string, arrow::LargeStringType,
arrow::LargeStringArray, arrow::LargeStringBuilder,
arrow::large_utf8(), "string")

} // namespace GAR_NAMESPACE_INTERNAL

Expand Down
4 changes: 3 additions & 1 deletion src/data_type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::shared_ptr<arrow::DataType> DataType::DataTypeToArrowDataType(
case Type::DOUBLE:
return arrow::float64();
case Type::STRING:
return arrow::utf8();
return arrow::large_utf8();
default:
throw std::runtime_error("Unsupported data type");
}
Expand All @@ -57,6 +57,8 @@ DataType DataType::ArrowDataTypeToDataType(
return DataType(Type::DOUBLE);
case arrow::Type::STRING:
return DataType(Type::STRING);
case arrow::Type::LARGE_STRING:
return DataType(Type::STRING);
default:
throw std::runtime_error("Unsupported data type");
}
Expand Down
81 changes: 81 additions & 0 deletions src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,54 @@ limitations under the License.

namespace GAR_NAMESPACE_INTERNAL {

namespace detail {
template <typename U, typename T>
static Status CastToLargeOffsetArray(
const std::shared_ptr<arrow::Array>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::Array>& out) { // NOLINT(runtime/references)
auto array_data = in->data()->Copy();
auto offset = array_data->buffers[1];
using from_offset_type = typename U::offset_type;
using to_string_offset_type = typename T::offset_type;
auto raw_value_offsets_ =
offset == NULLPTR
? NULLPTR
: reinterpret_cast<const from_offset_type*>(offset->data());
std::vector<to_string_offset_type> to_offset(offset->size() /
sizeof(from_offset_type));
for (size_t i = 0; i < to_offset.size(); ++i) {
to_offset[i] = raw_value_offsets_[i];
}
std::shared_ptr<arrow::Buffer> buffer;
arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
RETURN_NOT_ARROW_OK(
buffer_builder.Append(to_offset.data(), to_offset.size()));
RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
array_data->type = to_type;
array_data->buffers[1] = buffer;
out = arrow::MakeArray(array_data);
RETURN_NOT_ARROW_OK(out->ValidateFull());
return Status::OK();
}

template <typename U, typename T>
static Status CastToLargeOffsetArray(
const std::shared_ptr<arrow::ChunkedArray>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::ChunkedArray>& out) { // NOLINT(runtime/references)
std::vector<std::shared_ptr<arrow::Array>> chunks;
for (auto const& chunk : in->chunks()) {
std::shared_ptr<arrow::Array> array;
auto status = CastToLargeOffsetArray<U, T>(chunk, to_type, array);
GAR_RETURN_NOT_OK(status);
chunks.emplace_back(array);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(out, arrow::ChunkedArray::Make(chunks));
return Status::OK();
}
} // namespace detail

Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
const std::string& path, FileType file_type) const noexcept {
arrow::MemoryPool* pool = arrow::default_memory_pool();
Expand Down Expand Up @@ -64,6 +112,39 @@ Result<std::shared_ptr<arrow::Table>> FileSystem::ReadFileToTable(
default:
return Status::Invalid("File type is invalid.");
}
// cast string array to large string array as we need concatenate chunks in
// some places, e.g., in vineyard
for (int i = 0; i < table->num_columns(); ++i) {
std::shared_ptr<arrow::DataType> type = table->column(i)->type();
if (type->id() == arrow::Type::STRING) {
type = arrow::large_utf8();
} else if (type->id() == arrow::Type::BINARY) {
type = arrow::large_binary();
}
if (type->Equals(table->column(i)->type())) {
continue;
}
// do casting
auto field = table->field(i)->WithType(type);
std::shared_ptr<arrow::ChunkedArray> chunked_array;
if (type->Equals(arrow::large_utf8())) {
auto status = detail::CastToLargeOffsetArray<arrow::StringArray,
arrow::LargeStringArray>(
table->column(i), type, chunked_array);
GAR_RETURN_NOT_OK(status);
} else if (type->Equals(arrow::large_binary())) {
auto status = detail::CastToLargeOffsetArray<arrow::BinaryArray,
arrow::LargeBinaryArray>(
table->column(i), type, chunked_array);
GAR_RETURN_NOT_OK(status);
} else {
// noop
chunked_array = table->column(i);
}
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(table, table->RemoveColumn(i));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
table, table->AddColumn(i, field, chunked_array));
}
return table;
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Result<const void*> GetArrowArrayData(

std::string ValueGetter<std::string>::Value(const void* data, int64_t offset) {
return std::string(
reinterpret_cast<const arrow::StringArray*>(data)->GetView(offset));
reinterpret_cast<const arrow::LargeStringArray*>(data)->GetView(offset));
}

} // namespace util
Expand Down

0 comments on commit 09ab232

Please sign in to comment.