Skip to content

Commit

Permalink
[improvement](light-schema-change) Support tablet schema cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Jul 31, 2022
1 parent 388db05 commit 6665995
Show file tree
Hide file tree
Showing 47 changed files with 479 additions and 342 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Status OlapScanner::prepare(
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_tablet_schema = _tablet->tablet_schema();
_tablet_schema.copy_from(*_tablet->tablet_schema());
if (!_parent->_olap_scan_node.columns_desc.empty() &&
_parent->_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema.clear_columns();
Expand Down Expand Up @@ -289,7 +289,7 @@ Status OlapScanner::_init_return_columns(bool need_seq_col) {
}

// expand the sequence column
if (_tablet->tablet_schema().has_sequence_col() && need_seq_col) {
if (_tablet_schema.has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema.column(col).aggregation() ==
Expand Down
11 changes: 5 additions & 6 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "gutil/strings/substitute.h"
#include "olap/data_dir.h"
#include "olap/tablet_schema_cache.h"
#include "util/doris_metrics.h"
#include "util/path_util.h"

Expand All @@ -29,10 +30,8 @@ extern MetricPrototype METRIC_query_scan_rows;
extern MetricPrototype METRIC_query_scan_count;

BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
: _state(tablet_meta->tablet_state()),
_tablet_meta(tablet_meta),
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir) {
: _state(tablet_meta->tablet_state()), _tablet_meta(tablet_meta), _data_dir(data_dir) {
_schema = TabletSchemaCache::instance()->insert(_tablet_meta->tablet_schema().to_key());
_gen_tablet_path();

std::stringstream ss;
Expand Down Expand Up @@ -72,8 +71,8 @@ void BaseTablet::_gen_tablet_path() {
bool BaseTablet::set_tablet_schema_into_rowset_meta() {
bool flag = false;
for (RowsetMetaSharedPtr rowset_meta : _tablet_meta->all_mutable_rs_metas()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&_schema);
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(_schema);
flag = true;
}
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/metrics.h"

Expand Down Expand Up @@ -64,7 +65,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
void set_storage_policy(const std::string& policy) { _tablet_meta->set_storage_policy(policy); }

// properties encapsulated in TabletSchema
virtual const TabletSchema& tablet_schema() const;
virtual TabletSchemaSPtr tablet_schema() const;

bool set_tablet_schema_into_rowset_meta();

Expand All @@ -74,7 +75,7 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
protected:
TabletState _state;
TabletMetaSharedPtr _tablet_meta;
const TabletSchema& _schema;
TabletSchemaSPtr _schema;

DataDir* _data_dir;
std::string _tablet_path;
Expand Down Expand Up @@ -145,7 +146,7 @@ inline bool BaseTablet::equal(int64_t id, int32_t hash) {
return (tablet_id() == id) && (schema_hash() == hash);
}

inline const TabletSchema& BaseTablet::tablet_schema() const {
inline TabletSchemaSPtr BaseTablet::tablet_schema() const {
return _schema;
}

Expand Down
15 changes: 10 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
#include "util/time.h"
Expand Down Expand Up @@ -150,9 +151,13 @@ Status Compaction::do_compaction_impl(int64_t permits) {
LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
const TabletSchema cur_tablet_schema = _tablet->tablet_schema();
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); });
TabletSchemaSPtr cur_tablet_schema =
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();

RETURN_NOT_OK(construct_output_rowset_writer(&cur_tablet_schema));
RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema));
RETURN_NOT_OK(construct_input_rowset_readers());
TRACE("prepare finished");

Expand All @@ -166,10 +171,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
_input_rs_readers, _output_rs_writer.get(), &stats);
} else {
res = Merger::merge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema.get(),
_input_rs_readers, _output_rs_writer.get(), &stats);
}

Expand Down Expand Up @@ -233,7 +238,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return Status::OK();
}

