Skip to content

Commit

Permalink
[feature-wip](array-type) Add array type support for vectorized parqu…
Browse files Browse the repository at this point in the history
…et-orc scanner (#9856)

Only support one level array now.
for example:
- nullable(array(nullable(tinyint))) is **support**.
- nullable(array(nullable(array(xx))) is **not support**.
  • Loading branch information
yinzhijian authored Jun 9, 2022
1 parent bf8b4fb commit 19bc14c
Show file tree
Hide file tree
Showing 25 changed files with 511 additions and 37 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/arrow/orc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "exec/arrow/arrow_reader.h"
namespace doris {

// Reader of orc file
// Reader of ORC file
class ORCReaderWrap final : public ArrowReaderWrap {
public:
ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
Expand All @@ -48,4 +48,4 @@ class ORCReaderWrap final : public ArrowReaderWrap {
bool _cur_file_eof; // is read over?
};

} // namespace doris
} // namespace doris
1 change: 1 addition & 0 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
auto column_ptr = _src_block.get_by_position(result_column_id).column;
DCHECK(column_ptr != nullptr);

// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/data_types/data_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ std::string IDataType::to_string(const IColumn& column, size_t row_num) const {
LOG(FATAL) << fmt::format("Data type {} to_string not implement.", get_name());
return "";
}
Status IDataType::from_string(ReadBuffer& rb, IColumn* column) const {
LOG(FATAL) << fmt::format("Data type {} from_string not implement.", get_name());
return Status::OK();
}

void IDataType::insert_default_into(IColumn& column) const {
column.insert_default();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/data_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "vec/common/cow.h"
#include "vec/common/string_buffer.hpp"
#include "vec/core/types.h"
#include "vec/io/reader_buffer.h"

namespace doris {
class PBlock;
Expand Down Expand Up @@ -70,6 +71,7 @@ class IDataType : private boost::noncopyable {

virtual void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const;
virtual std::string to_string(const IColumn& column, size_t row_num) const;
virtual Status from_string(ReadBuffer& rb, IColumn* column) const;

protected:
virtual String do_get_name() const;
Expand Down
91 changes: 90 additions & 1 deletion be/src/vec/data_types/data_type_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "vec/data_types/data_type_array.h"

#include "gen_cpp/data.pb.h"
#include "vec/common/string_utils/string_utils.h"
#include "vec/io/io_helper.h"

namespace doris::vectorized {
Expand Down Expand Up @@ -94,4 +95,92 @@ void DataTypeArray::to_pb_column_meta(PColumnMeta* col_meta) const {
get_nested_type()->to_pb_column_meta(children);
}

} // namespace doris::vectorized
void DataTypeArray::to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const {
auto& data_column =
assert_cast<const ColumnArray&>(*column.convert_to_full_column_if_const().get());
auto& offsets = data_column.get_offsets();

size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];

const IColumn& nested_column = data_column.get_data();
ostr.write("[", 1);
for (size_t i = offset; i < next_offset; ++i) {
if (i != offset) {
ostr.write(",", 1);
}
nested->to_string(nested_column, i, ostr);
}
ostr.write("]", 1);
}

std::string DataTypeArray::to_string(const IColumn& column, size_t row_num) const {
auto& data_column =
assert_cast<const ColumnArray&>(*column.convert_to_full_column_if_const().get());
auto& offsets = data_column.get_offsets();

size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];
const IColumn& nested_column = data_column.get_data();
std::stringstream ss;
ss << "[";
for (size_t i = offset; i < next_offset; ++i) {
if (i != offset) {
ss << ",";
}
ss << nested->to_string(nested_column, i);
}
ss << "]";
return ss.str();
}

Status DataTypeArray::from_string(ReadBuffer& rb, IColumn* column) const {
// only support one level now
auto* array_column = assert_cast<ColumnArray*>(column);
auto& offsets = array_column->get_offsets();

IColumn& nested_column = array_column->get_data();
if (*rb.position() != '[') {
return Status::InvalidArgument("Array does not start with '[' character, found '{}'",
*rb.position());
}
++rb.position();
bool first = true;
size_t size = 0;
while (!rb.eof() && *rb.position() != ']') {
if (!first) {
if (*rb.position() == ',') {
++rb.position();
} else {
return Status::InvalidArgument(fmt::format(
"Cannot read array from text, expected comma or end of array, found '{}'",
*rb.position()));
}
}
first = false;
if (*rb.position() == ']') {
break;
}
size_t nested_str_len = 1;
char* temp_char = rb.position() + nested_str_len;
while (*(temp_char) != ']' && *(temp_char) != ',' && temp_char != rb.end()) {
++nested_str_len;
temp_char = rb.position() + nested_str_len;
}

ReadBuffer read_buffer(rb.position(), nested_str_len);
auto st = nested->from_string(read_buffer, &nested_column);
if (!st.ok()) {
// we should do revert if error
array_column->pop_back(size);
return st;
}
rb.position() += nested_str_len;
DCHECK_LE(rb.position(), rb.end());
++size;
}
offsets.push_back(offsets.back() + size);
return Status::OK();
}

} // namespace doris::vectorized
4 changes: 4 additions & 0 deletions be/src/vec/data_types/data_type_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class DataTypeArray final : public IDataType {
const char* deserialize(const char* buf, IColumn* column) const override;

void to_pb_column_meta(PColumnMeta* col_meta) const override;

std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
Status from_string(ReadBuffer& rb, IColumn* column) const override;
};

} // namespace doris::vectorized
12 changes: 12 additions & 0 deletions be/src/vec/data_types/data_type_date.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "runtime/datetime_value.h"
#include "util/binary_cast.hpp"
#include "vec/columns/columns_number.h"
#include "vec/io/io_helper.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::vectorized {
bool DataTypeDate::equals(const IDataType& rhs) const {
Expand Down Expand Up @@ -59,6 +60,17 @@ void DataTypeDate::to_string(const IColumn& column, size_t row_num, BufferWritab
ostr.write(buf, pos - buf - 1);
}

Status DataTypeDate::from_string(ReadBuffer& rb, IColumn* column) const {
auto* column_data = static_cast<ColumnInt64*>(column);
Int64 val = 0;
if (!read_date_text_impl<Int64>(val, rb)) {
return Status::InvalidArgument(fmt::format("parse date fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str()));
}
column_data->insert_value(val);
return Status::OK();
}

void DataTypeDate::cast_to_date(Int64& x) {
auto value = binary_cast<Int64, VecDateTimeValue>(x);
value.cast_to_date();
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/data_types/data_type_date.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DataTypeDate final : public DataTypeNumberBase<Int64> {
bool equals(const IDataType& rhs) const override;
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
Status from_string(ReadBuffer& rb, IColumn* column) const override;

static void cast_to_date(Int64& x);

Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/data_types/data_type_date_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "runtime/datetime_value.h"
#include "util/binary_cast.hpp"
#include "vec/columns/columns_number.h"
#include "vec/io/io_helper.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::vectorized {

Expand Down Expand Up @@ -82,6 +83,17 @@ void DataTypeDateTime::to_string(const IColumn& column, size_t row_num,
ostr.write(buf, pos - buf - 1);
}

Status DataTypeDateTime::from_string(ReadBuffer& rb, IColumn* column) const {
auto* column_data = static_cast<ColumnInt64*>(column);
Int64 val = 0;
if (!read_datetime_text_impl<Int64>(val, rb)) {
return Status::InvalidArgument(fmt::format("parse datetime fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str()));
}
column_data->insert_value(val);
return Status::OK();
}

void DataTypeDateTime::cast_to_date_time(Int64& x) {
auto value = binary_cast<Int64, doris::vectorized::VecDateTimeValue>(x);
value.to_datetime();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/data_type_date_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class DataTypeDateTime final : public DataTypeNumberBase<Int64> {

void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;

Status from_string(ReadBuffer& rb, IColumn* column) const override;

static void cast_to_date_time(Int64& x);

MutableColumnPtr create_column() const override;
Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/data_types/data_type_decimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ void DataTypeDecimal<T>::to_string(const IColumn& column, size_t row_num,
ostr.write(str.data(), str.size());
}

template <typename T>
Status DataTypeDecimal<T>::from_string(ReadBuffer& rb, IColumn* column) const {
auto& column_data = static_cast<ColumnType&>(*column).get_data();
T val = 0;
if (!read_decimal_text_impl<T>(val, rb)) {
return Status::InvalidArgument(fmt::format("parse decimal fail, string: '{}'",
std::string(rb.position(), rb.count()).c_str()));
}
column_data.emplace_back(val);
return Status::OK();
}

// binary: row_num | value1 | value2 | ...
template <typename T>
int64_t DataTypeDecimal<T>::get_uncompressed_serialized_bytes(const IColumn& column) const {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/data_types/data_type_decimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class DataTypeDecimal final : public IDataType {
bool can_be_inside_nullable() const override { return true; }
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
Status from_string(ReadBuffer& rb, IColumn* column) const override;

/// Decimal specific

Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/data_types/data_type_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ DataTypePtr DataTypeFactory::create_data_type(const arrow::DataType* type, bool
case ::arrow::Type::DECIMAL:
nested = std::make_shared<vectorized::DataTypeDecimal<vectorized::Decimal128>>();
break;
case ::arrow::Type::LIST:
DCHECK(type->num_fields() == 1);
nested = std::make_shared<vectorized::DataTypeArray>(
create_data_type(type->field(0)->type().get(), true));
break;
default:
DCHECK(false) << "invalid arrow type:" << (int)(type->id());
break;
Expand Down
43 changes: 27 additions & 16 deletions be/src/vec/data_types/data_type_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,33 @@ class DataTypeFactory {
static std::once_flag oc;
static DataTypeFactory instance;
std::call_once(oc, []() {
instance.register_data_type("UInt8", std::make_shared<DataTypeUInt8>());
instance.register_data_type("UInt16", std::make_shared<DataTypeUInt16>());
instance.register_data_type("UInt32", std::make_shared<DataTypeUInt32>());
instance.register_data_type("UInt64", std::make_shared<DataTypeUInt64>());
instance.register_data_type("Int8", std::make_shared<DataTypeInt8>());
instance.register_data_type("Int16", std::make_shared<DataTypeInt16>());
instance.register_data_type("Int32", std::make_shared<DataTypeInt32>());
instance.register_data_type("Int64", std::make_shared<DataTypeInt64>());
instance.register_data_type("Int128", std::make_shared<DataTypeInt128>());
instance.register_data_type("Float32", std::make_shared<DataTypeFloat32>());
instance.register_data_type("Float64", std::make_shared<DataTypeFloat64>());
instance.register_data_type("Date", std::make_shared<DataTypeDate>());
instance.register_data_type("DateTime", std::make_shared<DataTypeDateTime>());
instance.register_data_type("String", std::make_shared<DataTypeString>());
instance.register_data_type("Decimal",
std::make_shared<DataTypeDecimal<Decimal128>>(27, 9));
std::unordered_map<std::string, DataTypePtr> base_type_map {
{"UInt8", std::make_shared<DataTypeUInt8>()},
{"UInt16", std::make_shared<DataTypeUInt16>()},
{"UInt32", std::make_shared<DataTypeUInt32>()},
{"UInt64", std::make_shared<DataTypeUInt64>()},
{"Int8", std::make_shared<DataTypeInt8>()},
{"Int16", std::make_shared<DataTypeInt16>()},
{"Int32", std::make_shared<DataTypeInt32>()},
{"Int64", std::make_shared<DataTypeInt64>()},
{"Int128", std::make_shared<DataTypeInt128>()},
{"Float32", std::make_shared<DataTypeFloat32>()},
{"Float64", std::make_shared<DataTypeFloat64>()},
{"Date", std::make_shared<DataTypeDate>()},
{"DateTime", std::make_shared<DataTypeDateTime>()},
{"String", std::make_shared<DataTypeString>()},
{"Decimal", std::make_shared<DataTypeDecimal<Decimal128>>(27, 9)},

};
for (auto const& [key, val] : base_type_map) {
instance.register_data_type(key, val);
instance.register_data_type("Array(" + key + ")",
std::make_shared<vectorized::DataTypeArray>(val));
instance.register_data_type(
"Array(Nullable(" + key + "))",
std::make_shared<vectorized::DataTypeArray>(
std::make_shared<vectorized::DataTypeNullable>(val)));
}
});
return instance;
}
Expand Down
30 changes: 30 additions & 0 deletions be/src/vec/data_types/data_type_nullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,36 @@ std::string DataTypeNullable::to_string(const IColumn& column, size_t row_num) c
}
}

void DataTypeNullable::to_string(const IColumn& column, size_t row_num,
BufferWritable& ostr) const {
const ColumnNullable& col =
assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const().get());

if (col.is_null_at(row_num)) {
ostr.write("NULL", 4);
} else {
nested_data_type->to_string(col.get_nested_column(), row_num, ostr);
}
}

Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const {
auto* null_column = assert_cast<ColumnNullable*>(column);
if (rb.count() == 4 && *(rb.position()) == 'N' && *(rb.position() + 1) == 'U' &&
*(rb.position() + 2) == 'L' && *(rb.position() + 3) == 'L') {
null_column->insert_data(nullptr, 0);
return Status::OK();
}
auto st = nested_data_type->from_string(rb, &(null_column->get_nested_column()));
if (!st.ok()) {
// fill null if fail
null_column->insert_data(nullptr, 0); // 0 is meaningless here
return Status::OK();
}
// fill not null if succ
null_column->get_null_map_data().push_back(0);
return Status::OK();
}

// binary: row num | <null array> | <values array>
// <null array>: is_null1 | is_null2 | ...
// <values array>: value1 | value2 | ...>
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/data_types/data_type_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class DataTypeNullable final : public IDataType {
return nested_data_type->can_be_inside_low_cardinality();
}
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
Status from_string(ReadBuffer& rb, IColumn* column) const override;

const DataTypePtr& get_nested_type() const { return nested_data_type; }

Expand Down
Loading

0 comments on commit 19bc14c

Please sign in to comment.