Skip to content

Commit

Permalink
Fix: Less restrictive content-type header check for google authentica…
Browse files Browse the repository at this point in the history
…tion (ignores charset) (#1382)

* fix: added function for parsing content-type headers

* fix: improved too restrictive content-type check (now ignores charset param)

* test: Added some new tests for the parse_content_type function in _helpers and its use in _metadata

* style: Linted code

---------

Co-authored-by: arithmetic1728 <[email protected]>
  • Loading branch information
DanieleQuasimodo and arithmetic1728 authored Sep 25, 2023
1 parent 3a34c7a commit 7039beb
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
23 changes: 23 additions & 0 deletions google/auth/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import base64
import calendar
import datetime
from email.message import Message
import sys
import urllib

Expand Down Expand Up @@ -63,6 +64,28 @@ def decorator(method):
return decorator


def parse_content_type(header_value):
"""Parse a 'content-type' header value to get just the plain media-type (without parameters).
This is done using the class Message from email.message as suggested in PEP 594
(because the cgi is now deprecated and will be removed in python 3.13,
see https://peps.python.org/pep-0594/#cgi).
Args:
header_value (str): The value of a 'content-type' header as a string.
Returns:
str: A string with just the lowercase media-type from the parsed 'content-type' header.
If the provided content-type is not parsable, returns 'text/plain',
the default value for textual files.
"""
m = Message()
m["content-type"] = header_value
return (
m.get_content_type()
) # Despite the name, actually returns just the media-type


def utcnow():
"""Returns the current UTC datetime.
Expand Down
5 changes: 4 additions & 1 deletion google/auth/compute_engine/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def get(

if response.status == http_client.OK:
content = _helpers.from_bytes(response.data)
if response.headers["content-type"] == "application/json":
if (
_helpers.parse_content_type(response.headers["content-type"])
== "application/json"
):
try:
return json.loads(content)
except ValueError as caught_exc:
Expand Down
18 changes: 18 additions & 0 deletions tests/compute_engine/test__metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ def test_get_success_json():
assert result[key] == value


def test_get_success_json_content_type_charset():
key, value = "foo", "bar"

data = json.dumps({key: value})
request = make_request(
data, headers={"content-type": "application/json; charset=UTF-8"}
)

result = _metadata.get(request, PATH)

request.assert_called_once_with(
method="GET",
url=_metadata._METADATA_ROOT + PATH,
headers=_metadata._METADATA_HEADERS,
)
assert result[key] == value


def test_get_success_retry():
key, value = "foo", "bar"

Expand Down
26 changes: 26 additions & 0 deletions tests/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,32 @@ def func2(): # pragma: NO COVER
_helpers.copy_docstring(SourceClass)(func2)


def test_parse_content_type_plain():
assert _helpers.parse_content_type("text/html") == "text/html"
assert _helpers.parse_content_type("application/xml") == "application/xml"
assert _helpers.parse_content_type("application/json") == "application/json"


def test_parse_content_type_with_parameters():
content_type_html = "text/html; charset=UTF-8"
content_type_xml = "application/xml; charset=UTF-16; version=1.0"
content_type_json = "application/json; charset=UTF-8; indent=2"
assert _helpers.parse_content_type(content_type_html) == "text/html"
assert _helpers.parse_content_type(content_type_xml) == "application/xml"
assert _helpers.parse_content_type(content_type_json) == "application/json"


def test_parse_content_type_missing_or_broken():
content_type_foo = None
content_type_bar = ""
content_type_baz = "1234"
content_type_qux = " ; charset=UTF-8"
assert _helpers.parse_content_type(content_type_foo) == "text/plain"
assert _helpers.parse_content_type(content_type_bar) == "text/plain"
assert _helpers.parse_content_type(content_type_baz) == "text/plain"
assert _helpers.parse_content_type(content_type_qux) == "text/plain"


def test_utcnow():
assert isinstance(_helpers.utcnow(), datetime.datetime)

Expand Down

0 comments on commit 7039beb

Please sign in to comment.