Skip to content

Commit

Permalink
Merge pull request #183 from zwang28/kp-5.7-thread-pool-dev
Browse files Browse the repository at this point in the history
Relax the constraints on creating or waking workers.
  • Loading branch information
Bessel-Native authored Oct 20, 2021
2 parents c048096 + a4d6e3a commit f31bae2
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 65 deletions.
2 changes: 2 additions & 0 deletions mysql-test/suite/perfschema/r/show_sanity.result
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ SHOW_MODE SOURCE VARIABLE_NAME
5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED
5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS
5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG
5.6 I_S.SESSION_VARIABLES THREAD_POOL_DEDICATED_LISTENER
5.6 I_S.SESSION_VARIABLES TLS_VERSION

================================================================================
Expand Down Expand Up @@ -442,6 +443,7 @@ SHOW_MODE SOURCE VARIABLE_NAME
5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED
5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS
5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG
5.6 I_S.SESSION_VARIABLES THREAD_POOL_DEDICATED_LISTENER
5.6 I_S.SESSION_VARIABLES TLS_VERSION

================================================================================
Expand Down
2 changes: 2 additions & 0 deletions mysql-test/suite/sys_vars/r/all_vars.result
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ DISABLED_STORAGE_ENGINES
DISABLED_STORAGE_ENGINES
KEYRING_OPERATIONS
KEYRING_OPERATIONS
THREAD_POOL_DEDICATED_LISTENER
THREAD_POOL_DEDICATED_LISTENER
TLS_VERSION
TLS_VERSION
drop table t1;
Expand Down
9 changes: 0 additions & 9 deletions mysql-test/suite/thread_pool/r/threadpool_pool_size.result
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,3 @@ COUNT(*)
SELECT SUM(THREAD_CREATIONS_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(THREAD_CREATIONS_DUE_TO_STALL)
0
SELECT SUM(WAKES_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(WAKES_DUE_TO_STALL)
0
SELECT SUM(THROTTLES) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(THROTTLES)
0
SELECT SUM(STALLS) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(STALLS)
0
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,4 @@ COUNT(*)
1
SELECT SUM(THREAD_CREATIONS_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(THREAD_CREATIONS_DUE_TO_STALL)
2
SELECT SUM(WAKES_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(WAKES_DUE_TO_STALL)
1
SELECT SUM(THROTTLES) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(THROTTLES)
0
SELECT SUM(STALLS) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SUM(STALLS)
1
3 changes: 0 additions & 3 deletions mysql-test/suite/thread_pool/t/threadpool_pool_size.test
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ SELECT SUM(IS_STALLED) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.THREAD_POOL_QUEUES;
# I_S.THREAD_POOL_STATS
SELECT SUM(THREAD_CREATIONS_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SELECT SUM(WAKES_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SELECT SUM(THROTTLES) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SELECT SUM(STALLS) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;

# Clean up
--disable_query_log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ connect(conn2,127.0.0.1,root);
connection conn1;
--let $conn1_id = `SELECT connection_id()`
SEND SELECT benchmark(9999999999, md5('very long command 1'));

--sleep 1

connection conn2;
--let $conn2_id = `SELECT connection_id()`
SEND SELECT benchmark(9999999999, md5('very long command 2'));
--sleep 1

# Test that new connection cannot be established
--disable_abort_on_error
Expand All @@ -37,9 +37,6 @@ SELECT SUM(IS_STALLED) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS;
SELECT COUNT(*) FROM INFORMATION_SCHEMA.THREAD_POOL_QUEUES;
# I_S.THREAD_POOL_STATS
SELECT SUM(THREAD_CREATIONS_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SELECT SUM(WAKES_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SELECT SUM(THROTTLES) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;
SELECT SUM(STALLS) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS;

--disable_query_log
--eval KILL QUERY $conn1_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ connect(conn2,127.0.0.1,root,,db1);
connection conn0;
START TRANSACTION;
SELECT * FROM db1.t1 WHERE c1=2 FOR UPDATE;
--sleep 1

connection conn1;
START TRANSACTION;
Expand Down
7 changes: 7 additions & 0 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4192,6 +4192,13 @@ static Sys_var_enum Sys_threadpool_high_prio_mode(
SESSION_VAR(threadpool_high_prio_mode), CMD_LINE(REQUIRED_ARG),
threadpool_high_prio_mode_names, DEFAULT(TP_HIGH_PRIO_MODE_TRANSACTIONS));

static Sys_var_mybool Sys_threadpool_dedicated_listener(
"thread_pool_dedicated_listener",
"If set to 1, listener thread will not pick up queries.",
GLOBAL_VAR(threadpool_dedicated_listener), CMD_LINE(OPT_ARG), DEFAULT(FALSE),
NO_MUTEX_GUARD, NOT_IN_BINLOG
);

#endif /* !WIN32 */
static Sys_var_uint Sys_threadpool_max_threads(
"thread_pool_max_threads",
Expand Down
3 changes: 2 additions & 1 deletion sql/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall c
extern uint threadpool_max_threads; /* Maximum threads in pool */
extern uint threadpool_oversubscribe; /* Maximum active threads in group */
extern uint threadpool_toobusy; /* Maximum active and waiting threads in group */
extern my_bool threadpool_dedicated_listener;

/* Possible values for thread_pool_high_prio_mode */
extern const char *threadpool_high_prio_mode_names[];
Expand Down Expand Up @@ -76,7 +77,7 @@ extern TP_STATISTICS tp_stats;


/* Functions to set threadpool parameters */
extern void tp_set_threadpool_size(uint val);
extern void tp_set_threadpool_size(uint size);
extern void tp_set_threadpool_stall_limit(uint val);

#endif /* THREADPOOL_INCLUDED */
1 change: 1 addition & 0 deletions sql/threadpool_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ uint threadpool_stall_limit;
uint threadpool_max_threads;
uint threadpool_oversubscribe;
uint threadpool_toobusy;
my_bool threadpool_dedicated_listener;

/* Stats */
TP_STATISTICS tp_stats;
Expand Down
66 changes: 27 additions & 39 deletions sql/threadpool_unix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -735,21 +735,11 @@ static connection_t * listener(worker_thread_t *current_thread,
Q2: If queue is not empty, how many workers to wake?
Solution:
We generally try to keep one thread per group active (threads handling
queries are considered active, unless they stuck in inside some "wait")
Thus, we will wake only one worker, and only if there is not active
threads currently,and listener is not going to handle a query. When we
don't wake, we hope that currently active threads will finish fast and
handle the queue. If this does not happen, timer thread will detect stall
and wake a worker.
NOTE: Currently nothing is done to detect or prevent long queuing times.
A solutionc for the future would be to give up "one active thread per
group" principle, if events stay in the queue for too long, and just wake
more workers.
We will wake up as many workers as possible and conform to threadpool_toobusy.
*/

bool listener_picks_event= thread_group->high_prio_queue.is_empty() &&
bool listener_picks_event= !threadpool_dedicated_listener &&
thread_group->high_prio_queue.is_empty() &&
thread_group->queue.is_empty();

/*
Expand All @@ -771,17 +761,18 @@ static connection_t * listener(worker_thread_t *current_thread,
queue_push(thread_group, c);
}
}

if (listener_picks_event)

int workers_can_afford = (int)threadpool_toobusy -
thread_group->active_thread_count - thread_group->waiting_thread_count - (listener_picks_event ? 1 : 0);

if (workers_can_afford <= 0 && !listener_picks_event && thread_group->active_thread_count == 0)
{
/* Handle the first event. */
retval= (connection_t *)native_event_get_userdata(&ev[0]);
TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[LISTENER]);
mysql_mutex_unlock(&thread_group->mutex);
break;
workers_can_afford = 1;
}

if(thread_group->active_thread_count==0)
int workers_in_need = std::min(workers_can_afford, listener_picks_event ? (cnt - 1) : cnt);

for (int i = 0; i < workers_in_need; i++)
{
/* We added some work items to queue, now wake a worker. */
if(wake_thread(thread_group, false))
Expand All @@ -790,22 +781,19 @@ static connection_t * listener(worker_thread_t *current_thread,
Wake failed, hence groups has no idle threads. Now check if there are
any threads in the group except listener.
*/
if(thread_group->thread_count == 1)
{
/*
Currently there is no worker thread in the group, as indicated by
thread_count == 1 (this means listener is the only one thread in
the group).
The queue is not empty, and listener is not going to handle
events. In order to drain the queue, we create a worker here.
Alternatively, we could just rely on timer to detect stall, and
create thread, but waiting for timer would be an inefficient and
pointless delay.
*/
create_worker(thread_group, false);
}
create_worker(thread_group, false);
}
}

if (listener_picks_event)
{
/* Handle the first event. */
retval= (connection_t *)native_event_get_userdata(&ev[0]);
TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[LISTENER]);
mysql_mutex_unlock(&thread_group->mutex);
break;
}

mysql_mutex_unlock(&thread_group->mutex);
}

Expand Down Expand Up @@ -930,7 +918,7 @@ static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall
DBUG_RETURN(-1);


if (thread_group->active_thread_count == 0)
if (thread_group->active_thread_count < (int)threadpool_oversubscribe)
{
/*
We're better off creating a new thread here with no delay, either there
Expand Down Expand Up @@ -1084,7 +1072,7 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection)
connection->tickets= connection->thd->variables.threadpool_high_prio_tickets;
queue_push(thread_group, connection);

if (thread_group->active_thread_count == 0)
if (thread_group->active_thread_count < (int)threadpool_oversubscribe)
wake_or_create_thread(thread_group);

mysql_mutex_unlock(&thread_group->mutex);
Expand Down Expand Up @@ -1209,7 +1197,7 @@ static connection_t *get_event(worker_thread_t *current_thread,
if (abstime)
{
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
abstime);
abstime);
}
else
{
Expand Down Expand Up @@ -1255,7 +1243,7 @@ static void wait_begin(thread_group_t *thread_group)
DBUG_ASSERT(thread_group->connection_count > 0);

#ifdef THREADPOOL_CREATE_THREADS_ON_WAIT
if ((thread_group->active_thread_count == 0) &&
if ((thread_group->active_thread_count < (int)threadpool_oversubscribe) &&
(!queues_are_empty(thread_group) || !thread_group->listener))
{
/*
Expand Down

0 comments on commit f31bae2

Please sign in to comment.