diff --git a/pyproject.toml b/pyproject.toml index 5746b069a..60bac8dd7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,6 @@ dependencies = [ "xmltodict>=0.12,<0.14", "pytz", "requests>=2.24.0,<3.0.0", - "dnspython<3.0", "sqlalchemy>=1.4,<1.5", "importlib_metadata>=3.6", "packaging>=23.2", diff --git a/sopel/builtins/url.py b/sopel/builtins/url.py index fc121b8c4..bb3398bbe 100644 --- a/sopel/builtins/url.py +++ b/sopel/builtins/url.py @@ -10,22 +10,23 @@ """ from __future__ import annotations +from email.message import EmailMessage from ipaddress import ip_address import logging import re +from socket import getaddrinfo, IPPROTO_TCP from typing import NamedTuple, Optional, TYPE_CHECKING from urllib.parse import urlparse -import dns.resolver import requests from urllib3.exceptions import LocationValueError # type: ignore[import] from sopel import plugin, privileges, tools from sopel.config import types -from sopel.tools import web if TYPE_CHECKING: from collections.abc import Generator + from sopel.bot import Sopel, SopelWrapper from sopel.config import Config from sopel.trigger import Trigger @@ -73,7 +74,14 @@ class UrlSection(types.StaticSection): """If greater than 0, the title fetcher will include a TinyURL version of links longer than this many characters.""" enable_private_resolution = types.BooleanAttribute( 'enable_private_resolution', default=False) - """Enable requests to private and local network IP addresses""" + """Allow all requests to private and loopback networks. + + If disabled (the default), obvious attempts to load pages from loopback and + private IP addresses will be blocked. If this matters for your security you + must use additional protections like a firewall and CSRF tokens, since an + attacker can change which IP address a domain name refers to between when + Sopel checks it and when the HTTP request is made. + """ def configure(config: Config): @@ -84,7 +92,7 @@ def configure(config: Config): | exclude | https?://git\\\\.io/.* | A list of regular expressions for URLs for which the title should not be shown. | | exclusion\\_char | ! | A character (or string) which, when immediately preceding a URL, will stop the URL's title from being shown. | | shorten\\_url\\_length | 72 | If greater than 0, the title fetcher will include a TinyURL version of links longer than this many characters. | - | enable\\_private\\_resolution | False | Enable requests to private and local network IP addresses. | + | enable\\_private\\_resolution | False | Allow all requests to private IP addresses. Leaving this disabled only blocks obvious attempts, use a firewall! | """ config.define_section('url', UrlSection) config.url.configure_setting( @@ -107,7 +115,7 @@ def configure(config: Config): ) config.url.configure_setting( 'enable_private_resolution', - 'Enable requests to private and local network IP addresses?' + 'Allow all requests to private (local network) IP addresses?' ) @@ -288,7 +296,7 @@ def title_command(bot: SopelWrapper, trigger: Trigger): urls = [bot.memory["last_seen_url"][trigger.sender]] else: # needs to be a list so len() can be checked later - urls = list(web.search_urls(trigger)) + urls = list(tools.web.search_urls(trigger)) for url, title, domain, tinyurl, ignored in process_urls( bot, trigger, urls, requested=True @@ -334,21 +342,9 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): if re.match(bot.config.core.prefix + r'\S+', trigger): return - unchecked_urls = web.search_urls( + urls = tools.web.search_urls( trigger, exclusion_char=bot.config.url.exclusion_char, clean=True) - urls = [] - safety_cache = bot.memory.get("safety_cache", {}) - safety_cache_local = bot.memory.get("safety_cache_local", {}) - for url in unchecked_urls: - # Avoid fetching known malicious links - if url in safety_cache and safety_cache[url]["positives"] > 0: - continue - parsed = urlparse(url) - if not parsed.hostname or parsed.hostname.lower() in safety_cache_local: - continue - urls.append(url) - for url, title, domain, tinyurl, ignored in process_urls(bot, trigger, urls): if not ignored: message = '%s | %s' % (title, domain) @@ -418,60 +414,35 @@ def process_urls( if not requested and url.startswith(bot.config.url.exclusion_char): continue - parsed_url = urlparse(url) - if check_callbacks(bot, url, use_excludes=not requested): # URL matches a callback OR is excluded, ignore yield URLInfo(url, None, None, None, True) continue - # Prevent private addresses from being queried if enable_private_resolution is False - # FIXME: This does nothing when an attacker knows how to host a 302 - # FIXME: This whole concept has a TOCTOU issue - if not bot.config.url.enable_private_resolution: - if not parsed_url.hostname: - # URL like file:///path is a valid local path (i.e. private) - LOGGER.debug("Ignoring private URL: %s", url) - continue - - try: - ips = [ip_address(parsed_url.hostname)] - except ValueError: - # Extra try/except here in case the DNS resolution fails, see #2348 - try: - ips = [ip_address(ip) for ip in dns.resolver.resolve(parsed_url.hostname)] - except Exception as exc: - LOGGER.debug( - "Cannot resolve hostname %s, ignoring URL %s" - " (exception was: %r)", - parsed_url.hostname, - url, - exc, - ) - continue - - private = False - for ip in ips: - if ip.is_private or ip.is_loopback: - private = True - break - if private: - LOGGER.debug("Ignoring private URL: %s", url) - continue - # Call the URL to get a title, if possible - title = find_title(url) - if not title: + unsafe_urls = [ + url + for url, data in bot.memory.get("safety_cache", {}).items() + if data.get("positives") + ] + title_results = find_title( + url, + allow_local=bot.config.url.enable_private_resolution, + unsafe_urls=unsafe_urls, + unsafe_domains=bot.memory.get("safety_cache_local", {}).values(), + ) + if not title_results: # No title found: don't handle this URL LOGGER.debug('No title found; ignoring URL: %s', url) continue + title, final_hostname = title_results # If the URL is over bot.config.url.shorten_url_length, shorten the URL tinyurl = None if (shorten_url_length > 0) and (len(url) > shorten_url_length): tinyurl = get_or_create_shorturl(bot, url) - yield URLInfo(url, title, parsed_url.hostname, tinyurl, False) + yield URLInfo(url, title, final_hostname, tinyurl, False) def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> bool: @@ -509,32 +480,106 @@ def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> b ) -def find_title(url: str, verify: bool = True) -> Optional[str]: - """Return the title for the given URL. +def find_title( + url: str, + verify: bool = True, + allow_local: bool = False, + unsafe_urls: list = [], + unsafe_domains: list = [], +) -> Optional[tuple[str, str]]: + """Fetch the title for the given URL. :param verify: Whether to require a valid certificate when using https + :param allow_local: Allow requests to non-global addresses (RFC1918, etc.) + :param unsafe_urls: A list of URLs to consider malicious and ignore + :param unsafe_domains: A list of domains to consider malicious and ignore + :return: A tuple of the (title, final_hostname) that were found, or None """ - try: - response = requests.get(url, stream=True, verify=verify, - headers=DEFAULT_HEADERS) - raw_content = b'' - for byte in response.iter_content(chunk_size=512): - raw_content += byte - if b'' in raw_content or len(raw_content) > MAX_BYTES: - break - content = raw_content.decode('utf-8', errors='ignore') - # Need to close the connection because we have not read all - # the data - response.close() - except requests.exceptions.ConnectionError as e: - LOGGER.debug("Unable to reach URL: %r: %s", url, e) - return None - except ( - requests.exceptions.InvalidURL, # e.g. http:/// - UnicodeError, # e.g. http://.example.com (urllib3<1.26) - LocationValueError, # e.g. http://.example.com (urllib3>=1.26) - ): - LOGGER.debug('Invalid URL: %s', url) + original_url = url + redirects_left = 5 + session = requests.Session() + session.headers = dict(DEFAULT_HEADERS) + while redirects_left > 0: + redirects_left -= 1 + parsed_url = urlparse(url) + if not parsed_url.hostname: + return None + + # Avoid fetching known malicious links + if url in unsafe_urls: + LOGGER.debug("Ignoring unsafe URL: %r", url) + return None + if parsed_url.hostname.lower() in unsafe_domains: + LOGGER.debug("Ignoring unsafe domain: %r", url) + return None + + # Prevent private addresses from being queried + try: + # If link is to an IP + ips = [ip_address(parsed_url.hostname)] + except ValueError: # Nope, hostname + try: + # getaddrinfo instead of dns.resolver so we use normal OS + # name resolution, including hosts files. + addr_info = getaddrinfo(parsed_url.hostname, 443, proto=IPPROTO_TCP) + ips = [ip_address(info[4][0]) for info in addr_info] + except Exception as e: + LOGGER.debug("Failed to get IPs for %r: %s", url, e) + return None + + # is_global excludes RFC1918, loopback, link-local, and v6 equivalents + if not allow_local and not all(ip.is_global for ip in ips): + LOGGER.debug( + "Ignoring private URL %r which resolved to %s", + url, + ", ".join([str(ip) for ip in ips]), + ) + return None + + try: + response = session.get( + url, + stream=True, + verify=verify, + allow_redirects=False, + ) + if response.is_redirect: + LOGGER.debug( + "URL %r redirected to %r", url, response.headers.get("Location") + ) + if "Location" not in response.headers: + return None + url = response.headers["Location"] + continue + + content_bytes = b'' + for chunk in response.iter_content(chunk_size=512): + content_bytes += chunk + if b"" in content_bytes or len(content_bytes) > MAX_BYTES: + break + + encoding = None + if "Content-Type" in response.headers: + msg = EmailMessage() + msg["Content-Type"] = response.headers["Content-Type"] + encoding = msg.get_content_charset() + content = content_bytes.decode(encoding or "utf-8", errors="ignore") + + # Need to close the connection because we haven't read all the data + response.close() + except requests.exceptions.ConnectionError as e: + LOGGER.debug("Unable to reach URL: %r: %s", url, e) + return None + except ( + requests.exceptions.InvalidURL, # e.g. http:/// + UnicodeError, # e.g. http://.example.com (urllib3<1.26) + LocationValueError, # e.g. http://.example.com (urllib3>=1.26) + ): + LOGGER.debug('Invalid URL: %s', url) + return None + break + else: + LOGGER.debug("Redirects exhausted for %r", original_url) return None # Some cleanup that I don't really grok, but was in the original, so @@ -547,12 +592,12 @@ def find_title(url: str, verify: bool = True) -> Optional[str]: if start == -1 or end == -1: return None - title = web.decode(content[start + 7:end]) + title = tools.web.decode(content[start + 7:end]) title = title.strip()[:200] title = ' '.join(title.split()) # cleanly remove multiple spaces - return title or None + return (title, parsed_url.hostname) def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]: @@ -579,7 +624,7 @@ def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]: def get_tinyurl(url: str) -> Optional[str]: """Returns a shortened tinyURL link of the URL""" base_url = "https://tinyurl.com/api-create.php" - tinyurl = "%s?%s" % (base_url, web.urlencode({'url': url})) + tinyurl = "%s?%s" % (base_url, tools.web.urlencode({'url': url})) try: res = requests.get(tinyurl) res.raise_for_status() diff --git a/test/builtins/test_builtins_url.py b/test/builtins/test_builtins_url.py index 54db06f24..eedce5411 100644 --- a/test/builtins/test_builtins_url.py +++ b/test/builtins/test_builtins_url.py @@ -22,6 +22,12 @@ "http://example..com/", # empty label "http://?", # no host ) +PRIVATE_URLS = ( + # "https://httpbin.org/redirect-to?url=http://127.0.0.1/", # online + "http://127.1.1.1/", + "http://10.1.1.1/", + "http://169.254.1.1/", +) @pytest.fixture @@ -76,6 +82,11 @@ def test_find_title_invalid(site): assert url.find_title(site) is None +@pytest.mark.parametrize("site", PRIVATE_URLS) +def test_find_title_private(site): + assert url.find_title(site) is None + + def test_check_callbacks(mockbot): """Test that check_callbacks works with both new & legacy URL callbacks.""" assert url.check_callbacks(mockbot, 'https://example.com/test')