Status Compaction::construct_output_rowset_writer(const TabletSchema* schema) {
Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema) {
return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema,
_oldest_write_timestamp, _newest_write_timestamp,
&_output_rs_writer);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Compaction {
Status modify_rowsets();
void gc_output_rowset();

Status construct_output_rowset_writer(const TabletSchema* schema);
Status construct_output_rowset_writer(TabletSchemaSPtr schema);
Status construct_input_rowset_readers();

Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/data_dir.h"

#include <ctype.h>
#include <gen_cpp/olap_file.pb.h>
#include <mntent.h>
#include <stdio.h>
#include <sys/file.h>
Expand Down Expand Up @@ -475,8 +476,8 @@ Status DataDir::load() {
}
if (rowset_meta->rowset_state() == RowsetStatePB::COMMITTED &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
Expand All @@ -498,8 +499,8 @@ Status DataDir::load() {
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
if (!rowset_meta->get_rowset_pb().has_tablet_schema()) {
rowset_meta->set_tablet_schema(&tablet->tablet_schema());
if (!rowset_meta->tablet_schema()) {
rowset_meta->set_tablet_schema(tablet->tablet_schema());
RowsetMetaManager::save(_meta, rowset_meta->tablet_uid(), rowset_meta->rowset_id(),
rowset_meta->get_rowset_pb());
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ Status DeltaWriter::init() {
_req.txn_id, _req.load_id));
}
// build tablet schema in request level
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param, _tablet->tablet_schema());
_build_current_tablet_schema(_req.index_id, _req.ptable_schema_param,
*_tablet->tablet_schema());

RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
_tablet_schema.get(), &_rowset_writer));
_tablet_schema, &_rowset_writer));
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();

Expand Down Expand Up @@ -379,7 +380,7 @@ int64_t DeltaWriter::partition_id() const {
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
const POlapTableSchemaParam& ptable_schema_param,
const TabletSchema& ori_tablet_schema) {
*_tablet_schema = ori_tablet_schema;
_tablet_schema->copy_from(ori_tablet_schema);
//new tablet schame if new table
if (ptable_schema_param.indexes_size() > 0 &&
ptable_schema_param.indexes(0).columns_desc_size() != 0 &&
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class DeltaWriter {
// tablet schema owned by delta writer, all write will use this tablet schema
// it's build from tablet_schema(stored when create tablet) and OlapTableSchema
// every request will have it's own tablet schema so simple schema change can work
std::unique_ptr<TabletSchema> _tablet_schema;
TabletSchemaSPtr _tablet_schema;
bool _delta_written_success;

StorageEngine* _storage_engine;
Expand Down
24 changes: 13 additions & 11 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"

namespace doris {
Expand Down Expand Up @@ -115,7 +116,8 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
}

DeletePredicatePB del_pred;
auto tablet_schema = tablet_var.tablet->tablet_schema();
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet_var.tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
Expand All @@ -141,25 +143,25 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
<< ". tablet: " << tablet_vars->at(0).tablet->full_name();
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_VERSION);
}

auto tablet_schema = tablet_vars->at(0).tablet->tablet_schema();
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->copy_from(*tablet_vars->at(0).tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
tablet_schema->clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema.append_column(TabletColumn(column_desc));
tablet_schema->append_column(TabletColumn(column_desc));
}
}

