Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Fix base64 padding for kube config #79

Merged
merged 5 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions config/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,31 @@ def _load_oid_token(self, provider):
if 'config' not in provider:
return

parts = provider['config']['id-token'].split('.')
reserved_characters = frozenset(["=", "+", "/"])
token = provider['config']['id-token']

if any(char in token for char in reserved_characters):
# Invalid jwt, as it contains url-unsafe chars
return

parts = token.split('.')
if len(parts) != 3: # Not a valid JWT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a check to make sure the JWT doesn't contain url reserved characters (=, + and /)?

return None
return

padding = (4 - len(parts[1]) % 4) * '='
if len(padding) == 3:
# According to spec, 3 padding characters cannot occur
# in a valid jwt
# https://tools.ietf.org/html/rfc7515#appendix-C
return

if PY3:
jwt_attributes = json.loads(
base64.b64decode(parts[1]).decode('utf-8')
base64.b64decode(parts[1] + padding).decode('utf-8')
Copy link
Member

@roycaihw roycaihw Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JWT requires base64.urlsafe_b64decode instead of base64.b64decode (and for encoding as well), as + and / are not url safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be still worthwhile to do safe decode here?

)
else:
jwt_attributes = json.loads(
base64.b64decode(parts[1] + "==")
base64.b64decode(parts[1] + padding)
)

expire = jwt_attributes.get('exp')
Expand Down
101 changes: 97 additions & 4 deletions config/kube_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def _base64(string):
return base64.standard_b64encode(string.encode()).decode()


def _urlsafe_unpadded_b64encode(string):
return base64.urlsafe_b64encode(string.encode()).decode().rstrip('=')


def _format_expiry_datetime(dt):
return dt.strftime(EXPIRY_DATETIME_FORMAT)

Expand Down Expand Up @@ -97,12 +101,33 @@ def _raise_exception(st):

TEST_OIDC_TOKEN = "test-oidc-token"
TEST_OIDC_INFO = "{\"name\": \"test\"}"
TEST_OIDC_BASE = _base64(TEST_OIDC_TOKEN) + "." + _base64(TEST_OIDC_INFO)
TEST_OIDC_LOGIN = TEST_OIDC_BASE + "." + TEST_CLIENT_CERT_BASE64
TEST_OIDC_BASE = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
_urlsafe_unpadded_b64encode(TEST_OIDC_INFO)
])
TEST_OIDC_LOGIN = ".".join([
TEST_OIDC_BASE,
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT_BASE64)
])
TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN
TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}"
TEST_OIDC_EXP_BASE = _base64(TEST_OIDC_TOKEN) + "." + _base64(TEST_OIDC_EXP)
TEST_OIDC_EXPIRED_LOGIN = TEST_OIDC_EXP_BASE + "." + TEST_CLIENT_CERT_BASE64
TEST_OIDC_EXP_BASE = _urlsafe_unpadded_b64encode(
TEST_OIDC_TOKEN) + "." + _urlsafe_unpadded_b64encode(TEST_OIDC_EXP)
TEST_OIDC_EXPIRED_LOGIN = ".".join([
TEST_OIDC_EXP_BASE,
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_CONTAINS_RESERVED_CHARACTERS = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
_urlsafe_unpadded_b64encode(TEST_OIDC_INFO).replace("a", "+"),
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_INVALID_PADDING_LENGTH = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
"aaaaa",
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])

TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH)


Expand Down Expand Up @@ -409,6 +434,22 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "expired_oidc_nocert"
}
},
{
"name": "oidc_contains_reserved_character",
"context": {
"cluster": "default",
"user": "oidc_contains_reserved_character"

}
},
{
"name": "oidc_invalid_padding_length",
"context": {
"cluster": "default",
"user": "oidc_invalid_padding_length"

}
},
{
"name": "user_pass",
"context": {
Expand Down Expand Up @@ -595,6 +636,38 @@ class TestKubeConfigLoader(BaseTestCase):
}
}
},
{
"name": "oidc_contains_reserved_character",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_CONTAINS_RESERVED_CHARACTERS,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "oidc_invalid_padding_length",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_INVALID_PADDING_LENGTH,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "user_pass",
"user": {
Expand Down Expand Up @@ -793,6 +866,26 @@ def test_oidc_with_refresh_nocert(
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual("Bearer abc123", loader.token)

def test_oidc_fails_if_contains_reserved_chars(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc_contains_reserved_character",
)
self.assertEqual(
loader._load_oid_token("oidc_contains_reserved_character"),
None,
)

def test_oidc_fails_if_invalid_padding_length(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc_invalid_padding_length",
)
self.assertEqual(
loader._load_oid_token("oidc_invalid_padding_length"),
None,
)

def test_user_pass(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN)
actual = FakeConfig()
Expand Down