diff --git a/test/test_utils.py b/test/test_utils.py index 259c4763e1e..fb04e3f6f50 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -65,6 +65,8 @@ sanitize_filename, sanitize_path, sanitize_url, + extract_user_pass, + sanitized_Request, expand_path, prepend_extension, replace_extension, @@ -237,6 +239,26 @@ def test_sanitize_url(self): self.assertEqual(sanitize_url('rmtps://foo.bar'), 'rtmps://foo.bar') self.assertEqual(sanitize_url('https://foo.bar'), 'https://foo.bar') + def test_extract_user_pass(self): + self.assertEqual(extract_user_pass('http://foo.bar'), ('http://foo.bar', None, None)) + self.assertEqual(extract_user_pass('http://:foo.bar'), ('http://:foo.bar', None, None)) + self.assertEqual(extract_user_pass('http://@foo.bar'), ('http://foo.bar', '', '')) + self.assertEqual(extract_user_pass('http://:pass@foo.bar'), ('http://foo.bar', '', 'pass')) + self.assertEqual(extract_user_pass('http://user:@foo.bar'), ('http://foo.bar', 'user', '')) + self.assertEqual(extract_user_pass('http://user:pass@foo.bar'), ('http://foo.bar', 'user', 'pass')) + + def test_sanitized_Request(self): + self.assertFalse(sanitized_Request('http://foo.bar').has_header('Authorization')) + self.assertFalse(sanitized_Request('http://:foo.bar').has_header('Authorization')) + self.assertEqual(sanitized_Request('http://@foo.bar').get_header('Authorization'), + 'Basic Og==') + self.assertEqual(sanitized_Request('http://:pass@foo.bar').get_header('Authorization'), + 'Basic OnBhc3M=') + self.assertEqual(sanitized_Request('http://user:@foo.bar').get_header('Authorization'), + 'Basic dXNlcjo=') + self.assertEqual(sanitized_Request('http://user:pass@foo.bar').get_header('Authorization'), + 'Basic dXNlcjpwYXNz') + def test_expand_path(self): def env(var): return '%{0}%'.format(var) if sys.platform == 'win32' else '${0}'.format(var) diff --git a/youtube_dl/utils.py b/youtube_dl/utils.py index e722eed58de..b5e40dfdc62 100644 --- a/youtube_dl/utils.py +++ b/youtube_dl/utils.py @@ -2154,8 +2154,32 @@ def sanitize_url(url): return url +def extract_user_pass(url): + parts = compat_urlparse.urlsplit(url) + username = parts.username + password = parts.password + if username is not None: + if password is None: + password = '' + netloc = parts.hostname + if parts.port is not None: + netloc = parts.hostname + ':' + parts.port + parts = parts._replace(netloc=netloc) + url = compat_urlparse.urlunsplit(parts) + return url, username, password + + def sanitized_Request(url, *args, **kwargs): - return compat_urllib_request.Request(sanitize_url(url), *args, **kwargs) + url = sanitize_url(url) + url, username, password = extract_user_pass(url) + if username is not None: + # password is not None + auth_payload = username + ':' + password + auth_payload = base64.b64encode(auth_payload.encode('utf-8')).decode('utf-8') + auth_header = 'Basic ' + auth_payload + headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {}) + headers['Authorization'] = auth_header + return compat_urllib_request.Request(url, *args, **kwargs) def expand_path(s):