-
Notifications
You must be signed in to change notification settings - Fork 1
/
dirsearch_scan.py
115 lines (97 loc) · 5.36 KB
/
dirsearch_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import subprocess
import socket
import requests
from requests.exceptions import RequestException
import concurrent.futures
import threading
import shutil
print_lock = threading.Lock()
def synchronized_print(*args, **kwargs):
with print_lock:
print(*args, **kwargs)
def is_website_up(ip_address, port, protocol):
try:
url = f"{protocol}://{ip_address}:{port}"
response = requests.head(url, timeout=5)
return True
except RequestException:
return False
def run_gobuster(ip_address, port, protocol, output_file, colors):
command = f"sudo gobuster dir -u {protocol}://{ip_address}:{port} -w /usr/share/dirbuster/wordlists/directory-list-2.3-medium.txt -o {output_file}"
try:
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"{colors['red']}[-] Gobuster failed with error code {e.returncode}{colors['reset']}")
return False
with open(output_file, 'r') as f:
synchronized_print(f.read())
return True
def dirsearch_scan(ip_address, port, protocol, output_file, colors, extensions):
command = f"sudo dirsearch -u {protocol}://{ip_address}:{port} -e {extensions} -format plain --timeout=5 --retries=2"
try:
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError as e:
synchronized_print(f"\n{colors['red']}[-] Dirsearch failed with error code {e.returncode}{colors['reset']}")
synchronized_print(f"\n\n\033[1m{colors['yellow']}[>>] Running {colors['yellow']}Gobuster{colors['reset']} as a backup scanner on {protocol.upper()}...{colors['reset']}\033[0m\n")
return run_gobuster(ip_address, port, protocol, output_file, colors)
try:
# default directory where dirsearch stores results
src_file = "/usr/lib/python3/dist-packages/dirsearch/rmat"
# Check if the source file exists
if os.path.isfile(src_file):
shutil.copy(src_file, output_file)
else:
print(f"{colors['red']}[-] Source file {src_file} does not exist{colors['reset']}")
except Exception as e:
print(f"{colors['red']}[-] Error storing file in {output_file}:{e}{colors['reset']}")
return True
def get_server_type(ip_address, port, protocol):
try:
url = f"{protocol}://{ip_address}:{port}"
response = requests.head(url, timeout=5)
server = response.headers.get('Server', '').lower()
return server
except RequestException:
return None
def run_dirsearch(ip_address, port, output_dir, initiator, colors):
if initiator == "web_recon":
print(f"\n\n\033[1m{colors['yellow']}[>>] Running {colors['cyan']}Gobuster{colors['reset']}{colors['reset']}\033[0m")
else:
print(f"\n\n\033[1m{colors['yellow']}[>>] Running {colors['cyan']}Dirsearch{colors['reset']}{colors['reset']}\033[0m")
os.makedirs(output_dir, exist_ok=True)
protocols = ["http", "https"]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for protocol in protocols:
if initiator == "web_recon":
output_file = os.path.join(output_dir, f'gobuster_port_{port}_{protocol}.txt')
run_gobuster(ip_address, port, protocol, output_file, colors)
continue
output_file = os.path.join(output_dir, f'dirsearch_port_{port}_{protocol}.txt')
os.makedirs(os.path.dirname(output_file), exist_ok=True)
is_alive = is_website_up(ip_address, port, protocol)
if is_alive:
server = get_server_type(ip_address, port, protocol)
synchronized_print(f"\n{colors['green']}[\u2713] Detected server type for {protocol.upper()} on port {port}: {server if server else 'Unknown'}{colors['reset']}")
# Customize extensions based on server type
if server and "apache" in server.lower() or "nginx" in server.lower():
extensions = "js,txt,html,php,php3,php4,php5,php7,phtml,sh,pl,cgi"
elif server and "iis" in server.lower():
extensions = "js,txt,html,cs,dll,config,cshtml,asp,net,asax,aspx,ascx,ashx,asmx,axd,asp,sh,pl,cgi"
elif server and "python" in server.lower():
extensions = "js,txt,html,py,pyc,pyo,pyd,wsgi,log,xml,json,conf,inc,sql,bak,sh,pl,cgi"
elif server and "ruby" in server.lower():
extensions = "js,txt,html,rb,erb,rhtml,rake,rails,log,xml,json,conf,inc,sql,bak,yml,gem,sh,pl,cgi"
elif server and "node" in server.lower():
extensions = "js,txt,html,json,ejs,jade,log,conf,inc,sql,bak,md,yml,cgi,pl,sh"
else:
extensions = "php,html,js,txt,html,cs,dll,config,cshtml,asp,net,asax,aspx,ascx,ashx,asmx,axd,asp,sh,pl,cgi"
futures.append(executor.submit(dirsearch_scan, ip_address, port, protocol, output_file, colors, extensions))
else:
pass
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
synchronized_print(f"{colors['red']}[-] An error occurred while scanning: {e}{colors['reset']}")