Skip to content

Commit

Permalink
Merge branch 'master' into range_null
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr authored Mar 27, 2024
2 parents 7babbb3 + b29d395 commit d935b90
Show file tree
Hide file tree
Showing 289 changed files with 142,454 additions and 1,318 deletions.
2 changes: 1 addition & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ github:
- shuke987
- wm1581066
- KassieZ
- yujun777
- gavinchou
- yujun777

notifications:
pullrequests_status: [email protected]
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class BeExecVersionManager {
* e. change shuffle serialize/deserialize way
* f. shrink some function's nullable mode.
* g. do local merge of remote runtime filter
* h. "now": ALWAYS_NOT_NULLABLE -> DEPEND_ON_ARGUMENTS
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 4;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
Expand Down
4 changes: 4 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskReques
auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
if (tablet_ptr != nullptr) {
EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
status = engine_task.execute();
} else {
status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
Expand Down Expand Up @@ -940,6 +941,7 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest&
EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
check_consistency_req.schema_hash, check_consistency_req.version,
&checksum);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
Status status = engine_task.execute();
if (!status.ok()) {
LOG_WARNING("failed to check consistency")
Expand Down Expand Up @@ -1596,6 +1598,7 @@ void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
std::vector<TTabletInfo> tablet_infos;

EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();

// Return result to fe
Expand Down Expand Up @@ -1928,6 +1931,7 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,

std::vector<TTabletInfo> tablet_infos;
EngineCloneTask engine_task(engine, clone_req, master_info, req.signature, &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();
// Return result to fe
TFinishTaskRequest finish_task_request;
Expand Down
16 changes: 3 additions & 13 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,8 +801,7 @@ Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn);
}

Status CloudMetaMgr::get_storage_vault_info(
std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>>* vault_infos) {
Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
GetObjStoreInfoRequest req;
GetObjStoreInfoResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -826,18 +825,9 @@ Status CloudMetaMgr::get_storage_vault_info(
});
}
for (const auto& vault : resp.storage_vault()) {
THdfsParams params;
params.fs_name = vault.hdfs_info().build_conf().fs_name();
params.user = vault.hdfs_info().build_conf().user();
params.hdfs_kerberos_keytab = vault.hdfs_info().build_conf().hdfs_kerberos_keytab();
params.hdfs_kerberos_principal = vault.hdfs_info().build_conf().hdfs_kerberos_principal();
for (const auto& confs : vault.hdfs_info().build_conf().hdfs_confs()) {
THdfsConf conf;
conf.key = confs.key();
conf.value = confs.value();
params.hdfs_conf.emplace_back(std::move(conf));
if (vault.has_hdfs_info()) {
vault_infos->emplace_back(vault.id(), vault.hdfs_info());
}
vault_infos->emplace_back(vault.id(), std::move(params));
}
return Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class TabletJobInfoPB;
class TabletStatsPB;
class TabletIndexPB;

using StorageVaultInfos = std::vector<std::tuple<std::string, std::variant<S3Conf, HdfsVaultInfo>>>;

Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency);

class CloudMetaMgr {
Expand Down Expand Up @@ -70,8 +72,7 @@ class CloudMetaMgr {

Status precommit_txn(const StreamLoadContext& ctx);

Status get_storage_vault_info(
std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>>* vault_infos);
Status get_storage_vault_info(StorageVaultInfos* vault_infos);

Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res);

Expand Down
17 changes: 10 additions & 7 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cloud/cloud_storage_engine.h"

#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/cloud.pb.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
Expand All @@ -34,6 +35,7 @@
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/s3_file_system.h"
#include "io/hdfs_util.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/memtable_flush_executor.h"
#include "olap/storage_policy.h"
Expand Down Expand Up @@ -82,9 +84,11 @@ struct VaultCreateFSVisitor {
}

// TODO(ByteYue): Make sure enable_java_support is on
Status operator()(const THdfsParams& hdfs_params) const {
auto fs = DORIS_TRY(
io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr));
Status operator()(const cloud::HdfsVaultInfo& vault) const {
auto hdfs_params = io::to_hdfs_params(vault);
auto fs =
DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr,
vault.has_prefix() ? vault.prefix() : ""));
put_storage_resource(id, {std::move(fs), 0});
LOG_INFO("successfully create hdfs vault, vault id {}", id);
return Status::OK();
Expand All @@ -107,7 +111,7 @@ struct RefreshFSVaultVisitor {
return st;
}

