Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fleet_executor] Add task loop thread pool #38420

Merged

Conversation

wangxicoding
Copy link
Contributor

@wangxicoding wangxicoding commented Dec 24, 2021

PR types

Others

PR changes

Others

Describe

  1. Fleet executor add task loop thread pool, one loop can be used by many interceptors, one interceptor can use only one loop.
  2. Add Barrier to wait all Carrier ready.

image

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

messages_.emplace_back(message);
}
if (empty) {
loop_->QueueInLoop([this]() { LoopOnce(); });
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

后续可优化,同线程直接RunInLoop,不过貌似不是很打紧

@wangxicoding wangxicoding force-pushed the add_task_loop_thread_pool branch from 4a854c0 to f62c676 Compare December 24, 2021 05:36
@@ -81,6 +81,16 @@ class BlockingQueue {
std::swap(*empty_queue, q_);
}

std::deque<T> PopAll() {
Copy link
Contributor

@LiYuRio LiYuRio Dec 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面那个PopAll应该没用了,应该可以直接删了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

留着吧,还是可以用的

void Start();

TaskLoop* GetLoop(int tid);
std::vector<TaskLoop*> GetAllLoops();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这可以返回个引用吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以,目前这个还没有用。线程数少,里面保存的指针,应该也还能接收

std::deque<InterceptorMessage> local_mailbox_;
std::mutex mutex_;
std::deque<InterceptorMessage> messages_;
// std::deque<InterceptorMessage> local_messages_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行可以直接删了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@FeixLiu FeixLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对一下精度吧
另,stop消息现在还有用么?

std::deque<InterceptorMessage> local_mailbox_;
std::mutex mutex_;
std::deque<InterceptorMessage> messages_;
// std::deque<InterceptorMessage> local_messages_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this useless annotation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

void QueueInLoop(Functor cb);

template <class F, class... Args>
auto Enqueue(F&& f, Args&&... args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以试试c++14的自动返回类型推倒,应该可以少点代码

if (stop_) {
// break the pooling thread
VLOG(3) << "Interceptor " << interceptor_id_ << " is quiting.";
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个break是不是没用了😂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

干掉了

@wangxicoding wangxicoding reopened this Dec 27, 2021
@PaddlePaddle PaddlePaddle locked and limited conversation to collaborators Dec 27, 2021
@PaddlePaddle PaddlePaddle unlocked this conversation Dec 27, 2021
Copy link
Contributor

@FeixLiu FeixLiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with TODO:
Upadte carrier to fit dynamic creating interceptor.
Stop message (posion pill) for each interceptor's quitting.
Thread num configuration.
Running mutex (need discussion).

@wangxicoding wangxicoding merged commit dba59db into PaddlePaddle:develop Dec 27, 2021
@wangxicoding wangxicoding deleted the add_task_loop_thread_pool branch December 27, 2021 13:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants