Skip to content

Commit

Permalink
Merge branch 'master' into window_rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
feiniaofeiafei authored Apr 15, 2024
2 parents 4be8de6 + 5137342 commit 549420d
Show file tree
Hide file tree
Showing 614 changed files with 23,717 additions and 2,266 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ Apache Doris is an easy-to-use, high-performance and real-time analytical databa

All this makes Apache Doris an ideal tool for scenarios including report analysis, ad-hoc query, unified data warehouse, and data lake query acceleration. On Apache Doris, users can build various applications, such as user behavior analysis, AB test platform, log retrieval analysis, user portrait analysis, and order analysis.

🎉 Version 2.1.0 released now. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.1.0) here. The 2.1 verison delivers exceptional performance with 100% higher out-of-the-box queries proven by TPC-DS 1TB tests, enhanced data lake analytics that are 4-6 times speedier than Trino and Spark, solid support for semi-structured data analysis with new Variant types and suite of analytical functions, asynchronous materialized views for query acceleration, optimized real-time writing at scale, and better workload management with stability and runtime SQL resource tracking.
🎉 Version 2.1.1 released now. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.1.1) here. The 2.1 verison delivers exceptional performance with 100% higher out-of-the-box queries proven by TPC-DS 1TB tests, enhanced data lake analytics that are 4-6 times speedier than Trino and Spark, solid support for semi-structured data analysis with new Variant types and suite of analytical functions, asynchronous materialized views for query acceleration, optimized real-time writing at scale, and better workload management with stability and runtime SQL resource tracking.


🎉 Version 2.0.6 is now released ! This fully evolved and stable release is ready for all users to upgrade. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.0.6) here.
🎉 Version 2.0.8 is now released ! This fully evolved and stable release is ready for all users to upgrade. Check out the 🔗[Release Notes](https://doris.apache.org/docs/releasenotes/release-2.0.8) here.

👀 Have a look at the 🔗[Official Website](https://doris.apache.org/) for a comprehensive list of Apache Doris's core features, blogs and user cases.

Expand Down
10 changes: 10 additions & 0 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@

#include "cloud/cloud_internal_service.h"

#include "cloud/cloud_storage_engine.h"

namespace doris {

CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}

CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;

void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller,
const doris::PAlterVaultSyncRequest* request,
PAlterVaultSyncResponse* response,
google::protobuf::Closure* done) {
LOG(INFO) << "alter be to sync vault info from Meta Service";
_engine.sync_storage_vault();
}

} // namespace doris
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class CloudInternalServiceImpl final : public PInternalService {

// TODO(plat1ko): cloud internal service functions

void alter_vault_sync(google::protobuf::RpcController* controller,
const doris::PAlterVaultSyncRequest* request,
PAlterVaultSyncResponse* response,
google::protobuf::Closure* done) override;

private:
[[maybe_unused]] CloudStorageEngine& _engine;
};
Expand Down
14 changes: 10 additions & 4 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <memory>
Expand Down Expand Up @@ -811,7 +812,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
return s;
}

for (const auto& obj_store : resp.obj_info()) {
auto add_obj_store = [&vault_infos](const auto& obj_store) {
vault_infos->emplace_back(obj_store.id(),
S3Conf {
.bucket = obj_store.bucket(),
Expand All @@ -823,12 +824,17 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
.sse_enabled = obj_store.sse_enabled(),
.provider = obj_store.provider(),
});
}
for (const auto& vault : resp.storage_vault()) {
};

std::ranges::for_each(resp.obj_info(), add_obj_store);
std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) {
if (vault.has_hdfs_info()) {
vault_infos->emplace_back(vault.id(), vault.hdfs_info());
}
}
if (vault.has_obj_info()) {
add_obj_store(vault.obj_info());
}
});
return Status::OK();
}

Expand Down
52 changes: 30 additions & 22 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,32 +267,40 @@ Status CloudStorageEngine::start_bg_threads() {
return Status::OK();
}

void CloudStorageEngine::sync_storage_vault() {
cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
return;
}

if (vault_infos.empty()) {
LOG(WARNING) << "no storage vault info";
return;
}

for (auto& [id, vault_info] : vault_infos) {
auto fs = get_filesystem(id);
auto st = (fs == nullptr)
? std::visit(VaultCreateFSVisitor {id}, vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs)}, vault_info);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
}
}

if (auto& id = std::get<0>(vault_infos.back());
latest_fs() == nullptr || latest_fs()->id() != id) {
set_latest_fs(get_filesystem(id));
}
}

// We should enable_java_support if we want to use hdfs vault
void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::refresh_s3_info_interval_s))) {
cloud::StorageVaultInfos vault_infos;
auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
if (!st.ok()) {
LOG(WARNING) << "failed to get storage vault info. err=" << st;
continue;
}

CHECK(!vault_infos.empty()) << "no s3 infos";
for (auto& [id, vault_info] : vault_infos) {
auto fs = get_filesystem(id);
auto st = (fs == nullptr)
? std::visit(VaultCreateFSVisitor {id}, vault_info)
: std::visit(RefreshFSVaultVisitor {id, std::move(fs)}, vault_info);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << vault_process_error(id, vault_info, std::move(st));
}
}

