Skip to content

Commit

Permalink
feat: improve behavior of HTTP redirects
Browse files Browse the repository at this point in the history
This commit modifies the Python core so that it will include "safe" headers
when performing a cross-site redirect where both the original and redirected
hosts are within IBM's "cloud.ibm.com" domain.

Signed-off-by: Norbert Biczo <[email protected]>
  • Loading branch information
pyrooka committed Nov 22, 2023
1 parent c552ce3 commit dc1d65d
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 2 deletions.
40 changes: 38 additions & 2 deletions ibm_cloud_sdk_core/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
from http.cookiejar import CookieJar
from os.path import basename
from typing import Dict, List, Optional, Tuple, Union
from urllib3.util.retry import Retry

import requests
from requests.structures import CaseInsensitiveDict
from requests.exceptions import JSONDecodeError
from urllib.parse import urlparse
from urllib3.exceptions import MaxRetryError
from urllib3.util.retry import Retry

from ibm_cloud_sdk_core.authenticators import Authenticator
from .api_exception import ApiException
Expand All @@ -52,6 +54,10 @@
logger = logging.getLogger(__name__)


MAX_REDIRECTS = 10
SAFE_HEADERS = ['authorization', 'www-authenticate', 'cookie', 'cookie2']


# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-locals
class BaseService:
Expand Down Expand Up @@ -294,7 +300,9 @@ def send(self, request: requests.Request, **kwargs) -> DetailedResponse:
"""
# Use a one minute timeout when our caller doesn't give a timeout.
# http://docs.python-requests.org/en/master/user/quickstart/#timeouts
kwargs = dict({"timeout": 60}, **kwargs)
# We also disable the default redirection, to have more granular control
# over the headers sent in each request.
kwargs = dict({'timeout': 60, 'allow_redirects': False}, **kwargs)
kwargs = dict(kwargs, **self.http_config)

if self.disable_ssl_verification:
Expand All @@ -314,6 +322,34 @@ def send(self, request: requests.Request, **kwargs) -> DetailedResponse:
try:
response = self.http_client.request(**request, cookies=self.jar, **kwargs)

# Handle HTTP redirects.
redirects_count = 0
# Check if the response is a redirect to another host.
while response.is_redirect and response.next is not None and redirects_count < MAX_REDIRECTS:
redirects_count += 1

# urllib3 has already prepared a request that can almost be used as-is.
next_request = response.next

# If both the original and the redirected URL are under the `.cloud.ibm.com` domain,
# copy the safe headers that are used for authentication purposes,
if self.service_url.endswith('.cloud.ibm.com') and urlparse(next_request.url).netloc.endswith('.cloud.ibm.com'):
original_headers = request.get('headers')
for header, value in original_headers.items():
if header.lower() in SAFE_HEADERS:
next_request.headers[header] = value
# otherwise remove them manually, because `urllib3` doesn't strip all of them.
else:
for header in SAFE_HEADERS:
next_request.headers.pop(header, None)

response = self.http_client.send(next_request, **kwargs)

# If we reached the max number of redirects and the last response is still a redirect
# stop processing the response and return an error to the user.
if redirects_count == MAX_REDIRECTS and response.is_redirect:
raise MaxRetryError(None, response.url, reason=f'reached the maximum number of redirects: {MAX_REDIRECTS}')

# Process a "success" response.
if 200 <= response.status_code <= 299:
if response.status_code == 204 or request['method'] == 'HEAD':
Expand Down
146 changes: 146 additions & 0 deletions test/test_base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,152 @@ def test_retry_config_external():
assert retry_err.value.reason == error


@responses.activate
def test_redirect_ibm_to_ibm_success():
url_from = 'http://region1.cloud.ibm.com/'
url_to = 'http://region2.cloud.ibm.com/'

safe_headers = {
'Authorization': 'foo',
'WWW-Authenticate': 'bar',
'Cookie': 'baz',
'Cookie2': 'baz2',
}

responses.add(responses.GET, url_from, status=302, adding_headers={'Location': url_to}, body='just about to redirect')
responses.add(responses.GET, url_to, status=200, body='successfully redirected')

service = BaseService(service_url=url_from, authenticator=NoAuthAuthenticator())

prepped = service.prepare_request('GET', '', headers=safe_headers)
response = service.send(prepped)
result = response.get_result()

assert result.status_code == 200
assert result.url == url_to
assert result.text == 'successfully redirected'

# Make sure the headers are included in the 2nd, redirected request.
redirected_headers = responses.calls[1].request.headers
for key in safe_headers:
assert key in redirected_headers


@responses.activate
def test_redirect_not_ibm_to_ibm_fail():
url_from = 'http://region1.notcloud.ibm.com/'
url_to = 'http://region2.cloud.ibm.com/'

safe_headers = {
'Authorization': 'foo',
'WWW-Authenticate': 'bar',
'Cookie': 'baz',
'Cookie2': 'baz2',
}

responses.add(responses.GET, url_from, status=302, adding_headers={'Location': url_to}, body='just about to redirect')
responses.add(responses.GET, url_to, status=200, body='successfully redirected')

service = BaseService(service_url=url_from, authenticator=NoAuthAuthenticator())

prepped = service.prepare_request('GET', '', headers=safe_headers)
response = service.send(prepped)
result = response.get_result()

assert result.status_code == 200
assert result.url == url_to
assert result.text == 'successfully redirected'

# Make sure the headers have been excluded from the 2nd, redirected request.
redirected_headers = responses.calls[1].request.headers
for key in safe_headers:
assert key not in redirected_headers


@responses.activate
def test_redirect_ibm_to_not_ibm_fail():
url_from = 'http://region1.cloud.ibm.com/'
url_to = 'http://region2.notcloud.ibm.com/'

safe_headers = {
'Authorization': 'foo',
'WWW-Authenticate': 'bar',
'Cookie': 'baz',
'Cookie2': 'baz2',
}

responses.add(responses.GET, url_from, status=302, adding_headers={'Location': url_to}, body='just about to redirect')
responses.add(responses.GET, url_to, status=200, body='successfully redirected')

service = BaseService(service_url=url_from, authenticator=NoAuthAuthenticator())

prepped = service.prepare_request('GET', '', headers=safe_headers)
response = service.send(prepped)
result = response.get_result()

assert result.status_code == 200
assert result.url == url_to
assert result.text == 'successfully redirected'

# Make sure the headers have been excluded from the 2nd, redirected request.
redirected_headers = responses.calls[1].request.headers
for key in safe_headers:
assert key not in redirected_headers


@responses.activate
def test_redirect_not_ibm_to_not_ibm_fail():
url_from = 'http://region1.notcloud.ibm.com/'
url_to = 'http://region2.notcloud.ibm.com/'

safe_headers = {
'Authorization': 'foo',
'WWW-Authenticate': 'bar',
'Cookie': 'baz',
'Cookie2': 'baz2',
}

responses.add(responses.GET, url_from, status=302, adding_headers={'Location': url_to}, body='just about to redirect')
responses.add(responses.GET, url_to, status=200, body='successfully redirected')

service = BaseService(service_url=url_from, authenticator=NoAuthAuthenticator())

prepped = service.prepare_request('GET', '', headers=safe_headers)
response = service.send(prepped)
result = response.get_result()

assert result.status_code == 200
assert result.url == url_to
assert result.text == 'successfully redirected'

# Make sure the headers have been excluded from the 2nd, redirected request.
redirected_headers = responses.calls[1].request.headers
for key in safe_headers:
assert key not in redirected_headers


@responses.activate
def test_redirect_ibm_to_ibm_exhausted_fail():
redirects = 11
safe_headers = {
'Authorization': 'foo',
'WWW-Authenticate': 'bar',
'Cookie': 'baz',
'Cookie2': 'baz2',
}

for i in range(redirects):
responses.add(responses.GET, f'http://region{i+1}.cloud.ibm.com/', status=302, adding_headers={'Location': f'http://region{i+2}.cloud.ibm.com/'}, body='just about to redirect')

service = BaseService(service_url=f'http://region1.cloud.ibm.com/', authenticator=NoAuthAuthenticator())

with pytest.raises(MaxRetryError) as ex:
prepped = service.prepare_request('GET', '', headers=safe_headers)
service.send(prepped)

assert ex.value.reason == 'reached the maximum number of redirects: 10'


@responses.activate
def test_user_agent_header():
service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator())
Expand Down

0 comments on commit dc1d65d

Please sign in to comment.