Skip to content
This repository has been archived by the owner on Nov 23, 2024. It is now read-only.

feat: first iteration of the purity data model and purity analysis #76

Merged
merged 46 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4d37b1d
Added all changes regarding purity-analysis from old branch at "packa…
lukarade Mar 21, 2023
ba8bc62
PurityInformation now contains ImpurityReasons for impure functions
lukarade Mar 21, 2023
3089adc
Added tests and slightly improved Purity-Analysis
lukarade Mar 23, 2023
11c2e34
Added more tests for generate_purity_information - are failing becaus…
lukarade Mar 23, 2023
142dcb4
chore: WIP
lars-reimann Mar 23, 2023
7aa3521
WIP - changes to PurityInformation, added ImpurityCertainty
lukarade Mar 23, 2023
6285dcb
function ID is now implemented as class, split generate_purtiy_inform…
lukarade Mar 23, 2023
d6d4403
added Reference(Expression)
lukarade Mar 23, 2023
a5391ae
added tests for File interaction
lukarade Mar 24, 2023
6a1f2d6
WIP: added analysis for builtin function (open), generally improved a…
lukarade Mar 24, 2023
150ffea
WIP: added more test for file interaction
lukarade Mar 24, 2023
ea4e2ab
WIP: Added purity analysis for FileRead and FileWrite with string paths
lukarade Mar 26, 2023
5354c86
WIP: Improved purity analysis for FileRead and FileWrite with string …
lukarade Mar 27, 2023
46951a2
Merge branch 'main' into 5-data-model-for-purity-information
lukarade Mar 27, 2023
dd8234d
WIP: now with Types (and Warnings)
lukarade Mar 27, 2023
2cc8db2
WIP: more linter fixes
lukarade Mar 27, 2023
7840e5e
fix: turn `infer_purity` into a pure function
lars-reimann Mar 27, 2023
2550073
Merge remote-tracking branch 'origin/5-data-model-for-purity-informat…
lukarade Mar 27, 2023
55ad8fd
WIP: more linter fixes
lukarade Mar 27, 2023
108353b
Revert "WIP: more linter fixes"
lukarade Mar 27, 2023
8ac85a4
fix: fixed analysis to detect Calls again
lukarade Mar 27, 2023
21d74d9
test: adapted tests to fit the improved analysis
lukarade Mar 27, 2023
91aeb37
fix: Linter fixes
lukarade Mar 27, 2023
a7b20d7
fix: Linter fixes
lukarade Mar 27, 2023
29c38d3
fix: more Linter fixes
lukarade Mar 27, 2023
cc05880
fix: even more Linter fixes
lukarade Mar 27, 2023
01f2532
fix: 2 more Linter fixes
lukarade Mar 27, 2023
2551e48
feat: added analysis for variables in file interaction
lukarade Mar 28, 2023
c77c0aa
fix: linter fixes
lukarade Mar 28, 2023
6fa23b9
fix: more linter fixes
lukarade Mar 28, 2023
2d528f9
fix: some more linter fixes
lukarade Mar 28, 2023
eb115e6
fix: last linter fixes
lukarade Mar 28, 2023
0c3504b
style: apply automated linter fixes
lukarade Mar 28, 2023
4cf814d
feat: improved 'determine_open_mode'
lukarade Mar 28, 2023
dad7195
test: added test for 'determine_open_mode'
lukarade Mar 28, 2023
0def276
fix: 1 linter fix
lukarade Mar 28, 2023
91639d6
fix: 1 more linter fix
lukarade Mar 28, 2023
b9a7513
fix: linter
lukarade Mar 28, 2023
3a881cf
style: apply automated linter fixes
lukarade Mar 28, 2023
379dab1
test: added test_infer_purity_basics with testcases
lukarade Mar 28, 2023
13e9e00
Merge remote-tracking branch 'origin/5-data-model-for-purity-informat…
lukarade Mar 29, 2023
793ff91
test: added more testcases for code coverage, commented out test_infe…
lukarade Mar 29, 2023
4d4fad8
Merge branch 'main' into 5-data-model-for-purity-information
lukarade Mar 29, 2023
77f82fd
fix: linter fix, removed unused imports
lukarade Mar 29, 2023
92bc461
style: apply automated linter fixes
lukarade Mar 29, 2023
38a06c8
fix: removed unused functions to satisfy codecov
lukarade Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/library_analyzer/processing/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
from ._get_api import get_api
from ._get_instance_attributes import get_instance_attributes
from ._get_parameter_list import get_parameter_list
from ._infer_purity import (
DefinitelyImpure,
DefinitelyPure,
ImpurityIndicator,
MaybeImpure,
OpenMode,
PurityInformation,
PurityResult,
calc_function_id,
determine_open_mode,
determine_purity,
extract_impurity_reasons,
generate_purity_information,
get_function_defs,
get_purity_result_str,
infer_purity,
)
from ._package_metadata import (
distribution,
distribution_version,
Expand Down
360 changes: 360 additions & 0 deletions src/library_analyzer/processing/api/_infer_purity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
from __future__ import annotations

from abc import ABC
from dataclasses import dataclass
from enum import Enum, auto
from typing import Optional

import astroid
from library_analyzer.processing.api.model import (
BuiltInFunction,
Call,
ConcreteImpurityIndicator,
FileRead,
FileWrite,
ImpurityCertainty,
ImpurityIndicator,
Reference,
StringLiteral,
SystemInteraction,
VariableRead,
VariableWrite,
)
from library_analyzer.utils import ASTWalker

BUILTIN_FUNCTIONS = {
"open": BuiltInFunction(Reference("open"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_IMPURE),
# TODO: how to replace the ... with the correct type?
"print": BuiltInFunction(Reference("print"), SystemInteraction(), ImpurityCertainty.DEFINITELY_IMPURE),
"read": BuiltInFunction(Reference("read"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_IMPURE),
"write": BuiltInFunction(Reference("write"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_IMPURE),
"readline": BuiltInFunction(
Reference("readline"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_IMPURE
),
"readlines": BuiltInFunction(
Reference("readlines"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_IMPURE
),
"writelines": BuiltInFunction(
Reference("writelines"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_IMPURE
),
"close": BuiltInFunction(Reference("close"), ConcreteImpurityIndicator(), ImpurityCertainty.DEFINITELY_PURE),
}


@dataclass
class FunctionID:
module: str
name: str
line: int
col: int

def __str__(self) -> str:
return f"{self.module}.{self.name}.{self.line}.{self.col}"


class PurityResult(ABC):
def __init__(self) -> None:
self.reasons: list[ImpurityIndicator] = []


@dataclass
class DefinitelyPure(PurityResult):
reasons = []


@dataclass
class MaybeImpure(PurityResult):
reasons: list[ImpurityIndicator]

# def __hash__(self) -> int:
# return hash(tuple(self.reasons))


@dataclass
class DefinitelyImpure(PurityResult):
reasons: list[ImpurityIndicator]

# def __hash__(self) -> int:
# return hash(tuple(self.reasons))


@dataclass
class PurityInformation:
id: FunctionID
# purity: PurityResult
reasons: list[ImpurityIndicator]

# def __hash__(self) -> int:
# return hash((self.id, self.reasons))

# def __eq__(self, other: object) -> bool:
# if not isinstance(other, PurityInformation):
# return NotImplemented
# return self.id == other.id and self.reasons == other.reasons


class PurityHandler:
def __init__(self) -> None:
self.purity_reason: list[ImpurityIndicator] = []

def append_reason(self, reason: list[ImpurityIndicator]) -> None:
for r in reason:
self.purity_reason.append(r)

def enter_functiondef(self, node: astroid.FunctionDef) -> None:
# print(f"Enter functionDef node: {node.as_string()}")
# Handle the FunctionDef node here
pass # Are we analyzing function defs within function defs? Yes, we are.

def enter_assign(self, node: astroid.Assign) -> None:
# print(f"Entering Assign node {node}")
# Handle the Assign node here
if isinstance(node.value, astroid.Call):
pass
if isinstance(node.value, astroid.Const):
self.append_reason([VariableWrite(Reference(node.as_string()))])
else: # default case
self.append_reason([VariableWrite(Reference(node.as_string()))])
# TODO: Assign node needs further analysis to determine if it is pure or impure

def enter_assignattr(self, node: astroid.AssignAttr) -> None:
# print(f"Entering AssignAttr node {node.as_string()}")
# Handle the AssignAtr node here
self.append_reason([VariableWrite(Reference(node.as_string()))])
# TODO: AssignAttr node needs further analysis to determine if it is pure or impure

def enter_call(self, node: astroid.Call) -> None:
# print(f"Entering Call node {node.as_string()}")
# Handle the Call node here
if isinstance(node.func, astroid.Attribute):
pass
elif isinstance(node.func, astroid.Name):
if node.func.name in BUILTIN_FUNCTIONS:
value = node.args[0]
if isinstance(value, astroid.Name):
impurity_indicator = check_builtin_function(node, node.func.name, value.name, True)
self.append_reason(impurity_indicator)
else:
impurity_indicator = check_builtin_function(node, node.func.name, value.value)
self.append_reason(impurity_indicator)

self.append_reason([Call(Reference(node.as_string()))])
# TODO: Call node needs further analysis to determine if it is pure or impure

def enter_attribute(self, node: astroid.Attribute) -> None:
# print(f"Entering Attribute node {node.as_string()}")
# Handle the Attribute node here
if isinstance(node.expr, astroid.Name):
if node.attrname in BUILTIN_FUNCTIONS:
impurity_indicator = check_builtin_function(node, node.attrname)
self.append_reason(impurity_indicator)
else:
self.append_reason([Call(Reference(node.as_string()))])

def enter_arguments(self, node: astroid.Arguments) -> None:
# print(f"Entering Arguments node {node.as_string()}")
# Handle the Arguments node here
pass

def enter_expr(self, node: astroid.Expr) -> None:
# print(f"Entering Expr node {node.as_string()}")
# print(node.value)
# Handle the Expr node here
pass

def enter_name(self, node: astroid.Name) -> None:
# print(f"Entering Name node {node.as_string()}")
# Handle the Name node here
pass

def enter_const(self, node: astroid.Const) -> None:
# print(f"Entering Const node {node.as_string()}")
# Handle the Const node here
pass

def enter_assignname(self, node: astroid.AssignName) -> None:
# print(f"Entering AssignName node {node.as_string()}")
# Handle the AssignName node here
pass

def enter_with(self, node: astroid.With) -> None:
# print(f"Entering With node {node.as_string()}")
# Handle the With node here
pass


class OpenMode(Enum):
READ = auto()
WRITE = auto()
READ_WRITE = auto()


def determine_open_mode(args: list[str]) -> OpenMode:
write_mode = {"w", "wb", "a", "ab", "x", "xb", "wt", "at", "xt"}
read_mode = {"r", "rb", "rt"}
read_and_write_mode = {
"r+",
"rb+",
"w+",
"wb+",
"a+",
"ab+",
"x+",
"xb+",
"r+t",
"rb+t",
"w+t",
"wb+t",
"a+t",
"ab+t",
"x+t",
"xb+t",
"r+b",
"rb+b",
"w+b",
"wb+b",
"a+b",
"ab+b",
"x+b",
"xb+b",
}
if len(args) == 1:
return OpenMode.READ

mode = args[1]
if isinstance(mode, astroid.Const):
mode = mode.value

if mode in read_mode:
return OpenMode.READ
if mode in write_mode:
return OpenMode.WRITE
if mode in read_and_write_mode:
return OpenMode.READ_WRITE

raise ValueError(f"{mode} is not a valid mode for the open function")


def check_builtin_function(
node: astroid.NodeNG, key: str, value: Optional[str] = None, is_var: bool = False
) -> list[ImpurityIndicator]:
if is_var:
if key == "open":
open_mode = determine_open_mode(node.args)
if open_mode == OpenMode.WRITE:
return [FileWrite(Reference(value))]

if open_mode == OpenMode.READ:
return [FileRead(Reference(value))]

if open_mode == OpenMode.READ_WRITE:
return [FileRead(Reference(value)), FileWrite(Reference(value))]

elif isinstance(value, str):
if key == "open":
open_mode = determine_open_mode(node.args)
if open_mode == OpenMode.WRITE: # write mode
return [FileWrite(StringLiteral(value))]

if open_mode == OpenMode.READ: # read mode
return [FileRead(StringLiteral(value))]

if open_mode == OpenMode.READ_WRITE: # read and write mode
return [FileRead(StringLiteral(value)), FileWrite(StringLiteral(value))]

raise TypeError(f"Unknown builtin function {key}")

if key in ("read", "readline", "readlines"):
return [VariableRead(Reference(node.as_string()))]
if key in ("write", "writelines"):
return [VariableWrite(Reference(node.as_string()))]

raise TypeError(f"Unknown builtin function {key}")


def infer_purity(code: str) -> list[PurityInformation]:
purity_handler: PurityHandler = PurityHandler()
walker = ASTWalker(purity_handler)
functions = get_function_defs(code)
result = []
for function in functions:
# print(function)
# print(f"Analyse {function.name}:")
walker.walk(function)
purity_result = determine_purity(purity_handler.purity_reason)
# print(f"Result: {purity_result.__class__.__name__}")
# if not isinstance(purity_result, DefinitelyPure):
# print(f"Reasons: {purity_result.reasons}")
# print(f"Function {function.name} is done. \n")
result.append(generate_purity_information(function, purity_result))
purity_handler.purity_reason = []
return result


def determine_purity(indicators: list[ImpurityIndicator]) -> PurityResult:
if len(indicators) == 0:
return DefinitelyPure()
if any(indicator.certainty == ImpurityCertainty.DEFINITELY_IMPURE for indicator in indicators):
return DefinitelyImpure(reasons=indicators)

return MaybeImpure(reasons=indicators)

# print(f"Maybe check {(any(purity_reason.is_reason_for_impurity() for purity_reason in purity_reasons))}")
# if any(reason.is_reason_for_impurity() for reason in purity_reasons):
# # print(f"Definitely check {any(isinstance(reason, Call) for reason in purity_reasons)}")
# result = MaybeImpure(reasons=purity_reasons)
# if any(isinstance(reason, Call) for reason in purity_reasons):
# return DefinitelyImpure(reasons=purity_reasons)
# return result
# else:
# return DefinitelyPure()


def get_function_defs(code: str) -> list[astroid.FunctionDef]:
try:
module = astroid.parse(code)
except SyntaxError as error:
raise ValueError("Invalid Python code") from error

function_defs = list[astroid.FunctionDef]()
for node in module.body:
if isinstance(node, astroid.FunctionDef):
function_defs.append(node)
return function_defs
# TODO: This function should read from a python file (module) and return a list of FunctionDefs


def extract_impurity_reasons(purity: PurityResult) -> list[ImpurityIndicator]:
if isinstance(purity, DefinitelyPure):
return []
return purity.reasons


def generate_purity_information(function: astroid.FunctionDef, purity_result: PurityResult) -> PurityInformation:
function_id = calc_function_id(function)
reasons = extract_impurity_reasons(purity_result)
purity_info = PurityInformation(function_id, reasons)
return purity_info


def calc_function_id(node: astroid.NodeNG) -> FunctionID:
if not isinstance(node, astroid.FunctionDef):
raise TypeError("Node is not a function")
module = node.root().name
# module = "_infer_purity.py"
# if module.endswith(".py"):
# module = module[:-3]
name = node.name
line = node.position.lineno
col = node.position.col_offset
return FunctionID(module, name, line, col)


# this function is only for visualization purposes
def get_purity_result_str(indicators: list[ImpurityIndicator]) -> str:
if len(indicators) == 0:
return "Definitely Pure"
if any(indicator.certainty == ImpurityCertainty.DEFINITELY_IMPURE for indicator in indicators):
return "Definitely Impure"

return "Maybe Impure"
Loading