Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added explicit encoding for key_data string. #38

Merged
merged 5 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions plugins/modules/ec2_win_password.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@
except ImportError:
HAS_CRYPTOGRAPHY = False

from ansible.module_utils.basic import AnsibleModule
from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import HAS_BOTO, ec2_argument_spec, ec2_connect
from ansible.module_utils._text import to_bytes


def main():
def setup_module_object():
argument_spec = ec2_argument_spec()
argument_spec.update(dict(
instance_id=dict(required=True),
Expand All @@ -131,21 +131,21 @@ def main():
wait_timeout=dict(default=120, required=False, type='int'),
)
)
module = AnsibleModule(argument_spec=argument_spec)
module = AnsibleAWSModule(argument_spec=argument_spec)
return module

if not HAS_BOTO:
module.fail_json(msg='Boto required for this module.')

if not HAS_CRYPTOGRAPHY:
module.fail_json(msg='cryptography package required for this module.')

def ec2_win_password(module):
instance_id = module.params.get('instance_id')
key_file = module.params.get('key_file')
key_data = module.params.get('key_data')
if module.params.get('key_passphrase') is None:
b_key_passphrase = None
else:
b_key_passphrase = to_bytes(module.params.get('key_passphrase'), errors='surrogate_or_strict')
if module.params.get('key_data') is None:
b_key_data = None
else:
b_key_data = to_bytes(module.params.get('key_data'), errors='surrogate_or_strict')
wait = module.params.get('wait')
wait_timeout = module.params.get('wait_timeout')

Expand All @@ -169,7 +169,7 @@ def main():
if wait and datetime.datetime.now() >= end:
module.fail_json(msg="wait for password timeout after %d seconds" % wait_timeout)

if key_file is not None and key_data is None:
if key_file is not None and b_key_data is None:
try:
with open(key_file, 'rb') as f:
key = load_pem_private_key(f.read(), b_key_passphrase, default_backend())
Expand All @@ -179,9 +179,9 @@ def main():
except (ValueError, TypeError) as e:
# Handle issues loading key
module.fail_json(msg="unable to parse key file")
elif key_data is not None and key_file is None:
elif b_key_data is not None and key_file is None:
try:
key = load_pem_private_key(key_data, b_key_passphrase, default_backend())
key = load_pem_private_key(b_key_data, b_key_passphrase, default_backend())
except (ValueError, TypeError) as e:
module.fail_json(msg="unable to parse key data")

Expand All @@ -200,5 +200,17 @@ def main():
module.exit_json(win_password=decrypted, changed=True)


def main():
module = setup_module_object()

if not HAS_BOTO:
module.fail_json(msg='Boto required for this module.')

if not HAS_CRYPTOGRAPHY:
module.fail_json(msg='cryptography package required for this module.')

ec2_win_password(module)


if __name__ == '__main__':
main()
15 changes: 15 additions & 0 deletions tests/unit/modules/fixtures/certs/ec2_win_password.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQDAt4WXUohebyTXAxEBkfCjuaKBgv5VgGwHeSWomB0IoKszlNHL
itadHg/vDi1gHSeRANw4KccpFAEIy4Oq3bMpI/rFrDdj/otp4wDcZKuIxq8OtU4b
KBXsSJD9vxAMZktaJ28gpv+mSjnmz+uC0QiuticKaO62pWPGdd6RjuylkwIDAQAB
AoGAUNSo069qQzGa4hQHLgFoTUOvRWMMChCzPu8xPGWQx+2b4SaqWBUDryLMzBfG
MGoKDmet9mCPiEs7o9S4hRI38m2dKBPHRjpFJDPrJmsKNyjk9yBrcJf6EysNEPbd
mYt7DxyUHVNQJpLOPXuMFSi/iloXTBRZ0dEzvhCp2nmX9wECQQD8+s89dwIm41QK
laqELxSVDtSkfLkBIYtw4xPEfuXufna7LHXnR6b9CELAD8L5ht5CiXHzVPpiuwz4
AaIvK44tAkEAwwSHaT6AOeXKNnNLTM+UzFW4rKixsSMQVD/7OjU0/IabFOkE+uY/
WTgLrp1OsqhhDRS/F/eN9uj0dXHXgBEavwJAImW77gCTg1QfpjzJbaW1J7tXgHIQ
+a1k91l445vZib8aR8L42RSuCPOpl9HM0f7bk7J6kvp3/Rqv3bzjH4TNlQJBAId1
k+FEqqiMtsLPntRBs+ei+13i51pVMrhyoLyzzJRDo2EI4o6sdAAy79pgJhPu5UrC
yGGLcK667WLOqpOoTd0CQQC/4Bq12KCwk9VEWOzNV+kPFzTb85RuzwH5Tis+Fbp2
CNc26WPeNwOvNxXgzAve4G4CaUNLnmATatr5BKjU8Xkr
-----END RSA PRIVATE KEY-----
50 changes: 50 additions & 0 deletions tests/unit/modules/test_ec2_win_password.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import (absolute_import, division, print_function)

__metaclass__ = type

'''
Commands to encrypt a message that can be decrypted:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
import base64

path = '/path/to/rsa_public_key.pem'
with open(path, 'r') as f:
rsa_public_key_pem = to_text(f.read())
load_pem_public_key(rsa_public_key_pem = , default_backend())
base64_cipher = public_key.encrypt('Ansible_AWS_EC2_Win_Password', PKCS1v15())
string_cipher = base64.b64encode(base64_cipher)
'''

from ansible.module_utils._text import to_bytes, to_text
from ansible_collections.community.aws.plugins.modules.ec2_win_password import setup_module_object, ec2_win_password
from ansible_collections.community.aws.tests.unit.compat.mock import patch
from ansible_collections.community.aws.tests.unit.modules.utils import AnsibleExitJson, ModuleTestCase, set_module_args

fixture_prefix = 'tests/unit/modules/fixtures/certs'


class TestEc2WinPasswordModule(ModuleTestCase):
@patch('ansible_collections.community.aws.plugins.modules.ec2_win_password.ec2_connect')
def test_decryption(self, mock_connect):

path = fixture_prefix + '/ec2_win_password.pem'
with open(path, 'r') as f:
pem = to_text(f.read())

with self.assertRaises(AnsibleExitJson) as exec_info:
set_module_args({'instance_id': 'i-12345',
'key_data': pem
})
module = setup_module_object()
mock_connect().get_password_data.return_value = 'L2k1iFiu/TRrjGr6Rwco/T3C7xkWxUw4+YPYpGGOmP3KDdy3hT1' \
'8RvdDJ2i0e+y7wUcH43DwbRYSlkSyALY/nzjSV9R5NChUyVs3W5' \
'5oiVuyTKsk0lor8dFJ9z9unq14tScZHvyQ3Nx1ggOtS18S9Pk55q' \
'IaCXfx26ucH76VRho='
ec2_win_password(module)

self.assertEqual(
exec_info.exception.args[0]['win_password'],
to_bytes('Ansible_AWS_EC2_Win_Password'),
)