Skip to content

Commit

Permalink
feat: New spec and parser for"auditctl -l" (#3496)
Browse files Browse the repository at this point in the history
* feat: New spec "auditctl -l"

* Add a new module for all "auditctl" commands

Signed-off-by: Huanhuan Li <[email protected]>

* Replace the parent class "LegacyItemAccess" to "dict"

* Raise "ParseException" when the line isn't in expected format instead of pass it,
  maybe we need to enhance the parser.
  update the module docstring.
  raise SkipException when there is no known status output

Signed-off-by: Huanhuan Li <[email protected]>

* Rename "AuditdStatus" to "AuditStatus"

Signed-off-by: Huanhuan Li <[email protected]>

Signed-off-by: Huanhuan Li <[email protected]>
  • Loading branch information
huali027 authored Aug 18, 2022
1 parent 1547559 commit 00566b0
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 0 deletions.
3 changes: 3 additions & 0 deletions docs/shared_parsers_catalog/auditctl.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. automodule:: insights.parsers.auditctl
:members:
:show-inheritance:
111 changes: 111 additions & 0 deletions insights/parsers/auditctl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
AuditCtl - command ``auditctl xxx``
===================================
This module contains the following parsers:
AuditRules - command ``auditctl -l``
------------------------------------
AuditStatus - command ``auditctl -s``
-------------------------------------
"""

from insights import parser, CommandParser
from insights.parsers import ParseException, SkipException
from insights.specs import Specs


@parser(Specs.auditctl_rules)
class AuditRules(CommandParser, list):
"""
Class for parsing the `auditctl -l` command.
All lines are stored in a list.
Typical output of the command is::
-w /etc/selinux -p wa -k MAC-policy
-a always,exit -F arch=b32 -S chmod -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chmod -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b32 -S chmod -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chmod -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b32 -S chown -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chown -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b32 -S chown -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chown -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
Examples:
>>> type(audit_rules)
<class 'insights.parsers.auditctl.AuditRules'>
>>> len(audit_rules)
9
>>> '-w /etc/selinux -p wa -k MAC-policy' in audit_rules
True
Raises:
SkipException: When there are not rules.
"""

def parse_content(self, content):
if len(content) == 1 and content[0].lower().strip() == 'no rules':
raise SkipException
for line in content:
if line.strip():
self.append(line.strip())
if not self:
raise SkipException('No rules found')


@parser(Specs.auditctl_status)
class AuditStatus(CommandParser, dict):
"""
Module for parsing the output of the ``auditctl -s`` command.
Typical output on RHEL6 looks like::
AUDIT_STATUS: enabled=1 flag=1 pid=1483 rate_limit=0 backlog_limit=8192 lost=3 backlog=0
, while on RHEL7 and later, the output changes to::
enabled 1
failure 1
pid 947
rate_limit 0
backlog_limit 320
lost 0
backlog 0
loginuid_immutable 0 unlocked
Example:
>>> type(auds)
<class 'insights.parsers.auditctl.AuditStatus'>
>>> "enabled" in auds
True
>>> auds['enabled']
1
"""
def parse_content(self, content):
if not content:
raise SkipException("Input content is empty.")
if len(content) > 1:
for line in content:
k, v = line.split(None, 1)
# Mind the 'loginuid_immutable' on RHEL7
if k.strip() == "loginuid_immutable":
self[k.strip()] = v.strip()
else:
try:
self[k.strip()] = int(v.strip())
except ValueError:
raise ParseException('Unexpected type in line %s' % line)
if len(content) == 1:
line = list(content)[0].strip()
if line.startswith("AUDIT_STATUS:"):
for item in line.split(None)[1:]:
try:
k, v = item.split('=')
self[k.strip()] = int(v.strip())
except ValueError:
raise ParseException('Unexpected type in line %s ' % line)
if not self:
raise SkipException('There is no content in the status output.')
13 changes: 13 additions & 0 deletions insights/parsers/auditctl_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
from .. import parser, CommandParser, LegacyItemAccess
from ..parsers import ParseException
from ..specs import Specs
from insights.util import deprecated


@parser(Specs.auditctl_status)
class AuditctlStatus(LegacyItemAccess, CommandParser):
"""
.. warning::
This parser is deprecated, please use
:py:class:`insights.parsers.auditctl.AuditdStatus` instead.
Module for parsing the output of the ``auditctl -s`` command.
Typical output on RHEL6 looks like::
Expand All @@ -36,6 +41,14 @@ class AuditctlStatus(LegacyItemAccess, CommandParser):
>>> auds['enabled']
1
"""
def __init__(self, context):
deprecated(
AuditctlStatus,
"Please use the :class:`insights.parsers.auditctl.AuditdStatus` instead.",
"3.1.25"
)
super(AuditctlStatus, self).__init__(context)

def parse_content(self, content):
if not content:
raise ParseException("Input content is empty.")
Expand Down
1 change: 1 addition & 0 deletions insights/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Specs(SpecSet):
alternatives_display_python = RegistryPoint()
amq_broker = RegistryPoint(multi_output=True)
ansible_host = RegistryPoint()
auditctl_rules = RegistryPoint()
auditctl_status = RegistryPoint()
auditd_conf = RegistryPoint()
audit_log = RegistryPoint(filterable=True)
Expand Down
1 change: 1 addition & 0 deletions insights/specs/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class DefaultSpecs(Specs):
alternatives_display_python = simple_command("/usr/sbin/alternatives --display python")
amq_broker = glob_file("/var/opt/amq-broker/*/etc/broker.xml")
dse_ldif = glob_file("/etc/dirsrv/*/dse.ldif")
auditctl_rules = simple_command("/sbin/auditctl -l")
auditctl_status = simple_command("/sbin/auditctl -s")
auditd_conf = simple_file("/etc/audit/auditd.conf")
audit_log = simple_file("/var/log/audit/audit.log")
Expand Down
1 change: 1 addition & 0 deletions insights/specs/insights_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class InsightsArchiveSpecs(Specs):
all_installed_rpms = glob_file("insights_commands/rpm_-qa*")
alternatives_display_python = simple_file("insights_commands/alternatives_--display_python")
ansible_host = simple_file("ansible_host")
auditctl_rules = simple_file("insights_commands/auditctl_-l")
auditctl_status = simple_file("insights_commands/auditctl_-s")
aws_instance_id_doc = simple_file("insights_commands/python_-m_insights.tools.cat_--no-header_aws_instance_id_doc")
aws_instance_id_pkcs7 = simple_file("insights_commands/python_-m_insights.tools.cat_--no-header_aws_instance_id_pkcs7")
Expand Down
1 change: 1 addition & 0 deletions insights/specs/sos_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

class SosSpecs(Specs):
alternatives_display_python = simple_file("sos_commands/alternatives/alternatives_--display_python")
auditctl_rules = simple_file("sos_commands/auditd/auditctl_-l")
auditctl_status = simple_file("sos_commands/auditd/auditctl_-s")
auditd_conf = simple_file("/etc/audit/auditd.conf")
autofs_conf = simple_file("/etc/autofs.conf")
Expand Down
125 changes: 125 additions & 0 deletions insights/tests/parsers/test_auditctl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import pytest
import doctest

from insights.tests import context_wrap
from insights.parsers import auditctl
from insights.parsers.auditctl import AuditStatus, AuditRules
from insights.parsers import ParseException, SkipException


NORMAL_AUDS_RHEL6 = """
AUDIT_STATUS: enabled=1 flag=1 pid=1483 rate_limit=0 backlog_limit=8192 lost=3 backlog=0
""".strip()

BAD_AUDS_RHEL6 = """
AUDIT_STATUS: enabled=1 flag=1 pid=1483 rate_limit=0 backlog_limit=8192 lost=3 backlog=0 test=test
""".strip()

NORMAL_AUDS_RHEL7 = """
enabled 1
failure 1
pid 947
rate_limit 0
backlog_limit 320
lost 0
backlog 0
loginuid_immutable 1 locked
""".strip()

BAD_AUDS_RHEL7 = """
enabled 1
failure 1
pid 947
rate_limit 0
backlog_limit 320
lost 0
backlog 0
test test
loginuid_immutable 1 locked
""".strip()

BLANK_INPUT_SAMPLE = """
""".strip()

BAD_INPUT_SAMPLE = """
Unknown: type=0, len=0
""".strip()

BAD_INPUT_MIX = """
Unknown: type=0, len=0
enabled 1
""".strip()

AUDIT_RULES_OUTPUT1 = """
No rules
""".strip()

AUDIT_RULES_OUTPUT2 = """
-w /etc/selinux -p wa -k MAC-policy
-a always,exit -F arch=b32 -S chmod -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chmod -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b32 -S chmod -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chmod -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b32 -S chown -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chown -F exit=-EACCES -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b32 -S chown -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
-a always,exit -F arch=b64 -S chown -F exit=-EPERM -F auid>=1000 -F auid!=-1 -F key=access
""".strip()

AUDIT_RULES_OUTPUT3 = """
"""


def test_normal_auds_rhel6():
auds = AuditStatus(context_wrap(NORMAL_AUDS_RHEL6))
assert "enabled" in auds
assert "loginuid_immutable" not in auds
assert auds['pid'] == 1483


def test_normal_auds_rhel7():
auds = AuditStatus(context_wrap(NORMAL_AUDS_RHEL7))
assert "loginuid_immutable" in auds
assert auds['loginuid_immutable'] == "1 locked"
assert auds['failure'] == 1
assert auds.get('nonexists') is None


def test_auds_blank_input():
ctx = context_wrap(BLANK_INPUT_SAMPLE)
with pytest.raises(SkipException) as sc:
AuditStatus(ctx)
assert "Input content is empty." in str(sc)
with pytest.raises(SkipException):
AuditStatus(context_wrap(BAD_INPUT_SAMPLE))


def test_parse_exception():
with pytest.raises(ParseException):
AuditStatus(context_wrap(BAD_AUDS_RHEL7))
with pytest.raises(ParseException):
AuditStatus(context_wrap(BAD_AUDS_RHEL6))


def test_audit_rules():
audit_rules = AuditRules(context_wrap(AUDIT_RULES_OUTPUT2))
assert len(audit_rules) == 9
assert '-w /etc/selinux -p wa -k MAC-policy' in audit_rules


def test_audit_rules_exception():
with pytest.raises(SkipException):
AuditRules(context_wrap(AUDIT_RULES_OUTPUT1))
with pytest.raises(SkipException):
AuditRules(context_wrap(AUDIT_RULES_OUTPUT3))


def test_doc_examples():
env = {
'audit_rules': AuditRules(context_wrap(AUDIT_RULES_OUTPUT2)),
'auds': AuditStatus(context_wrap(NORMAL_AUDS_RHEL7))
}
failed, total = doctest.testmod(auditctl, globs=env)
assert failed == 0

0 comments on commit 00566b0

Please sign in to comment.