Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Lightweight schema change of add/drop column #10136

Merged
merged 4 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
#include "exprs/expr_context.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "olap/decimal12.h"
#include "olap/field.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "olap/uint24.h"
#include "olap_scan_node.h"
#include "olap_utils.h"
Expand Down Expand Up @@ -86,6 +88,14 @@ Status OlapScanner::prepare(
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_tablet_schema = _tablet->tablet_schema();
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
if (!_parent->_olap_scan_node.columns_desc.empty() &&
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema.clear_columns();
for (const auto& column_desc : _parent->_olap_scan_node.columns_desc) {
_tablet_schema.append_column(TabletColumn(column_desc));
}
}
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
{
std::shared_lock rdlock(_tablet->get_header_lock());
const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
Expand Down Expand Up @@ -170,6 +180,7 @@ Status OlapScanner::_init_tablet_reader_params(
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));

_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = &_tablet_schema;
_tablet_reader_params.reader_type = READER_QUERY;
_tablet_reader_params.aggregation = _aggregation;
_tablet_reader_params.version = Version(0, _version);
Expand Down Expand Up @@ -210,7 +221,7 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.return_columns.push_back(i);
}
for (auto index : _return_columns) {
if (_tablet->tablet_schema().column(index).is_key()) {
if (_tablet_schema.column(index).is_key()) {
continue;
} else {
_tablet_reader_params.return_columns.push_back(index);
Expand All @@ -219,13 +230,12 @@ Status OlapScanner::_init_tablet_reader_params(
}

// use _tablet_reader_params.return_columns, because reader use this to merge sort
Status res =
_read_row_cursor.init(_tablet->tablet_schema(), _tablet_reader_params.return_columns);
Status res = _read_row_cursor.init(_tablet_schema, _tablet_reader_params.return_columns);
if (!res.ok()) {
LOG(WARNING) << "fail to init row cursor.res = " << res;
return Status::InternalError("failed to initialize storage read row cursor");
}
_read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
_read_row_cursor.allocate_memory_for_string_type(_tablet_schema);

// If a agg node is this scan node direct parent
// we will not call agg object finalize method in scan node,
Expand All @@ -244,15 +254,17 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
if (!slot->is_materialized()) {
continue;
}
int32_t index = _tablet->field_index(slot->col_name());
int32_t index = slot->col_unique_id() >= 0
? _tablet_schema.field_index(slot->col_unique_id())
: _tablet_schema.field_index(slot->col_name());
if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_return_columns.push_back(index);
if (slot->is_nullable() && !_tablet->tablet_schema().column(index).is_nullable())
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
_tablet_columns_convert_to_null_set.emplace(index);
_query_slots.push_back(slot);
}
Expand All @@ -261,13 +273,13 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
if (_tablet->tablet_schema().has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet->tablet_schema().column(col).aggregation() ==
if (_tablet_schema.column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet->tablet_schema().sequence_col_idx();
if (auto sequence_col_idx = _tablet_schema.sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_return_columns.push_back(sequence_col_idx);
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class OlapScanner {
MonotonicStopWatch _watcher;

std::shared_ptr<MemTracker> _mem_tracker;

TabletSchema _tablet_schema;
};

} // namespace doris
13 changes: 13 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
for (auto slot : slots) {
pindex->add_columns(slot->col_name());
}
for (auto column : columns) {
column->to_schema_pb(pindex->add_columns_desc());
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand All @@ -57,6 +60,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
}
index->slots.emplace_back(it->second);
}
for (auto& pcolumn_desc : p_index.columns_desc()) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_pb(pcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -90,6 +98,11 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
}
index->slots.emplace_back(it->second);
}
for (auto& tcolumn_desc : t_index.columns_desc) {
TabletColumn* tc = _obj_pool.add(new TabletColumn());
tc->init_from_thrift(tcolumn_desc);
index->columns.emplace_back(tc);
}
_indexes.emplace_back(index);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/status.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/descriptors.pb.h"
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
#include "runtime/tuple.h"
Expand All @@ -41,6 +42,7 @@ struct OlapTableIndexSchema {
int64_t index_id;
std::vector<SlotDescriptor*> slots;
int32_t schema_hash;
std::vector<TabletColumn*> columns;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,15 @@ void BaseTablet::_gen_tablet_path() {
}
}

bool BaseTablet::set_tablet_schema_into_rowset_meta() {
bool flag = false;
for (RowsetMetaSharedPtr rowset_meta : _tablet_meta->all_mutable_rs_metas()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&_schema);
flag = true;
}
}
return flag;
}

} /* namespace doris */
4 changes: 3 additions & 1 deletion be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
}

// properties encapsulated in TabletSchema
const TabletSchema& tablet_schema() const;
virtual const TabletSchema& tablet_schema() const;

bool set_tablet_schema_into_rowset_meta();

