Skip to content

Commit

Permalink
Security CWE-312 and CWE-20 Handling (#453)
Browse files Browse the repository at this point in the history
  • Loading branch information
caronc authored Oct 6, 2021
1 parent abb7547 commit e2ebdbd
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 62 deletions.
56 changes: 37 additions & 19 deletions apprise/Apprise.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .utils import is_exclusive_match
from .utils import parse_list
from .utils import parse_urls
from .utils import cwe312_url
from .logger import logger

from .AppriseAsset import AppriseAsset
Expand Down Expand Up @@ -123,9 +124,14 @@ def instantiate(url, asset=None, tag=None, suppress_exceptions=True):
# Initialize our result set
results = None

# Prepare our Asset Object
asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset()

if isinstance(url, six.string_types):
# Acquire our url tokens
results = plugins.url_to_dict(url)
results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)

if results is None:
# Failed to parse the server URL; detailed logging handled
# inside url_to_dict - nothing to report here.
Expand All @@ -139,25 +145,30 @@ def instantiate(url, asset=None, tag=None, suppress_exceptions=True):
# schema is a mandatory dictionary item as it is the only way
# we can index into our loaded plugins
logger.error('Dictionary does not include a "schema" entry.')
logger.trace('Invalid dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v) for k, v in results.items()])))
logger.trace(
'Invalid dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v)
for k, v in results.items()])))
return None

logger.trace('Dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v) for k, v in results.items()])))
logger.trace(
'Dictionary unpacked as:{}{}'.format(
os.linesep, os.linesep.join(
['{}="{}"'.format(k, v) for k, v in results.items()])))

# Otherwise we handle the invalid input specified
else:
logger.error('Invalid URL specified: {}'.format(url))
logger.error(
'An invalid URL type (%s) was specified for instantiation',
type(url))
return None

# Build a list of tags to associate with the newly added notifications
results['tag'] = set(parse_list(tag))

# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Set our Asset Object
results['asset'] = asset

if suppress_exceptions:
try:
Expand All @@ -166,14 +177,21 @@ def instantiate(url, asset=None, tag=None, suppress_exceptions=True):
plugin = plugins.SCHEMA_MAP[results['schema']](**results)

# Create log entry of loaded URL
logger.debug('Loaded {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name,
plugin.url()))
logger.debug(
'Loaded {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name,
plugin.url(privacy=asset.secure_logging)))

except Exception:
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)

