diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 549a5c5dcdf2..c81bf7dd1e19 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -1065,3 +1065,500 @@ static connection_t *get_event(worker_thread_t *current_thread, DBUG_RETURN(connection); } + +/** + Tells the pool that worker starts waiting on IO, lock, condition, + sleep() or similar. +*/ + +static void wait_begin(thread_group_t *thread_group) noexcept { + DBUG_ENTER("wait_begin"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count--; + thread_group->waiting_thread_count++; + + assert(thread_group->active_thread_count >= 0); + assert(thread_group->connection_count > 0); + +#ifdef THREADPOOL_CREATE_THREADS_ON_WAIT + if ((thread_group->active_thread_count == 0) && + (!queues_are_empty(*thread_group) || !thread_group->listener)) { + /* + Group might stall while this thread waits, thus wake + or create a worker to prevent stall. + */ + wake_or_create_thread(thread_group); + } +#endif + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Tells the pool has finished waiting. +*/ + +static void wait_end(thread_group_t *thread_group) noexcept { + DBUG_ENTER("wait_end"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + thread_group->waiting_thread_count--; + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Allocate/initialize a new connection structure. +*/ + +static connection_t *alloc_connection(THD *thd) noexcept { + DBUG_ENTER("alloc_connection"); + DBUG_EXECUTE_IF("simulate_tp_alloc_connection_oom", DBUG_RETURN(nullptr);); + + connection_t *connection = (connection_t *)my_malloc( + PSI_NOT_INSTRUMENTED /*key_memory_thread_pool_connection*/, + sizeof(connection_t), 0); // !! todo + if (connection) { + connection->thd = thd; + connection->waiting = false; + connection->logged_in = false; + connection->bound_to_poll_descriptor = false; + connection->abs_wait_timeout = ULLONG_MAX; + connection->tickets = 0; + } + DBUG_RETURN(connection); +} + +/** + Add a new connection to thread pool.. +*/ + +bool tp_add_connection( + Channel_info *channel_info) { + DBUG_ENTER("Thread_pool_connection_handler::add_connection"); + + THD *const thd = channel_info->create_thd(); + + if (unlikely(!thd)) { + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } + + connection_t *const connection = alloc_connection(thd); + + if (unlikely(!connection)) { + // thd->get_protocol_classic()->end_net(); // !! todo + delete thd; + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } + + delete channel_info; + + thd->set_new_thread_id(); + thd->start_utime = my_micro_time(); + + Global_THD_manager::get_instance()->add_thd(thd); + + thd->scheduler.data = connection; + + /* Assign connection to a group. */ + thread_group_t *group = &all_groups[thd->thread_id() % group_count]; + + connection->thread_group = group; + + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); + + /* + Add connection to the work queue. Actual login + will be done by a worker thread. + */ + queue_put(group, connection); + DBUG_RETURN(false); +} + +/** + Terminate connection. +*/ + +static void connection_abort(connection_t *connection) { + DBUG_ENTER("connection_abort"); + thread_group_t *group = connection->thread_group; + + threadpool_remove_connection(connection->thd); + + mysql_mutex_lock(&group->mutex); + group->connection_count--; + mysql_mutex_unlock(&group->mutex); + + my_free(connection); + DBUG_VOID_RETURN; +} + +/** + MySQL scheduler callback : kill connection +*/ + +void tp_post_kill_notification(THD *thd) noexcept { + DBUG_ENTER("tp_post_kill_notification"); + if (current_thd == thd || thd->system_thread) DBUG_VOID_RETURN; + + Vio *vio = thd->get_protocol_classic()->get_vio(); + if (vio) vio_cancel(vio, SHUT_RD); + DBUG_VOID_RETURN; +} + +alignas(CPU_LEVEL1_DCACHE_LINESIZE) std::atomic tp_waits[THD_WAIT_LAST]; + +/** + MySQL scheduler callback: wait begin +*/ + +void tp_wait_begin(THD *thd, int type MY_ATTRIBUTE((unused))) { + DBUG_ENTER("tp_wait_begin"); + + if (thd == nullptr) { + DBUG_VOID_RETURN; + } + + connection_t *connection = (connection_t *)thd->scheduler.data; + if (connection) { + assert(!connection->waiting); + connection->waiting = true; + assert(type > 0 && type < THD_WAIT_LAST); + tp_waits[type]++; + wait_begin(connection->thread_group); + } + DBUG_VOID_RETURN; +} + +/** + MySQL scheduler callback: wait end +*/ + +void tp_wait_end(THD *thd) { + DBUG_ENTER("tp_wait_end"); + + if (thd == nullptr) { + DBUG_VOID_RETURN; + } + connection_t *connection = (connection_t *)thd->scheduler.data; + if (connection) { + assert(connection->waiting); + connection->waiting = false; + wait_end(connection->thread_group); + } + DBUG_VOID_RETURN; +} + +static void set_next_timeout_check(ulonglong abstime) { + DBUG_ENTER("set_next_timeout_check"); + while (abstime < pool_timer.next_timeout_check.load()) { + uint64 old = pool_timer.next_timeout_check.load(); + pool_timer.next_timeout_check.compare_exchange_weak(old, abstime); + } + DBUG_VOID_RETURN; +} + + + + inline ulong get_wait_timeout(THD *thd) noexcept { + return thd->variables.net_wait_timeout; + } + +/** + Set wait timeout for connection. +*/ + +static void set_wait_timeout(connection_t *c) noexcept { + DBUG_ENTER("set_wait_timeout"); + /* + Calculate wait deadline for this connection. + Instead of using my_microsecond_getsystime() which has a syscall + overhead, use pool_timer.current_microtime and take + into account that its value could be off by at most + one tick interval. + */ + + c->abs_wait_timeout = + pool_timer.current_microtime.load(std::memory_order_relaxed) + + 1000LL * pool_timer.tick_interval + + 1000000LL * get_wait_timeout(c->thd); + + set_next_timeout_check(c->abs_wait_timeout); + DBUG_VOID_RETURN; +} + +/** + Handle a (rare) special case,where connection needs to + migrate to a different group because group_count has changed + after thread_pool_size setting. +*/ + +static int change_group(connection_t *c, thread_group_t *old_group, + thread_group_t *new_group) { + assert(c->thread_group == old_group); + + /* Remove connection from the old group. */ + if (c->bound_to_poll_descriptor) { + Vio *const vio = c->thd->get_protocol_classic()->get_vio(); + const int fd = mysql_socket_getfd(vio->mysql_socket); + mysql_mutex_lock(&old_group->mutex); + io_poll_disassociate_fd(old_group->pollfd, fd); + c->bound_to_poll_descriptor = false; + } else { + mysql_mutex_lock(&old_group->mutex); + } + c->thread_group->connection_count--; + mysql_mutex_unlock(&old_group->mutex); + + /* Add connection to the new group. */ + mysql_mutex_lock(&new_group->mutex); + c->thread_group = new_group; + new_group->connection_count++; + /* Ensure that there is a listener in the new group. */ + int ret = 0; + if (!new_group->thread_count) ret = create_worker(new_group, false); + mysql_mutex_unlock(&new_group->mutex); + return ret; +} + +static int start_io(connection_t *connection) { + /* + Usually, connection will stay in the same group for the entire + connection's life. However, we do allow group_count to + change at runtime, which means in rare cases when it changes is + connection should need to migrate to another group, this ensures + to ensure equal load between groups. + + So we recalculate in which group the connection should be, based + on thread_id and current group count, and migrate if necessary. + */ + thread_group_t *const group = + &all_groups[connection->thd->thread_id() % group_count]; + + if (group != connection->thread_group) { + if (change_group(connection, connection->thread_group, group)) return -1; + } + + /* + Bind to poll descriptor if not yet done. + */ + Vio *vio = connection->thd->get_protocol_classic()->get_vio(); + int fd = mysql_socket_getfd(vio->mysql_socket); + if (!connection->bound_to_poll_descriptor) { + connection->bound_to_poll_descriptor = true; + return io_poll_associate_fd(group->pollfd, fd, connection); + } + + return io_poll_start_read(group->pollfd, fd, connection); +} + +static void handle_event(connection_t *connection) { + DBUG_ENTER("handle_event"); + int err; + + if (!connection->logged_in) { + err = threadpool_add_connection(connection->thd); + connection->logged_in = true; + } else { + err = threadpool_process_request(connection->thd); + } + + if (err) goto end; + + set_wait_timeout(connection); + err = start_io(connection); + +end: + if (err) connection_abort(connection); + + DBUG_VOID_RETURN; +} + +/** + Worker thread's main +*/ + +static void *worker_main(void *param) { + my_thread_init(); + + DBUG_ENTER("worker_main"); + + thread_group_t *thread_group = (thread_group_t *)param; + + /* Init per-thread structure */ + worker_thread_t this_thread; + mysql_cond_init(key_worker_cond, &this_thread.cond); + this_thread.thread_group = thread_group; + this_thread.event_count = 0; + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_account) + (NULL, 0, NULL, 0); +#endif + + /* Run event loop */ + for (;;) { + connection_t *connection; + struct timespec ts; + set_timespec(&ts, threadpool_idle_timeout); + connection = get_event(&this_thread, thread_group, &ts); + if (!connection) break; + this_thread.event_count++; + handle_event(connection); + } + + /* Thread shutdown: cleanup per-worker-thread structure. */ + mysql_cond_destroy(&this_thread.cond); + + mysql_mutex_lock(&thread_group->mutex); + add_thread_count(thread_group, -1); + mysql_mutex_unlock(&thread_group->mutex); + + my_thread_end(); + return nullptr; +} + +bool tp_init() { + DBUG_ENTER("tp_init"); + threadpool_started = true; + + for (uint i = 0; i < array_elements(all_groups); i++) { + thread_group_init(&all_groups[i], get_connection_attrib()); + } + tp_set_threadpool_size(threadpool_size); + if (group_count == 0) { + /* Something went wrong */ + sql_print_error("Can't set threadpool size to %d", threadpool_size); + DBUG_RETURN(true); + } +#ifdef HAVE_PSI_INTERFACE + mysql_mutex_register("threadpool", mutex_list, array_elements(mutex_list)); + mysql_cond_register("threadpool", cond_list, array_elements(cond_list)); + mysql_thread_register("threadpool", thread_list, array_elements(thread_list)); +#endif + + pool_timer.tick_interval = threadpool_stall_limit; + start_timer(&pool_timer); + DBUG_RETURN(false); +} + +void tp_end() { + DBUG_ENTER("tp_end"); + + if (!threadpool_started) DBUG_VOID_RETURN; + + stop_timer(&pool_timer); + for (uint i = 0; i < array_elements(all_groups); i++) { + thread_group_close(&all_groups[i]); + } + threadpool_started = false; + DBUG_VOID_RETURN; +} + +/** Ensure that poll descriptors are created when threadpool_size changes */ + +void tp_set_threadpool_size(uint size) noexcept { + if (!threadpool_started) return; + + bool success = true; + for (uint i = 0; i < size; i++) { + thread_group_t *group = &all_groups[i]; + mysql_mutex_lock(&group->mutex); + if (group->pollfd == -1) { + group->pollfd = io_poll_create(); + success = (group->pollfd >= 0); + if (!success) { + sql_print_error("io_poll_create() failed, errno=%d\n", errno); + break; + } + } + mysql_mutex_unlock(&all_groups[i].mutex); + if (!success) { + group_count = i; + return; + } + } + group_count = size; +} + +void tp_set_threadpool_stall_limit(uint limit) noexcept { + if (!threadpool_started) return; + mysql_mutex_lock(&(pool_timer.mutex)); + pool_timer.tick_interval = limit; + mysql_mutex_unlock(&(pool_timer.mutex)); + mysql_cond_signal(&(pool_timer.cond)); +} + +/** + Calculate number of idle/waiting threads in the pool. + + Sum idle threads over all groups. + Don't do any locking, it is not required for stats. +*/ + +int tp_get_idle_thread_count() noexcept { + int sum = 0; + for (uint i = 0; + i < array_elements(all_groups) && (all_groups[i].pollfd >= 0); i++) { + sum += (all_groups[i].thread_count - all_groups[i].active_thread_count); + } + return sum; +} + +/* Report threadpool problems */ + +/** + Delay in microseconds, after which "pool blocked" message is printed. + (30 sec == 30 Mio usec) +*/ +#define BLOCK_MSG_DELAY 30 * 1000000 + +#define MAX_THREADS_REACHED_MSG \ + "Threadpool could not create additional thread to handle queries, because the \ +number of allowed threads was reached. Increasing 'thread_pool_max_threads' \ +parameter can help in this situation.\n \ +If 'admin_port' parameter is set, you can still connect to the database with \ +superuser account (it must be TCP connection using admin_port as TCP port) \ +and troubleshoot the situation. \ +A likely cause of pool blocks are clients that lock resources for long time. \ +'show processlist' or 'show engine innodb status' can give additional hints." + +#define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)." + +/** + Write a message when blocking situation in threadpool occurs. + The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds. + It will be just a single message for each blocking situation (to prevent + log flood). +*/ + +static void print_pool_blocked_message(bool max_threads_reached) noexcept { + ulonglong now = my_microsecond_getsystime(); + static bool msg_written; + + if (pool_block_start == 0) { + pool_block_start = now; + msg_written = false; + } + + if (!msg_written && ((now > pool_block_start + BLOCK_MSG_DELAY) || + (now == pool_block_start))) { + if (max_threads_reached) + sql_print_error(MAX_THREADS_REACHED_MSG); + else + sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno); + + if (now > pool_block_start) { + sql_print_information("Threadpool has been blocked for %u seconds\n", + (uint)((now - pool_block_start) / 1000000)); + } + /* avoid reperated messages for the same blocking situation */ + msg_written = true; + } +} +