Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement](light-schema-change) support tablet schema cache #11131

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status OlapScanner::prepare(
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_tablet_schema = _tablet->tablet_schema();
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
_tablet_schema.copy_from(*_tablet->tablet_schema());
if (!_parent->_olap_scan_node.columns_desc.empty() &&
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema.clear_columns();
Expand Down Expand Up @@ -289,7 +289,7 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
}

// expand the sequence column
if (_tablet->tablet_schema().has_sequence_col() && need_seq_col) {
if (_tablet_schema.has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema.column(col).aggregation() ==
Expand Down
11 changes: 5 additions & 6 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "gutil/strings/substitute.h"
#include "olap/data_dir.h"
#include "olap/tablet_schema_cache.h"
#include "util/doris_metrics.h"
#include "util/path_util.h"

Expand All @@ -29,10 +30,8 @@ extern MetricPrototype METRIC_query_scan_rows;
extern MetricPrototype METRIC_query_scan_count;

BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: _state(tablet_meta->tablet_state()),
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
: _state(tablet_meta->tablet_state()), _tablet_meta(tablet_meta), _data_dir(data_dir) {
_schema = TabletSchemaCache::instance()->insert(_tablet_meta->tablet_schema().to_key());
_gen_tablet_path();

std::stringstream ss;
Expand Down Expand Up @@ -72,8 +71,8 @@ void BaseTablet::_gen_tablet_path() {
bool BaseTablet::set_tablet_schema_into_rowset_meta() {
bool flag = false;
for (RowsetMetaSharedPtr rowset_meta : _tablet_meta->all_mutable_rs_metas()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&_schema);
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(_schema);
flag = true;
}
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/metrics.h"

Expand Down Expand Up @@ -64,7 +65,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
void set_storage_policy(const std::string& policy) { _tablet_meta->set_storage_policy(policy); }

// properties encapsulated in TabletSchema
virtual const TabletSchema& tablet_schema() const;
virtual TabletSchemaSPtr tablet_schema() const;

bool set_tablet_schema_into_rowset_meta();

Expand All @@ -74,7 +75,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
protected:
TabletState _state;
TabletMetaSharedPtr _tablet_meta;
const TabletSchema& _schema;
TabletSchemaSPtr _schema;

DataDir* _data_dir;
std::string _tablet_path;
Expand Down Expand Up @@ -145,7 +146,7 @@ inline bool BaseTablet::equal(int64_t id, int32_t hash) {
return (tablet_id() == id) && (schema_hash() == hash);
}

inline const TabletSchema& BaseTablet::tablet_schema() const {
inline TabletSchemaSPtr BaseTablet::tablet_schema() const {
return _schema;
}

Expand Down
15 changes: 10 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
#include "util/time.h"
Expand Down Expand Up @@ -150,9 +151,13 @@ Status Compaction::do_compaction_impl(int64_t permits) {
LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
TabletSchemaSPtr cur_tablet_schema =
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();

RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema));
RETURN_NOT_OK(construct_input_rowset_readers());
TRACE("prepare finished");

Expand All @@ -166,10 +171,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
_input_rs_readers, _output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
_input_rs_readers, _output_rs_writer.get(), &stats);
}

Expand Down Expand Up @@ -233,7 +238,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return Status::OK();
}

Status Compaction::construct_output_rowset_writer(const TabletSchema* schema) {
Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema) {
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
_oldest_write_timestamp, _newest_write_timestamp,
&_output_rs_writer);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Compaction {
Status modify_rowsets();
void gc_output_rowset();

Status construct_output_rowset_writer(const TabletSchema* schema);
Status construct_output_rowset_writer(TabletSchemaSPtr schema);
Status construct_input_rowset_readers();

Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/data_dir.h"

#include <ctype.h>
#include <gen_cpp/olap_file.pb.h>
#include <mntent.h>
#include <stdio.h>
#include <sys/file.h>
Expand Down Expand Up @@ -475,8 +476,8 @@ Status DataDir::load() {
}
if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
if (!rowset_meta->tablet_schema()) {
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
Expand All @@ -498,8 +499,8 @@ Status DataDir::load() {
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ Status DeltaWriter::init() {
_req.txn_id, _req.load_id));
}
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param, _tablet->tablet_schema());
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param,
Lchangliang marked this conversation as resolved.
Show resolved Hide resolved
*_tablet->tablet_schema());

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
_tablet_schema.get(), &_rowset_writer));
_tablet_schema, &_rowset_writer));
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();

