From 6dbad5e7ee27b5930203c5bb9b5c04856e592556 Mon Sep 17 00:00:00 2001 From: Noam Rathaus Date: Mon, 23 Oct 2023 11:40:51 +0300 Subject: [PATCH] Change order of things to make easier to merge Create a single call for build_ext_aes128_cmm and build_ext_aes128_ccm_8 Combine more code together More code reuse Combine all the code into a func Combine the build_ext_aes128_ccm into one application_data_length is the real value, the -28 is based on the HTTP request part --- scripts/test-aesccm.py | 502 ++++++++++------------------------------- 1 file changed, 125 insertions(+), 377 deletions(-) diff --git a/scripts/test-aesccm.py b/scripts/test-aesccm.py index 7b2d1e0ec..d270689ee 100644 --- a/scripts/test-aesccm.py +++ b/scripts/test-aesccm.py @@ -50,6 +50,69 @@ def help_msg(): print(" --help this message") +def build_ext_aes128_ccm(dhe, type_8): + """ Build the AES ext with CCM 8 and ciphers """ + ext = {} + if dhe: + groups = [GroupName.secp256r1, + GroupName.ffdhe2048] + ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ + .create(groups) + ext[ExtensionType.signature_algorithms] = \ + SignatureAlgorithmsExtension().create(SIG_ALL) + ext[ExtensionType.signature_algorithms_cert] = \ + SignatureAlgorithmsCertExtension().create(SIG_ALL) + + if type_8: + ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, + CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, + CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + else: + ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, + CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, + CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + else: + ext = None + if type_8: + ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, + CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + else: + ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, + CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + + return (ext, ciphers) + + +def build_conn_graph(host, port, ccm_type_8, dhe, application_data_length): + """ Use one function to build all the graphs to make it easier to understand """ + if ccm_type_8: + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=True) + else: + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=False) + + conversation = Connect(host, port) + node = conversation + node = node.add_child(SetMaxRecordSize(2**16-1)) + + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) + node = node.add_child(ExpectServerHello()) + node = node.add_child(ExpectCertificate()) + if dhe: + node = node.add_child(ExpectServerKeyExchange()) + node = node.add_child(ExpectServerHelloDone()) + node = node.add_child(ClientKeyExchangeGenerator()) + node = node.add_child(ChangeCipherSpecGenerator()) + node = node.add_child(FinishedGenerator()) + node = node.add_child(ExpectChangeCipherSpec()) + node = node.add_child(ExpectFinished()) + data = bytearray(b"GET / HTTP/1.0\r\n" + + b"X-test: " + b"A" * (application_data_length - 28) + + b"\r\n\r\n") + assert len(data) == (application_data_length) + node = node.add_child(ApplicationDataGenerator(data)) + + return (conversation, node) + def main(): host = "localhost" port = 4433 @@ -92,8 +155,6 @@ def main(): conversations = {} - conversation = Connect(host, port) - node = conversation ext = {} if dhe: groups = [GroupName.secp256r1, @@ -120,6 +181,10 @@ def main(): CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, CipherSuite.TLS_RSA_WITH_AES_256_CCM_8, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + + conversation = Connect(host, port) + node = conversation + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) node = node.add_child(ExpectServerHello()) node = node.add_child(ExpectCertificate()) @@ -141,8 +206,6 @@ def main(): conversations["sanity"] = conversation # reject ciphers in TLS1.1 - conversation = Connect(host, port) - node = conversation ext = {} if dhe: groups = [GroupName.secp256r1, @@ -169,6 +232,10 @@ def main(): CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, CipherSuite.TLS_RSA_WITH_AES_256_CCM_8, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + + conversation = Connect(host, port) + node = conversation + node = node.add_child(ClientHelloGenerator(ciphers, version=(3, 2))) node = node.add_child(ExpectAlert(AlertLevel.fatal, AlertDescription.handshake_failure)) @@ -176,25 +243,11 @@ def main(): conversations["AES-CCM in TLS1.1"] = conversation # empty application data message acceptance + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=False) + conversation = Connect(host, port) node = conversation - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) node = node.add_child(ExpectServerHello()) node = node.add_child(ExpectCertificate()) @@ -218,25 +271,11 @@ def main(): conversations["empty app data"] = conversation # empty application data message acceptance with _8 ciphers + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=True) + conversation = Connect(host, port) node = conversation - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) node = node.add_child(ExpectServerHello()) node = node.add_child(ExpectCertificate()) @@ -260,25 +299,11 @@ def main(): conversations["empty app data with _8 ciphers"] = conversation # 1/n-1 message splitting + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=False) + conversation = Connect(host, port) node = conversation - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) node = node.add_child(ExpectServerHello()) node = node.add_child(ExpectCertificate()) @@ -302,42 +327,10 @@ def main(): conversations["1/n-1 record splitting"] = conversation # plaintext just under the maximum permissible - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 - 28) + - b"\r\n\r\n") - assert len(data) == 2**14 - node = node.add_child(ApplicationDataGenerator(data)) + (conversation, node) = build_conn_graph(host, port, ccm_type_8=False, + dhe=dhe, + application_data_length=(2**14)) + node = node.add_child(ExpectApplicationData()) node = node.add_child(AlertGenerator(AlertLevel.warning, AlertDescription.close_notify)) @@ -351,45 +344,14 @@ def main(): conversations["max size plaintext"] = conversation # plaintext over the maximum permissible - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 - 28) + - b"\r\n\r\n") - assert len(data) == 2**14 - node = node.add_child(ApplicationDataGenerator(data)) + (conversation, node) = build_conn_graph(host, port, ccm_type_8=True, + dhe=dhe, + application_data_length=(2**14)) + node = node.add_child(ExpectApplicationData()) node = node.add_child(AlertGenerator(AlertLevel.warning, AlertDescription.close_notify)) + # allow for multiple application data records in response node = node.add_child(ExpectApplicationData()) loop = node @@ -399,251 +361,65 @@ def main(): node.next_sibling = ExpectClose() conversations["max size plaintext with _8 ciphers"] = conversation - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 - 28 + 1) + - b"\r\n\r\n") - assert len(data) == 2**14 + 1 - node = node.add_child(ApplicationDataGenerator(data)) + # too big plaintext + (conversation, node) = build_conn_graph(host, port, ccm_type_8=False, + dhe=dhe, + application_data_length=(2**14 + 1)) + node = node.add_child(ExpectAlert(AlertLevel.fatal, [AlertDescription.decompression_failure, AlertDescription.record_overflow])) node.add_child(ExpectClose()) conversations["too big plaintext"] = conversation - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 - 28 + 1) + - b"\r\n\r\n") - assert len(data) == 2**14 + 1 - node = node.add_child(ApplicationDataGenerator(data)) + # too big plaintext with _8 ciphers + (conversation, node) = build_conn_graph(host, port, ccm_type_8=True, + dhe=dhe, + application_data_length=(2**14 + 1)) + node = node.add_child(ExpectAlert(AlertLevel.fatal, [AlertDescription.decompression_failure, AlertDescription.record_overflow])) node.add_child(ExpectClose()) conversations["too big plaintext with _8 ciphers"] = conversation - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 + 1024 - 28) + - b"\r\n\r\n") - assert len(data) == 2**14 + 1024 - node = node.add_child(ApplicationDataGenerator(data)) + # too big plaintext - max compress + (conversation, node) = build_conn_graph(host, port, ccm_type_8=False, + dhe=dhe, + application_data_length=(2**14 + 1024)) + node = node.add_child(ExpectAlert(AlertLevel.fatal, [AlertDescription.decompression_failure, AlertDescription.record_overflow])) node.add_child(ExpectClose()) conversations["too big plaintext - max compress"] = conversation - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 + 1024 - 28) + - b"\r\n\r\n") - assert len(data) == 2**14 + 1024 - node = node.add_child(ApplicationDataGenerator(data)) + # too big plaintext - max compress with _8 ciphers + (conversation, node) = build_conn_graph(host, port, ccm_type_8=True, + dhe=dhe, + application_data_length=(2**14 + 1024)) + node = node.add_child(ExpectAlert(AlertLevel.fatal, [AlertDescription.decompression_failure, AlertDescription.record_overflow])) node.add_child(ExpectClose()) conversations["too big plaintext - max compress with _8 ciphers"] = conversation - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 + 1024 - 28 + 1) + - b"\r\n\r\n") - assert len(data) == 2**14 + 1024 + 1 - node = node.add_child(ApplicationDataGenerator(data)) + # too big plaintext - above TLSCompressed max + (conversation, node) = build_conn_graph(host, port, ccm_type_8=False, + dhe=dhe, + application_data_length=(2**14 + 1024 + 1)) + node = node.add_child(ExpectAlert(AlertLevel.fatal, AlertDescription.record_overflow)) node.add_child(ExpectClose()) conversations["too big plaintext - above TLSCompressed max"] = conversation - conversation = Connect(host, port) - node = conversation - node = node.add_child(SetMaxRecordSize(2**16-1)) - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) - node = node.add_child(ExpectServerHello()) - node = node.add_child(ExpectCertificate()) - if dhe: - node = node.add_child(ExpectServerKeyExchange()) - node = node.add_child(ExpectServerHelloDone()) - node = node.add_child(ClientKeyExchangeGenerator()) - node = node.add_child(ChangeCipherSpecGenerator()) - node = node.add_child(FinishedGenerator()) - node = node.add_child(ExpectChangeCipherSpec()) - node = node.add_child(ExpectFinished()) - data = bytearray(b"GET / HTTP/1.0\r\n" + - b"X-test: " + b"A" * (2**14 + 1024 - 28 + 1) + - b"\r\n\r\n") - assert len(data) == 2**14 + 1024 + 1 - node = node.add_child(ApplicationDataGenerator(data)) + # too big plaintext - above TLSCompressed max with _8 ciphers + (conversation, node) = build_conn_graph(host, port, ccm_type_8=True, + dhe=dhe, + application_data_length=(2**14 + 1024 + 1)) + node = node.add_child(ExpectAlert(AlertLevel.fatal, AlertDescription.record_overflow)) node.add_child(ExpectClose()) @@ -699,25 +475,11 @@ def main(): # too small message handling for val in range(16): + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=False) + conversation = Connect(host, port) node = conversation - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) node = node.add_child(ExpectServerHello()) node = node.add_child(ExpectCertificate()) @@ -743,23 +505,9 @@ def main(): for val in range(8): conversation = Connect(host, port) node = conversation - ext = {} - if dhe: - groups = [GroupName.secp256r1, - GroupName.ffdhe2048] - ext[ExtensionType.supported_groups] = SupportedGroupsExtension()\ - .create(groups) - ext[ExtensionType.signature_algorithms] = \ - SignatureAlgorithmsExtension().create(SIG_ALL) - ext[ExtensionType.signature_algorithms_cert] = \ - SignatureAlgorithmsCertExtension().create(SIG_ALL) - ciphers = [CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_DHE_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] - else: - ext = None - ciphers = [CipherSuite.TLS_RSA_WITH_AES_128_CCM_8, - CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV] + + (ext, ciphers) = build_ext_aes128_ccm(dhe, type_8=True) + node = node.add_child(ClientHelloGenerator(ciphers, extensions=ext)) node = node.add_child(ExpectServerHello()) node = node.add_child(ExpectCertificate())