Skip to content

Commit

Permalink
url: fix private IP protection
Browse files Browse the repository at this point in the history
  • Loading branch information
half-duplex committed Jul 10, 2022
1 parent 17bb5b9 commit 2743253
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 71 deletions.
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pytz
praw>=4.0.0,<6.0.0
geoip2>=4.0,<5.0
requests>=2.24.0,<3.0.0
dnspython<3.0
sqlalchemy>=1.4,<1.5
importlib_metadata>=3.6
packaging
198 changes: 128 additions & 70 deletions sopel/modules/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
"""
from __future__ import annotations

from email.message import EmailMessage
from ipaddress import ip_address
import logging
import re
from socket import getaddrinfo, IPPROTO_TCP
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse

import dns.resolver
import requests
from urllib3.exceptions import LocationValueError # type: ignore[import]

from sopel import plugin, tools
from sopel.config import types
from sopel.tools import web

if TYPE_CHECKING:
from typing import Generator, List, Optional, Tuple
Expand Down Expand Up @@ -70,6 +70,20 @@ class UrlSection(types.StaticSection):
"""Enable requests to private and local network IP addresses"""


class HostAdapter(requests.adapters.HTTPAdapter):
"""SNI and certificate validation based on Host header value.
See also: https://github.com/requests/toolbelt/pull/293
"""
def send(self, request, *args, **kwargs):
host = request.headers.get("Host")
if host:
self.poolmanager.connection_pool_kw["server_hostname"] = host
elif "server_hostname" in self.poolmanager.connection_pool_kw:
self.poolmanager.connection_pool_kw.pop("server_hostname")
return super().send(request, *args, **kwargs)


def configure(config: Config):
"""
| name | example | purpose |
Expand Down Expand Up @@ -256,7 +270,7 @@ def title_command(bot: SopelWrapper, trigger: Trigger):
urls = [bot.memory["last_seen_url"][trigger.sender]]
else:
# needs to be a list so len() can be checked later
urls = list(web.search_urls(trigger))
urls = list(tools.web.search_urls(trigger))

