Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threadpool: throttled big group #1778

Merged
merged 28 commits into from
Apr 24, 2017

Conversation

AndreMouche
Copy link
Member

@AndreMouche AndreMouche commented Apr 18, 2017

Hi all,

This PR creates a new threadpool which tries to throttle the group's concurrency to a specified number when it's busy.

Each task uses the attribute group_id to identify which group it belongs to. When one thread asks a new task to run, it schedules according to the following rules:

  1. Find out which group has a running number that is smaller than that of group_concurrency_on_busy.
  2. If more than one group meets the first point, run the one who comes first.

If no group meets the first point, choose according to the following rules:

  1. Select the groups with the least running tasks.
  2. If more than one group meets the first point, choose the one whose task comes first (with the minimum task's ID).

@BusyJay @zhangjinpeng1987 @disksing PTAL

use std::marker::PhantomData;

pub struct Task<T> {
// The task's number in the pool.Each task has a unique number,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after period


impl<T> Ord for Task<T> {
fn cmp(&self, right: &Task<T>) -> Ordering {
self.id.cmp(&right.id).reverse()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverse ordering?

Copy link
Member Author

@AndreMouche AndreMouche Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reverse ordering here since the heap would pops the largest item number first while we need to pop the task with the smallest id first. @andelf

BigGroupThrottledQueue {
group_concurrency: HashMap::default(),
waiting_queue: HashMap::default(),
pending_tasks: BinaryHeap::new(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistency between new() and default().
maybe new() is better.

group_concurrency: HashMap::default(),
waiting_queue: HashMap::default(),
pending_tasks: BinaryHeap::new(),
group_concurrency_on_busy: group_concurrency_on_busy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/on/when/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since on share the same meaning with when here, and on is more short. I prefer on here @andelf

}
}

// Try push into pending. Return none on success,return Some(task) on failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logically wrong usage for Option. you may use Result<(), ...> instead.

btw, is this thread-safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one thread can own the lock of BigGroupThrottledQueue at the same time. @andelf

}

/// `ThreadPool` is used to execute tasks in parallel.
/// Each task would be pushed into the pool,and when a thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after comma

}

ThreadPool {
meta: meta.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is clone() necessary?

// return false when get stop msg
#[inline]
fn wait(&self) -> bool {
// try to receive notify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notification


fn run(&mut self) {
// start the worker.
// loop break on receive stop message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breaks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiving

// loop break on receive stop message.
while self.wait() {
// handle task
// since `tikv` would be down on any panic happens,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not format tikv since it's not a function, variable or type

for (group_id, tasks) in &self.waiting_queue {
let front_task_id = tasks[0].id;
assert!(self.group_concurrency.contains_key(group_id));
let count = self.group_concurrency[group_id];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ensure the group_id exist here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The task should be pushed into pending_tasks if group_id not in self.group_concurrency or self.group_concurrency[group_id]<group_concurrency_on_busy. So we can ensure the group_id exist here @siddontang

}

#[inline]
fn pop_task_from_waiting_queue(&mut self) -> Option<Task<T>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pop_from_waiting_queue

pub trait ScheduleQueue<T> {
fn pop(&mut self) -> Option<Task<T>>;
fn push(&mut self, task: Task<T>);
fn finish(&mut self, group_id: T);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does finish mean here?


// each thread has a worker.
struct Worker<Q, T> {
job_rever: Arc<Mutex<Receiver<bool>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Mutex + Condvar instead.

}
}

struct ThreadPoolMeta<Q, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskPool seems more appropriate to me.

// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

Copy link

@Wenting0905 Wenting0905 Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The paragraph "Unless required ... under the License" is divided into several rows.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we divide it into several rows in order to make a line not too long. It's ok in source code @Wenting0905

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see

// group_id => running_num+pending num. It means there may
// `group_concurrency[group_id]` tasks of the group are running.
group_concurrency: HashMap<T, usize>,
// max num of threads each group can run on when pool is busy.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max number of threads that each group can run when the pool is busy.

// `group_concurrency[group_id]` tasks of the group are running.
group_concurrency: HashMap<T, usize>,
// max num of threads each group can run on when pool is busy.
// each value in group_concurrency shouldn't bigger than this value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each value in 'group_concurrency' shouldn't be bigger than this value.

}
let group_id = group_id.unwrap();
let task = self.pop_from_waiting_queue_with_group_id(&group_id);
// update group_concurrency since current task is going to run.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the current task

let task = waiting_tasks.pop_front().unwrap();
(waiting_tasks.is_empty(), task)
};
// if waiting tasks for group is empty, remove from waiting_tasks.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it

task
}

// pop_group_id_from_waiting_queue returns next task's group_id.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returns the next

});
}

