Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter regex patterns in addition to prefixes #59

Merged
merged 7 commits into from
Sep 28, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions launch_testing/launch_testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ class InMemoryHandler(LineOutput):
:param filtered_prefixes: A list of byte strings representing prefixes that will cause output
lines to be ignored if they start with one of the prefixes. By default lines starting with
the process ID (`b'pid'`) and return code (`b'rc'`) will be ignored.
:param filtered_patterns: A list of byte strings representing regexes that will cause output
lines to be ignored if they match one of the regexes.
:param filtered_rmw_implementation: RMW implementation for which the output will be ignored
in addition to the default/`filtered_prefixes`.
in addition to the `filtered_prefixes`/`filtered_patterns`.
:param exit_on_match: If True, then when its output is matched, this handler
will terminate; otherwise it will simply keep track of the match.
:raises: :py:class:`UnmatchedOutputError` if :py:meth:`check` does not find that the output
Expand All @@ -40,17 +42,18 @@ class InMemoryHandler(LineOutput):

def __init__(
self, name, launch_descriptor, expected_lines, regex_match=False,
filtered_prefixes=None, filtered_rmw_implementation=None, exit_on_match=True
filtered_prefixes=None, filtered_patterns=None, filtered_rmw_implementation=None,
exit_on_match=True,
):
super(LineOutput, self).__init__()
if filtered_prefixes is None:
self.filtered_prefixes = get_default_filtered_prefixes()
else:
self.filtered_prefixes = filtered_prefixes
self.filtered_prefixes = filtered_prefixes or get_default_filtered_prefixes()
self.filtered_patterns = filtered_patterns or get_default_filtered_patterns()

if filtered_rmw_implementation:
rmw_output_filter = get_rmw_output_filter(filtered_rmw_implementation)
self.filtered_prefixes.extend(rmw_output_filter)
self.filtered_prefixes.extend(
get_rmw_output_filter(filtered_rmw_implementation, 'prefixes'))
self.filtered_patterns.extend(
get_rmw_output_filter(filtered_rmw_implementation, 'patterns'))

self.name = name
self.launch_descriptor = launch_descriptor
Expand All @@ -70,8 +73,11 @@ def on_stdout_lines(self, lines):

for line in lines.splitlines():
# Filter out stdout that comes from underlying DDS implementation
# Note: we do not currently support matching filters across multiple stdout lines.
if any(line.startswith(prefix) for prefix in self.filtered_prefixes):
continue
if any(re.match(pattern, line) for pattern in self.filtered_patterns):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was first worried that using the same pattern without compiling it repeatedly would be slower. But it turns out modern Python already do a great job compiling the pattern and caching it for future use. So all good here 😃

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I should have mentioned that I checked the same thing, to avoid you looking into it. it might be worthwhile if there are a lot of unique regexes, but for now we don't have that case

continue
self.stdout_data.write(line + b'\n')
if not self.regex_match and not self.matched:
output_lines = self.stdout_data.getvalue().splitlines()
Expand Down Expand Up @@ -112,37 +118,48 @@ def get_default_filtered_prefixes():
]


def get_rmw_output_filter(rmw_implementation):
def get_default_filtered_patterns():
return []


def get_rmw_output_filter(rmw_implementation, filter_type):
supported_filter_types = ['prefixes', 'patterns']
if filter_type not in supported_filter_types:
raise TypeError(
'Unsupported filter_type "{0}". Supported types: {1}'
.format(filter_type, supported_filter_types))
resource_name = 'rmw_output_' + filter_type
prefix_with_resource = ament_index_python.has_resource(
'rmw_output_filter', rmw_implementation)
resource_name, rmw_implementation)
if not prefix_with_resource:
return []

rmw_output_filter, _ = ament_index_python.get_resource('rmw_output_filter', rmw_implementation)
additional_filtered_prefixes = [
str.encode(l) for l in rmw_output_filter.splitlines()]
return additional_filtered_prefixes
# Treat each line of the resource as an independent filter.
rmw_output_filter, _ = ament_index_python.get_resource(resource_name, rmw_implementation)
return [str.encode(l) for l in rmw_output_filter.splitlines()]


def create_handler(
name, launch_descriptor, output_file, exit_on_match=True, filtered_prefixes=None,
filtered_rmw_implementation=None
filtered_patterns=None, filtered_rmw_implementation=None
):
literal_file = output_file + '.txt'
if os.path.isfile(literal_file):
with open(literal_file, 'rb') as f:
expected_output = f.read().splitlines()
return InMemoryHandler(
name, launch_descriptor, expected_output, regex_match=False,
exit_on_match=exit_on_match, filtered_prefixes=filtered_prefixes,
exit_on_match=exit_on_match,
filtered_prefixes=filtered_prefixes, filtered_patterns=filtered_patterns,
filtered_rmw_implementation=filtered_rmw_implementation)
regex_file = output_file + '.regex'
if os.path.isfile(regex_file):
with open(regex_file, 'rb') as f:
expected_output = f.read().splitlines()
return InMemoryHandler(
name, launch_descriptor, expected_output, regex_match=True,
exit_on_match=exit_on_match, filtered_prefixes=filtered_prefixes,
exit_on_match=exit_on_match,
filtered_prefixes=filtered_prefixes, filtered_patterns=filtered_patterns,
filtered_rmw_implementation=filtered_rmw_implementation)
py_file = output_file + '.py'
if os.path.isfile(py_file):
Expand Down