-
Notifications
You must be signed in to change notification settings - Fork 6
/
CloudBot.py
147 lines (119 loc) · 5.24 KB
/
CloudBot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import json
import requests
import subprocess
import argparse
import logging
import sys
class botCli(object):
def __init__(self,path,url,token):
self.path = path
self.url= url.format(token)
def get_url(self,url):
try:
response = requests.get(url)
content = response.content.decode("utf8")
return content
except requests.exceptions.ConnectionError:
logging.error("Problem with Telegram Connection")
def send_message(self,text, chat_id):
url = self.url + "sendMessage?text={}&chat_id={}".format(text, chat_id)
self.get_url(url)
def get_json_from_url(self,url):
js = {}
try:
content = self.get_url(url)
js = json.loads(content)
except TypeError:
logging.error("Problem parsing JSON")
return js
def get_updates(self):
url = self.url + "getUpdates?timeout=5"
js = self.get_json_from_url(url)
return js
def get_last_chat_id_and_text(self,updates):
num_updates = len(updates["result"])
last_update = num_updates - 1
text = updates["result"][last_update]["message"]["text"]
chat_id = updates["result"][last_update]["message"]["chat"]["id"]
m_id = updates["result"][last_update]["update_id"]
return (chat_id,text,m_id)
def execute_analysis_aws(self,chat_id,args):
try:
cmd =[self.path,"-M","mono"]
args.pop(0)
for i in args:
cmd.append(i)
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(p.stdout.readline, b''):
logging.info(line.decode('utf-8'))
self.send_message(line.decode('utf-8'),chat_id)
except FileNotFoundError:
logging.error("Prowler not found")
def execute_nmap(self,chat_id,args):
cmd = ["nmap"]
args.pop(0)
try:
for i in args:
cmd.append(i)
p = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(p.stdout.readline, b''):
logging.info(line.decode('utf-8'))
self.send_message(line.decode('utf-8'),chat_id)
except FileNotFoundError:
logging.error("Nmap not found")
def get_initID(self):
init_update = 0
if(len(self.get_json_from_url(self.url + "getUpdates?")["result"])==0):
init_update = 0
else:
_, _, init_update = self.get_last_chat_id_and_text(self.get_updates())
return init_update
def run(self,chat_id):
last_id = self.get_initID()
logging.info("[*][*][*][*][*] Running Telegram Server Bot ... [*][*][*][*][*]")
while True:
try:
user_id, msg, current_id = self.get_last_chat_id_and_text(self.get_updates())
if user_id in chat_id and current_id > last_id:
last_id = current_id
if msg.split(' ', 1)[0] == "/ScanAWS":
if len(msg.split())==1:
self.send_message(
"Unable to locate credentials. You can configure credentials by running \"aws configure\"",
user_id)
else:
self.execute_analysis_aws(user_id, msg.split())
logging.info("Request of %i User to execute ScanAWS",user_id)
if msg.split(' ', 1)[0] == "/Nmap":
self.execute_nmap(user_id,msg.split())
logging.info("Request of %i User to execute Nmap", user_id)
else:
self.send_message("To scan your AWS Account use /ScanAWS -p \"profileAWS\", to scan another network could use /Nmap with {nmap args}",user_id)
logging.info("Request Failed of %i User without corrects parameters", user_id)
except IndexError:
logging.error("Problem parsing JSON")
except TypeError:
logging.error("Problem parsing JSON")
except KeyError:
logging.error("Problem parsing JSON")
if __name__ == '__main__':
if sys.version_info < (3, 5,):
print("To run Telegram Bot Server you must use Python 3.5+")
sys.exit(0)
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(format=FORMAT,level=logging.INFO)
#Arguments
parser = argparse.ArgumentParser(description='[+][+] Telegram Bot Server to audit AWS Security Checks')
parser.add_argument('--token','-t', type=str, required=True ,help='Token API Telegram Bot')
parser.add_argument('--path',"-p", type=str, required=True, help='Prowler Path')
parser.add_argument('--users', '-u',type=int ,required=True , nargs='+', help='Users allowed')
args = parser.parse_args()
url = "https://api.telegram.org/bot{}/"
#Create de cli
botcli = botCli(args.path,url,args.token)
#Your ID's user
botcli.run(args.users)