Skip to content

Commit

Permalink
feat: remove hec token pytest argument, add self handling for hec tokne
Browse files Browse the repository at this point in the history
  • Loading branch information
mkolasinski-splunk committed Nov 25, 2023
1 parent 1557b3d commit 9cae1db
Showing 1 changed file with 65 additions and 9 deletions.
74 changes: 65 additions & 9 deletions pytest_splunk_addon/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from .standard_lib.CIM_Models.datamodel_definition import datamodels
import configparser
from filelock import FileLock
import sys
import xml.etree.ElementTree as ET

RESPONSIVE_SPLUNK_TIMEOUT = 300 # seconds

Expand Down Expand Up @@ -109,8 +111,8 @@ def pytest_addoption(parser):
"--splunk-hec-token",
action="store",
dest="splunk_hec_token",
default="9b741d03-43e9-4164-908b-e09102327d22",
help='Splunk HTTP event collector token. default is "9b741d03-43e9-4164-908b-e09102327d22" If an external forwarder is used provide HEC token of forwarder.',
help='Splunk HTTP event collector token.',
required=True
)
group.addoption(
"--splunk-port",
Expand Down Expand Up @@ -323,7 +325,13 @@ def pytest_addoption(parser):
help="Should execute test or not (True|False)",
default="True",
)

group.addoption(
"--sc4s-version",
action="store",
dest="sc4s_version",
default="latest",
help="SC4S version. default is latest",
)

@pytest.fixture(scope="session")
def splunk_setup(splunk):
Expand Down Expand Up @@ -434,7 +442,7 @@ def splunk(request, file_system_prerequisite):


@pytest.fixture(scope="session")
def sc4s(request):
def sc4s(request, create_hec_token):
"""
This fixture based on the passed option will provide a real fixture
for external or docker sc4s configuration
Expand All @@ -455,7 +463,7 @@ def sc4s(request):


@pytest.fixture(scope="session")
def uf(request):
def uf(request, create_hec_token):
"""
This fixture based on the passed option will provide a real fixture
for external or docker uf configuration
Expand Down Expand Up @@ -546,8 +554,8 @@ def splunk_docker(
os.environ["SPLUNK_PASSWORD"] = request.config.getoption("splunk_password")
os.environ["SPLUNK_VERSION"] = request.config.getoption("splunk_version")
os.environ["SC4S_VERSION"] = request.config.getoption("sc4s_version")

LOGGER.info("Starting docker_service=splunk")

if worker_id:
# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent
Expand Down Expand Up @@ -686,9 +694,56 @@ def splunk_hec_uri(request, splunk):
}
uri = f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port_hec"]}/services/collector'
LOGGER.info("Fetched splunk_hec_uri=%s", uri)

return splunk_session, uri

@pytest.fixture(scope="session")
def splunk_inputs_uri(request, splunk):
"""
Provides a uri to the Splunk data/inputs/all endpoint
"""
uri = f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port"]}/services/data/inputs/http'
LOGGER.info("Fetched splunk_inputs_uri=%s", uri)

return uri

def extract_token_from_xml(xml_content):
root = ET.fromstring(xml_content)
elements_with_name_attrib = [element for element in root.iter() if 'name' in element.attrib]
for element in elements_with_name_attrib:
if element.attrib["name"] == "token":
return element.text

@pytest.fixture(scope="session")
def create_hec_token(request, splunk_inputs_uri):
splunk_token_name = "splunk_hec_token_psa"
response = requests.post(
splunk_inputs_uri,
verify=False,
auth=(request.config.getoption("splunk_user"), request.config.getoption("splunk_password")),
data=f"name={splunk_token_name}",
)
try:
response.raise_for_status()
token_value = extract_token_from_xml(response.text)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 409:
existing_token_response = requests.get(
f"{splunk_inputs_uri}/{splunk_token_name}",
verify=False,
auth=(request.config.getoption("splunk_user"), request.config.getoption("splunk_password")),
)
existing_token_response.raise_for_status()
token_value = extract_token_from_xml(existing_token_response.text)
else:
raise Exception("HEC token creation failed!")

splunk_session = requests.Session()
splunk_session.headers = {
"Authorization": f'Splunk {token_value}'
}
os.environ["SPLUNK_HEC_TOKEN"] = splunk_session.headers["Authorization"].split(" ")[1]
return splunk_session

@pytest.fixture(scope="session")
def splunk_web_uri(request, splunk):
Expand All @@ -701,7 +756,7 @@ def splunk_web_uri(request, splunk):


@pytest.fixture(scope="session")
def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup):
def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup, create_hec_token):
"""
Generates events for the add-on and ingests into Splunk.
The ingestion can be done using the following methods:
Expand All @@ -720,6 +775,7 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup)
"""
if request.config.getoption("ingest_events").lower() in ["n", "no", "false", "f"]:
return
#create_hec_token(splunk_inputs_uri=splunk_inputs_uri)
global PYTEST_XDIST_TESTRUNUID
if (
"PYTEST_XDIST_WORKER" not in os.environ
Expand All @@ -732,7 +788,7 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup)
"uf_port": uf.get("uf_port"),
"uf_username": uf.get("uf_username"),
"uf_password": uf.get("uf_password"),
"session_headers": splunk_hec_uri[0].headers,
"session_headers": create_hec_token.headers,
"splunk_hec_uri": splunk_hec_uri[1],
"sc4s_host": sc4s[0], # for sc4s
"sc4s_port": sc4s[1][514], # for sc4s
Expand Down

0 comments on commit 9cae1db

Please sign in to comment.