for url, title, domain, tinyurl, dispatched in process_urls(
bot, trigger, urls, requested=True
Expand Down Expand Up @@ -297,20 +311,9 @@ def title_auto(bot: SopelWrapper, trigger: Trigger):
if re.match(bot.config.core.prefix + r'\S+', trigger):
return

unchecked_urls = web.search_urls(
urls = tools.web.search_urls(
trigger, exclusion_char=bot.config.url.exclusion_char, clean=True)

urls = []
safety_cache = bot.memory.get("safety_cache", {})
safety_cache_local = bot.memory.get("safety_cache_local", {})
for url in unchecked_urls:
# Avoid fetching known malicious links
if url in safety_cache and safety_cache[url]["positives"] > 0:
continue
if urlparse(url).hostname.lower() in safety_cache_local:
continue
urls.append(url)

for url, title, domain, tinyurl, dispatched in process_urls(bot, trigger, urls):
if not dispatched:
message = '%s | %s' % (title, domain)
Expand Down Expand Up @@ -356,49 +359,29 @@ def process_urls(
if not requested and url.startswith(bot.config.url.exclusion_char):
continue

parsed_url = urlparse(url)

# Check the URL does not match an existing URL callback
if check_callbacks(bot, url, use_excludes=not requested):
yield (url, None, None, None, True)
return

# Prevent private addresses from being queried if enable_private_resolution is False
# FIXME: This does nothing when an attacker knows how to host a 302
# FIXME: This whole concept has a TOCTOU issue
if not bot.config.url.enable_private_resolution:
if not parsed_url.hostname:
# URL like file:///path is a valid local path (i.e. private)
LOGGER.debug("Ignoring private URL: %s", url)
continue

try:
ips = [ip_address(parsed_url.hostname)]
except ValueError:
ips = [ip_address(ip) for ip in dns.resolver.resolve(parsed_url.hostname)]

private = False
for ip in ips:
if ip.is_private or ip.is_loopback:
private = True
break
if private:
LOGGER.debug("Ignoring private URL: %s", url)
continue

# Call the URL to get a title, if possible
title = find_title(url)
if not title:
title_results = find_title(
url,
allow_local=bot.config.url.enable_private_resolution,
memory=bot.memory,
)
if not title_results:
# No title found: don't handle this URL
LOGGER.debug('No title found; ignoring URL: %s', url)
continue
title, final_hostname = title_results

# If the URL is over bot.config.url.shorten_url_length, shorten the URL
tinyurl = None
if (shorten_url_length > 0) and (len(url) > shorten_url_length):
tinyurl = get_or_create_shorturl(bot, url)

yield (url, title, parsed_url.hostname, tinyurl, False)
yield (url, title, final_hostname, tinyurl, False)


def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> bool:
Expand Down Expand Up @@ -436,32 +419,107 @@ def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> b
)


def find_title(url: str, verify: bool = True) -> Optional[str]:
"""Return the title for the given URL.
def find_title(
url: str,
verify: bool = True,
allow_local: bool = False,
memory: tools.SopelMemory = {},
) -> Optional[Tuple[str, str]]:
"""Fetch the title for the given URL.
:param verify: Whether to require a valid certificate when using https
:param allow_local: Allow requests to non-global addresses (RFC1918, etc.)
:param memory: The bot.memory to search for safety.py caches
:return: A tuple of the (title, final_hostname) that were found, or None
"""
try:
response = requests.get(url, stream=True, verify=verify,
headers=DEFAULT_HEADERS)
raw_content = b''
for byte in response.iter_content(chunk_size=512):
raw_content += byte
if b'</title>' in raw_content or len(raw_content) > MAX_BYTES:
break
content = raw_content.decode('utf-8', errors='ignore')
# Need to close the connection because we have not read all
# the data
response.close()
except requests.exceptions.ConnectionError as e:
LOGGER.debug("Unable to reach URL: %r: %s", url, e)
return None
except (
requests.exceptions.InvalidURL, # e.g. http:///
UnicodeError, # e.g. http://.example.com (urllib3<1.26)
LocationValueError, # e.g. http://.example.com (urllib3>=1.26)
):
LOGGER.debug('Invalid URL: %s', url)
original_url = url
redirects_left = 5
session = requests.Session()
session.mount("https://", HostAdapter())
session.headers = dict(DEFAULT_HEADERS)
while redirects_left > 0:
redirects_left -= 1
parsed_url = urlparse(url)

# Avoid fetching known malicious links
safety_cache = bot.memory.get("safety_cache", {})
safety_cache_local = bot.memory.get("safety_cache_local", {})
if url in safety_cache and safety_cache[url]["positives"] > 0:
LOGGER.debug("Ignoring unsafe URL: %r", url)
return None
if parsed_url.hostname.lower() in safety_cache_local:
LOGGER.debug("Ignoring unsafe domain: %r", url)
return None

# Prevent private addresses from being queried
try:
# If link is to an IP
ips = [ip_address(parsed_url.hostname)]
except ValueError: # Nope, hostname
try:
# getaddrinfo instead of dns.resolver so we use normal OS
# name resolution. notably, this includes hosts files
addr_info = getaddrinfo(parsed_url.hostname, 443, proto=IPPROTO_TCP)
ips = [ip_address(info[4][0]) for info in addr_info]
# dns.resolver skips hosts file, but is simpler
# answers = dns.resolver.resolve(parsed_url.hostname)
# ips = [ip_address(ip) for ip in answers]
except Exception as e:
LOGGER.debug("Failed to get IPs for %r: %s", url, e)
return None

if not allow_local and not all(ip.is_global for ip in ips):
LOGGER.debug("Ignoring private URL: %r", url)
return None

# Prevent DNS TTL=0 TOCTOU from making us talk to unchecked IP
ip_host = ("{}" if ips[0].version == 4 else "[{}]").format(ips[0].compressed)
ip_url = urlunparse((parsed_url.scheme, ip_host) + parsed_url[2:])
try:
response = session.get(
ip_url,
stream=True,
verify=verify,
headers={"Host": parsed_url.hostname.strip(".")}, # "example.com."
allow_redirects=False,
)
if response.is_redirect:
LOGGER.debug(
"URL %r redirected to %r", url, response.headers.get("Location")
)
url = response.headers.get("Location")
if url is None:
return None
continue

content_bytes = b''
for chunk in response.iter_content(chunk_size=512):
content_bytes += chunk
if b"</title>" in content_bytes or len(content_bytes) > MAX_BYTES:
break

encoding = None
if "Content-Type" in response.headers:
msg = EmailMessage()
msg["Content-Type"] = response.headers["Content-Type"]
encoding = msg.get_content_charset()
content = content_bytes.decode(encoding or "utf-8", errors="ignore")

# Need to close the connection because we haven't read all the data
response.close()
except requests.exceptions.ConnectionError as e:
LOGGER.debug("Unable to reach URL: %r: %s", url, e)
return None
except (
requests.exceptions.InvalidURL, # e.g. http:///
UnicodeError, # e.g. http://.example.com (urllib3<1.26)
LocationValueError, # e.g. http://.example.com (urllib3>=1.26)
):
LOGGER.debug('Invalid URL: %s', url)
return None
break
else:
LOGGER.debug("Redirects exhausted for %r", original_url)
return None

# Some cleanup that I don't really grok, but was in the original, so
Expand All @@ -474,12 +532,12 @@ def find_title(url: str, verify: bool = True) -> Optional[str]:
if start == -1 or end == -1:
return None

title = web.decode(content[start + 7:end])
title = tools.web.decode(content[start + 7:end])
title = title.strip()[:200]

title = ' '.join(title.split()) # cleanly remove multiple spaces

return title or None
return (title, parsed_url.hostname)


def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]:
Expand All @@ -506,7 +564,7 @@ def get_or_create_shorturl(bot: SopelWrapper, url: str) -> Optional[str]:
def get_tinyurl(url: str) -> Optional[str]:
"""Returns a shortened tinyURL link of the URL"""
base_url = "https://tinyurl.com/api-create.php"
tinyurl = "%s?%s" % (base_url, web.urlencode({'url': url}))
tinyurl = "%s?%s" % (base_url, tools.web.urlencode({'url': url}))
try:
res = requests.get(tinyurl)
res.raise_for_status()
Expand Down
11 changes: 11 additions & 0 deletions test/modules/test_modules_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
"http://example..com/", # empty label
"http://?", # no host
)
PRIVATE_URLS = (
# "https://httpbin.org/redirect-to?url=http://127.0.0.1/", # online
"http://127.1.1.1/",
"http://10.1.1.1/",
"http://169.254.1.1/",
)


@pytest.fixture
Expand Down Expand Up @@ -76,6 +82,11 @@ def test_find_title_invalid(site):
assert url.find_title(site) is None


@pytest.mark.parametrize("site", PRIVATE_URLS)
def test_find_title_private(site):
assert url.find_title(site) is None


def test_check_callbacks(mockbot):
"""Test that check_callbacks works with both new & legacy URL callbacks."""
assert url.check_callbacks(mockbot, 'https://example.com/test')
Expand Down

0 comments on commit 2743253

Please sign in to comment.