# the arguments are invalid or can not be used.
logger.error('Could not load {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name, url))
logger.error(
'Could not load {} URL: {}'.format(
plugins.SCHEMA_MAP[results['schema']].service_name,
loggable_url))
return None

else:
Expand Down Expand Up @@ -402,7 +420,7 @@ def _notifyhandler(server, **kwargs):
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
logger.exception("Unhandled Notification Exception")
return False

@staticmethod
Expand Down Expand Up @@ -434,10 +452,10 @@ def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,

if len(self) == 0:
# Nothing to notify
raise TypeError
raise TypeError("No service(s) to notify")

if not (title or body):
raise TypeError
raise TypeError("No message content specified to deliver")

if six.PY2:
# Python 2.7.x Unicode Character Handling
Expand Down
11 changes: 11 additions & 0 deletions apprise/AppriseAsset.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ class AppriseAsset(object):
# to a new line.
interpret_escapes = False

# For more detail see CWE-312 @
# https://cwe.mitre.org/data/definitions/312.html
#
# By enabling this, the logging output has additional overhead applied to
# it preventing secure password and secret information from being
# displayed in the logging. Since there is overhead involved in performing
# this cleanup; system owners who run in a very isolated environment may
# choose to disable this for a slight performance bump. It is recommended
# that you leave this option as is otherwise.
secure_logging = True

def __init__(self, **kwargs):
"""
Asset Initialization
Expand Down
77 changes: 56 additions & 21 deletions apprise/config/ConfigBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from ..utils import parse_list
from ..utils import parse_bool
from ..utils import parse_urls
from ..utils import cwe312_url
from . import SCHEMA_MAP

# Test whether token is valid or not
Expand Down Expand Up @@ -209,8 +210,8 @@ def servers(self, asset=None, **kwargs):
# Configuration files were detected; recursively populate them
# If we have been configured to do so
for url in configs:
if self.recursion > 0:

if self.recursion > 0:
# Attempt to acquire the schema at the very least to allow
# our configuration based urls.
schema = GET_SCHEMA_RE.match(url)
Expand All @@ -223,6 +224,7 @@ def servers(self, asset=None, **kwargs):
url = os.path.join(self.config_path, url)

url = '{}://{}'.format(schema, URLBase.quote(url))

else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
Expand All @@ -233,13 +235,17 @@ def servers(self, asset=None, **kwargs):
'Unsupported include schema {}.'.format(schema))
continue

# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)

# Parse our url details of the server object as dictionary
# containing all of the information parsed from our URL
results = SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
self.logger.warning(
'Unparseable include URL {}'.format(url))
'Unparseable include URL {}'.format(loggable_url))
continue

# Handle cross inclusion based on allow_cross_includes rules
Expand All @@ -253,7 +259,7 @@ def servers(self, asset=None, **kwargs):
# Prevent the loading if insecure base protocols
ConfigBase.logger.warning(
'Including {}:// based configuration is prohibited. '
'Ignoring URL {}'.format(schema, url))
'Ignoring URL {}'.format(schema, loggable_url))
continue

# Prepare our Asset Object
Expand All @@ -279,7 +285,7 @@ def servers(self, asset=None, **kwargs):
except Exception as e:
# the arguments are invalid or can not be used.
self.logger.warning(
'Could not load include URL: {}'.format(url))
'Could not load include URL: {}'.format(loggable_url))
self.logger.debug('Loading Exception: {}'.format(str(e)))
continue

Expand All @@ -292,16 +298,23 @@ def servers(self, asset=None, **kwargs):
del cfg_plugin

else:
# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)

self.logger.debug(
'Recursion limit reached; ignoring Include URL: %s' % url)
'Recursion limit reached; ignoring Include URL: %s',
loggable_url)

if self._cached_servers:
self.logger.info('Loaded {} entries from {}'.format(
len(self._cached_servers), self.url()))
self.logger.info(
'Loaded {} entries from {}'.format(
len(self._cached_servers),
self.url(privacy=asset.secure_logging)))
else:
self.logger.warning(
'Failed to load Apprise configuration from {}'.format(
self.url()))
self.url(privacy=asset.secure_logging)))

# Set the time our content was cached at
self._cached_time = time.time()
Expand Down Expand Up @@ -531,6 +544,9 @@ def config_parse_text(content, asset=None):
# the include keyword
configs = list()

# Prepare our Asset Object
asset = asset if isinstance(asset, AppriseAsset) else AppriseAsset()

# Define what a valid line should look like
valid_line_re = re.compile(
r'^\s*(?P<line>([;#]+(?P<comment>.*))|'
Expand Down Expand Up @@ -567,41 +583,52 @@ def config_parse_text(content, asset=None):
continue

if config:
ConfigBase.logger.debug('Include URL: {}'.format(config))
# CWE-312 (Secure Logging) Handling
loggable_url = config if not asset.secure_logging \
else cwe312_url(config)

ConfigBase.logger.debug(
'Include URL: {}'.format(loggable_url))

# Store our include line
configs.append(config.strip())
continue

# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)

# Acquire our url tokens
results = plugins.url_to_dict(url)
results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)
if results is None:
# Failed to parse the server URL
ConfigBase.logger.warning(
'Unparseable URL {} on line {}.'.format(url, line))
'Unparseable URL {} on line {}.'.format(
loggable_url, line))
continue

# Build a list of tags to associate with the newly added
# notifications if any were set
results['tag'] = set(parse_list(result.group('tags')))

# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Set our Asset Object
results['asset'] = asset

try:
# Attempt to create an instance of our plugin using the
# parsed URL information
plugin = plugins.SCHEMA_MAP[results['schema']](**results)

# Create log entry of loaded URL
ConfigBase.logger.debug('Loaded URL: {}'.format(plugin.url()))
ConfigBase.logger.debug(
'Loaded URL: %s', plugin.url(privacy=asset.secure_logging))

except Exception as e:
# the arguments are invalid or can not be used.
ConfigBase.logger.warning(
'Could not load URL {} on line {}.'.format(
url, line))
loggable_url, line))
ConfigBase.logger.debug('Loading Exception: %s' % str(e))
continue

Expand Down Expand Up @@ -756,6 +783,10 @@ def config_parse_yaml(content, asset=None):
# we can. Reset it to None on each iteration
results = list()

# CWE-312 (Secure Logging) Handling
loggable_url = url if not asset.secure_logging \
else cwe312_url(url)

if isinstance(url, six.string_types):
# We're just a simple URL string...
schema = GET_SCHEMA_RE.match(url)
Expand All @@ -764,16 +795,18 @@ def config_parse_yaml(content, asset=None):
# config file at least has something to take action
# with.
ConfigBase.logger.warning(
'Invalid URL {}, entry #{}'.format(url, no + 1))
'Invalid URL {}, entry #{}'.format(
loggable_url, no + 1))
continue

# We found a valid schema worthy of tracking; store it's
# details:
_results = plugins.url_to_dict(url)
_results = plugins.url_to_dict(
url, secure_logging=asset.secure_logging)
if _results is None:
ConfigBase.logger.warning(
'Unparseable URL {}, entry #{}'.format(
url, no + 1))
loggable_url, no + 1))
continue

# add our results to our global set
Expand Down Expand Up @@ -819,7 +852,8 @@ def config_parse_yaml(content, asset=None):
'Unsupported URL, entry #{}'.format(no + 1))
continue

_results = plugins.url_to_dict(_url)
_results = plugins.url_to_dict(
_url, secure_logging=asset.secure_logging)
if _results is None:
# Setup dictionary
_results = {
Expand Down Expand Up @@ -931,7 +965,8 @@ def config_parse_yaml(content, asset=None):

# Create log entry of loaded URL
ConfigBase.logger.debug(
'Loaded URL: {}'.format(plugin.url()))
'Loaded URL: {}'.format(
plugin.url(privacy=asset.secure_logging)))

except Exception as e:
# the arguments are invalid or can not be used.
Expand Down
2 changes: 1 addition & 1 deletion apprise/plugins/NotifyGoogleChat.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def parse_native_url(url):
"""

result = re.match(
r'^https://chat.googleapis.com/v1/spaces/'
r'^https://chat\.googleapis\.com/v1/spaces/'
r'(?P<workspace>[A-Z0-9_-]+)/messages/*(?P<params>.+)$',
url, re.I)

Expand Down
Loading

0 comments on commit e2ebdbd

Please sign in to comment.