-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadPool.h
143 lines (135 loc) · 3.49 KB
/
ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#pragma once
#include<pthread.h>
#include<queue>
#include<cstdio>
#include"locker.h"
#include<vector>
#include<exception>
#include<memory>
#include"treatRequest.h"
/* *
* 此处为线程池头文件
* 实现:
* 通过预先创建指定数量的线程,并使用条件变量进行阻塞,
* 再创建任务队列来存储任务和分配任务。
* */
using std::vector;
using std::queue;
using std::exception;
class http_conn;
/*代码复用采用模板类*/
template< typename T >
class threadpool{
private:
/*工作线程数量*/
int m_thread_number;
/*最大请求数量*/
int m_max_requests;
/*线程数组*/
vector<pthread_t>threads;
/*工作队列*/
queue<T>taskqueue;
/*互斥锁*/
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
/*条件变量*/
pthread_cond_t notify = PTHREAD_COND_INITIALIZER;
/*是否关闭线程池*/
bool m_stop;
public:
threadpool( int thread_number, int max_requests );
~threadpool();
int thread_destroy();
bool thread_add(T request);
static void* worker( void* arg );
void run();
};
static int MAX_THREAD_NUM = 8;
static int MAX_REQUESTS = 10000;
template< typename T >
threadpool< T >::threadpool( int thread_number, int max_requests ) :
m_thread_number(thread_number),
m_max_requests(max_requests),threads(thread_number),m_stop(false)
{
if((thread_number<=0)||(thread_number>MAX_THREAD_NUM)||
(max_requests<=0)||(max_requests>MAX_REQUESTS)){
threads.resize(MAX_THREAD_NUM);
m_thread_number = thread_number;
m_max_requests = max_requests;
}
/*创建thread_number个线程,并将他们都设置为脱离线程*/
/*脱离线程:执行完后自动回收资源*/
for(int i = 0; i < m_thread_number; i++)
{
printf("create the %dth thread\n", i );
if( pthread_create( &threads[i], NULL, worker, this) !=0 )
{
threads.clear();
throw exception();
}
/*脱离线程*/
if( pthread_detach( threads[i] ) )
{
threads.clear();
throw exception();
}
}
}
template< typename T >
threadpool< T >::~threadpool()
{
threads.clear();
m_stop = true;
}
template< typename T >
bool threadpool<T>::thread_add(T request )
{
printf("thread_add\n");
pthread_mutex_lock(&lock);
if( (int)taskqueue.size() > m_max_requests )
{
pthread_mutex_unlock(&lock);
return false;
}
printf("add_m_sockfd-%d\n",request->getFd());
taskqueue.push( request );
if(pthread_cond_signal(¬ify)!=0)
{
pthread_mutex_unlock(&lock);
return false;
}
pthread_mutex_unlock(&lock);
return true;
}
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;
pool->run();
return pool;
}
template< typename T >
void threadpool< T >::run()
{
/*因为为脱离线程,当m_stop为true时,函数执行完毕后会自动回收资源*/
while( ! m_stop )
{
printf("run\n");
pthread_mutex_lock(&lock);
pthread_cond_wait(¬ify,&lock);
if( taskqueue.empty() )
{
pthread_mutex_unlock(&lock);
continue;
}
T request = taskqueue.front();
printf("m_sockfd-%d\n",request->getFd());
taskqueue.pop();
pthread_mutex_unlock(&lock);
if( !request )
{
printf("request_error\n");
continue;
}
request->process();
}
}