-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
138 lines (104 loc) · 3.29 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
基于进程+线程实现多任务的爬虫程序
"""
import time
import uuid
from multiprocessing import Queue, Process
from threading import Thread
from queue import Queue as TQueue
import requests
from lxml import etree
from utils.header import get_ua
headers = {
'User-Agent': get_ua()
}
class DownloadThread(Thread):
def __init__(self, url):
super().__init__()
self.url = url
self.content = None
def run(self):
print('开始下载', self.url)
resp = requests.get(self.url, headers=headers)
if resp.status_code == 200:
resp.encoding = 'utf-8'
self.content = resp.text
print(self.url, '下载完成')
def get_content(self):
return self.content
class DownloadProcess(Process):
"""
下载进程
"""
def __init__(self, url_q, html_q):
self.url_q: Queue = url_q
self.html_q = html_q
super().__init__()
def run(self):
while True:
try:
url = self.url_q.get(timeout=30)
# 启动子线程下载任务
t = DownloadThread(url)
t.start()
t.join()
# 获取下载的数据
html = t.get_content()
# 将数据压入到解析队列中
self.html_q.put((url, html))
except:
break
print('--下载进程 Over--')
class ParseThread(Thread):
def __init__(self, html, url_q, parent_url):
self.html = html
self.parent_url = parent_url
self.url_q = url_q
super(ParseThread, self).__init__()
def run(self):
root = etree.HTML(self.html)
imgs = root.xpath('//div[contains(@class, "picblock")]//img')
for img in imgs:
item = {}
item['id'] = uuid.uuid4().hex
item['name'] = img.xpath('./@alt')[0]
try:
item['cover'] = img.xpath('./@src2')[0]
except:
item['cover'] = img.xpath('./@src')[0]
print(item)
# 将item数据写入到ES索引库中
# 获取下一页的连接
next_page = root.xpath('//a[@class="nextpage"]/@href')
if next_page:
next_url = self.parent_url + next_page[0]
self.url_q.put(next_url) # 将新的下载任务添加到下载队列中
class ParseProcess(Process):
# 解析进程
def __init__(self, url_q, html_q):
self.url_q = url_q
self.html_q = html_q
super(ParseProcess, self).__init__()
def run(self):
while True:
try:
url, html = self.html_q.get(timeout=60)
print('开始解析', url)
parent_url = url[:url.rindex('/') + 1]
# 启动解析线程
ParseThread(html, self.url_q, parent_url).start()
except:
break
print('解析进程Over')
if __name__ == '__main__':
task1 = Queue() # 下载任务队列
task2 = Queue() # 解析任务队列
# 起始爬虫任务
task1.put('http://sc.chinaz.com/tupian/shuaigetupian.html')
p1 = DownloadProcess(task1, task2)
p2 = ParseProcess(task1, task2)
p1.start()
p2.start()
p1.join()
p2.join()
print('---over----')