Skip to content

Commit

Permalink
Merge branch 'apache:master' into init_hive_sink
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz committed Mar 3, 2024
2 parents ccdb211 + d8b4edb commit dafb315
Show file tree
Hide file tree
Showing 162 changed files with 6,700 additions and 906 deletions.
5 changes: 4 additions & 1 deletion be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,15 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
"PUSH", config::delete_worker_count,
[&engine](auto&& task) { cloud_push_callback(engine, task); });
// TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker
// TODO(plat1ko): CALCULATE_DELETE_BITMAP worker

_workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count,
[&engine](auto&& task) { return alter_cloud_tablet_callback(engine, task); });

_workers[TTaskType::CALCULATE_DELETE_BITMAP] = std::make_unique<TaskWorkerPool>(
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
[&engine](auto&& task) { return calc_delete_bimtap_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));
Expand Down
32 changes: 32 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#include "agent/utils.h"
#include "cloud/cloud_delete_task.h"
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
#include "cloud/cloud_schema_change_job.h"
#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -1864,4 +1865,35 @@ void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
std::vector<TTabletId> error_tablet_ids;
std::vector<TTabletId> succ_tablet_ids;
Status status;
error_tablet_ids.clear();
const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
&succ_tablet_ids);
status = engine_task.execute();

TFinishTaskRequest finish_task_request;
if (!status) {
DorisMetrics::instance()->publish_task_failed_total->increment(1);
LOG_WARNING("failed to calculate delete bitmap")
.tag("signature", req.signature)
.tag("transaction_id", calc_delete_bitmap_req.transaction_id)
.tag("error_tablets_num", error_tablet_ids.size())
.error(status);
}

status.to_thrift(&finish_task_request.task_status);
finish_task_request.__set_backend(BackendOptions::get_local_backend());
finish_task_request.__set_task_type(req.task_type);
finish_task_request.__set_signature(req.signature);
finish_task_request.__set_report_version(s_report_version);
finish_task_request.__set_error_tablet_ids(error_tablet_ids);

finish_task(finish_task_request);
remove_task_info(req.task_type, req.signature);
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,6 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info)

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info);

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);

} // namespace doris
Loading

0 comments on commit dafb315

Please sign in to comment.