if (auto& id = std::get<0>(vault_infos.back());
latest_fs() == nullptr || latest_fs()->id() != id) {
set_latest_fs(get_filesystem(id));
}
sync_storage_vault();
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::shared_ptr<CloudCumulativeCompactionPolicy> cumu_compaction_policy(
std::string_view compaction_policy);

void sync_storage_vault();

private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
Expand Down
13 changes: 7 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,14 +1181,15 @@ DEFINE_mString(ca_cert_file_paths,
"/etc/ssl/ca-bundle.pem");

/** Table sink configurations(currently contains only external table types) **/
// Minimum data processed to scale writers when non partition writing
// Minimum data processed to scale writers in exchange when non partition writing
DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold,
"125829120"); // 120MB
// Minimum data processed to start rebalancing in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_data_processed_threshold, "209715200"); // 200MB
"26214400"); // 25MB
// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold,
"209715200"); // 200MB
DEFINE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold,
"26214400"); // 25MB
// Minimum partition data processed to rebalance writers in exchange when partition writing
DEFINE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold,
"15728640"); // 15MB
// Maximum processed partition nums of per writer when partition writing
DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128");

Expand Down
8 changes: 4 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1265,12 +1265,12 @@ DECLARE_String(trino_connector_plugin_dir);
DECLARE_mString(ca_cert_file_paths);

/** Table sink configurations(currently contains only external table types) **/
// Minimum data processed to scale writers when non partition writing
// Minimum data processed to scale writers in exchange when non partition writing
DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold);
// Minimum data processed to start rebalancing in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_data_processed_threshold);
// Minimum data processed to trigger skewed partition rebalancing in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold);
DECLARE_mInt64(table_sink_partition_write_min_data_processed_rebalance_threshold);
// Minimum partition data processed to rebalance writers in exchange when partition writing
DECLARE_mInt64(table_sink_partition_write_min_partition_data_processed_rebalance_threshold);
// Maximum processed partition nums of per writer when partition writing
DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);

Expand Down
2 changes: 0 additions & 2 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ struct IteratorItem {

Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
PMultiGetResponse* response) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->rowid_storage_reader_tracker());
// read from storage engine row id by row id
OlapReaderStatistics stats;
vectorized::Block result_block;
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}
// timezone first. if not, try time_zone
if (!http_req->header(HTTP_TIMEZONE).empty()) {
request.__set_timezone(http_req->header(HTTP_TIMEZONE));
} else if (!http_req->header(HTTP_TIME_ZONE).empty()) {
request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
}
if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
try {
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ static const std::string HTTP_TEMP_PARTITIONS = "temporary_partitions";
static const std::string HTTP_NEGATIVE = "negative";
static const std::string HTTP_STRICT_MODE = "strict_mode";
static const std::string HTTP_TIMEZONE = "timezone";
static const std::string HTTP_TIME_ZONE = "time_zone";
static const std::string HTTP_EXEC_MEM_LIMIT = "exec_mem_limit";
static const std::string HTTP_JSONPATHS = "jsonpaths";
static const std::string HTTP_JSONROOT = "json_root";
Expand Down
3 changes: 1 addition & 2 deletions be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,7 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_

RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
}

return Status::OK();
return local_writer->close();
}

} // namespace doris::io
10 changes: 9 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/time.h"
#include "util/trace.h"

Expand Down Expand Up @@ -120,7 +121,14 @@ Compaction::Compaction(BaseTabletSPtr tablet, const std::string& label)
init_profile(label);
}

Compaction::~Compaction() = default;
Compaction::~Compaction() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_output_rs_writer.reset();
_tablet.reset();
_input_rowsets.clear();
_output_rowset.reset();
_cur_tablet_schema.reset();
}

void Compaction::init_profile(const std::string& label) {
_profile = std::make_unique<RuntimeProfile>(label);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest*
response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
return;
}
std::vector<Version> local_versions = tablet->get_all_versions();
std::vector<Version> local_versions = tablet->get_all_local_versions();
for (const auto& local_version : local_versions) {
auto version = response->add_versions();
version->set_first(local_version.first);
Expand Down
49 changes: 33 additions & 16 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,24 +576,41 @@ Status BetaRowset::add_to_binlog() {
}
linked_success_files.push_back(binlog_file);

for (const auto& index : _schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
for (const auto& index : _schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
}
auto index_id = index.index_id();
auto index_file = InvertedIndexDescriptor::get_index_file_name(
seg_file, index_id, index.get_index_suffix());
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
std::filesystem::path(index_file).filename())
.string();
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={}, errno={}", index_file,
binlog_index_file, Errno::no());
return status;
}
linked_success_files.push_back(binlog_index_file);
}
auto index_id = index.index_id();
auto index_file = InvertedIndexDescriptor::get_index_file_name(
seg_file, index_id, index.get_index_suffix());
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
std::filesystem::path(index_file).filename())
.string();
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={}, errno={}", index_file,
binlog_index_file, Errno::no());
return status;
} else {
if (_schema->has_inverted_index()) {
auto index_file = InvertedIndexDescriptor::get_index_file_name(seg_file);
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
std::filesystem::path(index_file).filename())
.string();
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={}, errno={}", index_file,
binlog_index_file, Errno::no());
return status;
}
linked_success_files.push_back(binlog_index_file);
}
linked_success_files.push_back(binlog_index_file);
}
}

Expand Down
Loading

0 comments on commit 549420d

Please sign in to comment.