Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workqueue 去掉timer,使用list排序处理方式 #9817

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion components/drivers/include/ipc/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <rtdef.h>
#include <rtconfig.h>
#include "completion.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -42,6 +43,7 @@ struct rt_workqueue
struct rt_semaphore sem;
rt_thread_t work_thread;
struct rt_spinlock spinlock;
struct rt_completion wakeup_completion;
};

struct rt_work
Expand All @@ -52,7 +54,7 @@ struct rt_work
void *work_data;
rt_uint16_t flags;
rt_uint16_t type;
struct rt_timer timer;
rt_tick_t timeout_tick;
struct rt_workqueue *workqueue;
};

Expand Down
169 changes: 71 additions & 98 deletions components/drivers/ipc/workqueue.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2006-2023, RT-Thread Development Team
* Copyright (c) 2006-2022, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
Expand All @@ -10,15 +10,14 @@
* 2021-08-14 Jackistang add comments for function interface
* 2022-01-16 Meco Man add rt_work_urgent()
* 2023-09-15 xqyjlj perf rt_hw_interrupt_disable/enable
* 2024-12-21 yuqingli delete timer, using list
*/

#include <rthw.h>
#include <rtdevice.h>

#ifdef RT_USING_HEAP

static void _delayed_work_timeout_handler(void *parameter);

rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
{
rt_err_t result;
Expand Down Expand Up @@ -50,38 +49,61 @@ rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)

