-
Notifications
You must be signed in to change notification settings - Fork 8
Logic Walkthrough
Rules are associated with Mapped Tables and their Columns. You can think of
them as extensions to the (logical) schema, or your SQLAlchemy models
class
definitions.
Rules are intended to govern derivations / constraints as transactions are committed ("transaction logic"), as well as other actions such as starting processes / sending mail or messages as a result of a transaction.
Transaction logic does not address:
- SQLAlchemy mass updates
- SQLAlchemy updates not using Mapped Class (ie., "raw sql")
- updates made outside SQLAlchemy
Open questions:
- Inheritance
Rules are declared with function calls - see logic.py, which looks like this:
Rule.constraint(validate=Customer, as_condition=lambda row: row.Balance <= row.CreditLimit,
error_msg="balance ({row.Balance}) exceeds credit ({row.CreditLimit})")
Rule.sum(derive=Customer.Balance, as_sum_of=Order.AmountTotal,
where=lambda row: row.ShippedDate is None)
Rule.sum(derive=Order.AmountTotal, as_sum_of=OrderDetail.Amount)
Rule.formula(derive=OrderDetail.Amount, as_expression=lambda row: row.UnitPrice * row.Quantity)
Rule.copy(derive=OrderDetail.UnitPrice, from_parent=Product.UnitPrice)
Rule.formula(derive=OrderDetail.ShippedDate, as_expression=lambda row: row.OrderHeader.ShippedDate)
Rule.sum(derive=Product.UnitsShipped, as_sum_of=OrderDetail.Quantity,
where="row.ShippedDate is not None")
Rule.formula(derive=Product.UnitsInStock, calling=units_shipped)
Note code completion enables you to discover and code rules in IDEs.
The logic_bank package provides for the definition and enforcement of rules.
logic_bank.py provides functions for rule definition. Invoking these "deposits" rules in the rules_bank, where the rules are stored in memory for the logic engine.
Rule classes are in the rule_type package. Instances of these are what is deposited in the RuleBank, and are executed when transactions are committed.
The rule_bank is a singleton, which maintains the rules by table.
rule_bank_withdraw.py provides essentially instance methods in RuleBank, but had to be broken out to work around circular dependencies. Better solution would be desirable.
Let's consider placing an order, and checking credit. The focus here is on multi-level roll-ups, to compute the balance and check it against the credit.
Execution begins in examples/nw/tests/test_add_order.py
.
logic_bank_utils.add_python_path
is called. This ensures imports work, even if run outside PyCharm (e.g., command line).
We always want to run with a fresh (gold) copy of the database. So:
tests.copy_gold_over_db()
Next, we issue:
from nw.logic import session, engine # opens db, activates rules <--
This calls examples/nw/logic/__init__.py
, which performs critical setup functions:
- set up logging
- opens the database (create engine, session etc)
- Critically, the engine is closed in
examples/nw/tests/__init__.py#tearDown()
, otherwise multiple commands are logged for each verb
- Critically, the engine is closed in
- calls
LogicBank.activate(session=session, activator=declare_logic)
, which- register SQLAlchemy
before_flush
listeners (hence thesession
argument) - create RuleBank, load rules into it by calling the supplied
declare_logic()
- logic is not executed now, but later executed on commit
- raises exception if cycles detected
- register SQLAlchemy
The RuleBank is a singleton, which maintains the rules for each mapped class.
The user application then functions normally, issuing SQLAlchemy reads and writes.
Logic Bank is not involved until the user issues (as usual) commit.
When test_add_order.py
issues commit, SQLAlchemy prepares a list of inserted, updated and deleted rows, and invokes
logic_bank/exec_trans_logic/listeners.py
,
which, for each row in the list:
- Creates a LogicRow instance: this wraps the row and old_row
- LogicRow implements the logic for for
insert
,delete
andupdate
rows. This is the core of the rules engine.
The old_row
is of paramount importance, enabling
pruning, adjustment logic, and user logic (how much
did the salary change?)
update
looks like this:
def update(self, reason: str = None, row: base = None):
"""
make updates - with logic - in events, for example
row = sqlalchemy read
logic_row.update(row=row, msg="my log message")
"""
if row is not None:
user_logic_row = self.user_row_update(row=row, ins_upd_dlt="upd")
user_logic_row.update(reason=reason)
else:
self.reason = reason
self.log("Update - " + reason)
self.early_row_events()
self.copy_rules()
self.formula_rules()
self.adjust_parent_aggregates() # parent chaining (sum / count adjustments)
self.constraints()
self.cascade_to_children() # child chaining (cascade changed parent references)
if self.row_sets is not None: # eg, for debug as in upd_order_shipped test
self.row_sets.remove_submitted(logic_row=self)
LogicRow finds the rules for that mapped table from the RuleBank, and executes the rule_type instance.
For example, here are formula_rules
:
def formula_rules(self):
""" execute un-pruned formulae, in dependency order """
self.log_engine("formula_rules")
formula_rules = rule_bank_withdraw.rules_of_class(self, Formula)
formula_rules.sort(key=lambda formula: formula._exec_order)
for each_formula in formula_rules:
if not self._is_formula_pruned(each_formula):
each_formula.execute(self)
Rules operation is "forward chaining". The system analyzes each rule for dependencies; when those are changed, the system recomputes (or adjusts) the referencing value. This can chain forward, even across tables. So, a change to an OrderDetail can adjust the Order, which adjusts the Customer.
This can be seen in the following log, resulting from client insertion
of an Order
and 2 OrderDetails
in
add_order.py
.
- The indention indicates the chaining of multi-table logic across tables.
- Row state is shown for each column, including old values [in brackets].
Logic Phase: BEFORE COMMIT - 2020-10-05 20:23:26,050 - logic_logger - DEBUG
Logic Phase: ROW LOGIC (sqlalchemy before_flush) - 2020-10-05 20:23:26,050 - logic_logger - DEBUG
..Order[None] {Insert - client} AmountTotal: 0, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None row@: 0x102e67f40 - 2020-10-05 20:23:26,052 - logic_logger - DEBUG
....Customer[ALFKI] {Update - Adjusting Customer} Address: Obere Str. 57, Balance: 960.0000000000, City: Berlin, CompanyName: Alfreds Futterkiste, ContactName: Maria Anders, ContactTitle: Sales Representative, Country: Germany, CreditLimit: 2000.0000000000, Fax: 030-0076545, Id: ALFKI, OrderCount: [8-->] 9, Phone: 030-0074321, PostalCode: 12209, Region: Western Europe, UnpaidOrderCount: [3-->] 4 row@: 0x1030cc490 - 2020-10-05 20:23:26,072 - logic_logger - DEBUG
..OrderDetail[None] {Insert - client} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 1, Quantity: 1, ShippedDate: None, UnitPrice: 18 row@: 0x102e67fd0 - 2020-10-05 20:23:26,074 - logic_logger - DEBUG
..OrderDetail[None] {copy_rules for role: ProductOrdered} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 1, Quantity: 1, ShippedDate: None, UnitPrice: 18 row@: 0x102e67fd0 - 2020-10-05 20:23:26,079 - logic_logger - DEBUG
..OrderDetail[None] {Formula Amount} Amount: 18.0000000000, Discount: 0, Id: None, OrderId: None, ProductId: 1, Quantity: 1, ShippedDate: None, UnitPrice: 18.0000000000 row@: 0x102e67fd0 - 2020-10-05 20:23:26,082 - logic_logger - DEBUG
....Order[None] {Update - Adjusting OrderHeader} AmountTotal: [0-->] 18.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None row@: 0x102e67f40 - 2020-10-05 20:23:26,085 - logic_logger - DEBUG
......Customer[ALFKI] {Update - Adjusting Customer} Address: Obere Str. 57, Balance: [960.0000000000-->] 978.0000000000, City: Berlin, CompanyName: Alfreds Futterkiste, ContactName: Maria Anders, ContactTitle: Sales Representative, Country: Germany, CreditLimit: 2000.0000000000, Fax: 030-0076545, Id: ALFKI, OrderCount: 9, Phone: 030-0074321, PostalCode: 12209, Region: Western Europe, UnpaidOrderCount: 4 row@: 0x1030cc490 - 2020-10-05 20:23:26,088 - logic_logger - DEBUG
..OrderDetail[None] {Insert - client} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 2, Quantity: 2, ShippedDate: None, UnitPrice: 18 row@: 0x102ef2040 - 2020-10-05 20:23:26,092 - logic_logger - DEBUG
..OrderDetail[None] {copy_rules for role: ProductOrdered} Amount: 0, Discount: 0, Id: None, OrderId: None, ProductId: 2, Quantity: 2, ShippedDate: None, UnitPrice: 18 row@: 0x102ef2040 - 2020-10-05 20:23:26,097 - logic_logger - DEBUG
..OrderDetail[None] {Formula Amount} Amount: 38.0000000000, Discount: 0, Id: None, OrderId: None, ProductId: 2, Quantity: 2, ShippedDate: None, UnitPrice: 19.0000000000 row@: 0x102ef2040 - 2020-10-05 20:23:26,098 - logic_logger - DEBUG
....Order[None] {Update - Adjusting OrderHeader} AmountTotal: [18.0000000000-->] 56.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None row@: 0x102e67f40 - 2020-10-05 20:23:26,103 - logic_logger - DEBUG
......Customer[ALFKI] {Update - Adjusting Customer} Address: Obere Str. 57, Balance: [978.0000000000-->] 1016.0000000000, City: Berlin, CompanyName: Alfreds Futterkiste, ContactName: Maria Anders, ContactTitle: Sales Representative, Country: Germany, CreditLimit: 2000.0000000000, Fax: 030-0076545, Id: ALFKI, OrderCount: 9, Phone: 030-0074321, PostalCode: 12209, Region: Western Europe, UnpaidOrderCount: 4 row@: 0x1030cc490 - 2020-10-05 20:23:26,105 - logic_logger - DEBUG
Logic Phase: COMMIT - 2020-10-05 20:23:26,106 - logic_logger - DEBUG
..Order[None] {Commit Event} AmountTotal: 56.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None row@: 0x102e67f40 - 2020-10-05 20:23:26,107 - logic_logger - DEBUG
..Order[None] {Hi, Steven, congratulate Michael on their new order} AmountTotal: 56.0000000000, CustomerId: ALFKI, EmployeeId: 6, Freight: 1, Id: None, OrderDate: None, RequiredDate: None, ShipAddress: None, ShipCity: Richmond, ShipCountry: None, ShipName: None, ShipPostalCode: None, ShipRegion: None, ShipVia: None, ShippedDate: None row@: 0x102e67f40 - 2020-10-05 20:23:26,112 - logic_logger - DEBUG
Logic Phase: FLUSH (sqlalchemy flush processing - 2020-10-05 20:23:26,113 - logic_logger - DEBUG
A singleton with (from the code):
Attributes
_tables Dict[mapped_class_name: str, List[TableRules]]
_metadata, _base, _engine from SQLAlchemy
Where TableRules
is (again, from the code):
Rules and dependencies for a mapped class
attributes
rules : List['AbstractRule'] -- Sums, Constraints, Formulas etc for this mapped class
referring_children : Dict[parent_role_name - str, List[parent_attr_name: str]] - Information driving cascade
Each formula
rule definition has an _exec_order
, which determines
execution order. It is set in
rule_bank_setup.validate()
,
(and yes, detects cycles).
Formula instance execution is pruned if their dependent attributes
are unchanged. This is driven by formula._dependencies
,
which is list of attributes this rule refers to, including parent.attribute.
This is set in the formula constructor which calls
AbstractRule#parse_dependencies
.
The objective here is adjustment processing. Also, consider that a parent might have multiple sums/counts from a single child. An additional objective is that these result in only 1 execution of parent logic.
Note the actual physical SQL is buffered, courtesy SQLAlchemy (recall logic execution occurs in
before_flush
.
Aggregate processing (from the code):
Objective: 1 (one) update per role, for N aggregates along that role.
For each child -> parent role,
For each aggregate along that role
execute sum (etc) logic which set parent_adjustor (as req'd)
use parent_adjustor to save altered parent
ParentRoleAdjustor
is a class (from the LogicRow
code):
Contains current / previous parent_logic_row iff parent needs adjustment,
and method to save_altered_parents.
Instances are passed to <aggregate>.adjust_parent who will set parent row(s) values
iff adjustment is required (e.g., summed value changes, where changes, fk changes, etc)
This ensures only 1 update per set of aggregates along a given role
where save_altered_parent
:
Save (chain) parent iff parent_logic_row has been set by sum/count executor.
Child rows can reference parent attributes,
as in OrderDetail.ShippedDate
, exercised in the
upd_order_shipped test. So,
- if the parent is altered, cascade to children
- advising children that cascade is along role xyz
So, LogicRow.update()
calls cascade_to_children
:
Child Formulas can reference (my) Parent Attributes, so...
If the *referenced* Parent Attributes are changed, cascade to child
Setting update_msg to denote parent_role
This will cause each child to recompute all formulas referencing that role
eg,
OrderDetail.ShippedDate = Order.ShippedDate, so....
Order cascades changed ShippedDate => OrderDetailList
This of course requires we track
the child references to a parent, by role.
We obtain that via
rule_bank_withdraw.get_referring_children
:
def get_referring_children(parent_logic_row: LogicRow) -> dict:
"""
return RulesBank[class_name].referring_children (create if None)
referring_children is <parent_role_name>, parent_attribute_list()
"""
This information is updated into the RuleBank (if not set)
by the method above. The design intent was to build
it when "depositing" rules into the RuleBank, but
get_mapper
requires a mapped instance as input
(unavailable when the RuleBank is being created).
User Project Operations
Logic Bank Internals