Expand Down Expand Up @@ -379,7 +380,7 @@ int64_t DeltaWriter::partition_id() const {
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const POlapTableSchemaParam& ptable_schema_param,
const TabletSchema& ori_tablet_schema) {
*_tablet_schema = ori_tablet_schema;
_tablet_schema->copy_from(ori_tablet_schema);
//new tablet schame if new table
if (ptable_schema_param.indexes_size() > 0 &&
ptable_schema_param.indexes(0).columns_desc_size() != 0 &&
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DeltaWriter {
// tablet schema owned by delta writer, all write will use this tablet schema
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
// every request will have it's own tablet schema so simple schema change can work
std::unique_ptr<TabletSchema> _tablet_schema;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success;

StorageEngine* _storage_engine;
Expand Down
24 changes: 13 additions & 11 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"

namespace doris {
Expand Down Expand Up @@ -115,7 +116,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
}

DeletePredicatePB del_pred;
auto tablet_schema = tablet_var.tablet->tablet_schema();
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet_var.tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
Expand All @@ -141,25 +143,25 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
<< ". tablet: " << tablet_vars->at(0).tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
}

auto tablet_schema = tablet_vars->at(0).tablet->tablet_schema();
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
tablet_schema->clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema.append_column(TabletColumn(column_desc));
tablet_schema->append_column(TabletColumn(column_desc));
}
}

// writes
if (push_type == PUSH_NORMAL_V2) {
res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
&tablet_schema);
tablet_schema);

} else {
res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
&tablet_schema);
tablet_schema);
}
if (!res.ok()) {
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
Expand Down Expand Up @@ -219,7 +221,7 @@ void PushHandler::_get_tablet_infos(const std::vector<TabletVars>& tablet_vars,

Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema) {
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
uint32_t num_rows = 0;
PUniqueId load_id;
Expand Down Expand Up @@ -344,7 +346,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_

Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema) {
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
RowCursor row;
BinaryFile raw_file;
Expand Down Expand Up @@ -515,7 +517,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) {

BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {}

Status BinaryReader::init(const TabletSchema* tablet_schema, BinaryFile* file) {
Status BinaryReader::init(TabletSchemaSPtr tablet_schema, BinaryFile* file) {
Status res = Status::OK();

do {
Expand Down Expand Up @@ -657,7 +659,7 @@ LzoBinaryReader::LzoBinaryReader()
_row_num(0),
_next_row_start(0) {}

Status LzoBinaryReader::init(const TabletSchema* tablet_schema, BinaryFile* file) {
Status LzoBinaryReader::init(TabletSchemaSPtr tablet_schema, BinaryFile* file) {
Status res = Status::OK();

do {
Expand Down
13 changes: 7 additions & 6 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/olap_common.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet_schema.h"

namespace doris {

Expand Down Expand Up @@ -61,12 +62,12 @@ class PushHandler {
private:
Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema);
TabletSchemaSPtr tablet_schema);
// Convert local data file to internal formatted delta,
// return new delta's SegmentGroup
Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema);
TabletSchemaSPtr tablet_schema);

// Only for debug
std::string _debug_version_list(const Versions& versions) const;
Expand Down Expand Up @@ -114,7 +115,7 @@ class IBinaryReader {
static IBinaryReader* create(bool need_decompress);
virtual ~IBinaryReader() = default;

virtual Status init(const TabletSchema* tablet_schema, BinaryFile* file) = 0;
virtual Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) = 0;
virtual Status finalize() = 0;

virtual Status next(RowCursor* row) = 0;
Expand All @@ -133,7 +134,7 @@ class IBinaryReader {
_ready(false) {}

BinaryFile* _file;
const TabletSchema* _tablet_schema;
TabletSchemaSPtr _tablet_schema;
size_t _content_len;
size_t _curr;
uint32_t _adler_checksum;
Expand All @@ -146,7 +147,7 @@ class BinaryReader : public IBinaryReader {
explicit BinaryReader();
~BinaryReader() override { finalize(); }

Status init(const TabletSchema* tablet_schema, BinaryFile* file) override;
Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) override;
Status finalize() override;

Status next(RowCursor* row) override;
Expand All @@ -163,7 +164,7 @@ class LzoBinaryReader : public IBinaryReader {
explicit LzoBinaryReader();
~LzoBinaryReader() override { finalize(); }

Status init(const TabletSchema* tablet_schema, BinaryFile* file) override;
Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) override;
Status finalize() override;

Status next(RowCursor* row) override;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "io/fs/s3_file_system.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/doris_metrics.h"

Expand Down Expand Up @@ -59,7 +60,7 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& r
segment_id);
}

BetaRowset::BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
: Rowset(schema, tablet_path, std::move(rowset_meta)) {}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BetaRowset : public Rowset {
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);

protected:
BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta);

// init segment groups
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

#include "olap/rowset/rowset.h"

#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"
#include "util/time.h"

namespace doris {

Rowset::Rowset(const TabletSchema* schema, const std::string& tablet_path,
Rowset::Rowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
: _tablet_path(tablet_path), _rowset_meta(std::move(rowset_meta)), _refs_by_reader(0) {
_is_pending = !_rowset_meta->has_version();
Expand All @@ -32,7 +34,7 @@ Rowset::Rowset(const TabletSchema* schema, const std::string& tablet_path,
_is_cumulative = version.first != version.second;
}
// build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema
_schema = _rowset_meta->tablet_schema() != nullptr ? _rowset_meta->tablet_schema() : schema;
_schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema;
}

Status Rowset::load(bool use_cache) {
Expand Down
Loading