Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Be refactor #1092

Merged
merged 3 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,11 @@ void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
if (task.__isset.resource_info) {
user = task.resource_info.user;
}
{
lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
if (task.__isset.alter_tablet_req && _tasks.size() >= config::alter_tablet_worker_count) {
return;
}
}
bool ret = _record_task_info(task_type, signature, user);
if (ret == true) {
lock_guard<Mutex> worker_thread_lock(_worker_thread_lock);
// set the task receive time
(const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
_tasks.push_back(task);
_worker_thread_condition_lock.notify();
}
Expand Down Expand Up @@ -536,24 +532,33 @@ void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) {
CgroupsMgr::apply_system_cgroup();
int64_t signatrue = agent_task_req.signature;
LOG(INFO) << "get alter table task, signature: " << agent_task_req.signature;

TFinishTaskRequest finish_task_request;
TTaskType::type task_type = agent_task_req.task_type;
switch (task_type) {
case TTaskType::SCHEMA_CHANGE:
case TTaskType::ROLLUP:
worker_pool_this->_alter_tablet(worker_pool_this,
alter_tablet_request,
signatrue,
task_type,
&finish_task_request);
break;
default:
// pass
break;
bool is_task_timeout = false;
if (agent_task_req.__isset.recv_time) {
int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
if (time_elapsed > config::report_task_interval_seconds * 20) {
LOG(INFO) << "task elapsed " << time_elapsed
<< " since it is inserted to queue, it is timeout";
is_task_timeout = true;
}
}
if (!is_task_timeout) {
TFinishTaskRequest finish_task_request;
TTaskType::type task_type = agent_task_req.task_type;
switch (task_type) {
case TTaskType::SCHEMA_CHANGE:
case TTaskType::ROLLUP:
worker_pool_this->_alter_tablet(worker_pool_this,
alter_tablet_request,
signatrue,
task_type,
&finish_task_request);
break;
default:
// pass
break;
}
worker_pool_this->_finish_task(finish_task_request);
}

worker_pool_this->_finish_task(finish_task_request);
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
#ifndef BE_TEST
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ OLAPStatus TxnManager::commit_txn(
OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash,
const PUniqueId& load_id, RowsetSharedPtr rowset_ptr, bool is_recovery) {

DCHECK(partition_id > 0);
DCHECK(transaction_id > 0);
DCHECK(tablet_id > 0);
DCHECK(rowset_ptr != nullptr);
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash);
if (rowset_ptr == nullptr) {
Expand Down
2 changes: 2 additions & 0 deletions fe/src/main/java/org/apache/doris/alter/RollupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,10 @@ protected void runOneCycle() {
// check previous load job finished
if (rollupJob.checkPreviousLoadFinished()) {
// if all previous load job finished, then send clear alter tasks to all related be
LOG.info("previous txn finished, try to send clear txn task");
int res = rollupJob.checkOrResendClearTasks();
if (res != 0) {
LOG.info("send clear txn task return {}", res);
if (res == -1) {
LOG.warn("rollup job is in finishing state, but could not finished, "
+ "just finish it, maybe a fatal error {}", rollupJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ public boolean hasPreviousTransactionsFinished(long endTransactionId, long dbId)
continue;
}
if (entry.getKey() <= endTransactionId) {
LOG.info("find a running txn with txn_id={}, less than schema change txn_id {}",
entry.getKey(), endTransactionId);
return false;
}
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ struct TAgentTaskRequest {
21: optional TClearTransactionTaskRequest clear_transaction_task_req
22: optional TMoveDirReq move_dir_req
23: optional TRecoverTabletReq recover_tablet_req;
24: optional i64 recv_time; // time the task is inserted to queue
}

struct TAgentResult {
Expand Down