diff --git a/requirements.txt b/requirements.txt index 1e8559009d..9ed465f04f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ pytz praw>=4.0.0,<6.0.0 geoip2>=4.0,<5.0 requests>=2.24.0,<3.0.0 -dnspython<3.0 sqlalchemy>=1.4,<1.5 importlib_metadata>=3.6 packaging diff --git a/sopel/modules/url.py b/sopel/modules/url.py index c835d62314..934e15608d 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -10,19 +10,19 @@ """ from __future__ import annotations +from email.message import EmailMessage from ipaddress import ip_address import logging import re +from socket import getaddrinfo, IPPROTO_TCP from typing import TYPE_CHECKING -from urllib.parse import urlparse +from urllib.parse import urlparse, urlunparse -import dns.resolver import requests from urllib3.exceptions import LocationValueError # type: ignore[import] from sopel import plugin, tools from sopel.config import types -from sopel.tools import web if TYPE_CHECKING: from typing import Generator, List, Optional, Tuple @@ -70,6 +70,20 @@ class UrlSection(types.StaticSection): """Enable requests to private and local network IP addresses""" +class HostAdapter(requests.adapters.HTTPAdapter): + """SNI and certificate validation based on Host header value. + + See also: https://github.com/requests/toolbelt/pull/293 + """ + def send(self, request, *args, **kwargs): + host = request.headers.get("Host") + if host: + self.poolmanager.connection_pool_kw["server_hostname"] = host + elif "server_hostname" in self.poolmanager.connection_pool_kw: + self.poolmanager.connection_pool_kw.pop("server_hostname") + return super().send(request, *args, **kwargs) + + def configure(config: Config): """ | name | example | purpose | @@ -256,7 +270,7 @@ def title_command(bot: SopelWrapper, trigger: Trigger): urls = [bot.memory["last_seen_url"][trigger.sender]] else: # needs to be a list so len() can be checked later - urls = list(web.search_urls(trigger)) + urls = list(tools.web.search_urls(trigger)) for url, title, domain, tinyurl, dispatched in process_urls( bot, trigger, urls, requested=True @@ -297,20 +311,9 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): if re.match(bot.config.core.prefix + r'\S+', trigger): return - unchecked_urls = web.search_urls( + urls = tools.web.search_urls( trigger, exclusion_char=bot.config.url.exclusion_char, clean=True) - urls = [] - safety_cache = bot.memory.get("safety_cache", {}) - safety_cache_local = bot.memory.get("safety_cache_local", {}) - for url in unchecked_urls: - # Avoid fetching known malicious links - if url in safety_cache and safety_cache[url]["positives"] > 0: - continue - if urlparse(url).hostname.lower() in safety_cache_local: - continue - urls.append(url) - for url, title, domain, tinyurl, dispatched in process_urls(bot, trigger, urls): if not dispatched: message = '%s | %s' % (title, domain) @@ -356,49 +359,29 @@ def process_urls( if not requested and url.startswith(bot.config.url.exclusion_char): continue - parsed_url = urlparse(url) - # Check the URL does not match an existing URL callback if check_callbacks(bot, url, use_excludes=not requested): yield (url, None, None, None, True) return - # Prevent private addresses from being queried if enable_private_resolution is False - # FIXME: This does nothing when an attacker knows how to host a 302 - # FIXME: This whole concept has a TOCTOU issue - if not bot.config.url.enable_private_resolution: - if not parsed_url.hostname: - # URL like file:///path is a valid local path (i.e. private) - LOGGER.debug("Ignoring private URL: %s", url) - continue - - try: - ips = [ip_address(parsed_url.hostname)] - except ValueError: - ips = [ip_address(ip) for ip in dns.resolver.resolve(parsed_url.hostname)] - - private = False - for ip in ips: - if ip.is_private or ip.is_loopback: - private = True - break - if private: - LOGGER.debug("Ignoring private URL: %s", url) - continue - # Call the URL to get a title, if possible - title = find_title(url) - if not title: + title_results = find_title( + url, + allow_local=bot.config.url.enable_private_resolution, + memory=bot.memory, + ) + if not title_results: # No title found: don't handle this URL LOGGER.debug('No title found; ignoring URL: %s', url) continue + title, final_hostname = title_results # If the URL is over bot.config.url.shorten_url_length, shorten the URL tinyurl = None if (shorten_url_length > 0) and (len(url) > shorten_url_length): tinyurl = get_or_create_shorturl(bot, url) - yield (url, title, parsed_url.hostname, tinyurl, False) + yield (url, title, final_hostname, tinyurl, False) def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> bool: @@ -436,32 +419,107 @@ def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> b ) -def find_title(url: str, verify: bool = True) -> Optional[str]: - """Return the title for the given URL. +def find_title( + url: str, + verify: bool = True, + allow_local: bool = False, + memory: tools.SopelMemory = {}, +) -> Optional[Tuple[str, str]]: + """Fetch the title for the given URL. :param verify: Whether to require a valid certificate when using https + :param allow_local: Allow requests to non-global addresses (RFC1918, etc.) + :param memory: The bot.memory to search for safety.py caches + :return: A tuple of the (title, final_hostname) that were found, or None """ - try: - response = requests.get(url, stream=True, verify=verify, - headers=DEFAULT_HEADERS) - raw_content = b'' - for byte in response.iter_content(chunk_size=512): - raw_content += byte - if b'' in raw_content or len(raw_content) > MAX_BYTES: - break - content = raw_content.decode('utf-8', errors='ignore') - # Need to close the connection because we have not read all - # the data - response.close() - except requests.exceptions.ConnectionError as e: - LOGGER.debug("Unable to reach URL: %r: %s", url, e) - return None - except ( - requests.exceptions.InvalidURL, # e.g. http:/// - UnicodeError, # e.g. http://.example.com (urllib3<1.26) - LocationValueError, # e.g. http://.example.com (urllib3>=1.26) - ): - LOGGER.debug('Invalid URL: %s', url) + original_url = url + redirects_left = 5 + session = requests.Session() + session.mount("https://", HostAdapter()) + session.headers = dict(DEFAULT_HEADERS) + while redirects_left > 0: + redirects_left -= 1 + parsed_url = urlparse(url) + + # Avoid fetching known malicious links + safety_cache = bot.memory.get("safety_cache", {}) + safety_cache_local = bot.memory.get("safety_cache_local", {}) + if url in safety_cache and safety_cache[url]["positives"] > 0: + LOGGER.debug("Ignoring unsafe URL: %r", url) + return None + if parsed_url.hostname.lower() in safety_cache_local: + LOGGER.debug("Ignoring unsafe domain: %r", url) + return None + + # Prevent private addresses from being queried + try: + # If link is to an IP + ips = [ip_address(parsed_url.hostname)] + except ValueError: # Nope, hostname + try: + # getaddrinfo instead of dns.resolver so we use normal OS + # name resolution. notably, this includes hosts files + addr_info = getaddrinfo(parsed_url.hostname, 443, proto=IPPROTO_TCP) + ips = [ip_address(info[4][0]) for info in addr_info] + # dns.resolver skips hosts file, but is simpler + # answers = dns.resolver.resolve(parsed_url.hostname) + # ips = [ip_address(ip) for ip in answers] + except Exception as e: + LOGGER.debug("Failed to get IPs for %r: %s", url, e) + return None + + if not allow_local and not all(ip.is_global for ip in ips): + LOGGER.debug("Ignoring private URL: %r", url) + return None + + # Prevent DNS TTL=0 TOCTOU from making us talk to unchecked IP + ip_host = ("{}" if ips[0].version == 4 else "[{}]").format(ips[0].compressed) + ip_url = urlunparse((parsed_url.scheme, ip_host) + parsed_url[2:]) + try: + response = session.get( + ip_url, + stream=True, + verify=verify, + headers={"Host": parsed_url.hostname.strip(".")}, # "example.com." + allow_redirects=False, + ) + if response.is_redirect: + LOGGER.debug( + "URL %r redirected to %r", url, response.headers.get("Location") + ) + url = response.headers.get("Location") + if url is None: + return None + continue + + content_bytes = b'' + for chunk in response.iter_content(chunk_size=512): + content_bytes += chunk + if b"" in content_bytes or len(content_bytes) > MAX_BYTES: + break + + encoding = None + if "Content-Type" in response.headers: + msg = EmailMessage() + msg["Content-Type"] = response.headers["Content-Type"] + encoding = msg.get_content_charset() + content = content_bytes.decode(encoding or "utf-8", errors="ignore") + + # Need to close the connection because we haven't read all the data + response.close() + except requests.exceptions.ConnectionError as e: + LOGGER.debug("Unable to reach URL: %r: %s", url, e) + return None + except ( + requests.exceptions.InvalidURL, # e.g. http:/// + UnicodeError, # e.g. http://.example.com (urllib3<1.26) + LocationValueError, # e.g. http://.example.com (urllib3>=1.26) + ): + LOGGER.debug('Invalid URL: %s', url) + return None + break + else: + LOGGER.debug("Redirects exhausted for %r", original_url) return None # Some cleanup that I don't really grok, but was in the original, so @@ -474,12 +532,12 @@ def find_title(url: str, verify: bool = True) -> Optional[str]: if start == -1 or end == -1: return None - title = web.decode(content[start + 7:end]) + title = tools.web.decode(content[start + 7:end]) title = title.strip()[:200] title = ' '.join(title.split()) # cleanly remove multiple spaces - return title or None + return (title, parsed_url.hostname) def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]: @@ -506,7 +564,7 @@ def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]: def get_tinyurl(url: str) -> Optional[str]: """Returns a shortened tinyURL link of the URL""" base_url = "https://tinyurl.com/api-create.php" - tinyurl = "%s?%s" % (base_url, web.urlencode({'url': url})) + tinyurl = "%s?%s" % (base_url, tools.web.urlencode({'url': url})) try: res = requests.get(tinyurl) res.raise_for_status() diff --git a/test/modules/test_modules_url.py b/test/modules/test_modules_url.py index d8a6d087c1..56b6acf8ae 100644 --- a/test/modules/test_modules_url.py +++ b/test/modules/test_modules_url.py @@ -22,6 +22,12 @@ "http://example..com/", # empty label "http://?", # no host ) +PRIVATE_URLS = ( + # "https://httpbin.org/redirect-to?url=http://127.0.0.1/", # online + "http://127.1.1.1/", + "http://10.1.1.1/", + "http://169.254.1.1/", +) @pytest.fixture @@ -76,6 +82,11 @@ def test_find_title_invalid(site): assert url.find_title(site) is None +@pytest.mark.parametrize("site", PRIVATE_URLS) +def test_find_title_private(site): + assert url.find_title(site) is None + + def test_check_callbacks(mockbot): """Test that check_callbacks works with both new & legacy URL callbacks.""" assert url.check_callbacks(mockbot, 'https://example.com/test')