diff --git a/winrm/__init__.py b/winrm/__init__.py index ffe2629..6b76e4f 100644 --- a/winrm/__init__.py +++ b/winrm/__init__.py @@ -7,13 +7,13 @@ from winrm.protocol import Protocol -__version__ = '0.5.0' +__version__ = "0.5.0" # Feature support attributes for multi-version clients. # These values can be easily checked for with hasattr(winrm, "FEATURE_X"), # "'auth_type' in winrm.FEATURE_SUPPORTED_AUTHTYPES", etc for clients to sniff features # supported by a particular version of pywinrm -FEATURE_SUPPORTED_AUTHTYPES = ['basic', 'certificate', 'ntlm', 'kerberos', 'plaintext', 'ssl', 'credssp'] +FEATURE_SUPPORTED_AUTHTYPES = ["basic", "certificate", "ntlm", "kerberos", "plaintext", "ssl", "credssp"] FEATURE_READ_TIMEOUT = True FEATURE_OPERATION_TIMEOUT = True FEATURE_PROXY_SUPPORT = True @@ -21,22 +21,21 @@ class Response(object): """Response from a remote command execution""" + def __init__(self, args): self.std_out, self.std_err, self.status_code = args def __repr__(self): # TODO put tree dots at the end if out/err was truncated - return ''.format( - self.status_code, self.std_out[:20], self.std_err[:20]) + return ''.format(self.status_code, self.std_out[:20], self.std_err[:20]) class Session(object): # TODO implement context manager methods def __init__(self, target, auth, **kwargs): username, password = auth - self.url = self._build_url(target, kwargs.get('transport', 'plaintext')) - self.protocol = Protocol(self.url, - username=username, password=password, **kwargs) + self.url = self._build_url(target, kwargs.get("transport", "plaintext")) + self.protocol = Protocol(self.url, username=username, password=password, **kwargs) def run_cmd(self, command, args=()): # TODO optimize perf. Do not call open/close shell every time @@ -52,8 +51,8 @@ def run_ps(self, script): encoded script command """ # must use utf16 little endian on windows - encoded_ps = b64encode(script.encode('utf_16_le')).decode('ascii') - rs = self.run_cmd('powershell -encodedcommand {0}'.format(encoded_ps)) + encoded_ps = b64encode(script.encode("utf_16_le")).decode("ascii") + rs = self.run_cmd("powershell -encodedcommand {0}".format(encoded_ps)) if len(rs.std_err): # if there was an error message, clean it it up and make it human # readable @@ -61,8 +60,7 @@ def run_ps(self, script): return rs def _clean_error_msg(self, msg): - """converts a Powershell CLIXML message to a more human readable string - """ + """converts a Powershell CLIXML message to a more human readable string""" # TODO prepare unit test, beautify code # if the msg does not start with this, return it as is if msg.startswith(b"#< CLIXML\r\n"): @@ -83,15 +81,13 @@ def _clean_error_msg(self, msg): except Exception as e: # if any of the above fails, the msg was not true xml # print a warning and return the original string - warnings.warn( - "There was a problem converting the Powershell error " - "message: %s" % (e)) + warnings.warn("There was a problem converting the Powershell error " "message: %s" % (e)) else: # if new_msg was populated, that's our error message # otherwise the original error message will be used if len(new_msg): # remove leading and trailing whitespace while we are here - return new_msg.strip().encode('utf-8') + return new_msg.strip().encode("utf-8") # either failed to decode CLIXML or there was nothing to decode # just return the original message @@ -99,7 +95,7 @@ def _clean_error_msg(self, msg): def _strip_namespace(self, xml): """strips any namespaces from an xml string""" - p = re.compile(b"xmlns=*[\"\"][^\"\"]*[\"\"]") + p = re.compile(b'xmlns=*[""][^""]*[""]') allmatches = p.finditer(xml) for match in allmatches: xml = xml.replace(match.group(), b"") @@ -107,17 +103,16 @@ def _strip_namespace(self, xml): @staticmethod def _build_url(target, transport): - match = re.match( - r'(?i)^((?Phttp[s]?)://)?(?P[0-9a-z-_.]+)(:(?P\d+))?(?P(/)?(wsman)?)?', target) # NOQA - scheme = match.group('scheme') + match = re.match(r"(?i)^((?Phttp[s]?)://)?(?P[0-9a-z-_.]+)(:(?P\d+))?(?P(/)?(wsman)?)?", target) # NOQA + scheme = match.group("scheme") if not scheme: # TODO do we have anything other than HTTP/HTTPS - scheme = 'https' if transport == 'ssl' else 'http' - host = match.group('host') - port = match.group('port') + scheme = "https" if transport == "ssl" else "http" + host = match.group("host") + port = match.group("port") if not port: - port = 5986 if transport == 'ssl' else 5985 - path = match.group('path') + port = 5986 if transport == "ssl" else 5985 + path = match.group("path") if not path: - path = 'wsman' - return '{0}://{1}:{2}/{3}'.format(scheme, host, port, path.lstrip('/')) + path = "wsman" + return "{0}://{1}:{2}/{3}".format(scheme, host, port, path.lstrip("/")) diff --git a/winrm/encryption.py b/winrm/encryption.py index 6293afe..0cf02f6 100644 --- a/winrm/encryption.py +++ b/winrm/encryption.py @@ -10,7 +10,7 @@ class Encryption(object): SIXTEN_KB = 16384 - MIME_BOUNDARY = b'--Encrypted Boundary' + MIME_BOUNDARY = b"--Encrypted Boundary" def __init__(self, session, protocol): """ @@ -36,15 +36,15 @@ def __init__(self, session, protocol): self.protocol = protocol self.session = session - if protocol == 'ntlm': # Details under Negotiate [2.2.9.1.1] in MS-WSMV + if protocol == "ntlm": # Details under Negotiate [2.2.9.1.1] in MS-WSMV self.protocol_string = b"application/HTTP-SPNEGO-session-encrypted" self._build_message = self._build_ntlm_message self._decrypt_message = self._decrypt_ntlm_message - elif protocol == 'credssp': # Details under CredSSP [2.2.9.1.3] in MS-WSMV + elif protocol == "credssp": # Details under CredSSP [2.2.9.1.3] in MS-WSMV self.protocol_string = b"application/HTTP-CredSSP-session-encrypted" self._build_message = self._build_credssp_message self._decrypt_message = self._decrypt_credssp_message - elif protocol == 'kerberos': + elif protocol == "kerberos": self.protocol_string = b"application/HTTP-SPNEGO-session-encrypted" self._build_message = self._build_kerberos_message self._decrypt_message = self._decrypt_kerberos_message @@ -63,23 +63,22 @@ def prepare_encrypted_request(self, session, endpoint, message): """ host = urlsplit(endpoint).hostname - if self.protocol == 'credssp' and len(message) > self.SIXTEN_KB: - content_type = 'multipart/x-multi-encrypted' - encrypted_message = b'' - message_chunks = [message[i:i+self.SIXTEN_KB] for i in range(0, len(message), self.SIXTEN_KB)] + if self.protocol == "credssp" and len(message) > self.SIXTEN_KB: + content_type = "multipart/x-multi-encrypted" + encrypted_message = b"" + message_chunks = [message[i : i + self.SIXTEN_KB] for i in range(0, len(message), self.SIXTEN_KB)] for message_chunk in message_chunks: encrypted_chunk = self._encrypt_message(message_chunk, host) encrypted_message += encrypted_chunk else: - content_type = 'multipart/encrypted' + content_type = "multipart/encrypted" encrypted_message = self._encrypt_message(message, host) encrypted_message += self.MIME_BOUNDARY + b"--\r\n" - request = requests.Request('POST', endpoint, data=encrypted_message) + request = requests.Request("POST", endpoint, data=encrypted_message) prepared_request = session.prepare_request(request) - prepared_request.headers['Content-Length'] = str(len(prepared_request.body)) - prepared_request.headers['Content-Type'] = '{0};protocol="{1}";boundary="Encrypted Boundary"'\ - .format(content_type, self.protocol_string.decode()) + prepared_request.headers["Content-Length"] = str(len(prepared_request.body)) + prepared_request.headers["Content-Type"] = '{0};protocol="{1}";boundary="Encrypted Boundary"'.format(content_type, self.protocol_string.decode()) return prepared_request @@ -90,7 +89,7 @@ def parse_encrypted_response(self, response): :param response: The response that needs to be decrypted :return: The unencrypted message from the server """ - content_type = response.headers['Content-Type'] + content_type = response.headers["Content-Type"] if 'protocol="{0}"'.format(self.protocol_string.decode()) in content_type: host = urlsplit(response.request.url).hostname msg = self._decrypt_response(response, host) @@ -103,19 +102,19 @@ def _encrypt_message(self, message, host): message_length = str(len(message)).encode() encrypted_stream = self._build_message(message, host) - message_payload = self.MIME_BOUNDARY + b"\r\n" \ - b"\tContent-Type: " + self.protocol_string + b"\r\n" \ - b"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=" + message_length + b"\r\n" + \ - self.MIME_BOUNDARY + b"\r\n" \ - b"\tContent-Type: application/octet-stream\r\n" + \ - encrypted_stream + message_payload = ( + self.MIME_BOUNDARY + b"\r\n" + b"\tContent-Type: " + self.protocol_string + b"\r\n" + b"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=" + message_length + b"\r\n" + self.MIME_BOUNDARY + b"\r\n" + b"\tContent-Type: application/octet-stream\r\n" + encrypted_stream + ) return message_payload def _decrypt_response(self, response, host): - parts = response.content.split(self.MIME_BOUNDARY + b'\r\n') + parts = response.content.split(self.MIME_BOUNDARY + b"\r\n") parts = list(filter(None, parts)) # filter out empty parts of the split - message = b'' + message = b"" for i in range(0, len(parts)): if i % 2 == 1: @@ -124,27 +123,26 @@ def _decrypt_response(self, response, host): header = parts[i].strip() payload = parts[i + 1] - expected_length = int(header.split(b'Length=')[1]) + expected_length = int(header.split(b"Length=")[1]) # remove the end MIME block if it exists - if payload.endswith(self.MIME_BOUNDARY + b'--\r\n'): - payload = payload[:len(payload) - 24] + if payload.endswith(self.MIME_BOUNDARY + b"--\r\n"): + payload = payload[: len(payload) - 24] - encrypted_data = payload.replace(b'\tContent-Type: application/octet-stream\r\n', b'') + encrypted_data = payload.replace(b"\tContent-Type: application/octet-stream\r\n", b"") decrypted_message = self._decrypt_message(encrypted_data, host) actual_length = len(decrypted_message) if actual_length != expected_length: - raise WinRMError('Encrypted length from server does not match the ' - 'expected size, message has been tampered with') + raise WinRMError("Encrypted length from server does not match the " "expected size, message has been tampered with") message += decrypted_message return message def _decrypt_ntlm_message(self, encrypted_data, host): signature_length = struct.unpack(" # 0 # - command_done = len([ - node for node in root.findall('.//*') - if node.get('State', '').endswith('CommandState/Done')]) == 1 + command_done = len([node for node in root.findall(".//*") if node.get("State", "").endswith("CommandState/Done")]) == 1 if command_done: - return_code = int( - next(node for node in root.findall('.//*') - if node.tag.endswith('ExitCode')).text) + return_code = int(next(node for node in root.findall(".//*") if node.tag.endswith("ExitCode")).text) return stdout, stderr, return_code, command_done diff --git a/winrm/tests/conftest.py b/winrm/tests/conftest.py index cf59cea..2857d1c 100644 --- a/winrm/tests/conftest.py +++ b/winrm/tests/conftest.py @@ -493,6 +493,7 @@ """ + def sort_dict(ordered_dict): items = sorted(ordered_dict.items(), key=lambda x: x[0]) ordered_dict.clear() @@ -516,19 +517,15 @@ def send_message(self, message): return open_shell_response elif xml_str_compare(message, close_shell_request): return close_shell_response - elif xml_str_compare( - message, run_cmd_with_args_request) or xml_str_compare( - message, run_cmd_wo_args_request): - return run_cmd_ps_response % '1' + elif xml_str_compare(message, run_cmd_with_args_request) or xml_str_compare(message, run_cmd_wo_args_request): + return run_cmd_ps_response % "1" elif xml_str_compare(message, run_ps_request): - return run_cmd_ps_response % '2' - elif xml_str_compare( - message, cleanup_cmd_request % '1') or xml_str_compare( - message, cleanup_cmd_request % '2'): + return run_cmd_ps_response % "2" + elif xml_str_compare(message, cleanup_cmd_request % "1") or xml_str_compare(message, cleanup_cmd_request % "2"): return cleanup_cmd_response - elif xml_str_compare(message, get_cmd_ps_output_request % '1'): + elif xml_str_compare(message, get_cmd_ps_output_request % "1"): return get_cmd_output_response - elif xml_str_compare(message, get_cmd_ps_output_request % '2'): + elif xml_str_compare(message, get_cmd_ps_output_request % "2"): return get_ps_output_response elif xml_str_compare(message, run_cmd_req_input): return run_cmd_req_input_response @@ -539,26 +536,21 @@ def send_message(self, message): elif xml_str_compare(message, stdin_cmd_cleanup): return stdin_cmd_cleanup_response else: - raise Exception('Message was not expected\n\n%s' % message) + raise Exception("Message was not expected\n\n%s" % message) def close_session(self): pass -@fixture(scope='module') +@fixture(scope="module") def protocol_fake(request): - uuid4_patcher = patch('uuid.uuid4') + uuid4_patcher = patch("uuid.uuid4") uuid4_mock = uuid4_patcher.start() - uuid4_mock.return_value = uuid.UUID( - '11111111-1111-1111-1111-111111111111') + uuid4_mock.return_value = uuid.UUID("11111111-1111-1111-1111-111111111111") from winrm.protocol import Protocol - protocol_fake = Protocol( - endpoint='http://windows-host:5985/wsman', - transport='plaintext', - username='john.smith', - password='secret') + protocol_fake = Protocol(endpoint="http://windows-host:5985/wsman", transport="plaintext", username="john.smith", password="secret") protocol_fake.transport = TransportStub() @@ -569,27 +561,24 @@ def uuid4_patch_stop(): return protocol_fake -@fixture(scope='module') +@fixture(scope="module") def protocol_real(): - endpoint = os.environ.get('WINRM_ENDPOINT', None) - transport = os.environ.get('WINRM_TRANSPORT', None) - username = os.environ.get('WINRM_USERNAME', None) - password = os.environ.get('WINRM_PASSWORD', None) + endpoint = os.environ.get("WINRM_ENDPOINT", None) + transport = os.environ.get("WINRM_TRANSPORT", None) + username = os.environ.get("WINRM_USERNAME", None) + password = os.environ.get("WINRM_PASSWORD", None) if endpoint: - settings = dict( - endpoint=endpoint, - operation_timeout_sec=5, - read_timeout_sec=7 - ) + settings = dict(endpoint=endpoint, operation_timeout_sec=5, read_timeout_sec=7) if transport: - settings['transport'] = transport + settings["transport"] = transport if username: - settings['username'] = username + settings["username"] = username if password: - settings['password'] = password + settings["password"] = password from winrm.protocol import Protocol + protocol = Protocol(**settings) return protocol else: - skip('WINRM_ENDPOINT environment variable was not set. Integration tests will be skipped') + skip("WINRM_ENDPOINT environment variable was not set. Integration tests will be skipped") diff --git a/winrm/tests/test_encryption.py b/winrm/tests/test_encryption.py index 47fd13c..bfb1d53 100644 --- a/winrm/tests/test_encryption.py +++ b/winrm/tests/test_encryption.py @@ -9,7 +9,7 @@ def test_init_with_invalid_protocol(): with pytest.raises(WinRMError) as excinfo: - Encryption(None, 'invalid_protocol') + Encryption(None, "invalid_protocol") assert "Encryption for protocol 'invalid_protocol' not supported in pywinrm" in str(excinfo.value) @@ -19,7 +19,7 @@ def test_encrypt_message(): test_message = b"unencrypted message" test_endpoint = b"endpoint" - encryption = Encryption(test_session, 'ntlm') + encryption = Encryption(test_session, "ntlm") actual = encryption.prepare_encrypted_request(test_session, test_endpoint, test_message) expected_encrypted_message = b"dW5lbmNyeXB0ZWQgbWVzc2FnZQ==" @@ -28,24 +28,24 @@ def test_encrypt_message(): assert actual.headers == { "Content-Length": "272", - "Content-Type": 'multipart/encrypted;protocol="application/HTTP-SPNEGO-session-encrypted";boundary="Encrypted Boundary"' + "Content-Type": 'multipart/encrypted;protocol="application/HTTP-SPNEGO-session-encrypted";boundary="Encrypted Boundary"', } - assert actual.body == b"--Encrypted Boundary\r\n" \ - b"\tContent-Type: application/HTTP-SPNEGO-session-encrypted\r\n" \ - b"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=19\r\n" \ - b"--Encrypted Boundary\r\n" \ - b"\tContent-Type: application/octet-stream\r\n" + \ - signature_length + expected_signature + expected_encrypted_message + \ - b"--Encrypted Boundary--\r\n" + assert ( + actual.body == b"--Encrypted Boundary\r\n" + b"\tContent-Type: application/HTTP-SPNEGO-session-encrypted\r\n" + b"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=19\r\n" + b"--Encrypted Boundary\r\n" + b"\tContent-Type: application/octet-stream\r\n" + signature_length + expected_signature + expected_encrypted_message + b"--Encrypted Boundary--\r\n" + ) def test_encrypt_large_credssp_message(): test_session = SessionTest() test_message = b"unencrypted message " * 2048 test_endpoint = "http://testhost.com" - message_chunks = [test_message[i:i + 16384] for i in range(0, len(test_message), 16384)] + message_chunks = [test_message[i : i + 16384] for i in range(0, len(test_message), 16384)] - encryption = Encryption(test_session, 'credssp') + encryption = Encryption(test_session, "credssp") actual = encryption.prepare_encrypted_request(test_session, test_endpoint, test_message) expected_encrypted_message1 = base64.b64encode(message_chunks[0]) @@ -54,28 +54,24 @@ def test_encrypt_large_credssp_message(): assert actual.headers == { "Content-Length": "55303", - "Content-Type": 'multipart/x-multi-encrypted;protocol="application/HTTP-CredSSP-session-encrypted";boundary="Encrypted Boundary"' + "Content-Type": 'multipart/x-multi-encrypted;protocol="application/HTTP-CredSSP-session-encrypted";boundary="Encrypted Boundary"', } - assert actual.body == b"--Encrypted Boundary\r\n" \ - b"\tContent-Type: application/HTTP-CredSSP-session-encrypted\r\n" \ - b"\tOriginalContent: type=application/soap+xml;charset=UTF-8;Length=16384\r\n" \ - b"--Encrypted Boundary\r\n" \ - b"\tContent-Type: application/octet-stream\r\n" + \ - struct.pack("System.Management.Automation.PSCustomObjectSystem.Object1Preparing modules for first use.0-1-1Completed-1 1Preparing modules for first use.0-1-1Completed-1 fake : The term \'fake\' is not recognized as the name of a cmdlet, function, script file, or operable program. Check _x000D__x000A_the spelling of the name, or if a path was included, verify that the path is correct and try again._x000D__x000A_At line:1 char:1_x000D__x000A_+ fake cmdlet_x000D__x000A_+ ~~~~_x000D__x000A_ + CategoryInfo : ObjectNotFound: (fake:String) [], CommandNotFoundException_x000D__x000A_ + FullyQualifiedErrorId : CommandNotFoundException_x000D__x000A_ _x000D__x000A_' expected = b"fake : The term 'fake' is not recognized as the name of a cmdlet, function, script file, or operable program. Check \nthe spelling of the name, or if a path was included, verify that the path is correct and try again.\nAt line:1 char:1\n+ fake cmdlet\n+ ~~~~\n + CategoryInfo : ObjectNotFound: (fake:String) [], CommandNotFoundException\n + FullyQualifiedErrorId : CommandNotFoundException" actual = s._clean_error_msg(msg) @@ -67,7 +66,7 @@ def test_decode_clixml_error(): def test_decode_clixml_no_clixml(): - s = Session('windows-host.example.com', auth=('john.smith', 'secret')) + s = Session("windows-host.example.com", auth=("john.smith", "secret")) msg = b"stderr line" expected = b"stderr line" actual = s._clean_error_msg(msg) @@ -75,7 +74,7 @@ def test_decode_clixml_no_clixml(): def test_decode_clixml_no_errors(): - s = Session('windows-host.example.com', auth=('john.smith', 'secret')) + s = Session("windows-host.example.com", auth=("john.smith", "secret")) msg = b'#< CLIXML\r\nSystem.Management.Automation.PSCustomObjectSystem.Object1Preparing modules for first use.0-1-1Completed-1 1Preparing modules for first use.0-1-1Completed-1 ' expected = msg actual = s._clean_error_msg(msg) @@ -83,8 +82,8 @@ def test_decode_clixml_no_errors(): def test_decode_clixml_invalid_xml(): - s = Session('windows-host.example.com', auth=('john.smith', 'secret')) - msg = b'#< CLIXML\r\ndasf' + s = Session("windows-host.example.com", auth=("john.smith", "secret")) + msg = b"#< CLIXML\r\ndasf" with pytest.warns(UserWarning, match="There was a problem converting the Powershell error message"): actual = s._clean_error_msg(msg) diff --git a/winrm/tests/test_transport.py b/winrm/tests/test_transport.py index 3f5ab67..c86dec2 100644 --- a/winrm/tests/test_transport.py +++ b/winrm/tests/test_transport.py @@ -15,259 +15,254 @@ class TestTransport(unittest.TestCase): def setUp(self): super(TestTransport, self).setUp() self._old_env = {} - os.environ.pop('REQUESTS_CA_BUNDLE', None) - os.environ.pop('TRAVIS_APT_PROXY', None) - os.environ.pop('CURL_CA_BUNDLE', None) - os.environ.pop('HTTPS_PROXY', None) - os.environ.pop('HTTP_PROXY', None) - os.environ.pop('NO_PROXY', None) + os.environ.pop("REQUESTS_CA_BUNDLE", None) + os.environ.pop("TRAVIS_APT_PROXY", None) + os.environ.pop("CURL_CA_BUNDLE", None) + os.environ.pop("HTTPS_PROXY", None) + os.environ.pop("HTTP_PROXY", None) + os.environ.pop("NO_PROXY", None) transport.DISPLAYED_PROXY_WARNING = False transport.DISPLAYED_CA_TRUST_WARNING = False def tearDown(self): super(TestTransport, self).tearDown() - os.environ.pop('REQUESTS_CA_BUNDLE', None) - os.environ.pop('TRAVIS_APT_PROXY', None) - os.environ.pop('CURL_CA_BUNDLE', None) - os.environ.pop('HTTPS_PROXY', None) - os.environ.pop('HTTP_PROXY', None) - os.environ.pop('NO_PROXY', None) + os.environ.pop("REQUESTS_CA_BUNDLE", None) + os.environ.pop("TRAVIS_APT_PROXY", None) + os.environ.pop("CURL_CA_BUNDLE", None) + os.environ.pop("HTTPS_PROXY", None) + os.environ.pop("HTTP_PROXY", None) + os.environ.pop("NO_PROXY", None) def test_build_session_cert_validate_default(self): - t_default = transport.Transport(endpoint="https://example.com", - username='test', - password='test', - auth_method='basic', - ) + t_default = transport.Transport( + endpoint="https://example.com", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() self.assertEqual(True, t_default.session.verify) def test_build_session_cert_validate_default_env(self): - os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT' - - t_default = transport.Transport(endpoint="https://example.com", - username='test', - password='test', - auth_method='basic', - ) + os.environ["REQUESTS_CA_BUNDLE"] = "path_to_REQUESTS_CA_CERT" + + t_default = transport.Transport( + endpoint="https://example.com", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() - self.assertEqual('path_to_REQUESTS_CA_CERT', t_default.session.verify) + self.assertEqual("path_to_REQUESTS_CA_CERT", t_default.session.verify) def test_build_session_cert_validate_1(self): - os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT' - - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ) + os.environ["REQUESTS_CA_BUNDLE"] = "path_to_REQUESTS_CA_CERT" + + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() - self.assertEqual('path_to_REQUESTS_CA_CERT', t_default.session.verify) + self.assertEqual("path_to_REQUESTS_CA_CERT", t_default.session.verify) def test_build_session_cert_validate_2(self): - os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT' - - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ) + os.environ["CURL_CA_BUNDLE"] = "path_to_CURL_CA_CERT" + + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() - self.assertEqual('path_to_CURL_CA_CERT', t_default.session.verify) + self.assertEqual("path_to_CURL_CA_CERT", t_default.session.verify) def test_build_session_cert_override_1(self): - os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT' - - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ca_trust_path='overridepath', - ) + os.environ["REQUESTS_CA_BUNDLE"] = "path_to_REQUESTS_CA_CERT" + + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ca_trust_path="overridepath", + ) t_default.build_session() - self.assertEqual('overridepath', t_default.session.verify) + self.assertEqual("overridepath", t_default.session.verify) def test_build_session_cert_override_2(self): - os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT' - - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ca_trust_path='overridepath', - ) + os.environ["CURL_CA_BUNDLE"] = "path_to_CURL_CA_CERT" + + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ca_trust_path="overridepath", + ) t_default.build_session() - self.assertEqual('overridepath', t_default.session.verify) + self.assertEqual("overridepath", t_default.session.verify) def test_build_session_cert_override_3(self): - os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT' - - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ca_trust_path=None, - ) + os.environ["CURL_CA_BUNDLE"] = "path_to_CURL_CA_CERT" + + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ca_trust_path=None, + ) t_default.build_session() self.assertEqual(True, t_default.session.verify) def test_build_session_cert_ignore_1(self): - os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT' - os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT' + os.environ["REQUESTS_CA_BUNDLE"] = "path_to_REQUESTS_CA_CERT" + os.environ["CURL_CA_BUNDLE"] = "path_to_CURL_CA_CERT" - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='ignore', - username='test', - password='test', - auth_method='basic', - ) + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="ignore", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() self.assertIs(False, t_default.session.verify) def test_build_session_cert_ignore_2(self): - os.environ['REQUESTS_CA_BUNDLE'] = 'path_to_REQUESTS_CA_CERT' - os.environ['CURL_CA_BUNDLE'] = 'path_to_CURL_CA_CERT' + os.environ["REQUESTS_CA_BUNDLE"] = "path_to_REQUESTS_CA_CERT" + os.environ["CURL_CA_BUNDLE"] = "path_to_CURL_CA_CERT" - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='ignore', - username='test', - password='test', - auth_method='basic', - ca_trust_path='boguspath' - ) + t_default = transport.Transport( + endpoint="https://example.com", server_cert_validation="ignore", username="test", password="test", auth_method="basic", ca_trust_path="boguspath" + ) t_default.build_session() self.assertIs(False, t_default.session.verify) def test_build_session_proxy_none(self): - os.environ['HTTP_PROXY'] = 'random_proxy' - os.environ['HTTPS_PROXY'] = 'random_proxy_2' + os.environ["HTTP_PROXY"] = "random_proxy" + os.environ["HTTPS_PROXY"] = "random_proxy_2" - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - proxy=None - ) + t_default = transport.Transport( + endpoint="https://example.com", server_cert_validation="validate", username="test", password="test", auth_method="basic", proxy=None + ) t_default.build_session() - self.assertEqual({'no_proxy': '*'}, t_default.session.proxies) + self.assertEqual({"no_proxy": "*"}, t_default.session.proxies) def test_build_session_proxy_defined(self): - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - proxy='test_proxy' - ) + t_default = transport.Transport( + endpoint="https://example.com", server_cert_validation="validate", username="test", password="test", auth_method="basic", proxy="test_proxy" + ) t_default.build_session() - self.assertEqual({'http': 'test_proxy', 'https': 'test_proxy'}, t_default.session.proxies) + self.assertEqual({"http": "test_proxy", "https": "test_proxy"}, t_default.session.proxies) def test_build_session_proxy_defined_and_env(self): - os.environ['HTTPS_PROXY'] = 'random_proxy' + os.environ["HTTPS_PROXY"] = "random_proxy" - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - proxy='test_proxy' - ) + t_default = transport.Transport( + endpoint="https://example.com", server_cert_validation="validate", username="test", password="test", auth_method="basic", proxy="test_proxy" + ) t_default.build_session() - self.assertEqual({'http': 'test_proxy', 'https': 'test_proxy'}, t_default.session.proxies) + self.assertEqual({"http": "test_proxy", "https": "test_proxy"}, t_default.session.proxies) def test_build_session_proxy_with_env_https(self): - os.environ['HTTPS_PROXY'] = 'random_proxy' + os.environ["HTTPS_PROXY"] = "random_proxy" - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ) + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() - self.assertEqual({'https': 'random_proxy'}, t_default.session.proxies) + self.assertEqual({"https": "random_proxy"}, t_default.session.proxies) def test_build_session_proxy_with_env_http(self): - os.environ['HTTP_PROXY'] = 'random_proxy' + os.environ["HTTP_PROXY"] = "random_proxy" - t_default = transport.Transport(endpoint="https://example.com", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - ) + t_default = transport.Transport( + endpoint="https://example.com", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() - self.assertEqual({'http': 'random_proxy'}, t_default.session.proxies) + self.assertEqual({"http": "random_proxy"}, t_default.session.proxies) def test_build_session_server_cert_validation_invalid(self): with self.assertRaises(WinRMError) as exc: - transport.Transport(endpoint="Endpoint", - server_cert_validation='invalid_value', - username='test', - password='test', - auth_method='basic', - ) - self.assertEqual('invalid server_cert_validation mode: invalid_value', str(exc.exception)) + transport.Transport( + endpoint="Endpoint", + server_cert_validation="invalid_value", + username="test", + password="test", + auth_method="basic", + ) + self.assertEqual("invalid server_cert_validation mode: invalid_value", str(exc.exception)) def test_build_session_krb_delegation_as_str(self): - winrm_transport = transport.Transport(endpoint="Endpoint", - server_cert_validation='validate', - username='test', - password='test', - auth_method='kerberos', - kerberos_delegation='True' - ) + winrm_transport = transport.Transport( + endpoint="Endpoint", server_cert_validation="validate", username="test", password="test", auth_method="kerberos", kerberos_delegation="True" + ) self.assertTrue(winrm_transport.kerberos_delegation) def test_build_session_krb_delegation_as_invalid_str(self): with self.assertRaises(ValueError) as exc: - transport.Transport(endpoint="Endpoint", - server_cert_validation='validate', - username='test', - password='test', - auth_method='kerberos', - kerberos_delegation='invalid_value' - ) + transport.Transport( + endpoint="Endpoint", + server_cert_validation="validate", + username="test", + password="test", + auth_method="kerberos", + kerberos_delegation="invalid_value", + ) self.assertEqual("invalid truth value 'invalid_value'", str(exc.exception)) def test_build_session_no_username(self): with self.assertRaises(InvalidCredentialsError) as exc: - transport.Transport(endpoint="Endpoint", - server_cert_validation='validate', - password='test', - auth_method='basic', - ) + transport.Transport( + endpoint="Endpoint", + server_cert_validation="validate", + password="test", + auth_method="basic", + ) self.assertEqual("auth method basic requires a username", str(exc.exception)) def test_build_session_no_password(self): with self.assertRaises(InvalidCredentialsError) as exc: - transport.Transport(endpoint="Endpoint", - server_cert_validation='validate', - username='test', - auth_method='basic', - ) + transport.Transport( + endpoint="Endpoint", + server_cert_validation="validate", + username="test", + auth_method="basic", + ) self.assertEqual("auth method basic requires a password", str(exc.exception)) def test_build_session_invalid_auth(self): - winrm_transport = transport.Transport(endpoint="Endpoint", - server_cert_validation='validate', - username='test', - password='test', - auth_method='invalid_value', - ) + winrm_transport = transport.Transport( + endpoint="Endpoint", + server_cert_validation="validate", + username="test", + password="test", + auth_method="invalid_value", + ) with self.assertRaises(WinRMError) as exc: winrm_transport.build_session() @@ -276,36 +271,39 @@ def test_build_session_invalid_auth(self): def test_build_session_invalid_encryption(self): with self.assertRaises(WinRMError) as exc: - transport.Transport(endpoint="Endpoint", - server_cert_validation='validate', - username='test', - password='test', - auth_method='basic', - message_encryption='invalid_value' - ) + transport.Transport( + endpoint="Endpoint", + server_cert_validation="validate", + username="test", + password="test", + auth_method="basic", + message_encryption="invalid_value", + ) self.assertEqual("invalid message_encryption arg: invalid_value. Should be 'auto', 'always', or 'never'", str(exc.exception)) - @mock.patch('requests.Session') + @mock.patch("requests.Session") def test_close_session(self, mock_session): - t_default = transport.Transport(endpoint="Endpoint", - server_cert_validation='ignore', - username='test', - password='test', - auth_method='basic', - ) + t_default = transport.Transport( + endpoint="Endpoint", + server_cert_validation="ignore", + username="test", + password="test", + auth_method="basic", + ) t_default.build_session() t_default.close_session() mock_session.return_value.close.assert_called_once_with() self.assertIsNone(t_default.session) - @mock.patch('requests.Session') + @mock.patch("requests.Session") def test_close_session_not_built(self, mock_session): - t_default = transport.Transport(endpoint="Endpoint", - server_cert_validation='ignore', - username='test', - password='test', - auth_method='basic', - ) + t_default = transport.Transport( + endpoint="Endpoint", + server_cert_validation="ignore", + username="test", + password="test", + auth_method="basic", + ) t_default.close_session() self.assertFalse(mock_session.return_value.close.called) self.assertIsNone(t_default.session) diff --git a/winrm/transport.py b/winrm/transport.py index a64c960..d432d22 100644 --- a/winrm/transport.py +++ b/winrm/transport.py @@ -37,15 +37,15 @@ except ImportError as ie: pass -__all__ = ['Transport'] +__all__ = ["Transport"] def strtobool(value): value = value.lower() - if value in ('true', 't', 'yes', 'y', 'on', '1'): + if value in ("true", "t", "yes", "y", "on", "1"): return True - elif value in ('false', 'f', 'no', 'n', 'off', '0'): + elif value in ("false", "f", "no", "n", "off", "0"): return False else: @@ -58,18 +58,28 @@ class UnsupportedAuthArgument(Warning): class Transport(object): def __init__( - self, endpoint, username=None, password=None, realm=None, - service=None, keytab=None, ca_trust_path='legacy_requests', cert_pem=None, - cert_key_pem=None, read_timeout_sec=None, server_cert_validation='validate', - kerberos_delegation=False, - kerberos_hostname_override=None, - auth_method='auto', - message_encryption='auto', - credssp_disable_tlsv1_2=False, - credssp_auth_mechanism='auto', - credssp_minimum_version=2, - send_cbt=True, - proxy='legacy_requests'): + self, + endpoint, + username=None, + password=None, + realm=None, + service=None, + keytab=None, + ca_trust_path="legacy_requests", + cert_pem=None, + cert_key_pem=None, + read_timeout_sec=None, + server_cert_validation="validate", + kerberos_delegation=False, + kerberos_hostname_override=None, + auth_method="auto", + message_encryption="auto", + credssp_disable_tlsv1_2=False, + credssp_auth_mechanism="auto", + credssp_minimum_version=2, + send_cbt=True, + proxy="legacy_requests", + ): self.endpoint = endpoint self.username = username self.password = password @@ -89,8 +99,8 @@ def __init__( self.send_cbt = send_cbt self.proxy = proxy - if self.server_cert_validation not in [None, 'validate', 'ignore']: - raise WinRMError('invalid server_cert_validation mode: %s' % self.server_cert_validation) + if self.server_cert_validation not in [None, "validate", "ignore"]: + raise WinRMError("invalid server_cert_validation mode: %s" % self.server_cert_validation) # defensively parse this to a bool if isinstance(kerberos_delegation, bool): @@ -100,41 +110,44 @@ def __init__( self.auth_method = auth_method self.default_headers = { - 'Content-Type': 'application/soap+xml;charset=UTF-8', - 'User-Agent': 'Python WinRM client', + "Content-Type": "application/soap+xml;charset=UTF-8", + "User-Agent": "Python WinRM client", } # try to suppress user-unfriendly warnings from requests' vendored urllib3 try: from requests.packages.urllib3.exceptions import InsecurePlatformWarning - warnings.simplefilter('ignore', category=InsecurePlatformWarning) + + warnings.simplefilter("ignore", category=InsecurePlatformWarning) except Exception: pass # oh well, we tried... try: from requests.packages.urllib3.exceptions import SNIMissingWarning - warnings.simplefilter('ignore', category=SNIMissingWarning) + + warnings.simplefilter("ignore", category=SNIMissingWarning) except Exception: pass # oh well, we tried... # if we're explicitly ignoring validation, try to suppress InsecureRequestWarning, since the user opted-in - if self.server_cert_validation == 'ignore': + if self.server_cert_validation == "ignore": try: from requests.packages.urllib3.exceptions import InsecureRequestWarning - warnings.simplefilter('ignore', category=InsecureRequestWarning) + + warnings.simplefilter("ignore", category=InsecureRequestWarning) except Exception: pass # oh well, we tried... try: from urllib3.exceptions import InsecureRequestWarning - warnings.simplefilter('ignore', category=InsecureRequestWarning) + + warnings.simplefilter("ignore", category=InsecureRequestWarning) except Exception: pass # oh well, we tried... # validate credential requirements for various auth types - if self.auth_method != 'kerberos': - if self.auth_method == 'certificate' or ( - self.auth_method == 'ssl' and (self.cert_pem or self.cert_key_pem)): + if self.auth_method != "kerberos": + if self.auth_method == "certificate" or (self.auth_method == "ssl" and (self.cert_pem or self.cert_key_pem)): if not self.cert_pem or not self.cert_key_pem: raise InvalidCredentialsError("both cert_pem and cert_key_pem must be specified for cert auth") if not os.path.exists(self.cert_pem): @@ -152,63 +165,61 @@ def __init__( # Used for encrypting messages self.encryption = None # The Pywinrm Encryption class used to encrypt/decrypt messages - if self.message_encryption not in ['auto', 'always', 'never']: - raise WinRMError( - "invalid message_encryption arg: %s. Should be 'auto', 'always', or 'never'" % self.message_encryption) + if self.message_encryption not in ["auto", "always", "never"]: + raise WinRMError("invalid message_encryption arg: %s. Should be 'auto', 'always', or 'never'" % self.message_encryption) def build_session(self): session = requests.Session() proxies = dict() if self.proxy is None: - proxies['no_proxy'] = '*' - elif self.proxy != 'legacy_requests': + proxies["no_proxy"] = "*" + elif self.proxy != "legacy_requests": # If there was a proxy specified then use it - proxies['http'] = self.proxy - proxies['https'] = self.proxy + proxies["http"] = self.proxy + proxies["https"] = self.proxy # Merge proxy environment variables - settings = session.merge_environment_settings(url=self.endpoint, - proxies=proxies, stream=None, verify=None, cert=None) + settings = session.merge_environment_settings(url=self.endpoint, proxies=proxies, stream=None, verify=None, cert=None) global DISPLAYED_PROXY_WARNING # We want to eventually stop reading proxy information from the environment. # Also only display the warning once. This method can be called many times during an application's runtime. - if not DISPLAYED_PROXY_WARNING and self.proxy == 'legacy_requests' and ( - 'http' in settings['proxies'] or 'https' in settings['proxies']): - message = "'pywinrm' will use an environment defined proxy. This feature will be disabled in " \ - "the future, please specify it explicitly." - if 'http' in settings['proxies']: - message += " HTTP proxy {proxy} discovered.".format(proxy=settings['proxies']['http']) - if 'https' in settings['proxies']: - message += " HTTPS proxy {proxy} discovered.".format(proxy=settings['proxies']['https']) + if not DISPLAYED_PROXY_WARNING and self.proxy == "legacy_requests" and ("http" in settings["proxies"] or "https" in settings["proxies"]): + message = "'pywinrm' will use an environment defined proxy. This feature will be disabled in " "the future, please specify it explicitly." + if "http" in settings["proxies"]: + message += " HTTP proxy {proxy} discovered.".format(proxy=settings["proxies"]["http"]) + if "https" in settings["proxies"]: + message += " HTTPS proxy {proxy} discovered.".format(proxy=settings["proxies"]["https"]) DISPLAYED_PROXY_WARNING = True warnings.warn(message, DeprecationWarning) - session.proxies = settings['proxies'] + session.proxies = settings["proxies"] # specified validation mode takes precedence - session.verify = self.server_cert_validation == 'validate' + session.verify = self.server_cert_validation == "validate" # patch in CA path override if one was specified in init or env if session.verify: - if self.ca_trust_path == 'legacy_requests' and settings['verify'] is not None: + if self.ca_trust_path == "legacy_requests" and settings["verify"] is not None: # We will - session.verify = settings['verify'] + session.verify = settings["verify"] global DISPLAYED_CA_TRUST_WARNING # We want to eventually stop reading proxy information from the environment. # Also only display the warning once. This method can be called many times during an application's runtime. if not DISPLAYED_CA_TRUST_WARNING and session.verify is not True: - message = "'pywinrm' will use an environment variable defined CA Trust. This feature will be disabled in " \ - "the future, please specify it explicitly." - if os.environ.get('REQUESTS_CA_BUNDLE') is not None: - message += " REQUESTS_CA_BUNDLE contains {ca_path}".format(ca_path=os.environ.get('REQUESTS_CA_BUNDLE')) - elif os.environ.get('CURL_CA_BUNDLE') is not None: - message += " CURL_CA_BUNDLE contains {ca_path}".format(ca_path=os.environ.get('CURL_CA_BUNDLE')) + message = ( + "'pywinrm' will use an environment variable defined CA Trust. This feature will be disabled in " + "the future, please specify it explicitly." + ) + if os.environ.get("REQUESTS_CA_BUNDLE") is not None: + message += " REQUESTS_CA_BUNDLE contains {ca_path}".format(ca_path=os.environ.get("REQUESTS_CA_BUNDLE")) + elif os.environ.get("CURL_CA_BUNDLE") is not None: + message += " CURL_CA_BUNDLE contains {ca_path}".format(ca_path=os.environ.get("CURL_CA_BUNDLE")) DISPLAYED_CA_TRUST_WARNING = True warnings.warn(message, DeprecationWarning) @@ -219,7 +230,7 @@ def build_session(self): encryption_available = False - if self.auth_method == 'kerberos': + if self.auth_method == "kerberos": if not HAVE_KERBEROS: raise WinRMError("requested auth method is kerberos, but pykerberos is not installed") @@ -231,19 +242,18 @@ def build_session(self): hostname_override=self.kerberos_hostname_override, sanitize_mutual_error_response=False, service=self.service, - send_cbt=self.send_cbt + send_cbt=self.send_cbt, ) - encryption_available = hasattr(session.auth, 'winrm_encryption_available') and session.auth.winrm_encryption_available - elif self.auth_method in ['certificate', 'ssl']: - if self.auth_method == 'ssl' and not self.cert_pem and not self.cert_key_pem: + encryption_available = hasattr(session.auth, "winrm_encryption_available") and session.auth.winrm_encryption_available + elif self.auth_method in ["certificate", "ssl"]: + if self.auth_method == "ssl" and not self.cert_pem and not self.cert_key_pem: # 'ssl' was overloaded for HTTPS with optional certificate auth, # fall back to basic auth if no cert specified session.auth = requests.auth.HTTPBasicAuth(username=self.username, password=self.password) else: session.cert = (self.cert_pem, self.cert_key_pem) - session.headers['Authorization'] = \ - "http://schemas.dmtf.org/wbem/wsman/1/wsman/secprofile/https/mutual" - elif self.auth_method == 'ntlm': + session.headers["Authorization"] = "http://schemas.dmtf.org/wbem/wsman/1/wsman/secprofile/https/mutual" + elif self.auth_method == "ntlm": if not HAVE_NTLM: raise WinRMError("requested auth method is ntlm, but requests_ntlm is not installed") @@ -253,11 +263,11 @@ def build_session(self): send_cbt=self.send_cbt, ) # check if requests_ntlm has the session_security attribute available for encryption - encryption_available = hasattr(session.auth, 'session_security') + encryption_available = hasattr(session.auth, "session_security") # TODO: ssl is not exactly right here- should really be client_cert - elif self.auth_method in ['basic', 'plaintext']: + elif self.auth_method in ["basic", "plaintext"]: session.auth = requests.auth.HTTPBasicAuth(username=self.username, password=self.password) - elif self.auth_method == 'credssp': + elif self.auth_method == "credssp": if not HAVE_CREDSSP: raise WinRMError("requests auth method is credssp, but requests-credssp is not installed") @@ -266,7 +276,7 @@ def build_session(self): password=self.password, disable_tlsv1_2=self.credssp_disable_tlsv1_2, auth_mechanism=self.credssp_auth_mechanism, - minimum_version=self.credssp_minimum_version + minimum_version=self.credssp_minimum_version, ) encryption_available = True else: @@ -276,20 +286,19 @@ def build_session(self): self.session = session # Will check the current config and see if we need to setup message encryption - if self.message_encryption == 'always' and not encryption_available: - raise WinRMError( - "message encryption is set to 'always' but the selected auth method %s does not support it" % self.auth_method) + if self.message_encryption == "always" and not encryption_available: + raise WinRMError("message encryption is set to 'always' but the selected auth method %s does not support it" % self.auth_method) elif encryption_available: - if self.message_encryption == 'always': + if self.message_encryption == "always": self.setup_encryption() - elif self.message_encryption == 'auto' and not self.endpoint.lower().startswith('https'): + elif self.message_encryption == "auto" and not self.endpoint.lower().startswith("https"): self.setup_encryption() def setup_encryption(self): # Security context doesn't exist, sending blank message to initialise context - request = requests.Request('POST', self.endpoint, data=None) + request = requests.Request("POST", self.endpoint, data=None) prepared_request = self.session.prepare_request(request) - self._send_message_request(prepared_request, '') + self._send_message_request(prepared_request, "") self.encryption = Encryption(self.session, self.auth_method) def close_session(self): @@ -305,12 +314,12 @@ def send_message(self, message): # urllib3 fails on SSL retries with unicode buffers- must send it a byte string # see https://github.com/shazow/urllib3/issues/717 if isinstance(message, str): - message = message.encode('utf-8') + message = message.encode("utf-8") if self.encryption: prepared_request = self.encryption.prepare_encrypted_request(self.session, self.endpoint, message) else: - request = requests.Request('POST', self.endpoint, data=message) + request = requests.Request("POST", self.endpoint, data=message) prepared_request = self.session.prepare_request(request) response = self._send_message_request(prepared_request, message) @@ -327,9 +336,9 @@ def _send_message_request(self, prepared_request, message): if ex.response.content: response_text = self._get_message_response_text(ex.response) else: - response_text = '' + response_text = "" - raise WinRMTransportError('http', ex.response.status_code, response_text) + raise WinRMTransportError("http", ex.response.status_code, response_text) def _get_message_response_text(self, response): if self.encryption: