Skip to content

Commit

Permalink
[BugFix][C++] Cast to correct schema when get chunk with property rea…
Browse files Browse the repository at this point in the history
…der (apache#456)

## Proposed changes
as issue apache#219 describe, the chunk table get from arrow chunk reader may not has the same type with schema.
This change help to fix the problem by cast the chunk table to what they should be(schema), if cast failed, raise error.

## Further comments
fixes apache#219 
---------

Signed-off-by: acezen <[email protected]>
  • Loading branch information
acezen authored Apr 19, 2024
1 parent b92da70 commit c4a5f72
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cpp/include/gar/reader/arrow_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
// forward declaration
namespace arrow {
class Array;
class Schema;
class Table;
} // namespace arrow

Expand Down Expand Up @@ -145,6 +146,7 @@ class VertexPropertyArrowChunkReader {
IdType seek_id_;
IdType chunk_num_;
IdType vertex_num_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::Table> chunk_table_;
util::FilterOptions filter_options_;
std::shared_ptr<FileSystem> fs_;
Expand Down Expand Up @@ -500,6 +502,7 @@ class AdjListPropertyArrowChunkReader {
std::string prefix_;
IdType vertex_chunk_index_, chunk_index_;
IdType seek_offset_;
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::Table> chunk_table_;
util::FilterOptions filter_options_;
IdType vertex_chunk_num_, chunk_num_;
Expand Down
110 changes: 110 additions & 0 deletions cpp/src/arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,112 @@
*/

#include "arrow/api.h"
#include "arrow/compute/api.h"

#include "gar/graph_info.h"
#include "gar/reader/arrow_chunk_reader.h"
#include "gar/util/adj_list_type.h"
#include "gar/util/data_type.h"
#include "gar/util/filesystem.h"
#include "gar/util/general_params.h"
#include "gar/util/reader_util.h"
#include "gar/util/result.h"
#include "gar/util/status.h"
#include "gar/util/util.h"

namespace graphar {

namespace {

Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
const std::shared_ptr<PropertyGroup> pg,
bool contain_index_column = false) {
std::vector<std::shared_ptr<arrow::Field>> fields;
if (contain_index_column) {
fields.push_back(std::make_shared<arrow::Field>(
GeneralParams::kVertexIndexCol, arrow::int64()));
}
for (const auto& prop : pg->GetProperties()) {
fields.push_back(std::make_shared<arrow::Field>(
prop.name, DataType::DataTypeToArrowDataType(prop.type)));
}
return arrow::schema(fields);
}

Status GeneralCast(const std::shared_ptr<arrow::Array>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::Array>* out) {
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(*out,
arrow::compute::Cast(*in, to_type));
return Status::OK();
}

Status CastStringToLargeString(const std::shared_ptr<arrow::Array>& in,
const std::shared_ptr<arrow::DataType>& to_type,
std::shared_ptr<arrow::Array>* out) {
auto array_data = in->data()->Copy();
auto offset = array_data->buffers[1];
using from_offset_type = typename arrow::StringArray::offset_type;
using to_string_offset_type = typename arrow::LargeStringArray::offset_type;
auto raw_value_offsets_ =
offset == NULLPTR
? NULLPTR
: reinterpret_cast<const from_offset_type*>(offset->data());
std::vector<to_string_offset_type> to_offset(offset->size() /
sizeof(from_offset_type));
for (size_t i = 0; i < to_offset.size(); ++i) {
to_offset[i] = raw_value_offsets_[i];
}
std::shared_ptr<arrow::Buffer> buffer;
arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
RETURN_NOT_ARROW_OK(
buffer_builder.Append(to_offset.data(), to_offset.size()));
RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
array_data->type = to_type;
array_data->buffers[1] = buffer;
*out = arrow::MakeArray(array_data);
RETURN_NOT_ARROW_OK((*out)->ValidateFull());
return Status::OK();
}

// helper function to cast arrow::Table with a schema
Status CastTableWithSchema(const std::shared_ptr<arrow::Table>& table,
const std::shared_ptr<arrow::Schema>& schema,
std::shared_ptr<arrow::Table>* out_table) {
if (table->schema()->Equals(*schema)) {
*out_table = table;
}
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
for (int64_t i = 0; i < table->num_columns(); ++i) {
auto column = table->column(i);
if (table->field(i)->type()->Equals(schema->field(i)->type())) {
columns.push_back(column);
continue;
}
auto from_t = table->field(i)->type();
auto to_t = schema->field(i)->type();
std::vector<std::shared_ptr<arrow::Array>> chunks;
// process cast for each chunk
for (int64_t j = 0; j < column->num_chunks(); ++j) {
auto chunk = column->chunk(j);
std::shared_ptr<arrow::Array> out;
if (arrow::compute::CanCast(*from_t, *to_t)) {
GAR_RETURN_NOT_OK(GeneralCast(chunk, to_t, &out));
chunks.push_back(out);
} else if (from_t->Equals(arrow::utf8()) &&
to_t->Equals(arrow::large_utf8())) {
GAR_RETURN_NOT_OK(CastStringToLargeString(chunk, to_t, &out));
chunks.push_back(out);
}
}
columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks, to_t));
}

*out_table = arrow::Table::Make(schema, columns);
return Status::OK();
}
} // namespace

VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
const std::shared_ptr<VertexInfo>& vertex_info,
const std::shared_ptr<PropertyGroup>& property_group,
Expand All @@ -39,6 +132,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
property_group_(std::move(property_group)),
chunk_index_(0),
seek_id_(0),
schema_(nullptr),
chunk_table_(nullptr),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
Expand All @@ -49,6 +143,8 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
util::GetVertexChunkNum(prefix_, vertex_info));
GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
util::GetVertexNum(prefix_, vertex_info_));
GAR_ASSIGN_OR_RAISE_ERROR(schema_,
PropertyGroupToSchema(property_group_, true));
}