Status operator()(const THdfsParams& hdfs_params) const {
Status operator()(const cloud::HdfsVaultInfo& vault_info) const {
// TODO(ByteYue): Implmente the hdfs fs refresh logic
return Status::OK();
}
Expand All @@ -117,7 +121,7 @@ struct RefreshFSVaultVisitor {
};

Status CloudStorageEngine::open() {
std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>> vault_infos;
cloud::StorageVaultInfos vault_infos;
do {
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (st.ok()) {
Expand All @@ -133,7 +137,6 @@ Status CloudStorageEngine::open() {
for (auto& [id, vault_info] : vault_infos) {
RETURN_IF_ERROR(std::visit(VaultCreateFSVisitor {id}, vault_info));
}

set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));

// TODO(plat1ko): DeleteBitmapTxnManager
Expand Down Expand Up @@ -239,7 +242,7 @@ Status CloudStorageEngine::start_bg_threads() {
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::refresh_s3_info_interval_s))) {
std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>> vault_infos;
cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
Expand Down
51 changes: 51 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_stream_load_executor.h"

#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "runtime/stream_load/stream_load_context.h"

namespace doris {

CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env)
: StreamLoadExecutor(exec_env) {}

CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default;

Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
// forward to fe to excute commit transaction for MoW table
Status st;
int retry_times = 0;
// mow table will retry when DELETE_BITMAP_LOCK_ERROR occurs
do {
st = StreamLoadExecutor::commit_txn(ctx);
if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
}
LOG_WARNING("Failed to commit txn")
.tag("txn_id", ctx->txn_id)
.tag("retry_times", retry_times)
.error(st);
retry_times++;
} while (retry_times < config::mow_stream_load_commit_retry_times);
return st;
}

} // namespace doris
30 changes: 30 additions & 0 deletions be/src/cloud/cloud_stream_load_executor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once
#include "runtime/stream_load/stream_load_executor.h"

namespace doris {
class CloudStreamLoadExecutor final : public StreamLoadExecutor {
public:
CloudStreamLoadExecutor(ExecEnv* exec_env);

~CloudStreamLoadExecutor() override;

Status commit_txn(StreamLoadContext* ctx) override;
};
} // namespace doris
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"

namespace doris {
using namespace ErrorCode;
Expand Down Expand Up @@ -75,6 +76,10 @@ Status CloudTablet::capture_consistent_rowsets_unlocked(
Status CloudTablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
bool skip_missing_version) {
DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
return Status::Error<false>(-230, "injected error");
});
Versions version_path;
std::shared_lock rlock(_meta_lock);
auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ DEFINE_mInt32(refresh_s3_info_interval_s, "60");
DEFINE_mInt32(vacuum_stale_rowsets_interval_s, "300");
DEFINE_mInt32(schedule_sync_tablets_interval_s, "600");

DEFINE_mInt32(mow_stream_load_commit_retry_times, "10");

} // namespace doris::config
3 changes: 3 additions & 0 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ DECLARE_mInt32(refresh_s3_info_interval_s);
DECLARE_mInt32(vacuum_stale_rowsets_interval_s);
DECLARE_mInt32(schedule_sync_tablets_interval_s);

// Cloud mow
DECLARE_mInt32(mow_stream_load_commit_retry_times);

} // namespace doris::config
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Result<std::shared_ptr<HdfsFileSystem>> HdfsFileSystem::create(const THdfsParams

HdfsFileSystem::HdfsFileSystem(const THdfsParams& hdfs_params, std::string fs_name, std::string id,
RuntimeProfile* profile, std::string root_path)
: RemoteFileSystem(root_path, std::move(id), FileSystemType::HDFS),
: RemoteFileSystem(std::move(root_path), std::move(id), FileSystemType::HDFS),
_hdfs_params(hdfs_params),
_fs_name(std::move(fs_name)),
_profile(profile) {
Expand Down
26 changes: 26 additions & 0 deletions be/src/io/hdfs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "io/hdfs_util.h"

#include <gen_cpp/cloud.pb.h>

#include <ostream>

#include "common/logging.h"
Expand Down Expand Up @@ -140,4 +142,28 @@ bool is_hdfs(const std::string& path_or_fs) {
return path_or_fs.rfind("hdfs://") == 0;
}

THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) {
THdfsParams params;
auto build_conf = vault.build_conf();
params.__set_fs_name(build_conf.fs_name());
if (build_conf.has_user()) {
params.__set_user(build_conf.user());
}
if (build_conf.has_hdfs_kerberos_principal()) {
params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_principal());
}
if (build_conf.has_hdfs_kerberos_keytab()) {
params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_keytab());
}
std::vector<THdfsConf> tconfs;
for (const auto& confs : vault.build_conf().hdfs_confs()) {
THdfsConf conf;
conf.__set_key(confs.key());
conf.__set_value(confs.value());
tconfs.emplace_back(conf);
}
params.__set_hdfs_conf(tconfs);
return params;
}

} // namespace doris::io
6 changes: 6 additions & 0 deletions be/src/io/hdfs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include "io/fs/hdfs.h"
#include "io/fs/path.h"

namespace cloud {
class HdfsVaultInfo;
}

namespace doris {
class HDFSCommonBuilder;
class THdfsParams;
Expand Down Expand Up @@ -125,5 +129,7 @@ std::string get_fs_name(const std::string& path);
// return true if path_or_fs contains "hdfs://"
bool is_hdfs(const std::string& path_or_fs);

THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault);

} // namespace io
} // namespace doris
Loading

0 comments on commit d935b90

Please sign in to comment.