diff --git a/stix2/test/v21/test_artifact.py b/stix2/test/v21/test_artifact.py index 5c0e2eb4..2c68c0d2 100644 --- a/stix2/test/v21/test_artifact.py +++ b/stix2/test/v21/test_artifact.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -35,6 +33,7 @@ True, ) + def test_basic_artifact(): store.sink.generate_stix_schema() artifact_stix_object = stix2.parse(basic_artifact_dict) @@ -44,6 +43,7 @@ def test_basic_artifact(): for attrib in basic_artifact_dict.keys(): assert basic_artifact_dict[attrib] == read_obj[attrib] + def test_encrypted_artifact(): store.sink.generate_stix_schema() artifact_stix_object = stix2.parse(encrypted_artifact_dict) @@ -51,6 +51,6 @@ def test_encrypted_artifact(): read_obj = json.loads(store.get(artifact_stix_object['id']).serialize()) for attrib in encrypted_artifact_dict.keys(): - if attrib == 'hashes': # TODO hashes are saved to separate table, functionality to retrieve is WIP + if attrib == 'hashes': # TODO hashes are saved to separate table, functionality to retrieve is WIP continue - assert encrypted_artifact_dict[attrib] == read_obj[attrib] \ No newline at end of file + assert encrypted_artifact_dict[attrib] == read_obj[attrib] diff --git a/stix2/test/v21/test_autonomous_system.py b/stix2/test/v21/test_autonomous_system.py index 81399446..82fb4f10 100644 --- a/stix2/test/v21/test_autonomous_system.py +++ b/stix2/test/v21/test_autonomous_system.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore @@ -21,6 +19,7 @@ True, ) + def test_autonomous_system(): store.sink.generate_stix_schema() as_obj = stix2.parse(as_dict) @@ -28,4 +27,4 @@ def test_autonomous_system(): read_obj = json.loads(store.get(as_obj['id']).serialize()) for attrib in as_dict.keys(): - assert as_dict[attrib] == read_obj[attrib] \ No newline at end of file + assert as_dict[attrib] == read_obj[attrib] diff --git a/stix2/test/v21/test_directory.py b/stix2/test/v21/test_directory.py index 5896f25d..2f413866 100644 --- a/stix2/test/v21/test_directory.py +++ b/stix2/test/v21/test_directory.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore @@ -26,6 +24,7 @@ True, ) + def test_directory(): store.sink.generate_stix_schema() directory_obj = stix2.parse(directory_dict) @@ -33,10 +32,9 @@ def test_directory(): read_obj = json.loads(store.get(directory_obj['id']).serialize()) for attrib in directory_dict.keys(): - if attrib == "contains_refs": # TODO remove skip once we can pull from table join + if attrib == "contains_refs": # TODO remove skip once we can pull from table join continue - if attrib == "ctime" or attrib == "mtime": # convert both into stix2 date format for consistency + if attrib == "ctime" or attrib == "mtime": # convert both into stix2 date format for consistency assert stix2.utils.parse_into_datetime(directory_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert directory_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_domain_name.py b/stix2/test/v21/test_domain_name.py index 42c60ae2..93f30d54 100644 --- a/stix2/test/v21/test_domain_name.py +++ b/stix2/test/v21/test_domain_name.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore @@ -19,6 +17,7 @@ True, ) + def test_autonomous_system(): store.sink.generate_stix_schema() domain_name_obj = stix2.parse(domain_name_dict) @@ -26,4 +25,4 @@ def test_autonomous_system(): read_obj = json.loads(store.get(domain_name_obj['id']).serialize()) for attrib in domain_name_dict.keys(): - assert domain_name_dict[attrib] == read_obj[attrib] \ No newline at end of file + assert domain_name_dict[attrib] == read_obj[attrib] diff --git a/stix2/test/v21/test_email_address.py b/stix2/test/v21/test_email_address.py index a0992979..6a00daef 100644 --- a/stix2/test/v21/test_email_address.py +++ b/stix2/test/v21/test_email_address.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -22,6 +20,7 @@ True, ) + def test_email_addr(): store.sink.generate_stix_schema() email_addr_stix_object = stix2.parse(email_addr_dict) diff --git a/stix2/test/v21/test_email_message.py b/stix2/test/v21/test_email_message.py index 7fad0946..038b3274 100644 --- a/stix2/test/v21/test_email_message.py +++ b/stix2/test/v21/test_email_message.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -63,19 +61,19 @@ }, "body_multipart": [ { - "content_type": "text/plain; charset=utf-8", - "content_disposition": "inline", - "body": "Cats are funny!", + "content_type": "text/plain; charset=utf-8", + "content_disposition": "inline", + "body": "Cats are funny!", }, { - "content_type": "image/png", - "content_disposition": "attachment; filename=\"tabby.png\"", - "body_raw_ref": "artifact--4cce66f8-6eaa-53cb-85d5-3a85fca3a6c5", + "content_type": "image/png", + "content_disposition": "attachment; filename=\"tabby.png\"", + "body_raw_ref": "artifact--4cce66f8-6eaa-53cb-85d5-3a85fca3a6c5", }, { - "content_type": "application/zip", - "content_disposition": "attachment; filename=\"tabby_pics.zip\"", - "body_raw_ref": "file--6ce09d9c-0ad3-5ebf-900c-e3cb288955b5", + "content_type": "application/zip", + "content_disposition": "attachment; filename=\"tabby_pics.zip\"", + "body_raw_ref": "file--6ce09d9c-0ad3-5ebf-900c-e3cb288955b5", }, ], } @@ -87,6 +85,7 @@ True, ) + def test_email_msg(): store.sink.generate_stix_schema() email_msg_stix_object = stix2.parse(email_msg_dict) @@ -95,13 +94,16 @@ def test_email_msg(): for attrib in email_msg_dict.keys(): if attrib == "to_refs" or attrib == "cc_refs" or attrib == "bcc_refs" \ - or attrib == "additional_header_fields": # join multiple tables not implemented yet + or attrib == "additional_header_fields": # join multiple tables not implemented yet continue if attrib == "date": - assert stix2.utils.parse_into_datetime(email_msg_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) + assert stix2.utils.parse_into_datetime(email_msg_dict[attrib]) == stix2.utils.parse_into_datetime( + read_obj[attrib], + ) continue assert email_msg_dict[attrib] == read_obj[attrib] + def test_multipart_email_msg(): store.sink.generate_stix_schema() multipart_email_msg_stix_object = stix2.parse(multipart_email_msg_dict) @@ -110,10 +112,11 @@ def test_multipart_email_msg(): for attrib in multipart_email_msg_dict.keys(): if attrib == "to_refs" or attrib == "cc_refs" or attrib == "bcc_refs" \ - or attrib == "additional_header_fields" or attrib == "body_multipart": # join multiple tables not implemented yet + or attrib == "additional_header_fields" or attrib == "body_multipart": # join multiple tables not implemented yet continue if attrib == "date": - assert stix2.utils.parse_into_datetime(multipart_email_msg_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) + assert stix2.utils.parse_into_datetime(multipart_email_msg_dict[attrib]) == stix2.utils.parse_into_datetime( + read_obj[attrib], + ) continue assert multipart_email_msg_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_file.py b/stix2/test/v21/test_file.py index 12c1d5ee..647f81e5 100644 --- a/stix2/test/v21/test_file.py +++ b/stix2/test/v21/test_file.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -36,6 +34,7 @@ True, ) + def test_file(): store.sink.generate_stix_schema() file_stix_object = stix2.parse(file_dict) @@ -44,10 +43,9 @@ def test_file(): read_obj = json.loads(store.get(file_stix_object['id']).serialize()) for attrib in file_dict.keys(): - if attrib == "contains_refs" or attrib == "hashes": # join multiple tables not implemented yet + if attrib == "contains_refs" or attrib == "hashes": # join multiple tables not implemented yet continue if attrib == "ctime" or attrib == "mtime" or attrib == "atime": assert stix2.utils.parse_into_datetime(file_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert file_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_ipv4_ipv6.py b/stix2/test/v21/test_ipv4_ipv6.py index a3755b3b..c32197d4 100644 --- a/stix2/test/v21/test_ipv4_ipv6.py +++ b/stix2/test/v21/test_ipv4_ipv6.py @@ -1,7 +1,3 @@ -import json - -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -27,6 +23,7 @@ True, ) + def test_ipv4(): store.sink.generate_stix_schema() ipv4_stix_object = stix2.parse(ipv4_dict) @@ -45,4 +42,3 @@ def test_ipv6(): for attrib in ipv6_dict.keys(): assert ipv6_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_mutex.py b/stix2/test/v21/test_mutex.py index e7df076b..55fdd5d2 100644 --- a/stix2/test/v21/test_mutex.py +++ b/stix2/test/v21/test_mutex.py @@ -1,7 +1,3 @@ -import json - -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -20,6 +16,7 @@ True, ) + def test_mutex(): store.sink.generate_stix_schema() mutex_stix_object = stix2.parse(mutex_dict) @@ -28,4 +25,3 @@ def test_mutex(): for attrib in mutex_dict.keys(): assert mutex_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_network_traffic.py b/stix2/test/v21/test_network_traffic.py index f24e594f..ddb47a49 100644 --- a/stix2/test/v21/test_network_traffic.py +++ b/stix2/test/v21/test_network_traffic.py @@ -1,7 +1,3 @@ -import json - -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -29,7 +25,7 @@ "dst_packets": 100, "src_payload_ref": "artifact--3857f78d-7d16-5092-99fe-ecff58408b02", "dst_payload_ref": "artifact--3857f78d-7d16-5092-99fe-ecff58408b03", - "encapsulates_refs" : [ + "encapsulates_refs": [ "network-traffic--53e0bf48-2eee-5c03-8bde-ed7049d2c0a3", "network-traffic--53e0bf48-2eee-5c03-8bde-ed7049d2c0a4", ], @@ -43,6 +39,7 @@ True, ) + def test_network_traffic(): store.sink.generate_stix_schema() network_traffic_stix_object = stix2.parse(network_traffic_dict) @@ -50,12 +47,9 @@ def test_network_traffic(): read_obj = store.get(network_traffic_stix_object['id']) for attrib in network_traffic_dict.keys(): - if attrib == "encapsulates_refs": # multiple table join not implemented + if attrib == "encapsulates_refs": # multiple table join not implemented continue if attrib == "start" or attrib == "end": assert stix2.utils.parse_into_datetime(network_traffic_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert network_traffic_dict[attrib] == read_obj[attrib] - - - diff --git a/stix2/test/v21/test_process.py b/stix2/test/v21/test_process.py index c58f9c68..519fd2d1 100644 --- a/stix2/test/v21/test_process.py +++ b/stix2/test/v21/test_process.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -37,6 +35,7 @@ True, ) + def test_process(): store.sink.generate_stix_schema() process_stix_object = stix2.parse(process_dict) @@ -45,10 +44,10 @@ def test_process(): read_obj = json.loads(store.get(process_stix_object['id']).serialize()) for attrib in process_dict.keys(): - if attrib == "child_refs" or attrib == "opened_connection_refs" or attrib == "environment_variables": # join multiple tables not implemented yet + if attrib == "child_refs" or attrib == "opened_connection_refs" or attrib == "environment_variables": + # join multiple tables not implemented yet continue if attrib == "created_time": assert stix2.utils.parse_into_datetime(process_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert process_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_software.py b/stix2/test/v21/test_software.py index a2a201ac..896e9c2a 100644 --- a/stix2/test/v21/test_software.py +++ b/stix2/test/v21/test_software.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -23,6 +21,7 @@ True, ) + def test_software(): store.sink.generate_stix_schema() software_stix_object = stix2.parse(software_dict) @@ -32,4 +31,3 @@ def test_software(): for attrib in software_dict.keys(): assert software_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_url.py b/stix2/test/v21/test_url.py index d3a331e1..838cbfbb 100644 --- a/stix2/test/v21/test_url.py +++ b/stix2/test/v21/test_url.py @@ -1,8 +1,5 @@ -import datetime as dt import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -10,7 +7,7 @@ url_dict = { "type": "url", "id": "url--a5477287-23ac-5971-a010-5c287877fa60", - "value" : "https://example.com/research/index.html", + "value": "https://example.com/research/index.html", } store = RelationalDBStore( @@ -20,6 +17,7 @@ True, ) + def test_url(): store.sink.generate_stix_schema() url_stix_object = stix2.parse(url_dict) @@ -27,4 +25,4 @@ def test_url(): read_obj = json.loads(store.get(url_stix_object['id']).serialize()) for attrib in url_dict.keys(): - assert url_dict[attrib] == read_obj[attrib] \ No newline at end of file + assert url_dict[attrib] == read_obj[attrib] diff --git a/stix2/test/v21/test_user_account.py b/stix2/test/v21/test_user_account.py index 80c2fd14..374c2377 100644 --- a/stix2/test/v21/test_user_account.py +++ b/stix2/test/v21/test_user_account.py @@ -1,8 +1,5 @@ -import datetime as dt import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -28,12 +25,13 @@ } store = RelationalDBStore( - "postgresql://postgres:admin@localhost/postgres", - False, - None, - True, + "postgresql://postgres:admin@localhost/postgres", + False, + None, + True, ) + def test_user_account(): store.sink.generate_stix_schema() user_account_stix_object = stix2.parse(user_account_dict) @@ -42,9 +40,10 @@ def test_user_account(): for attrib in user_account_dict.keys(): if attrib == "account_created" or attrib == "account_expires" \ - or attrib == "credential_last_changed" or attrib == "account_first_login" \ + or attrib == "credential_last_changed" or attrib == "account_first_login" \ or attrib == "account_last_login": - assert stix2.utils.parse_into_datetime(user_account_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) - continue + assert stix2.utils.parse_into_datetime(user_account_dict[attrib]) == stix2.utils.parse_into_datetime( + read_obj[attrib], + ) + continue assert user_account_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_windows_registry.py b/stix2/test/v21/test_windows_registry.py index 582a1ced..f2864548 100644 --- a/stix2/test/v21/test_windows_registry.py +++ b/stix2/test/v21/test_windows_registry.py @@ -1,8 +1,5 @@ -import datetime as dt import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -14,14 +11,14 @@ "key": "hkey_local_machine\\system\\bar\\foo", "values": [ { - "name": "Foo", - "data": "qwerty", - "data_type": "REG_SZ", + "name": "Foo", + "data": "qwerty", + "data_type": "REG_SZ", }, { - "name": "Bar", - "data": "42", - "data_type": "REG_DWORD", + "name": "Bar", + "data": "42", + "data_type": "REG_DWORD", }, ], "modified_time": "2018-01-20T12:31:12Z", @@ -30,12 +27,13 @@ } store = RelationalDBStore( - "postgresql://postgres:admin@localhost/postgres", - False, - None, - True, + "postgresql://postgres:admin@localhost/postgres", + False, + None, + True, ) + def test_windows_registry(): store.sink.generate_stix_schema() windows_registry_stix_object = stix2.parse(windows_registry_dict) @@ -43,10 +41,11 @@ def test_windows_registry(): read_obj = json.loads(store.get(windows_registry_stix_object['id']).serialize()) for attrib in windows_registry_dict.keys(): - if attrib == "values": # skip multiple table join + if attrib == "values": # skip multiple table join continue if attrib == "modified_time": - assert stix2.utils.parse_into_datetime(windows_registry_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) + assert stix2.utils.parse_into_datetime(windows_registry_dict[attrib]) == stix2.utils.parse_into_datetime( + read_obj[attrib], + ) continue assert windows_registry_dict[attrib] == read_obj[attrib] - diff --git a/stix2/test/v21/test_x509_certificates.py b/stix2/test/v21/test_x509_certificates.py index d3ec5b7f..1847dbc2 100644 --- a/stix2/test/v21/test_x509_certificates.py +++ b/stix2/test/v21/test_x509_certificates.py @@ -1,7 +1,5 @@ import json -import pytest - import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore import stix2.properties @@ -20,41 +18,42 @@ } extensions_x509_certificate_dict = { - "type":"x509-certificate", + "type": "x509-certificate", "spec_version": "2.1", "id": "x509-certificate--b595eaf0-0b28-5dad-9e8e-0fab9c1facc9", - "issuer":"C=ZA, ST=Western Cape, L=Cape Town, O=Thawte Consulting cc, OU=Certification \ + "issuer": "C=ZA, ST=Western Cape, L=Cape Town, O=Thawte Consulting cc, OU=Certification \ Services Division, CN=Thawte Server CA/emailAddress=server-certs@thawte.com", - "validity_not_before":"2016-03-12T12:00:00Z", - "validity_not_after":"2016-08-21T12:00:00Z", - "subject":"C=US, ST=Maryland, L=Pasadena, O=Brent Baccala, OU=FreeSoft, \ + "validity_not_before": "2016-03-12T12:00:00Z", + "validity_not_after": "2016-08-21T12:00:00Z", + "subject": "C=US, ST=Maryland, L=Pasadena, O=Brent Baccala, OU=FreeSoft, \ CN=www.freesoft.org/emailAddress=baccala@freesoft.org", "serial_number": "02:08:87:83:f2:13:58:1f:79:52:1e:66:90:0a:02:24:c9:6b:c7:dc", - "x509_v3_extensions":{ - "basic_constraints":"critical,CA:TRUE, pathlen:0", - "name_constraints":"permitted;IP:192.168.0.0/255.255.0.0", - "policy_constraints":"requireExplicitPolicy:3", - "key_usage":"critical, keyCertSign", - "extended_key_usage":"critical,codeSigning,1.2.3.4", - "subject_key_identifier":"hash", - "authority_key_identifier":"keyid,issuer", - "subject_alternative_name":"email:my@other.address,RID:1.2.3.4", - "issuer_alternative_name":"issuer:copy", - "crl_distribution_points":"URI:http://myhost.com/myca.crl", - "inhibit_any_policy":"2", - "private_key_usage_period_not_before":"2016-03-12T12:00:00Z", - "private_key_usage_period_not_after":"2018-03-12T12:00:00Z", - "certificate_policies":"1.2.4.5, 1.1.3.4", + "x509_v3_extensions": { + "basic_constraints": "critical,CA:TRUE, pathlen:0", + "name_constraints": "permitted;IP:192.168.0.0/255.255.0.0", + "policy_constraints": "requireExplicitPolicy:3", + "key_usage": "critical, keyCertSign", + "extended_key_usage": "critical,codeSigning,1.2.3.4", + "subject_key_identifier": "hash", + "authority_key_identifier": "keyid,issuer", + "subject_alternative_name": "email:my@other.address,RID:1.2.3.4", + "issuer_alternative_name": "issuer:copy", + "crl_distribution_points": "URI:http://myhost.com/myca.crl", + "inhibit_any_policy": "2", + "private_key_usage_period_not_before": "2016-03-12T12:00:00Z", + "private_key_usage_period_not_after": "2018-03-12T12:00:00Z", + "certificate_policies": "1.2.4.5, 1.1.3.4", }, } store = RelationalDBStore( - "postgresql://postgres:admin@localhost/postgres", - False, - None, - True, + "postgresql://postgres:admin@localhost/postgres", + False, + None, + True, ) + def test_basic_x509_certificate(): store.sink.generate_stix_schema() basic_x509_certificate_stix_object = stix2.parse(basic_x509_certificate_dict) @@ -63,10 +62,13 @@ def test_basic_x509_certificate(): for attrib in basic_x509_certificate_dict.keys(): if attrib == "validity_not_before" or attrib == "validity_not_after": - assert stix2.utils.parse_into_datetime(basic_x509_certificate_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) + assert stix2.utils.parse_into_datetime( + basic_x509_certificate_dict[attrib], + ) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert basic_x509_certificate_dict[attrib] == read_obj[attrib] + def test_x509_certificate_with_extensions(): store.sink.generate_stix_schema() extensions_x509_certificate_stix_object = stix2.parse(extensions_x509_certificate_dict) @@ -74,11 +76,11 @@ def test_x509_certificate_with_extensions(): read_obj = json.loads(store.get(extensions_x509_certificate_stix_object['id']).serialize()) for attrib in extensions_x509_certificate_dict.keys(): - if attrib == "x509_v3_extensions": # skipping multi-table join + if attrib == "x509_v3_extensions": # skipping multi-table join continue if attrib == "validity_not_before" or attrib == "validity_not_after": - assert stix2.utils.parse_into_datetime(extensions_x509_certificate_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) + assert stix2.utils.parse_into_datetime( + extensions_x509_certificate_dict[attrib], + ) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert extensions_x509_certificate_dict[attrib] == read_obj[attrib] - -