Status VertexPropertyArrowChunkReader::seek(IdType id) {
Expand Down Expand Up @@ -79,6 +175,11 @@ VertexPropertyArrowChunkReader::GetChunk() {
GAR_ASSIGN_OR_RAISE(
chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
filter_options_));
// TODO(acezen): filter pushdown doesn't support cast schema now
if (schema_ != nullptr && filter_options_.filter == nullptr) {
GAR_RETURN_NOT_OK(
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
}
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand Down Expand Up @@ -469,6 +570,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
vertex_chunk_index_(0),
chunk_index_(0),
seek_offset_(0),
schema_(nullptr),
chunk_table_(nullptr),
filter_options_(options),
chunk_num_(-1) /* -1 means uninitialized */ {
Expand All @@ -480,6 +582,8 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
GAR_ASSIGN_OR_RAISE_ERROR(
vertex_chunk_num_,
util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
GAR_ASSIGN_OR_RAISE_ERROR(schema_,
PropertyGroupToSchema(property_group, false));
}

AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
Expand All @@ -491,6 +595,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
vertex_chunk_index_(other.vertex_chunk_index_),
chunk_index_(other.chunk_index_),
seek_offset_(other.seek_offset_),
schema_(other.schema_),
chunk_table_(nullptr),
filter_options_(other.filter_options_),
vertex_chunk_num_(other.vertex_chunk_num_),
Expand Down Expand Up @@ -593,6 +698,11 @@ AdjListPropertyArrowChunkReader::GetChunk() {
GAR_ASSIGN_OR_RAISE(
chunk_table_, fs_->ReadFileToTable(path, property_group_->GetFileType(),
filter_options_));
// TODO(acezen): filter pushdown doesn't support cast schema now
if (schema_ != nullptr && filter_options_.filter == nullptr) {
GAR_RETURN_NOT_OK(
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
}
}
IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
auto arrow_fs,
arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
// arrow would delete the last slash, so use uri string
*out_path = uri_string;
if (out_path != nullptr) {
*out_path = uri_string;
}
return std::make_shared<FileSystem>(arrow_fs);
}

Expand Down
36 changes: 36 additions & 0 deletions cpp/test/test_arrow_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include "./util.h"
#include "gar/reader/arrow_chunk_reader.h"
#include "gar/util/adj_list_type.h"
#include "gar/util/data_type.h"
#include "gar/util/expression.h"
#include "gar/util/filesystem.h"
#include "gar/util/general_params.h"

#define CATCH_CONFIG_MAIN
Expand Down Expand Up @@ -96,6 +98,40 @@ TEST_CASE("ArrowChunkReader") {
REQUIRE(reader->seek(1024).IsIndexError());
}

SECTION("CastDataType") {
std::string prefix = root + "/modern_graph/";
std::string vertex_info_path = prefix + "person.vertex.yml";
std::cout << "Vertex info path: " << vertex_info_path << std::endl;
auto fs = FileSystemFromUriOrPath(prefix).value();
auto yaml_content =
fs->ReadFileToValue<std::string>(vertex_info_path).value();
std::cout << yaml_content << std::endl;
auto maybe_vertex_info = VertexInfo::Load(yaml_content);
REQUIRE(maybe_vertex_info.status().ok());
auto vertex_info = maybe_vertex_info.value();
std::cout << vertex_info->Dump().value() << std::endl;
auto pg = vertex_info->GetPropertyGroup("id");
REQUIRE(pg != nullptr);
REQUIRE(pg->GetProperties().size() == 1);
auto origin_property = pg->GetProperties()[0];
REQUIRE(origin_property.type->Equals(int64()));

// change to int32_t
Property new_property("id", int32(), origin_property.is_primary,
origin_property.is_nullable);
auto new_pg = CreatePropertyGroup({new_property}, pg->GetFileType(),
pg->GetPrefix());
auto maybe_reader =
VertexPropertyArrowChunkReader::Make(vertex_info, new_pg, prefix);
REQUIRE(maybe_reader.status().ok());
auto reader = maybe_reader.value();
auto result = reader->GetChunk();
REQUIRE(!result.has_error());
auto table = result.value();
REQUIRE(table->schema()->GetFieldByName("id")->type()->id() ==
arrow::Type::INT32);
}

SECTION("PropertyPushDown") {
std::string filter_property = "gender";
auto filter = _Equal(_Property(filter_property), _Literal("female"));
Expand Down

0 comments on commit c4a5f72

Please sign in to comment.