-
Notifications
You must be signed in to change notification settings - Fork 0
/
BlockingQueue.cpp
97 lines (79 loc) · 1.67 KB
/
BlockingQueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
* BlockingQueue.cpp
*
* Created on: Dec 18, 2014
* Author: root
*/
#include "BlockingQueue.h"
BlockingQueue::BlockingQueue(bool read_lock, int size) :
read_mutex_(), //
write_mutex_(), //
listEmpty_(write_mutex_), //
peek_index_(0), //
read_lock_(read_lock)
{
this->read_list_ = new CommonList(size);
this->write_list_ = new CommonList(size);
}
BlockingQueue::~BlockingQueue(void)
{
delete this->read_list_;
delete this->write_list_;
}
int BlockingQueue::append(CommonItem item)
{
if (item == NULL)
{
return -1;
}
MutexLockGuard lock(this->write_mutex_);
return this->append_detail(item);
}
int BlockingQueue::peek(CommonItem &item)
{
MutexLockConditionGuard lock(this->read_mutex_, this->read_lock_);
return this->peek_detail(item);
}
int BlockingQueue::swap(CommonItem &item)
{
MutexLockGuard lock(this->write_mutex_);
while (this->write_list_->cur_size() == 0)
{
listEmpty_.wait();
}
if (this->write_list_->cur_size() == 0)
{
return -1;
}
CommonList *temp_list;
temp_list = this->read_list_;
temp_list->reset();
this->read_list_ = this->write_list_;
this->write_list_ = temp_list;
this->peek_index_ = 0;
return this->read_list_->get_item(item, this->peek_index_++);
}
int BlockingQueue::peek_detail(CommonItem &item)
{
if (this->peek_index_ < this->read_list_->cur_size())
{
return this->read_list_->get_item(item, this->peek_index_++);
}
else
{
return this->swap(item);
}
}
int BlockingQueue::append_detail(CommonItem item)
{
int ret = this->write_list_->add_item(item);
if (ret == 0)
{
listEmpty_.notify();
}
return ret;
}
int BlockingQueue::appendNull(void)
{
return this->append_detail(NULL);
}