diff --git a/tests/attack/test_mod_wp_enum.py b/tests/attack/test_mod_wp_enum.py index 8dce76740..0513a76b7 100644 --- a/tests/attack/test_mod_wp_enum.py +++ b/tests/attack/test_mod_wp_enum.py @@ -259,6 +259,9 @@ async def test_wp_version(): ) ) + respx.get(url__regex=r"http://perdu.com/wp-content/plugins/.*?/readme.txt").mock(return_value=httpx.Response(404)) + respx.get(url__regex=r"http://perdu.com/wp-content/themes/.*?/readme.txt").mock(return_value=httpx.Response(404)) + persister = AsyncMock() request = Request("http://perdu.com") diff --git a/wapitiCore/attack/cms/mod_wp_enum.py b/wapitiCore/attack/cms/mod_wp_enum.py index c7b30daf2..ce346b436 100644 --- a/wapitiCore/attack/cms/mod_wp_enum.py +++ b/wapitiCore/attack/cms/mod_wp_enum.py @@ -230,7 +230,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): await self.detect_version(self.PAYLOADS_HASH, request_to_root.url) # Call the method on the instance self.versions = sorted(self.versions, key=lambda x: x.split('.')) if self.versions else [] - drupal_detected = { + wp_detected = { "name": "WordPress", "versions": self.versions, "categories": ["CMS WordPress"], @@ -247,12 +247,12 @@ async def attack(self, request: Request, response: Optional[Response] = None): await self.add_info( finding_class=SoftwareVersionDisclosureFinding, request=request_to_root, - info=json.dumps(drupal_detected), + info=json.dumps(wp_detected), ) await self.add_info( finding_class=SoftwareNameDisclosureFinding, request=request_to_root, - info=json.dumps(drupal_detected), + info=json.dumps(wp_detected), ) await self.check_false_positive(request_to_root.url) log_blue("Enumeration of WordPress Plugins :") diff --git a/wapitiCore/attack/mod_wp_enum.py b/wapitiCore/attack/mod_wp_enum.py index 2a6447cdf..1601cefe0 100644 --- a/wapitiCore/attack/mod_wp_enum.py +++ b/wapitiCore/attack/mod_wp_enum.py @@ -23,8 +23,9 @@ import xml.etree.ElementTree as ET from os.path import join as path_join from typing import Match, Optional +from httpx import RequestError -from wapitiCore.attack.attack import Attack +from wapitiCore.attack.attack import Attack, random_string from wapitiCore.definitions.fingerprint import SoftwareNameDisclosureFinding from wapitiCore.definitions.fingerprint_webapp import SoftwareVersionDisclosureFinding from wapitiCore.main.log import log_blue, log_orange, logging @@ -41,6 +42,21 @@ class ModuleWpEnum(Attack): name = "wp_enum" PAYLOADS_FILE_PLUGINS = "wordpress_plugins.txt" PAYLOADS_FILE_THEMES = "wordpress_themes.txt" + false_positive = {"plugins": False, "themes": False} + + async def check_false_positive(self, url): + self.false_positive = {"plugins": False, "themes": False} + rand = random_string() + for wp_type in ["plugins", "themes"]: + request = Request(f'{url}/wp-content/{wp_type}/{rand}/readme.txt', 'GET') + try: + response: Response = await self.crawler.async_send(request) + except RequestError: + self.network_errors += 1 + else: + if response.status == 403 or response.is_success: + logging.warning(f"False positive detected for {wp_type} due to status code {response.status}") + self.false_positive[wp_type] = response.status def get_plugin(self): with open( @@ -123,55 +139,57 @@ async def detect_plugin(self, url): break request = Request(f'{url}/wp-content/plugins/{plugin}/readme.txt', 'GET') - response = await self.crawler.async_send(request) - - if response.is_success: - version = re.search(r'tag:\s*([\d.]+)', response.content) - - # This check was added to detect invalid format of "Readme.txt" who can cause a crashe - if version: - version = version.group(1) - else: - logging.warning("Readme.txt is not in a valid format") - version = "" - - plugin_detected = { - "name": plugin, - "versions": [version], - "categories": ["WordPress plugins"], - "groups": ['Add-ons'] - } - - log_blue( - MSG_TECHNO_VERSIONED, - plugin, - version - ) - - await self.add_info( - finding_class=SoftwareNameDisclosureFinding, - request=request, - info=json.dumps(plugin_detected), - response=response - ) - elif response.status == 403: - plugin_detected = { - "name": plugin, - "versions": [""], - "categories": ["WordPress plugins"], - "groups": ['Add-ons'] - } - log_blue( - MSG_TECHNO_VERSIONED, - plugin, - [""] - ) - await self.add_info( - finding_class=SoftwareNameDisclosureFinding, - request=request, - info=json.dumps(plugin_detected), - response=response - ) + try: + response: Response = await self.crawler.async_send(request) + except RequestError: + self.network_errors += 1 + else: + if response.is_success: + version = re.search(r'tag:\s*([\d.]+)', response.content) + + # This check was added to detect invalid format of "Readme.txt" which can cause a crash + if version: + version = version.group(1) + else: + version = "" + + if version or \ + self.false_positive["plugins"] < 200 or self.false_positive["plugins"] > 299: + plugin_detected = { + "name": plugin, + "versions": [version], + "categories": ["WordPress plugins"], + "groups": ['Add-ons'] + } + log_blue( + MSG_TECHNO_VERSIONED, + plugin, + [version] + ) + await self.add_info( + finding_class=SoftwareNameDisclosureFinding, + request=request, + info=json.dumps(plugin_detected), + response=response + ) + elif response.status == 403 and self.false_positive["plugins"] != 403: + plugin_detected = { + "name": plugin, + "versions": [""], + "categories": ["WordPress plugins"], + "groups": ['Add-ons'] + } + log_blue( + MSG_TECHNO_VERSIONED, + plugin, + [""] + ) + await self.add_info( + finding_class=SoftwareNameDisclosureFinding, + request=request, + info=json.dumps(plugin_detected), + response=response + ) async def detect_theme(self, url): for theme in self.get_theme(): @@ -179,50 +197,57 @@ async def detect_theme(self, url): break request = Request(f'{url}/wp-content/themes/{theme}/readme.txt', 'GET') - response = await self.crawler.async_send(request) - - if response.is_success: - version = re.search(r'tag:\s*([\d.]+)', response.content) - # This check was added to detect invalid format of "Readme.txt" who can cause a crashe - if version: - version = version.group(1) - else: - version = "" - theme_detected = { - "name": theme, - "versions": [version], - "categories": ["WordPress themes"], - "groups": ['Add-ons'] - } - log_blue( - MSG_TECHNO_VERSIONED, - theme, - version - ) - await self.add_info( - finding_class=SoftwareNameDisclosureFinding, - request=request, - info=json.dumps(theme_detected), - response=response - ) - elif response.status == 403: - theme_detected = { - "name": theme, - "versions": [""], - "categories": ["WordPress themes"], - "groups": ['Add-ons'] - } - log_blue( - MSG_TECHNO_VERSIONED, - theme, - [""] - ) - await self.add_info( - finding_class=SoftwareNameDisclosureFinding, - request=request, - info=json.dumps(theme_detected), - response=response - ) + try: + response: Response = await self.crawler.async_send(request) + except RequestError: + self.network_errors += 1 + else: + if response.is_success: + version = re.search(r'tag:\s*([\d.]+)', response.content) + # This check was added to detect invalid format of "Readme.txt" which can cause a crash + if version: + version = version.group(1) + else: + version = "" + + theme_detected = { + "name": theme, + "versions": [version], + "categories": ["WordPress themes"], + "groups": ['Add-ons'] + } + + if version or \ + self.false_positive["themes"] < 200 or self.false_positive["themes"] > 299: + log_blue( + MSG_TECHNO_VERSIONED, + theme, + [version] + ) + await self.add_info( + finding_class=SoftwareNameDisclosureFinding, + request=request, + info=json.dumps(theme_detected), + response=response + ) + elif response.status == 403 and self.false_positive["themes"] != 403: + theme_detected = { + "name": theme, + "versions": [""], + "categories": ["WordPress themes"], + "groups": ['Add-ons'] + } + log_blue( + MSG_TECHNO_VERSIONED, + theme, + [""] + ) + await self.add_info( + finding_class=SoftwareNameDisclosureFinding, + request=request, + info=json.dumps(theme_detected), + response=response + ) @staticmethod def check_wordpress(response: Response): @@ -245,6 +270,7 @@ async def attack(self, request: Request, response: Optional[Response] = None): response = await self.crawler.async_send(request_to_root, follow_redirects=True) if self.check_wordpress(response): await self.detect_version(request_to_root.url) + await self.check_false_positive(request_to_root.url) log_blue("----") log_blue("Enumeration of WordPress Plugins :") await self.detect_plugin(request_to_root.url)