Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: splunk hec token handling #803

Closed
7 changes: 3 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ version: "3.7"
services:

sc4s:
platform: linux/amd64
image: ghcr.io/splunk/splunk-connect-for-syslog/container2:latest
hostname: sc4s
#When this is enabled test_common will fail
Expand All @@ -38,8 +39,6 @@ services:
- "6514"
stdin_open: true
tty: true
links:
- splunk
environment:
- SPLUNK_HEC_URL=https://splunk:8088
- SPLUNK_HEC_TOKEN=${SPLUNK_HEC_TOKEN}
Expand All @@ -62,6 +61,7 @@ services:
- SC4S_LISTEN_CHECKPOINT_SPLUNK_NOISE_CONTROL=yes

splunk:
platform: linux/amd64
build:
context: .
dockerfile: Dockerfile.splunk
Expand All @@ -81,6 +81,7 @@ services:
- TEST_SC4S_ACTIVATE_EXAMPLES=yes

uf:
platform: linux/amd64
build:
context: .
dockerfile: Dockerfile.uf
Expand All @@ -92,8 +93,6 @@ services:
ports:
- "9997"
- "8089"
links:
- splunk
environment:
- SPLUNK_PASSWORD=Chang3d!
- SPLUNK_START_ARGS=--accept-license
Expand Down
19 changes: 19 additions & 0 deletions pytest_splunk_addon/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

test_generator = None

EXC_MAP = {Exception: 1}


