-
Notifications
You must be signed in to change notification settings - Fork 1
/
message_queue.c
136 lines (132 loc) · 4.11 KB
/
message_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*! \brief Message Queue */
#include <cmsis_os2.h>
#include <stdlib.h>
#include <string.h>
#include <semaphore.h>
#include <sys/thread.h>
#include <svc.h>
typedef struct _thread osThread_t;
extern volatile osThread_t* current_thread;
typedef struct _Event osEvent_t;
typedef struct _osMessageQueue osMessageQueue_t;
typedef struct _PriorityList PriorityList_t;
struct _PriorityList {
PriorityList_t* next;
uint8_t prio;
uint8_t data[0];
};
struct _osMessageQueue {
volatile int count;// счетчик семафора
osMemoryPoolId_t pool_id;
uint32_t msg_size;
PriorityList_t *tail;
PriorityList_t *head;
const char * name;
// uint16_t block_size;
};
/*! \defgroup _messages2 RTOSv2: Message Queue
\ingroup _rtos2
\brief Exchange messages between threads in a FIFO-like operation.
\{
*/
/*
\param [in] msg_count maximum number of messages in queue.
\param [in] msg_size maximum message size in bytes.
\param [in] attr message queue attributes; NULL: default values.
*/
osMessageQueueId_t osMessageQueueNew ( uint32_t msg_count,
uint32_t msg_size,
const osMessageQueueAttr_t * attr
)
{
osMessageQueue_t * queue = malloc(sizeof(osMessageQueue_t));
queue->pool_id = osMemoryPoolNew(msg_count, msg_size + sizeof(PriorityList_t),
(const osMemoryPoolAttr_t*)attr);
semaphore_init(&queue->count, 0);
queue->head=NULL;
queue->tail=NULL;
queue->msg_size = msg_size;
return queue;
}
uint32_t osMessageQueueGetCount ( osMessageQueueId_t mq_id )
{
osMessageQueue_t* queue = mq_id;
return queue->count;
}
const char * osMessageQueueGetName ( osMessageQueueId_t mq_id )
{
osMessageQueue_t* queue = mq_id;
return queue->name;
}
/*!
\return number of available slots for messages.
*/
uint32_t osMessageQueueGetSpace ( osMessageQueueId_t mq_id )
{
osMessageQueue_t* queue = mq_id;
return osMemoryPoolGetSpace(queue->pool_id);
}
osStatus_t osMessageQueueGet ( osMessageQueueId_t mq_id,
void * msg_ptr,
uint8_t * msg_prio,
uint32_t timeout
)
{
osMessageQueue_t* queue = mq_id;
int count = semaphore_enter(&queue->count);// ожидаем сообщение
if(count==0) {
// osEvent_t event = {.status = osEventSemaphore, .value={.p = (void*)&queue->count}};
svc3(SVC_EVENT_WAIT, osEventSemaphore, (void*)&queue->count, timeout*1000);
int status = current_thread->process.event.status;
if (status & osEventTimeout){
return osErrorTimeout;
}
}
{
PriorityList_t* msg = (PriorityList_t*)atomic_pointer_exchange(&queue->tail, NULL);// снимаем шляпу
// если используются приоритеты, следует использовать сортировку
while(msg){
PriorityList_t *msg_next = msg->next;
PriorityList_t *list;
PriorityList_t **next = &queue->head;
while ((list = *next)!=NULL && list->prio >= msg->prio) {
next = &list->next;
}
msg->next = *next;// вставка после указанного
*next = msg;
msg = msg_next;
}
msg = queue->head;
queue->head = msg->next;
memcpy(msg_ptr, msg->data, queue->msg_size);
*msg_prio = msg->prio;
osMemoryPoolFree(queue->pool_id, msg);
}
return osOK;
}
osStatus_t osMessageQueuePut ( osMessageQueueId_t mq_id,
const void * msg_ptr,
uint8_t msg_prio,
uint32_t timeout
)
{
osStatus_t res;
PriorityList_t* msg;
osMessageQueue_t* queue = mq_id;
msg = osMemoryPoolAlloc(queue->pool_id, timeout);
if(msg != NULL) {
memcpy(msg->data, msg_ptr, queue->msg_size);// копирование можно было исключить
msg->prio = msg_prio;
// атомарно подменяем вершину стека
volatile void**ptr = (volatile void**)&queue->tail;
do {
msg->next = atomic_pointer_get(ptr);
atomic_mb();
} while (!atomic_pointer_compare_and_exchange(ptr, msg->next, msg));
semaphore_leave(&queue->count);// увеличиваем число сообщений в очереди
res = osOK;
} else
res = osErrorTimeout;
return res;
}
//!\}