From 9cae1db9b9d5cfe25d38cd65b6c08a5caffb605e Mon Sep 17 00:00:00 2001 From: mkolasinski-splunk Date: Sat, 25 Nov 2023 13:31:30 +0100 Subject: [PATCH] feat: remove hec token pytest argument, add self handling for hec tokne --- pytest_splunk_addon/splunk.py | 74 ++++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/pytest_splunk_addon/splunk.py b/pytest_splunk_addon/splunk.py index 6c0f7607e..f20e4f2d3 100644 --- a/pytest_splunk_addon/splunk.py +++ b/pytest_splunk_addon/splunk.py @@ -29,6 +29,8 @@ from .standard_lib.CIM_Models.datamodel_definition import datamodels import configparser from filelock import FileLock +import sys +import xml.etree.ElementTree as ET RESPONSIVE_SPLUNK_TIMEOUT = 300 # seconds @@ -109,8 +111,8 @@ def pytest_addoption(parser): "--splunk-hec-token", action="store", dest="splunk_hec_token", - default="9b741d03-43e9-4164-908b-e09102327d22", - help='Splunk HTTP event collector token. default is "9b741d03-43e9-4164-908b-e09102327d22" If an external forwarder is used provide HEC token of forwarder.', + help='Splunk HTTP event collector token.', + required=True ) group.addoption( "--splunk-port", @@ -323,7 +325,13 @@ def pytest_addoption(parser): help="Should execute test or not (True|False)", default="True", ) - + group.addoption( + "--sc4s-version", + action="store", + dest="sc4s_version", + default="latest", + help="SC4S version. default is latest", + ) @pytest.fixture(scope="session") def splunk_setup(splunk): @@ -434,7 +442,7 @@ def splunk(request, file_system_prerequisite): @pytest.fixture(scope="session") -def sc4s(request): +def sc4s(request, create_hec_token): """ This fixture based on the passed option will provide a real fixture for external or docker sc4s configuration @@ -455,7 +463,7 @@ def sc4s(request): @pytest.fixture(scope="session") -def uf(request): +def uf(request, create_hec_token): """ This fixture based on the passed option will provide a real fixture for external or docker uf configuration @@ -546,8 +554,8 @@ def splunk_docker( os.environ["SPLUNK_PASSWORD"] = request.config.getoption("splunk_password") os.environ["SPLUNK_VERSION"] = request.config.getoption("splunk_version") os.environ["SC4S_VERSION"] = request.config.getoption("sc4s_version") - LOGGER.info("Starting docker_service=splunk") + if worker_id: # get the temp directory shared by all workers root_tmp_dir = tmp_path_factory.getbasetemp().parent @@ -686,9 +694,56 @@ def splunk_hec_uri(request, splunk): } uri = f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port_hec"]}/services/collector' LOGGER.info("Fetched splunk_hec_uri=%s", uri) - + return splunk_session, uri +@pytest.fixture(scope="session") +def splunk_inputs_uri(request, splunk): + """ + Provides a uri to the Splunk data/inputs/all endpoint + """ + uri = f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port"]}/services/data/inputs/http' + LOGGER.info("Fetched splunk_inputs_uri=%s", uri) + + return uri + +def extract_token_from_xml(xml_content): + root = ET.fromstring(xml_content) + elements_with_name_attrib = [element for element in root.iter() if 'name' in element.attrib] + for element in elements_with_name_attrib: + if element.attrib["name"] == "token": + return element.text + +@pytest.fixture(scope="session") +def create_hec_token(request, splunk_inputs_uri): + splunk_token_name = "splunk_hec_token_psa" + response = requests.post( + splunk_inputs_uri, + verify=False, + auth=(request.config.getoption("splunk_user"), request.config.getoption("splunk_password")), + data=f"name={splunk_token_name}", + ) + try: + response.raise_for_status() + token_value = extract_token_from_xml(response.text) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 409: + existing_token_response = requests.get( + f"{splunk_inputs_uri}/{splunk_token_name}", + verify=False, + auth=(request.config.getoption("splunk_user"), request.config.getoption("splunk_password")), + ) + existing_token_response.raise_for_status() + token_value = extract_token_from_xml(existing_token_response.text) + else: + raise Exception("HEC token creation failed!") + + splunk_session = requests.Session() + splunk_session.headers = { + "Authorization": f'Splunk {token_value}' + } + os.environ["SPLUNK_HEC_TOKEN"] = splunk_session.headers["Authorization"].split(" ")[1] + return splunk_session @pytest.fixture(scope="session") def splunk_web_uri(request, splunk): @@ -701,7 +756,7 @@ def splunk_web_uri(request, splunk): @pytest.fixture(scope="session") -def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup): +def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup, create_hec_token): """ Generates events for the add-on and ingests into Splunk. The ingestion can be done using the following methods: @@ -720,6 +775,7 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup) """ if request.config.getoption("ingest_events").lower() in ["n", "no", "false", "f"]: return + #create_hec_token(splunk_inputs_uri=splunk_inputs_uri) global PYTEST_XDIST_TESTRUNUID if ( "PYTEST_XDIST_WORKER" not in os.environ @@ -732,7 +788,7 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup) "uf_port": uf.get("uf_port"), "uf_username": uf.get("uf_username"), "uf_password": uf.get("uf_password"), - "session_headers": splunk_hec_uri[0].headers, + "session_headers": create_hec_token.headers, "splunk_hec_uri": splunk_hec_uri[1], "sc4s_host": sc4s[0], # for sc4s "sc4s_port": sc4s[1][514], # for sc4s