protected:
void _gen_tablet_path();
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ Status CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
// then merged with the base rowset.
void CollectIterator::build_heap(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
DCHECK(rs_readers.size() == _children.size());
_reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
SortType sort_type = _reader->_tablet->tablet_schema().sort_type();
int sort_col_num = _reader->_tablet->tablet_schema().sort_col_num();
_reverse = _reader->_tablet_schema->keys_type() == KeysType::UNIQUE_KEYS;
SortType sort_type = _reader->_tablet_schema->sort_type();
int sort_col_num = _reader->_tablet_schema->sort_col_num();
if (_children.empty()) {
_inner_iter.reset(nullptr);
return;
Expand Down Expand Up @@ -200,7 +200,7 @@ CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader,
CollectIterator::Level0Iterator::~Level0Iterator() = default;

Status CollectIterator::Level0Iterator::init() {
RETURN_NOT_OK_LOG(_row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns),
RETURN_NOT_OK_LOG(_row_cursor.init(*_reader->_tablet_schema, _reader->_seek_columns),
"failed to init row cursor");
return (this->*_refresh_current_row)();
}
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

#include "olap/compaction.h"

#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
#include "util/time.h"
#include "util/trace.h"

Expand Down Expand Up @@ -141,8 +144,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {

LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();

RETURN_NOT_OK(construct_output_rowset_writer());
RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
RETURN_NOT_OK(construct_input_rowset_readers());
TRACE("prepare finished");

Expand All @@ -152,11 +157,11 @@ Status Compaction::do_compaction_impl(int64_t permits) {
Status res;

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}

if (!res.ok()) {
Expand Down Expand Up @@ -219,8 +224,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return Status::OK();
}

Status Compaction::construct_output_rowset_writer() {
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING,
Status Compaction::construct_output_rowset_writer(const TabletSchema* schema) {
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
_oldest_write_timestamp, _newest_write_timestamp,
&_output_rs_writer);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class Compaction {
Status modify_rowsets();
void gc_output_rowset();

Status construct_output_rowset_writer();
Status construct_output_rowset_writer(const TabletSchema* schema);
Status construct_input_rowset_readers();

Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
Expand Down
14 changes: 14 additions & 0 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ Status DataDir::load() {
<< " schema hash: " << rowset_meta->tablet_schema_hash()
<< " for txn: " << rowset_meta->txn_id();
}
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
Status publish_status = tablet->add_rowset(rowset);
Expand All @@ -506,6 +511,15 @@ Status DataDir::load() {
++invalid_rowset_counter;
}
}

for (int64_t tablet_id : tablet_ids) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet && tablet->set_tablet_schema_into_rowset_meta()) {
TabletMetaManager::save(this, tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_meta());
}
}

// At startup, we only count these invalid rowset, but do not actually delete it.
// The actual delete operation is in StorageEngine::_clean_unused_rowset_metas,
// which is cleaned up uniformly by the background cleanup thread.
Expand Down
27 changes: 20 additions & 7 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool
_tablet(nullptr),
_cur_rowset(nullptr),
_rowset_writer(nullptr),
_tablet_schema(nullptr),
_tablet_schema(new TabletSchema),
_delta_written_success(false),
_storage_engine(storage_engine),
_is_vec(is_vec) {}
Expand Down Expand Up @@ -121,10 +121,11 @@ Status DeltaWriter::init() {
RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet,
_req.txn_id, _req.load_id));
}
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param, _tablet->tablet_schema());
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
&_rowset_writer));
_tablet_schema = &(_tablet->tablet_schema());
_tablet_schema.get(), &_rowset_writer));
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();

Expand Down Expand Up @@ -172,7 +173,6 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
if (_is_cancelled) {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}

for (const auto& row_idx : row_idxs) {
_mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
}
Expand Down Expand Up @@ -266,9 +266,9 @@ Status DeltaWriter::wait_flush() {
}

void DeltaWriter::_reset_mem_table() {
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
_req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
_mem_tracker, _is_vec));
_mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(),
_req.slots, _req.tuple_desc, _tablet->keys_type(),
_rowset_writer.get(), _mem_tracker, _is_vec));
}

Status DeltaWriter::close() {
Expand Down Expand Up @@ -367,4 +367,17 @@ int64_t DeltaWriter::partition_id() const {
return _req.partition_id;
}

void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const POlapTableSchemaParam& ptable_schema_param,
const TabletSchema& ori_tablet_schema) {
*_tablet_schema = ori_tablet_schema;
//new tablet schame if new table
if (ptable_schema_param.indexes_size() > 0 &&
ptable_schema_param.indexes(0).columns_desc_size() != 0 &&
ptable_schema_param.indexes(0).columns_desc(0).unique_id() >= 0) {
_tablet_schema->build_current_tablet_schema(index_id, ptable_schema_param,
ori_tablet_schema);
}
}

} // namespace doris
12 changes: 11 additions & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ struct WriteRequest {
// slots are in order of tablet's schema
const std::vector<SlotDescriptor*>* slots;
bool is_high_priority = false;
POlapTableSchemaParam ptable_schema_param;
int64_t index_id;
};

// Writer for a particular (load, index, tablet).
Expand Down Expand Up @@ -107,6 +109,10 @@ class DeltaWriter {

void _reset_mem_table();

void _build_current_tablet_schema(int64_t index_id,
const POlapTableSchemaParam& table_schema_param,
const TabletSchema& ori_tablet_schema);

bool _is_init = false;
bool _is_cancelled = false;
WriteRequest _req;
Expand All @@ -116,7 +122,11 @@ class DeltaWriter {
// TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
std::shared_ptr<MemTable> _mem_table;
std::unique_ptr<Schema> _schema;
const TabletSchema* _tablet_schema;
//const TabletSchema* _tablet_schema;
// tablet schema owned by delta writer, all write will use this tablet schema
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
// every request will have it's own tablet schema so simple schema change can work
std::unique_ptr<TabletSchema> _tablet_schema;
bool _delta_written_success;

StorageEngine* _storage_engine;
Expand Down
Loading