static void _workqueue_thread_entry(void *parameter)
{
rt_base_t level;
struct rt_work *work;
rt_base_t level;
struct rt_work *work;
struct rt_workqueue *queue;
rt_tick_t current_tick;
rt_int32_t delay_tick;
void (*work_func)(struct rt_work *work, void *work_data);
void *work_data;

queue = (struct rt_workqueue *) parameter;
queue = (struct rt_workqueue *)parameter;
RT_ASSERT(queue != RT_NULL);

while (1)
{
level = rt_spin_lock_irqsave(&(queue->spinlock));
if (rt_list_isempty(&(queue->work_list)))

/* timer check */
current_tick = rt_tick_get();
delay_tick = RT_WAITING_FOREVER;
while (!rt_list_isempty(&(queue->delayed_list)))
{
/* no software timer exist, suspend self. */
rt_thread_suspend_with_flag(rt_thread_self(), RT_UNINTERRUPTIBLE);
work = rt_list_entry(queue->delayed_list.next, struct rt_work, list);
if ((current_tick - work->timeout_tick) < RT_TICK_MAX / 2)
{
rt_list_remove(&(work->list));
rt_list_insert_after(queue->work_list.prev, &(work->list));
work->flags &= ~RT_WORK_STATE_SUBMITTING;
work->flags |= RT_WORK_STATE_PENDING;
}
else
{
delay_tick = work->timeout_tick - current_tick;
break;
}
}

/* release lock after suspend so we will not lost any wakeups */
if (rt_list_isempty(&(queue->work_list)))
{
rt_spin_unlock_irqrestore(&(queue->spinlock), level);

rt_schedule();
/* wait for work completion */
rt_completion_wait(&(queue->wakeup_completion), delay_tick);
continue;
}

/* we have work to do with. */
work = rt_list_entry(queue->work_list.next, struct rt_work, list);
rt_list_remove(&(work->list));
queue->work_current = work;
work->flags &= ~RT_WORK_STATE_PENDING;
work->workqueue = RT_NULL;
queue->work_current = work;
work->flags &= ~RT_WORK_STATE_PENDING;
work->workqueue = RT_NULL;
work_func = work->work_func;
work_data = work->work_data;
rt_spin_unlock_irqrestore(&(queue->spinlock), level);

/* do work */
work->work_func(work, work->work_data);
work_func(work, work_data);
/* clean current work */
queue->work_current = RT_NULL;

Expand All @@ -93,114 +115,68 @@ static void _workqueue_thread_entry(void *parameter)
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue,
struct rt_work *work, rt_tick_t ticks)
{
rt_base_t level;
rt_err_t err = RT_EOK;
rt_base_t level;
rt_err_t err = RT_EOK;
struct rt_work *work_tmp;
rt_list_t *list_tmp;

level = rt_spin_lock_irqsave(&(queue->spinlock));

/* remove list */
rt_list_remove(&(work->list));
work->flags &= ~RT_WORK_STATE_PENDING;
work->flags = 0;

if (ticks == 0)
{
rt_list_insert_after(queue->work_list.prev, &(work->list));
work->flags |= RT_WORK_STATE_PENDING;
work->workqueue = queue;
work->flags |= RT_WORK_STATE_PENDING;
work->workqueue = queue;

/* whether the workqueue is doing work */
if (queue->work_current == RT_NULL)
{
/* resume work thread, and do a re-schedule if succeed */
rt_thread_resume(queue->work_thread);
}
rt_completion_done(&(queue->wakeup_completion));
err = RT_EOK;
}
else if (ticks < RT_TICK_MAX / 2)
{
/* Timer started */
if (work->flags & RT_WORK_STATE_SUBMITTING)
{
rt_timer_control(&work->timer, RT_TIMER_CTRL_SET_TIME, &ticks);
}
else
/* insert delay work list */
work->flags |= RT_WORK_STATE_SUBMITTING;
work->workqueue = queue;
work->timeout_tick = rt_tick_get() + ticks;

list_tmp = &(queue->delayed_list);
rt_list_for_each_entry(work_tmp, &(queue->delayed_list), list)
{
rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler,
work, ticks, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
work->flags |= RT_WORK_STATE_SUBMITTING;
if ((work_tmp->timeout_tick - work->timeout_tick) < RT_TICK_MAX / 2)
{
list_tmp = &(work_tmp->list);
break;
}
}
work->workqueue = queue;
/* insert delay work list */
rt_list_insert_after(queue->delayed_list.prev, &(work->list));
rt_list_insert_before(list_tmp, &(work->list));

err = rt_timer_start(&(work->timer));
rt_completion_done(&(queue->wakeup_completion));
err = RT_EOK;
}
else
{
err = - RT_ERROR;
err = -RT_ERROR;
}

rt_spin_unlock_irqrestore(&(queue->spinlock), level);
return err;
}

static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
{
rt_base_t level;
rt_err_t err;
rt_err_t err;

level = rt_spin_lock_irqsave(&(queue->spinlock));
rt_list_remove(&(work->list));
work->flags &= ~RT_WORK_STATE_PENDING;
/* Timer started */
if (work->flags & RT_WORK_STATE_SUBMITTING)
{
if ((err = rt_timer_stop(&(work->timer))) != RT_EOK)
{
goto exit;
}
rt_timer_detach(&(work->timer));
work->flags &= ~RT_WORK_STATE_SUBMITTING;
}
err = queue->work_current != work ? RT_EOK : -RT_EBUSY;
work->flags = 0;
err = queue->work_current != work ? RT_EOK : -RT_EBUSY;
work->workqueue = RT_NULL;
exit:
rt_spin_unlock_irqrestore(&(queue->spinlock), level);
return err;
}

static void _delayed_work_timeout_handler(void *parameter)
{
struct rt_work *work;
struct rt_workqueue *queue;
rt_base_t level;

work = (struct rt_work *)parameter;
queue = work->workqueue;

RT_ASSERT(work->flags & RT_WORK_STATE_SUBMITTING);
RT_ASSERT(queue != RT_NULL);

level = rt_spin_lock_irqsave(&(queue->spinlock));
rt_timer_detach(&(work->timer));
work->flags &= ~RT_WORK_STATE_SUBMITTING;
/* remove delay list */
rt_list_remove(&(work->list));
/* insert work queue */
if (queue->work_current != work)
{
rt_list_insert_after(queue->work_list.prev, &(work->list));
work->flags |= RT_WORK_STATE_PENDING;
}
/* whether the workqueue is doing work */
if (queue->work_current == RT_NULL)
{
/* resume work thread, and do a re-schedule if succeed */
rt_thread_resume(queue->work_thread);
}

rt_spin_unlock_irqrestore(&(queue->spinlock), level);
}

/**
* @brief Initialize a work item, binding with a callback function.
*
Expand All @@ -221,8 +197,8 @@ void rt_work_init(struct rt_work *work,
work->work_func = work_func;
work->work_data = work_data;
work->workqueue = RT_NULL;
work->flags = 0;
work->type = 0;
work->flags = 0;
work->type = 0;
}

/**
Expand All @@ -248,6 +224,7 @@ struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_siz
rt_list_init(&(queue->delayed_list));
queue->work_current = RT_NULL;
rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
rt_completion_init(&(queue->wakeup_completion));

/* create the work thread */
queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
Expand Down Expand Up @@ -346,14 +323,10 @@ rt_err_t rt_workqueue_urgent_work(struct rt_workqueue *queue, struct rt_work *wo
/* NOTE: the work MUST be initialized firstly */
rt_list_remove(&(work->list));
rt_list_insert_after(&queue->work_list, &(work->list));
/* whether the workqueue is doing work */
if (queue->work_current == RT_NULL)
{
/* resume work thread, and do a re-schedule if succeed */
rt_thread_resume(queue->work_thread);
}

rt_completion_done(&(queue->wakeup_completion));
rt_spin_unlock_irqrestore(&(queue->spinlock), level);

return RT_EOK;
}

Expand Down
Loading