// push 2 txn3 into pool, each need 2*sleep_duration.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push 2 txn3 into pool and each needs 2*sleep_duration.

}

// txn11,txn12,txn13,txn14,txn21,txn22,txn31,txn32
// first 4 task during [0,sleep_duration] should be

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first 4 tasks


// txn11,txn12,txn13,txn14,txn21,txn22,txn31,txn32
// first 4 task during [0,sleep_duration] should be
// {txn11,txn12,txn21,txn22}.Since txn1 finished before than txn2,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. space after period
  2. Since txn1 is finished before txn2,

// txn11,txn12,txn13,txn14,txn21,txn22,txn31,txn32
// first 4 task during [0,sleep_duration] should be
// {txn11,txn12,txn21,txn22}.Since txn1 finished before than txn2,
// 4 task during [sleep_duration,2*sleep_duration] should be

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 tasks

fn test_fair_group_queue() {
let max_pending_task_each_group = 2;
let mut queue = BigGroupThrottledQueue::new(max_pending_task_each_group);
// push 4 group1 into queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete one space before 4

}
while let Some(t) = self.threads.pop() {
if let Err(e) = t.join() {
return Err(format!("{:?}", e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about other threads?

@AndreMouche
Copy link
Member Author

PTAL

// The task's number in the pool. Each task has a unique number,
// and it's always bigger than preceding ones.
id: u64,
// the task's group_id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group which the task belongs to.

waiting_queue: HashMap<T, VecDeque<Task<T>>>,
// group_id => running_num+pending num. It means there may
// `group_concurrency[group_id]` tasks of the group are running.
group_concurrency: HashMap<T, usize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/group_concurrency/group_concurrency_stat

while let Some(task) = self.get_next_task() {
// handle task
// since tikv would be down when any panic happens,
// we do't need to process panic case here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/do't/don't/

// handle task
// since tikv would be down when any panic happens,
// we do't need to process panic case here.
task.task.call_box(());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer (task.task)().

// we do't need to process panic case here.
task.task.call_box(());
self.on_task_finished(&task.group_id);
self.task_count.fetch_sub(1, AOrdering::SeqCst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's A?

builder = builder.name(name.clone());
let tasks = task_pool.clone();
let task_num = task_count.clone();
let thread = builder.spawn(move || {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chain L306, L307 with L310.


fn pop(&mut self) -> Option<Task<T>> {
if let Some(task) = self.pending_tasks.pop() {
let count = self.group_concurrency.entry(task.group_id.clone()).or_insert(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems always exist.

let mut next_group = None;
for (group_id, tasks) in &self.waiting_queue {
let front_task_id = tasks[0].id;
assert!(self.group_concurrency.contains_key(group_id));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary.

// (group_id,count,task_id) the best current group's info with it's group_id,
// running tasks count, front task's id in waiting queue.
let mut next_group = None;
for (group_id, tasks) in &self.waiting_queue {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Iterator::min instead.


pub struct Task<T> {
// The task's number in the pool. Each task has a unique number,
// and it's always bigger than preceding ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/task number/tast id/
It will be fine to use just task id.

}
}

impl<T> Eq for Task<T> {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the implementation is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// `BigGroupThrottledQueue` tries to throttle group's concurrency to
// `group_concurrency_on_busy` when it's busy.
// When one worker asks a task to run, it schedules in the following way:
// 1. Find out which group has a running number that is smaller than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at these comments. @Wenting0905

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// more than `group_concurrency_on_busy`), the rest of the group's tasks
// would be pushed into `waiting_queue[group_id]`
waiting_queue: HashMap<T, VecDeque<Task<T>>>,
// group_id => running_num+pending num. It means there may
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there may?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is a subtle optimization to improve the efficiency in schedule next task. We may need waiting_queue only in the normal implementation, while it always need to iterator all groups in waiting_queue to find the optimal task.
In this implementation, we add a pending_heap, when a new task comes:

  1. If the total number of the group's tasks in pending_heap or running is smaller than group_concurrency_on_busy, push it into pending heap.
  2. Otherwise, push the task into waiting_queue.

And when try to get a new task to run:

  1. If the heap is not empty, pop the task in front.
  2. Otherwise, find the optimal task according our rules in waiting_queue.

Here group_concurrency[group_id] save the total number of tasks which are in pending_tasks(pending_heap) or running. It also means there may group_concurrency[group_id] tasks of the group are running. @hhkbp2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the crudeness of previous comment. :)
It meant to point out that there may be a syntax error for the comment. Try to revise it like
"It means at most group_concurrency[group_id] tasks of the group may be running."

@Wenting0905
Copy link

LGTM

}

// `BigGroupThrottledQueue` tries to throttle group's concurrency to
// `group_concurrency_on_busy` when is busy.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when it's busy

.map(|(group_id, waiting_queue)| {
(self.group_concurrency[group_id], waiting_queue[0].id, group_id)
})
.min();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, it could use data structure like SkipList/Heap which is fast for insertion/deletion/ordered access to track the relationship of (lowest concurrency, low id) -> group_id, to avoid the iteration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't make it more complicated, ordermap can satisfy the need.

// `group_concurrency_on_busy`(which means the number of on-going tasks is
// more than `group_concurrency_on_busy`), the rest of the group's tasks
// would be pushed into `waiting_queue[group_id]`
waiting_queue: HashMap<T, VecDeque<Task<T>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should name waiting_queue to big_task_waiting_queue. I always confused by the pending_tasks and waiting_queue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe queue1 queue2 is more clear.


#[inline]
fn pop_from_waiting_queue_with_group_id(&mut self, group_id: &T) -> Task<T> {
let (waiting_tasks_is_empty, task) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/waiting_tasks_is_empty/empty_after_pop

}

#[inline]
fn pop_from_waiting_queue_with_group_id(&mut self, group_id: &T) -> Task<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/with/by/


struct TaskPool<Q, T> {
next_task_id: u64,
tasks: Q,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/tasks/task_queue

}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove blank line

pub fn execute<F>(&mut self, group_id: T, job: F)
where F: FnOnce() + Send + 'static
{
self.task_count.fetch_add(1, AtomicOrdering::SeqCst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line after L309.

}
if let Some(task) = task_pool.pop_task() {
// to reduce lock's time.
task_pool.on_task_started(&task.group_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments why call on_task_started at here not before L387.

-> Worker<Q, T> {
Worker {
task_pool: task_pool,
task_count: task_count,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems task_count is not used, can we drop it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_count is needed when one task is finished. @zhangjinpeng1987

group_concurrency: HashMap::new(),
waiting_queue: HashMap::new(),
pending_tasks: BinaryHeap::new(),
group_concurrency_on_busy: group_concurrency_on_busy,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/group_concurrency_on_busy/group_concurrency_limit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is busy?

if statistics.total() >= self.group_concurrency_limit {
return Err(PushError(task));
}
statistics.queue1_count += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line below L144

}

#[inline]
fn pop_from_waiting_queue_by_group_id(&mut self, group_id: &T) -> Task<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pop_from_queue2_by_group_id

// Try push into high priority queue. Return none on success,return PushError(task) on failed.
#[inline]
fn try_push_into_high_pri_queue(&mut self, task: Task<T>) -> Result<(), PushError<Task<T>>> {
let statistics = self.group_statistics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let mut statistics.. ?

// If the value of `group_statistics[group_id]` is not big enough, pop
// a task from `low_pri_queue[group_id]` and push it into `high_pri_queue`.
let group_task = self.pop_from_low_pri_queue_by_group_id(group_id);
assert!(self.try_push_into_high_pri_queue(group_task).is_ok());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap

@zhangjinpeng87
Copy link
Member

LGTM @hhkbp2 PTAL again.


struct GroupStatisticsItem {
running_count: usize,
high_pri_queue_count: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/pri/priority/
Use full name unless it's really too long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= =

@hhkbp2
Copy link
Contributor

hhkbp2 commented Apr 24, 2017

LGTM

@AndreMouche AndreMouche merged commit 2f116ef into master Apr 24, 2017
@AndreMouche AndreMouche deleted the shirly/big_group_throttled_threadpool branch April 24, 2017 15:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants