Skip to content

Commit

Permalink
[Fix](Variant) use uinque id to access column reader (apache#39841)
Browse files Browse the repository at this point in the history
Currently, the variant type is not supported rename column because its
column reader accesses columns by path rather than by unique ID. If the
name is modified, the column reader may not locate the column
correctly.So we should access by unique id
  • Loading branch information
eldenmoon committed Sep 2, 2024
1 parent 8ff068e commit 7b1ced6
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 54 deletions.
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/rowset/segment_v2/hierarchical_data_reader.h"

#include <memory>

#include "common/status.h"
#include "io/io_common.h"
#include "olap/rowset/segment_v2/column_reader.h"
Expand All @@ -41,7 +43,7 @@ Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
vectorized::PathsInData leaves_paths;
SubcolumnColumnReaders::get_leaves_of_node(node, leaves, leaves_paths);
for (size_t i = 0; i < leaves_paths.size(); ++i) {
if (leaves_paths[i] == root->path) {
if (leaves_paths[i].empty()) {
// use set_root to share instead
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/hierarchical_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class HierarchicalDataReader : public ColumnIterator {
auto& container_variant = assert_cast<vectorized::ColumnObject&>(*container);

// add root first
if (_path.get_parts().size() == 1) {
if (_path.get_parts().empty() && _root_reader) {
auto& root_var =
_root_reader->column->is_nullable()
? assert_cast<vectorized::ColumnObject&>(
Expand Down
117 changes: 77 additions & 40 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
const TabletColumn& col = read_options.tablet_schema->column(column_id);
ColumnReader* reader = nullptr;
if (col.is_extracted_column()) {
const auto* node = _sub_column_tree.find_exact(*col.path_info_ptr());
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id();
const auto* node = _sub_column_tree[unique_id].find_exact(relative_path);
reader = node != nullptr ? node->data.reader.get() : nullptr;
} else {
reader = _column_readers.contains(col.unique_id())
Expand Down Expand Up @@ -381,19 +383,27 @@ Status Segment::_load_index_impl() {

// Return the storage datatype of related column to field.
// Return nullptr meaning no such storage infomation for this column
vectorized::DataTypePtr Segment::get_data_type_of(vectorized::PathInDataPtr path, bool is_nullable,
bool ignore_children) const {
vectorized::DataTypePtr Segment::get_data_type_of(const ColumnIdentifier& identifier,
bool read_flat_leaves) const {
// Path has higher priority
if (path != nullptr && !path->empty()) {
const auto* node = _sub_column_tree.find_leaf(*path);
const auto* sparse_node = _sparse_column_tree.find_exact(*path);
if (identifier.path != nullptr && !identifier.path->empty()) {
auto relative_path = identifier.path->copy_pop_front();
int32_t unique_id =
identifier.unique_id > 0 ? identifier.unique_id : identifier.parent_unique_id;
const auto* node = _sub_column_tree.contains(unique_id)
? _sub_column_tree.at(unique_id).find_leaf(relative_path)
: nullptr;
const auto* sparse_node =
_sparse_column_tree.contains(unique_id)
? _sparse_column_tree.at(unique_id).find_exact(relative_path)
: nullptr;
if (node) {
if (ignore_children || (node->children.empty() && sparse_node == nullptr)) {
if (read_flat_leaves || (node->children.empty() && sparse_node == nullptr)) {
return node->data.file_column_type;
}
}
// it contains children or column missing in storage, so treat it as variant
return is_nullable
return identifier.is_nullable
? vectorized::make_nullable(std::make_shared<vectorized::DataTypeObject>())
: std::make_shared<vectorized::DataTypeObject>();
}
Expand Down Expand Up @@ -450,7 +460,9 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
if (!column.has_path_info()) {
continue;
}
auto iter = column_path_to_footer_ordinal.find(*column.path_info_ptr());
auto path = column.has_path_info() ? *column.path_info_ptr()
: vectorized::PathInData(column.name_lower_case());
auto iter = column_path_to_footer_ordinal.find(path);
if (iter == column_path_to_footer_ordinal.end()) {
continue;
}
Expand All @@ -460,20 +472,34 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(
ColumnReader::create(opts, column_pb, footer.num_rows(), _file_reader, &reader));
_sub_column_tree.add(
iter->first,
SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
// root column use unique id, leaf column use parent_unique_id
int32_t unique_id =
column.parent_unique_id() > 0 ? column.parent_unique_id() : column.unique_id();
auto relative_path = path.copy_pop_front();
if (relative_path.empty()) {
// root column
_sub_column_tree[unique_id].create_root(SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
} else {
// check the root is already a leaf node
DCHECK(_sub_column_tree[unique_id].get_leaves()[0]->path.empty());
_sub_column_tree[unique_id].add(
relative_path,
SubcolumnReader {
std::move(reader),
vectorized::DataTypeFactory::instance().create_data_type(column_pb)});
}

// init sparse columns paths and type info
for (uint32_t ordinal = 0; ordinal < column_pb.sparse_columns().size(); ++ordinal) {
const auto& spase_column_pb = column_pb.sparse_columns(ordinal);
if (spase_column_pb.has_column_path_info()) {
vectorized::PathInData path;
path.from_protobuf(spase_column_pb.column_path_info());
// Read from root column, so reader is nullptr
_sparse_column_tree.add(
path,
_sparse_column_tree[column.unique_id()].add(
path.copy_pop_front(),
SubcolumnReader {nullptr,
vectorized::DataTypeFactory::instance().create_data_type(
spase_column_pb)});
Expand Down Expand Up @@ -523,22 +549,22 @@ Status Segment::_new_iterator_with_variant_root(const TabletColumn& tablet_colum
Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const StorageReadOptions* opt) {
vectorized::PathInData root_path;
if (!tablet_column.has_path_info()) {
// Missing path info, but need read the whole variant column
root_path = vectorized::PathInData(tablet_column.name_lower_case());
} else {
root_path = vectorized::PathInData({tablet_column.path_info_ptr()->get_parts()[0]});
// root column use unique id, leaf column use parent_unique_id
int32_t unique_id = tablet_column.unique_id() > 0 ? tablet_column.unique_id()
: tablet_column.parent_unique_id();
if (!_sub_column_tree.contains(unique_id)) {
// No such variant column in this segment, get a default one
RETURN_IF_ERROR(new_default_iterator(tablet_column, iter));
return Status::OK();
}
const auto* root = _sub_column_tree.find_leaf(root_path);
auto relative_path = tablet_column.path_info_ptr()->copy_pop_front();
const auto* root = _sub_column_tree[unique_id].get_root();
const auto* node = tablet_column.has_path_info()
? _sub_column_tree.find_exact(*tablet_column.path_info_ptr())
? _sub_column_tree[unique_id].find_exact(relative_path)
: nullptr;
const auto* sparse_node =
tablet_column.has_path_info()
? _sparse_column_tree.find_exact(*tablet_column.path_info_ptr())
: nullptr;

const auto* sparse_node = tablet_column.has_path_info()
? _sparse_column_tree[unique_id].find_exact(relative_path)
: nullptr;
// Currently only compaction and checksum need to read flat leaves
// They both use tablet_schema_with_merged_max_schema_version as read schema
auto type_to_read_flat_leaves = [](ReaderType type) {
Expand All @@ -552,7 +578,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
if (opt != nullptr && type_to_read_flat_leaves(opt->io_ctx.reader_type)) {
// compaction need to read flat leaves nodes data to prevent from amplification
const auto* node = tablet_column.has_path_info()
? _sub_column_tree.find_leaf(*tablet_column.path_info_ptr())
? _sub_column_tree[unique_id].find_leaf(relative_path)
: nullptr;
if (!node) {
// sparse_columns have this path, read from root
Expand All @@ -574,14 +600,14 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
if (node->is_leaf_node() && sparse_node == nullptr) {
// Node contains column without any child sub columns and no corresponding sparse columns
// Direct read extracted columns
const auto* node = _sub_column_tree.find_leaf(*tablet_column.path_info_ptr());
const auto* node = _sub_column_tree[unique_id].find_leaf(relative_path);
ColumnIterator* it;
RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
iter->reset(it);
} else {
// Node contains column with children columns or has correspoding sparse columns
// Create reader with hirachical data
RETURN_IF_ERROR(HierarchicalDataReader::create(iter, *tablet_column.path_info_ptr(),
RETURN_IF_ERROR(HierarchicalDataReader::create(iter, relative_path,
node, root));
}
} else {
Expand Down Expand Up @@ -648,8 +674,11 @@ Status Segment::new_column_iterator(int32_t unique_id, std::unique_ptr<ColumnIte
ColumnReader* Segment::_get_column_reader(const TabletColumn& col) {
// init column iterator by path info
if (col.has_path_info() || col.is_variant_type()) {
const auto* node =
col.has_path_info() ? _sub_column_tree.find_exact(*col.path_info_ptr()) : nullptr;
auto relative_path = col.path_info_ptr()->copy_pop_front();
int32_t unique_id = col.unique_id() > 0 ? col.unique_id() : col.parent_unique_id();
const auto* node = col.has_path_info()
? _sub_column_tree[unique_id].find_exact(relative_path)
: nullptr;
if (node != nullptr) {
return node->data.reader.get();
}
Expand Down Expand Up @@ -810,14 +839,19 @@ Status Segment::read_key_by_rowid(uint32_t row_id, std::string* key) {
}

bool Segment::same_with_storage_type(int32_t cid, const Schema& schema,
bool ignore_children) const {
auto file_column_type = get_data_type_of(schema.column(cid)->path(),
schema.column(cid)->is_nullable(), ignore_children);
auto expected_type = Schema::get_data_type_ptr(*schema.column(cid));
bool read_flat_leaves) const {
const auto* col = schema.column(cid);
auto file_column_type =
get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(),
.parent_unique_id = col->parent_unique_id(),
.path = col->path(),
.is_nullable = col->is_nullable()},
read_flat_leaves);
auto expected_type = Schema::get_data_type_ptr(*col);
#ifndef NDEBUG
if (file_column_type && !file_column_type->equals(*expected_type)) {
VLOG_DEBUG << fmt::format("Get column {}, file column type {}, exepected type {}",
schema.column(cid)->name(), file_column_type->get_name(),
col->name(), file_column_type->get_name(),
expected_type->get_name());
}
#endif
Expand All @@ -843,7 +877,10 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
vectorized::PathInDataPtr path = std::make_shared<vectorized::PathInData>(
schema.column_by_uid(slot->col_unique_id()).name_lower_case(),
slot->column_paths());
auto storage_type = get_data_type_of(path, slot->is_nullable(), false);
auto storage_type = get_data_type_of(ColumnIdentifier {.unique_id = slot->col_unique_id(),
.path = path,
.is_nullable = slot->is_nullable()},
false);
vectorized::MutableColumnPtr file_storage_column = storage_type->create_column();
DCHECK(storage_type != nullptr);
TabletColumn column = TabletColumn::create_materialized_variant_column(
Expand Down
27 changes: 19 additions & 8 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,19 @@ class Segment : public std::enable_shared_from_this<Segment> {

void remove_from_segment_cache() const;

// Identify the column by unique id or path info
struct ColumnIdentifier {
int32_t unique_id = -1;
int32_t parent_unique_id = -1;
vectorized::PathInDataPtr path;
bool is_nullable = false;
};
// Get the inner file column's data type
// ignore_chidren set to false will treat field as variant
// when it contains children with field paths.
// nullptr will returned if storage type does not contains such column
std::shared_ptr<const vectorized::IDataType> get_data_type_of(vectorized::PathInDataPtr path,
bool is_nullable,
bool ignore_children) const;

std::shared_ptr<const vectorized::IDataType> get_data_type_of(
const ColumnIdentifier& identifier, bool read_flat_leaves) const;
// Check is schema read type equals storage column type
bool same_with_storage_type(int32_t cid, const Schema& schema, bool ignore_children) const;

Expand All @@ -166,8 +171,12 @@ class Segment : public std::enable_shared_from_this<Segment> {
bool can_apply_predicate_safely(int cid, Predicate* pred, const Schema& schema,
ReaderType read_type) const {
const Field* col = schema.column(cid);
vectorized::DataTypePtr storage_column_type = get_data_type_of(
col->path(), col->is_nullable(), read_type != ReaderType::READER_QUERY);
vectorized::DataTypePtr storage_column_type =
get_data_type_of(ColumnIdentifier {.unique_id = col->unique_id(),
.parent_unique_id = col->parent_unique_id(),
.path = col->path(),
.is_nullable = col->is_nullable()},
read_type != ReaderType::READER_QUERY);
if (storage_column_type == nullptr) {
// Default column iterator
return true;
Expand Down Expand Up @@ -239,10 +248,12 @@ class Segment : public std::enable_shared_from_this<Segment> {

// Each node in the tree represents the sub column reader and type
// for variants.
SubcolumnColumnReaders _sub_column_tree;
// map column unique id --> it's sub column readers
std::map<int32_t, SubcolumnColumnReaders> _sub_column_tree;

// each sprase column's path and types info
SubcolumnColumnReaders _sparse_column_tree;
// map column unique id --> it's sparse sub column readers
std::map<int32_t, SubcolumnColumnReaders> _sparse_column_tree;

// used to guarantee that short key index will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_index_once;
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,12 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
const Field* col = _schema->column(i);
if (col) {
auto storage_type = _segment->get_data_type_of(
col->path(), col->is_nullable(),
Segment::ColumnIdentifier {
col->unique_id(),
col->parent_unique_id(),
col->path(),
col->is_nullable(),
},
_opts.io_ctx.reader_type != ReaderType::READER_QUERY);
if (storage_type == nullptr) {
storage_type = vectorized::DataTypeFactory::instance().create_data_type(*col);
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,12 @@ class SegmentIterator : public RowwiseIterator {
continue;
}
vectorized::DataTypePtr storage_type = _segment->get_data_type_of(
_schema->column(cid)->path(), _schema->column(cid)->is_nullable(), false);
Segment::ColumnIdentifier {
.unique_id = _schema->column(cid)->unique_id(),
.parent_unique_id = _schema->column(cid)->parent_unique_id(),
.path = _schema->column(cid)->path(),
.is_nullable = _schema->column(cid)->is_nullable()},
false);
if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) {
// Do additional cast
vectorized::MutableColumnPtr tmp = storage_type->create_column();
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/columns/subcolumn_tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ class SubcolumnsTree {
/// flag, which is true if node already exists.
using NodeCreator = std::function<NodePtr(NodeKind, bool)>;

// create root as SCALAR node
void create_root(NodeData&& leaf_data) {
root = std::make_shared<Node>(Node::SCALAR, std::move(leaf_data));
leaves.push_back(root);
}

// create root as SCALAR node
void create_root(const NodeData& leaf_data) {
root = std::make_shared<Node>(Node::SCALAR, leaf_data);
root = std::make_shared<Node>(Node::SCALAR, std::move(leaf_data));
leaves.push_back(root);
}

Expand Down
2 changes: 1 addition & 1 deletion regression-test/data/variant_p0/column_name.out
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ UPPER CASE lower case
\N
\N
\N
\N
""
""
1234566
16
Expand Down
13 changes: 13 additions & 0 deletions regression-test/data/variant_p0/schema_change/rename.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
0 {"k1":1,"k2":"hello world","k3":[1234],"k4":1.1,"k5":[[123]]}

-- !sql --
0 {"k1":1,"k2":"hello world","k3":[1234],"k4":1.1,"k5":[[123]]} \N
2 {"xxxx":1234} {"yyyy":1.1111}

-- !sql --
0 {"k1":1,"k2":"hello world","k3":[1234],"k4":1.1,"k5":[[123]]} \N
2 {"xxxx":1234} \N
2 {"xxxx":1234} \N

Loading

0 comments on commit 7b1ced6

Please sign in to comment.