-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #664 from dadav/feature/wpa-sec-download
Add wpa-sec password download
- Loading branch information
Showing
2 changed files
with
77 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,27 @@ | ||
import os | ||
import logging | ||
import threading | ||
import requests | ||
from datetime import datetime | ||
from pwnagotchi.utils import StatusFile | ||
import pwnagotchi.plugins as plugins | ||
from pwnagotchi import plugins | ||
from json.decoder import JSONDecodeError | ||
|
||
|
||
class WpaSec(plugins.Plugin): | ||
__author__ = '[email protected]' | ||
__version__ = '2.0.1' | ||
__version__ = '2.1.0' | ||
__license__ = 'GPL3' | ||
__description__ = 'This plugin automatically uploads handshakes to https://wpa-sec.stanev.org' | ||
|
||
def __init__(self): | ||
self.ready = False | ||
self.report = StatusFile('/root/.wpa_sec_uploads', data_format='json') | ||
self.lock = threading.Lock() | ||
try: | ||
self.report = StatusFile('/root/.wpa_sec_uploads', data_format='json') | ||
except JSONDecodeError as json_err: | ||
os.remove("/root/.wpa_sec_uploads") | ||
self.report = StatusFile('/root/.wpa_sec_uploads', data_format='json') | ||
self.options = dict() | ||
self.skip = list() | ||
|
||
|
@@ -35,6 +43,29 @@ def _upload_to_wpasec(self, path, timeout=30): | |
except requests.exceptions.RequestException as req_e: | ||
raise req_e | ||
|
||
|
||
def _download_from_wpasec(self, output, timeout=30): | ||
""" | ||
Downloads the results from wpasec and safes them to output | ||
Output-Format: bssid, station_mac, ssid, password | ||
""" | ||
api_url = self.options['api_url'] | ||
if not api_url.endswith('/'): | ||
api_url = f"{api_url}/" | ||
api_url = f"{api_url}?api&dl=1" | ||
|
||
cookie = {'key': self.options['api_key']} | ||
try: | ||
result = requests.get(api_url, cookies=cookie, timeout=timeout) | ||
with open(output, 'wb') as output_file: | ||
output_file.write(result.content) | ||
except requests.exceptions.RequestException as req_e: | ||
raise req_e | ||
except OSError as os_e: | ||
raise os_e | ||
|
||
|
||
def on_loaded(self): | ||
""" | ||
Gets called when the plugin gets loaded | ||
|
@@ -53,32 +84,48 @@ def on_internet_available(self, agent): | |
""" | ||
Called in manual mode when there's internet connectivity | ||
""" | ||
if self.ready: | ||
config = agent.config() | ||
display = agent.view() | ||
reported = self.report.data_field_or('reported', default=list()) | ||
|
||
handshake_dir = config['bettercap']['handshakes'] | ||
handshake_filenames = os.listdir(handshake_dir) | ||
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if | ||
filename.endswith('.pcap')] | ||
handshake_new = set(handshake_paths) - set(reported) - set(self.skip) | ||
|
||
if handshake_new: | ||
logging.info("WPA_SEC: Internet connectivity detected. Uploading new handshakes to wpa-sec.stanev.org") | ||
|
||
for idx, handshake in enumerate(handshake_new): | ||
display.set('status', f"Uploading handshake to wpa-sec.stanev.org ({idx + 1}/{len(handshake_new)})") | ||
display.update(force=True) | ||
with self.lock: | ||
if self.ready: | ||
config = agent.config() | ||
display = agent.view() | ||
reported = self.report.data_field_or('reported', default=list()) | ||
|
||
handshake_dir = config['bettercap']['handshakes'] | ||
handshake_filenames = os.listdir(handshake_dir) | ||
handshake_paths = [os.path.join(handshake_dir, filename) for filename in handshake_filenames if | ||
filename.endswith('.pcap')] | ||
handshake_new = set(handshake_paths) - set(reported) - set(self.skip) | ||
|
||
if handshake_new: | ||
logging.info("WPA_SEC: Internet connectivity detected. Uploading new handshakes to wpa-sec.stanev.org") | ||
|
||
for idx, handshake in enumerate(handshake_new): | ||
display.set('status', f"Uploading handshake to wpa-sec.stanev.org ({idx + 1}/{len(handshake_new)})") | ||
display.update(force=True) | ||
try: | ||
self._upload_to_wpasec(handshake) | ||
reported.append(handshake) | ||
self.report.update(data={'reported': reported}) | ||
logging.info("WPA_SEC: Successfully uploaded %s", handshake) | ||
except requests.exceptions.RequestException as req_e: | ||
self.skip.append(handshake) | ||
logging.error("WPA_SEC: %s", req_e) | ||
continue | ||
except OSError as os_e: | ||
logging.error("WPA_SEC: %s", os_e) | ||
continue | ||
|
||
if 'download_results' in self.options and self.options['download_results']: | ||
cracked_file = os.path.join(handshake_dir, 'wpa-sec.cracked.potfile') | ||
if os.path.exists(cracked_file): | ||
last_check = datetime.fromtimestamp(os.path.getmtime(cracked_file)) | ||
if last_check is not None and ((datetime.now() - last_check).seconds / (60 * 60)) < 1: | ||
return | ||
|
||
try: | ||
self._upload_to_wpasec(handshake) | ||
reported.append(handshake) | ||
self.report.update(data={'reported': reported}) | ||
logging.info("WPA_SEC: Successfully uploaded %s", handshake) | ||
self._download_from_wpasec(os.path.join(handshake_dir, 'wpa-sec.cracked.potfile')) | ||
logging.info("WPA_SEC: Downloaded cracked passwords.") | ||
except requests.exceptions.RequestException as req_e: | ||
self.skip.append(handshake) | ||
logging.error("WPA_SEC: %s", req_e) | ||
continue | ||
logging.debug("WPA_SEC: %s", req_e) | ||
except OSError as os_e: | ||
logging.error("WPA_SEC: %s", os_e) | ||
continue | ||
logging.debug("WPA_SEC: %s", os_e) |