def pytest_configure(config):
"""
Expand Down Expand Up @@ -115,6 +117,7 @@ def pytest_sessionstart(session):
SampleXdistGenerator.tokenized_event_source = session.config.getoption(
"tokenized_event_source"
).lower()
session.__exc_limits = EXC_MAP
if (
SampleXdistGenerator.tokenized_event_source == "store_new"
and session.config.getoption("ingest_events").lower()
Expand Down Expand Up @@ -198,5 +201,21 @@ def init_pytest_splunk_addon_logger():
return logger


def pytest_exception_interact(node, call, report):
artemrys marked this conversation as resolved.
Show resolved Hide resolved
"""
Hook called when an exception is raised during a test.
If the number of occurrences for a specific exception exceeds the limit in session.__exc_limits, pytest exits
https://docs.pytest.org/en/stable/reference/reference.html#pytest.hookspec.pytest_exception_interact
"""
session = node.session
type_ = call.excinfo.type

if type_ in session.__exc_limits:
if session.__exc_limits[type_] == 0:
pytest.exit(f"Reached max exception for type: {type_}")
else:
session.__exc_limits[type_] -= 1


init_pytest_splunk_addon_logger()
LOGGER = logging.getLogger("pytest-splunk-addon")
154 changes: 129 additions & 25 deletions pytest_splunk_addon/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,22 @@
from .standard_lib.CIM_Models.datamodel_definition import datamodels
import configparser
from filelock import FileLock
import defusedxml.ElementTree as ET

RESPONSIVE_SPLUNK_TIMEOUT = 300 # seconds
# Default splunk HEC token name, matching token name when token created via env variable for CI purposes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change it accidentally, it breaks the CI?

SPLUNK_HEC_TOKEN_NAME = "splunk_hec_token"

LOGGER = logging.getLogger("pytest-splunk-addon")
PYTEST_XDIST_TESTRUNUID = ""


class HecTokenNotExistingError(Exception):
def __init__(self, message="An error occurred while getting HEC token."):
self.message = message
super().__init__(self.message)


def pytest_addoption(parser):
"""Add options for interaction with Splunk this allows the tool to work in two modes
1) docker mode which is typically used by developers on their workstation
Expand Down Expand Up @@ -109,8 +118,7 @@ def pytest_addoption(parser):
"--splunk-hec-token",
action="store",
dest="splunk_hec_token",
default="9b741d03-43e9-4164-908b-e09102327d22",
help='Splunk HTTP event collector token. default is "9b741d03-43e9-4164-908b-e09102327d22" If an external forwarder is used provide HEC token of forwarder.',
help="Deprecated option. Splunk HTTP event collector token. If an external forwarder is used provide HEC token of forwarder.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without a default value it is a breaking change?

)
group.addoption(
"--splunk-port",
Expand Down Expand Up @@ -434,7 +442,7 @@ def splunk(request, file_system_prerequisite):


@pytest.fixture(scope="session")
def sc4s(request):
def sc4s(request, get_hec_token):
"""
This fixture based on the passed option will provide a real fixture
for external or docker sc4s configuration
Expand All @@ -455,7 +463,7 @@ def sc4s(request):


@pytest.fixture(scope="session")
def uf(request):
def uf(request, get_hec_token):
"""
This fixture based on the passed option will provide a real fixture
for external or docker uf configuration
Expand Down Expand Up @@ -541,13 +549,12 @@ def splunk_docker(
os.environ["SPLUNK_APP_ID"] = config["package"]["id"]
except Exception:
os.environ["SPLUNK_APP_ID"] = "TA_package"
os.environ["SPLUNK_HEC_TOKEN"] = request.config.getoption("splunk_hec_token")
os.environ["SPLUNK_USER"] = request.config.getoption("splunk_user")
os.environ["SPLUNK_PASSWORD"] = request.config.getoption("splunk_password")
os.environ["SPLUNK_VERSION"] = request.config.getoption("splunk_version")
os.environ["SC4S_VERSION"] = request.config.getoption("sc4s_version")

LOGGER.info("Starting docker_service=splunk")

if worker_id:
# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent
Expand Down Expand Up @@ -576,11 +583,13 @@ def splunk_docker(
docker_services.port_for("splunk", 9997),
)

docker_services.wait_until_responsive(
timeout=180.0,
pause=0.5,
check=lambda: is_responsive_splunk(splunk_info),
)
for _ in range(RESPONSIVE_SPLUNK_TIMEOUT):
if is_responsive_splunk(splunk_info) and is_responsive_hec(
request, splunk_info
):
sleep(20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 20 seconds sleeping if both conditions are okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if both conditions are met, as far as I observed, docker container is not fully initialized yet, it performs still few steps, including cleanup of existing HEC tokens. This sleep assures that we don't perform any actions until docker container is fully spinned. I didn't figure out a better solution (I decided not to parse container logs waiting for "Ansible playbook complete" as I don't think it's needed here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may lead to flaky tests, what if the CI is slower than we expect?

Why can't we improve the already existing code and add 1 more check to lambda?

break
sleep(1)

return splunk_info

Expand Down Expand Up @@ -610,9 +619,9 @@ def splunk_external(request):
)

for _ in range(RESPONSIVE_SPLUNK_TIMEOUT):
if is_responsive_splunk(splunk_info):
break
if is_responsive_hec(request, splunk_info):
if is_responsive_splunk(splunk_info) and is_responsive_hec(
artemrys marked this conversation as resolved.
Show resolved Hide resolved
request, splunk_info
):
break
sleep(1)

Expand Down Expand Up @@ -678,16 +687,110 @@ def splunk_rest_uri(splunk):
@pytest.fixture(scope="session")
def splunk_hec_uri(request, splunk):
"""
Provides a uri to the Splunk hec port
Provides a uri to the Splunk services/collector endpoint
"""
splunk_session = requests.Session()
splunk_session.headers = {
"Authorization": f'Splunk {request.config.getoption("splunk_hec_token")}'
}
uri = f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port_hec"]}/services/collector'
LOGGER.info("Fetched splunk_hec_uri=%s", uri)

return splunk_session, uri
return uri


@pytest.fixture(scope="session")
def splunk_inputs_uri(request, splunk):
"""
Provides a uri to the Splunk services/data/inputs/http endpoint
"""
uri = f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port"]}/services/data/inputs/http'
LOGGER.info("Fetched splunk_inputs_uri=%s", uri)

return uri


@pytest.fixture(scope="session")
def get_hec_token(request, splunk_inputs_uri):
"""
Gets HEC token. If HEC token doesn't exist, creates it in Splunk instance. Exports token value to SPLUNK_HEC_TOKEN env variable.
Returns:
requests.Session: A session with headers containing Authorization: Splunk <HEC token>.
"""
splunk_session = requests.Session()
if request.config.getoption("splunk_hec_token"):
os.environ["SPLUNK_HEC_TOKEN"] = request.config.getoption("splunk_hec_token")
splunk_session.headers = {
"Authorization": f'Splunk {request.config.getoption("splunk_hec_token")}'
}
else:
LOGGER.info(f"Attempting to create HEC token")
try:
token_value = _get_existing_token(
request, splunk_inputs_uri, SPLUNK_HEC_TOKEN_NAME
)
LOGGER.info(f"Retrieved HEC token: {token_value}")
except HecTokenNotExistingError:
_create_new_token(request, splunk_inputs_uri, SPLUNK_HEC_TOKEN_NAME)
token_value = _get_existing_token(
request, splunk_inputs_uri, SPLUNK_HEC_TOKEN_NAME
)
LOGGER.info(f"Created HEC token: {token_value}")
except Exception as e:
sleep(5)
LOGGER.error(f"Failed to create HEC token: {e}")
raise

splunk_session.headers = {"Authorization": f"Splunk {token_value}"}
os.environ["SPLUNK_HEC_TOKEN"] = token_value

return splunk_session


def _create_new_token(request, splunk_inputs_uri, splunk_token_name):
try:
response = requests.post( # nosemgrep: splunk.disabled-cert-validation
splunk_inputs_uri,
verify=False,
auth=(
request.config.getoption("splunk_user"),
request.config.getoption("splunk_password"),
),
data=f"name={splunk_token_name}",
params={"output_mode": "json"},
)
response.raise_for_status()
token_value = response.json()["entry"][0]["content"]["token"]
return token_value

except requests.exceptions.HTTPError as e:
if e.response.status_code == 409:
# Token already exists; attempt to retrieve the existing token
return _get_existing_token(request, splunk_inputs_uri, splunk_token_name)
else:
raise Exception(f"HTTP error during token creation: {e}") from e

except Exception as e:
LOGGER.error(f"An error occurred during HEC token creation: {e}")
raise Exception(f"HEC token creation failed: {e}") from e


def _get_existing_token(request, splunk_inputs_uri, splunk_token_name):
try:
response = requests.get( # nosemgrep: splunk.disabled-cert-validation
f"{splunk_inputs_uri}/{splunk_token_name}",
artemrys marked this conversation as resolved.
Show resolved Hide resolved
verify=False,
auth=(
request.config.getoption("splunk_user"),
request.config.getoption("splunk_password"),
),
params={"output_mode": "json"},
)
response.raise_for_status()
token_value = response.json()["entry"][0]["content"]["token"]
return token_value

except Exception as e:
LOGGER.error(f"Failed to retrieve existing HEC token: {e}")
raise HecTokenNotExistingError(
f"Failed to retrieve existing HEC token: {e}"
) from e


@pytest.fixture(scope="session")
Expand All @@ -701,7 +804,9 @@ def splunk_web_uri(request, splunk):


@pytest.fixture(scope="session")
def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup):
def splunk_ingest_data(
request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup, get_hec_token
):
"""
Generates events for the add-on and ingests into Splunk.
The ingestion can be done using the following methods:
Expand Down Expand Up @@ -732,8 +837,8 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup)
"uf_port": uf.get("uf_port"),
"uf_username": uf.get("uf_username"),
"uf_password": uf.get("uf_password"),
"session_headers": splunk_hec_uri[0].headers,
"splunk_hec_uri": splunk_hec_uri[1],
"session_headers": get_hec_token.headers,
"splunk_hec_uri": splunk_hec_uri,
"sc4s_host": sc4s[0], # for sc4s
"sc4s_port": sc4s[1][514], # for sc4s
}
Expand Down Expand Up @@ -900,8 +1005,7 @@ def is_responsive_hec(request, splunk):
"""
try:
LOGGER.info(
"Trying to connect Splunk HEC... splunk=%s",
json.dumps(splunk),
f"Trying to connect Splunk HEC... splunk={splunk['forwarder_host']}:{splunk['port_hec']}/services/collector/health/1.0"
)
response = requests.get( # nosemgrep: splunk.disabled-cert-validation
f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port_hec"]}/services/collector/health/1.0',
Expand Down
Loading