// writes
if (push_type == PUSH_NORMAL_V2) {
res = _convert_v2(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
&tablet_schema);
tablet_schema);

} else {
res = _convert(tablet_vars->at(0).tablet, tablet_vars->at(1).tablet,
&(tablet_vars->at(0).rowset_to_add), &(tablet_vars->at(1).rowset_to_add),
&tablet_schema);
tablet_schema);
}
if (!res.ok()) {
LOG(WARNING) << "fail to convert tmp file when realtime push. res=" << res
Expand Down Expand Up @@ -219,7 +221,7 @@ void PushHandler::_get_tablet_infos(const std::vector<TabletVars>& tablet_vars,

Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema) {
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
uint32_t num_rows = 0;
PUniqueId load_id;
Expand Down Expand Up @@ -344,7 +346,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_

Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema) {
TabletSchemaSPtr tablet_schema) {
Status res = Status::OK();
RowCursor row;
BinaryFile raw_file;
Expand Down Expand Up @@ -515,7 +517,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) {

BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {}

Status BinaryReader::init(const TabletSchema* tablet_schema, BinaryFile* file) {
Status BinaryReader::init(TabletSchemaSPtr tablet_schema, BinaryFile* file) {
Status res = Status::OK();

do {
Expand Down Expand Up @@ -657,7 +659,7 @@ LzoBinaryReader::LzoBinaryReader()
_row_num(0),
_next_row_start(0) {}

Status LzoBinaryReader::init(const TabletSchema* tablet_schema, BinaryFile* file) {
Status LzoBinaryReader::init(TabletSchemaSPtr tablet_schema, BinaryFile* file) {
Status res = Status::OK();

do {
Expand Down
13 changes: 7 additions & 6 deletions be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/olap_common.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet_schema.h"

namespace doris {

Expand Down Expand Up @@ -61,12 +62,12 @@ class PushHandler {
private:
Status _convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema);
TabletSchemaSPtr tablet_schema);
// Convert local data file to internal formatted delta,
// return new delta's SegmentGroup
Status _convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tablet_vec,
RowsetSharedPtr* cur_rowset, RowsetSharedPtr* new_rowset,
const TabletSchema* tablet_schema);
TabletSchemaSPtr tablet_schema);

// Only for debug
std::string _debug_version_list(const Versions& versions) const;
Expand Down Expand Up @@ -114,7 +115,7 @@ class IBinaryReader {
static IBinaryReader* create(bool need_decompress);
virtual ~IBinaryReader() = default;

virtual Status init(const TabletSchema* tablet_schema, BinaryFile* file) = 0;
virtual Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) = 0;
virtual Status finalize() = 0;

virtual Status next(RowCursor* row) = 0;
Expand All @@ -133,7 +134,7 @@ class IBinaryReader {
_ready(false) {}

BinaryFile* _file;
const TabletSchema* _tablet_schema;
TabletSchemaSPtr _tablet_schema;
size_t _content_len;
size_t _curr;
uint32_t _adler_checksum;
Expand All @@ -146,7 +147,7 @@ class BinaryReader : public IBinaryReader {
explicit BinaryReader();
~BinaryReader() override { finalize(); }

Status init(const TabletSchema* tablet_schema, BinaryFile* file) override;
Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) override;
Status finalize() override;

Status next(RowCursor* row) override;
Expand All @@ -163,7 +164,7 @@ class LzoBinaryReader : public IBinaryReader {
explicit LzoBinaryReader();
~LzoBinaryReader() override { finalize(); }

Status init(const TabletSchema* tablet_schema, BinaryFile* file) override;
Status init(TabletSchemaSPtr tablet_schema, BinaryFile* file) override;
Status finalize() override;

Status next(RowCursor* row) override;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "io/fs/s3_file_system.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/doris_metrics.h"

Expand Down Expand Up @@ -59,7 +60,7 @@ std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& r
segment_id);
}

BetaRowset::BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
: Rowset(schema, tablet_path, std::move(rowset_meta)) {}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class BetaRowset : public Rowset {
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);

protected:
BetaRowset(const TabletSchema* schema, const std::string& tablet_path,
BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta);

// init segment groups
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

#include "olap/rowset/rowset.h"

#include "olap/tablet_schema.h"
#include "olap/tablet_schema_cache.h"
#include "util/time.h"

namespace doris {

Rowset::Rowset(const TabletSchema* schema, const std::string& tablet_path,
Rowset::Rowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
: _tablet_path(tablet_path), _rowset_meta(std::move(rowset_meta)), _refs_by_reader(0) {
_is_pending = !_rowset_meta->has_version();
Expand All @@ -32,7 +34,7 @@ Rowset::Rowset(const TabletSchema* schema, const std::string& tablet_path,
_is_cumulative = version.first != version.second;
}
// build schema from RowsetMeta.tablet_schema or Tablet.tablet_schema
_schema = _rowset_meta->tablet_schema() != nullptr ? _rowset_meta->tablet_schema() : schema;
_schema = _rowset_meta->tablet_schema() ? _rowset_meta->tablet_schema() : schema;
}

Status Rowset::load(bool use_cache) {
Expand Down
Loading

0 comments on commit 6665995

Please sign in to comment.