forked from oasis-open/cti-python-stix2
-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of github.com:oasis-open/cti-python-stix2 into …
…1.1.0-release
- Loading branch information
Showing
4 changed files
with
419 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
skip = workbench.py | ||
not_skip = __init__.py | ||
known_third_party = | ||
antlr4, | ||
dateutil, | ||
medallion, | ||
pytest, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,349 @@ | ||
import importlib | ||
import inspect | ||
|
||
from antlr4 import CommonTokenStream, InputStream | ||
import six | ||
from stix2patterns.grammars.STIXPatternLexer import STIXPatternLexer | ||
from stix2patterns.grammars.STIXPatternParser import (STIXPatternParser, | ||
TerminalNode) | ||
from stix2patterns.grammars.STIXPatternVisitor import STIXPatternVisitor | ||
from stix2patterns.validator import STIXPatternErrorListener | ||
|
||
from .patterns import * | ||
from .patterns import _BooleanExpression | ||
|
||
# flake8: noqa F405 | ||
|
||
|
||
def collapse_lists(lists): | ||
result = [] | ||
for c in lists: | ||
if isinstance(c, list): | ||
result.extend(c) | ||
else: | ||
result.append(c) | ||
return result | ||
|
||
|
||
def remove_terminal_nodes(parse_tree_nodes): | ||
values = [] | ||
for x in parse_tree_nodes: | ||
if not isinstance(x, TerminalNode): | ||
values.append(x) | ||
return values | ||
|
||
|
||
# This class defines a complete generic visitor for a parse tree produced by STIXPatternParser. | ||
|
||
|
||
class STIXPatternVisitorForSTIX2(STIXPatternVisitor): | ||
classes = {} | ||
|
||
def __init__(self, module_suffix, module_name): | ||
if module_suffix and module_name: | ||
self.module_suffix = module_suffix | ||
if not STIXPatternVisitorForSTIX2.classes: | ||
module = importlib.import_module(module_name) | ||
for k, c in inspect.getmembers(module, inspect.isclass): | ||
STIXPatternVisitorForSTIX2.classes[k] = c | ||
else: | ||
self.module_suffix = None | ||
super(STIXPatternVisitor, self).__init__() | ||
|
||
def get_class(self, class_name): | ||
if class_name in STIXPatternVisitorForSTIX2.classes: | ||
return STIXPatternVisitorForSTIX2.classes[class_name] | ||
else: | ||
return None | ||
|
||
def instantiate(self, klass_name, *args): | ||
klass_to_instantiate = None | ||
if self.module_suffix: | ||
klass_to_instantiate = self.get_class(klass_name + "For" + self.module_suffix) | ||
if not klass_to_instantiate: | ||
# use the classes in python_stix2 | ||
klass_to_instantiate = globals()[klass_name] | ||
return klass_to_instantiate(*args) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#pattern. | ||
def visitPattern(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return children[0] | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressions. | ||
def visitObservationExpressions(self, ctx): | ||
children = self.visitChildren(ctx) | ||
if len(children) == 1: | ||
return children[0] | ||
else: | ||
return FollowedByObservationExpression([children[0], children[2]]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionOr. | ||
def visitObservationExpressionOr(self, ctx): | ||
children = self.visitChildren(ctx) | ||
if len(children) == 1: | ||
return children[0] | ||
else: | ||
return self.instantiate("OrObservationExpression", [children[0], children[2]]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionAnd. | ||
def visitObservationExpressionAnd(self, ctx): | ||
children = self.visitChildren(ctx) | ||
if len(children) == 1: | ||
return children[0] | ||
else: | ||
return self.instantiate("AndObservationExpression", [children[0], children[2]]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionRepeated. | ||
def visitObservationExpressionRepeated(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("QualifiedObservationExpression", children[0], children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionSimple. | ||
def visitObservationExpressionSimple(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("ObservationExpression", children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionCompound. | ||
def visitObservationExpressionCompound(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("ObservationExpression", children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionWithin. | ||
def visitObservationExpressionWithin(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("QualifiedObservationExpression", children[0], children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#observationExpressionStartStop. | ||
def visitObservationExpressionStartStop(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("QualifiedObservationExpression", children[0], children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#comparisonExpression. | ||
def visitComparisonExpression(self, ctx): | ||
children = self.visitChildren(ctx) | ||
if len(children) == 1: | ||
return children[0] | ||
else: | ||
if isinstance(children[0], _BooleanExpression): | ||
children[0].operands.append(children[2]) | ||
return children[0] | ||
else: | ||
return self.instantiate("OrBooleanExpression", [children[0], children[2]]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#comparisonExpressionAnd. | ||
def visitComparisonExpressionAnd(self, ctx): | ||
# TODO: NOT | ||
children = self.visitChildren(ctx) | ||
if len(children) == 1: | ||
return children[0] | ||
else: | ||
if isinstance(children[0], _BooleanExpression): | ||
children[0].operands.append(children[2]) | ||
return children[0] | ||
else: | ||
return self.instantiate("AndBooleanExpression", [children[0], children[2]]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestEqual. | ||
def visitPropTestEqual(self, ctx): | ||
children = self.visitChildren(ctx) | ||
operator = children[1].symbol.type | ||
negated = operator != STIXPatternParser.EQ | ||
return self.instantiate("EqualityComparisonExpression", children[0], children[3 if len(children) > 3 else 2], | ||
negated) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestOrder. | ||
def visitPropTestOrder(self, ctx): | ||
children = self.visitChildren(ctx) | ||
operator = children[1].symbol.type | ||
if operator == STIXPatternParser.GT: | ||
return self.instantiate("GreaterThanComparisonExpression", children[0], | ||
children[3 if len(children) > 3 else 2], False) | ||
elif operator == STIXPatternParser.LT: | ||
return self.instantiate("LessThanComparisonExpression", children[0], | ||
children[3 if len(children) > 3 else 2], False) | ||
elif operator == STIXPatternParser.GE: | ||
return self.instantiate("GreaterThanEqualComparisonExpression", children[0], | ||
children[3 if len(children) > 3 else 2], False) | ||
elif operator == STIXPatternParser.LE: | ||
return self.instantiate("LessThanEqualComparisonExpression", children[0], | ||
children[3 if len(children) > 3 else 2], False) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestSet. | ||
def visitPropTestSet(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("InComparisonExpression", children[0], children[3 if len(children) > 3 else 2], False) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestLike. | ||
def visitPropTestLike(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("LikeComparisonExpression", children[0], children[3 if len(children) > 3 else 2], False) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestRegex. | ||
def visitPropTestRegex(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("MatchesComparisonExpression", children[0], children[3 if len(children) > 3 else 2], | ||
False) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestIsSubset. | ||
def visitPropTestIsSubset(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("IsSubsetComparisonExpression", children[0], children[3 if len(children) > 3 else 2]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestIsSuperset. | ||
def visitPropTestIsSuperset(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("IsSupersetComparisonExpression", children[0], children[3 if len(children) > 3 else 2]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#propTestParen. | ||
def visitPropTestParen(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("ParentheticalExpression", children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#startStopQualifier. | ||
def visitStartStopQualifier(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return StartStopQualifier(children[1], children[3]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#withinQualifier. | ||
def visitWithinQualifier(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return WithinQualifier(children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#repeatedQualifier. | ||
def visitRepeatedQualifier(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return RepeatQualifier(children[1]) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#objectPath. | ||
def visitObjectPath(self, ctx): | ||
children = self.visitChildren(ctx) | ||
flat_list = collapse_lists(children[2:]) | ||
property_path = [] | ||
i = 0 | ||
while i < len(flat_list): | ||
current = flat_list[i] | ||
if i == len(flat_list)-1: | ||
property_path.append(current) | ||
break | ||
next = flat_list[i+1] | ||
if isinstance(next, TerminalNode): | ||
property_path.append(self.instantiate("ListObjectPathComponent", current.property_name, next.getText())) | ||
i += 2 | ||
else: | ||
property_path.append(current) | ||
i += 1 | ||
return self.instantiate("ObjectPath", children[0].getText(), property_path) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#objectType. | ||
def visitObjectType(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return children[0] | ||
|
||
# Visit a parse tree produced by STIXPatternParser#firstPathComponent. | ||
def visitFirstPathComponent(self, ctx): | ||
children = self.visitChildren(ctx) | ||
step = children[0].getText() | ||
# if step.endswith("_ref"): | ||
# return stix2.ReferenceObjectPathComponent(step) | ||
# else: | ||
return self.instantiate("BasicObjectPathComponent", step, False) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#indexPathStep. | ||
def visitIndexPathStep(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return children[1] | ||
|
||
# Visit a parse tree produced by STIXPatternParser#pathStep. | ||
def visitPathStep(self, ctx): | ||
return collapse_lists(self.visitChildren(ctx)) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#keyPathStep. | ||
def visitKeyPathStep(self, ctx): | ||
children = self.visitChildren(ctx) | ||
if isinstance(children[1], StringConstant): | ||
# special case for hashes | ||
return children[1].value | ||
else: | ||
return self.instantiate("BasicObjectPathComponent", children[1].getText(), True) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#setLiteral. | ||
def visitSetLiteral(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return self.instantiate("ListConstant", remove_terminal_nodes(children)) | ||
|
||
# Visit a parse tree produced by STIXPatternParser#primitiveLiteral. | ||
def visitPrimitiveLiteral(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return children[0] | ||
|
||
# Visit a parse tree produced by STIXPatternParser#orderableLiteral. | ||
def visitOrderableLiteral(self, ctx): | ||
children = self.visitChildren(ctx) | ||
return children[0] | ||
|
||
def visitTerminal(self, node): | ||
if node.symbol.type == STIXPatternParser.IntPosLiteral or node.symbol.type == STIXPatternParser.IntNegLiteral: | ||
return IntegerConstant(node.getText()) | ||
elif node.symbol.type == STIXPatternParser.FloatPosLiteral or node.symbol.type == STIXPatternParser.FloatNegLiteral: | ||
return FloatConstant(node.getText()) | ||
elif node.symbol.type == STIXPatternParser.HexLiteral: | ||
return HexConstant(node.getText(), from_parse_tree=True) | ||
elif node.symbol.type == STIXPatternParser.BinaryLiteral: | ||
return BinaryConstant(node.getText(), from_parse_tree=True) | ||
elif node.symbol.type == STIXPatternParser.StringLiteral: | ||
return StringConstant(node.getText().strip('\''), from_parse_tree=True) | ||
elif node.symbol.type == STIXPatternParser.BoolLiteral: | ||
return BooleanConstant(node.getText()) | ||
elif node.symbol.type == STIXPatternParser.TimestampLiteral: | ||
return TimestampConstant(node.getText()) | ||
else: | ||
return node | ||
|
||
def aggregateResult(self, aggregate, nextResult): | ||
if aggregate: | ||
aggregate.append(nextResult) | ||
elif nextResult: | ||
aggregate = [nextResult] | ||
return aggregate | ||
|
||
|
||
def create_pattern_object(pattern, module_suffix="", module_name=""): | ||
""" | ||
Validates a pattern against the STIX Pattern grammar. Error messages are | ||
returned in a list. The test passed if the returned list is empty. | ||
""" | ||
|
||
start = '' | ||
if isinstance(pattern, six.string_types): | ||
start = pattern[:2] | ||
pattern = InputStream(pattern) | ||
|
||
if not start: | ||
start = pattern.readline()[:2] | ||
pattern.seek(0) | ||
|
||
parseErrListener = STIXPatternErrorListener() | ||
|
||
lexer = STIXPatternLexer(pattern) | ||
# it always adds a console listener by default... remove it. | ||
lexer.removeErrorListeners() | ||
|
||
stream = CommonTokenStream(lexer) | ||
|
||
parser = STIXPatternParser(stream) | ||
parser.buildParseTrees = True | ||
# it always adds a console listener by default... remove it. | ||
parser.removeErrorListeners() | ||
parser.addErrorListener(parseErrListener) | ||
|
||
# To improve error messages, replace "<INVALID>" in the literal | ||
# names with symbolic names. This is a hack, but seemed like | ||
# the simplest workaround. | ||
for i, lit_name in enumerate(parser.literalNames): | ||
if lit_name == u"<INVALID>": | ||
parser.literalNames[i] = parser.symbolicNames[i] | ||
|
||
tree = parser.pattern() | ||
builder = STIXPatternVisitorForSTIX2(module_suffix, module_name) | ||
return builder.visit(tree) |
Oops, something went wrong.