diff --git a/edi_exchange_type_auto/README.rst b/edi_exchange_type_auto/README.rst new file mode 100644 index 000000000..a74415d00 --- /dev/null +++ b/edi_exchange_type_auto/README.rst @@ -0,0 +1 @@ +wait for the bot diff --git a/edi_exchange_type_auto/__init__.py b/edi_exchange_type_auto/__init__.py new file mode 100644 index 000000000..0650744f6 --- /dev/null +++ b/edi_exchange_type_auto/__init__.py @@ -0,0 +1 @@ +from . import models diff --git a/edi_exchange_type_auto/__manifest__.py b/edi_exchange_type_auto/__manifest__.py new file mode 100644 index 000000000..edafe3a85 --- /dev/null +++ b/edi_exchange_type_auto/__manifest__.py @@ -0,0 +1,20 @@ +# Copyright 2022 Camptocamp SA +# @author: Simone Orsi +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +{ + "name": "EDI Exchange type automation", + "summary": """ + Ease automation of EDI exchange types within the EDI framework. + """, + "version": "14.0.1.0.0", + "development_status": "Alpha", + "license": "AGPL-3", + "website": "https://github.com/OCA/edi-framework", + "author": "Camptocamp,Odoo Community Association (OCA)", + "maintainers": ["simahawk"], + "depends": ["edi_oca"], + "data": [ + "views/edi_exchange_type_rule.xml", + ], +} diff --git a/edi_exchange_type_auto/models/__init__.py b/edi_exchange_type_auto/models/__init__.py new file mode 100644 index 000000000..6450b2448 --- /dev/null +++ b/edi_exchange_type_auto/models/__init__.py @@ -0,0 +1,2 @@ +from . import edi_auto_exchange_consumer_mixin +from . import edi_exchange_type_rule diff --git a/edi_exchange_type_auto/models/edi_auto_exchange_consumer_mixin.py b/edi_exchange_type_auto/models/edi_auto_exchange_consumer_mixin.py new file mode 100644 index 000000000..efc3022e6 --- /dev/null +++ b/edi_exchange_type_auto/models/edi_auto_exchange_consumer_mixin.py @@ -0,0 +1,475 @@ +# Copyright 2022 Camptocamp SA +# @author Simone Orsi +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import logging + +from odoo import api, models +from odoo.tools import frozendict, safe_eval + +_logger = logging.getLogger("edi_exchange_auto.mixin") + + +class EDIAutoExchangeConsumerMixin(models.AbstractModel): + """Enhance edi.exchange.consumer.mixin behavior to automatize actions.""" + + _name = "edi.auto.exchange.consumer.mixin" + _inherit = "edi.exchange.consumer.mixin" + _description = __doc__ + + @api.model + def _edi_get_exchange_type_rule_conf(self, rule): + conf = super()._edi_get_exchange_type_rule_conf(rule) + if rule.kind == "auto": + conf.update({"auto": rule.auto_conf or {}}) + return conf + + """Disable automatic EDI programmatically on models. + """ # pylint: disable=pointless-string-statement + _edi_no_auto_for_operation = ( + # "create", + # "write", + # "unlink", + ) + + @api.model_create_multi + def create(self, vals_list): + records = super().create(vals_list) + todo = None + operation = "create" + candidates = self.browse() + for rec in records: + if not rec._edi_auto_skip(operation): + candidates |= rec + if candidates: + todo = candidates._edi_auto_collect_todo(operation, vals_list) + if todo: + # TODO: schedule call on post commit + candidates._edi_auto_handle(todo) + return rec + + def write(self, vals): + todo = None + operation = "write" + candidates = self.browse() + for rec in self: + if not rec._edi_auto_skip(operation): + candidates |= rec + if candidates: + todo_vals = [vals.copy() for x in candidates] + todo = candidates._edi_auto_collect_todo(operation, todo_vals) + res = super().write(vals) + if todo: + candidates._edi_auto_handle(todo) + return res + + # TODO + # def unlink(self): + + def _edi_auto_skip(self, operation): + skip_reason = None + if self.env.context.get("edi__skip_auto_handle"): + skip_reason = "edi__skip_auto_handle ctx key found" + elif operation in self._edi_no_auto_for_operation: + skip_reason = f"{operation} disabled attr _edi_no_auto_for_operation" + elif self.disable_edi_auto: + skip_reason = f"EDI auto disabled for rec={self.id}" + if skip_reason: + self._edi_auto_log_skip(operation, skip_reason) + return True + return False + + def _edi_auto_collect_todo(self, operation, new_vals_list): + """Generate list of automatic actions to do. + + :param operation: valid edi action (see ``edi.backend._is_valid_edi_action``) + :param new_vals: list of new values for current record(s) + """ + res = [] + # Example of configuration on type rule `auto_conf_edit`: + # actions: + # generate: + # when: + # - create + # - write + # - unlink + # if: + # - domain: $domain TODO: support domain + # - callable: $callable_on_model + # - snippet: $code + # event_only: false + # force: true + # target_record: order_id + # trigger_fields: + # - state + # - order_line + # tracked_fields: + # - state + # - expected_date + # + rec_by_type = self._edi_auto_collect_records_by_type(operation, new_vals_list) + for exc_type, data in rec_by_type.items(): + conf = data["conf"] + if not exc_type.backend_id: + # TODO: add validation on adv settings? + skip_reason = f"Backend required, not set on type={exc_type.code}" + self._edi_auto_log_skip(operation, skip_reason, exc_type=exc_type) + continue + for action_name, action_conf in conf.get("actions", {}).items(): + try: + todo = self._edi_auto_collect_todo_for_action( + operation, exc_type, action_name, action_conf, data + ) + res.extend(todo) + except EDIAutoSkipException as exc: + self._edi_auto_log_skip( + exc.operation, exc.reason, exc_type=exc.exc_type + ) + return res + + def _edi_auto_collect_todo_for_action( + self, operation, exc_type, action_name, action_conf, data + ): + backend = exc_type.backend_id + skip_reason = None + if not backend._is_valid_edi_action(action_name): + skip_reason = f"EDI action not allowed ={action_name}" + if operation not in action_conf.get("when", []): + skip_reason = f"Operation not allowed for action={action_name}" + triggers = action_conf.get("trigger_fields", []) + if not triggers: + skip_reason = f"No trigger set for action={action_name}" + if skip_reason: + raise EDIAutoSkipException(operation, skip_reason, exc_type=exc_type) + tracked = action_conf.get("tracked_fields", []) + trigger = None + for k in triggers: + if k in data["new_vals"]: + trigger = k + break + if not trigger: + return [] + if trigger not in tracked: + tracked.append(trigger) + checker = None + if action_conf.get("if", {}).get("callable"): + callable_name = action_conf["if"]["callable"] + try: + checker = getattr(self, callable_name) + except AttributeError: + skip_reason = f"Invalid callable={callable_name}" + raise EDIAutoSkipException(operation, skip_reason, exc_type=exc_type) + snippet = action_conf.get("if", {}).get("snippet") + new_vals = frozendict( + {k: v for k, v in data["new_vals"].items() if k in tracked} + ) + # TODO: group by record in case `target_record` is different + # or let it trigger N exchanges for sub models? + res = [] + for rec in data["records"]: + old_vals = frozendict({k: rec[k] for k in tracked}) + try: + todo = self._edi_auto_collect_todo_for_action_for_record( + operation, + exc_type, + action_name, + action_conf, + trigger, + new_vals, + old_vals, + rec, + checker=checker, + snippet=snippet, + ) + res.append(todo) + except EDIAutoSkipException as exc: + self._edi_auto_log_skip( + exc.operation, exc.reason, exc_type=exc.exc_type + ) + return res + + def _edi_auto_collect_todo_for_action_for_record( + self, + operation, + exc_type, + action_name, + action_conf, + trigger, + new_vals, + old_vals, + rec, + checker=None, + snippet=None, + ): + target_record = self._edi_auto_get_target_record(rec, action_conf) + todo = self._edi_auto_prepare_info( + edi_type=exc_type, + edi_action=action_name, + conf=action_conf, + triggered_by=trigger, + record=rec, + target_record=target_record, + vals=new_vals, + old_vals=old_vals, + force=action_conf.get("force", False), + event_only=action_conf.get("event_only", False), + ) + if checker and not checker(todo): + skip_reason = f"Checker {checker.__func__.__name__} skip action" + raise EDIAutoSkipException(operation, skip_reason, exc_type=exc_type) + if snippet: + try: + evaluated = self._edi_auto_evaluate_snippet( + snippet, rec, target_record, todo + ) + if not evaluated: + skip_reason = "Snippet skip action" + raise EDIAutoSkipException( + operation, skip_reason, exc_type=exc_type + ) + except ValueError as err: + skip_reason = f"Invalid snippet={snippet}" + raise EDIAutoSkipException( + operation, skip_reason, exc_type=exc_type + ) from err + return todo + + def _edi_auto_collect_records_by_type(self, operation, new_vals_list): + skip_type_rule_ids = set() + skip_rec_ids = set() + # TODO: here we group by type + # but we could have potentially several auto rule for the same type + # into `edi_config` -> control or limit this + rec_by_type = {} + # Make sure config is freshly computed + self.invalidate_cache(["edi_config"]) + for rec, new_vals in zip(self, new_vals_list): + if rec.id in skip_rec_ids: + continue + for rule_id, conf in rec.edi_config.items(): + if rule_id in skip_type_rule_ids: + continue + exc_type_rule = self.env["edi.exchange.type.rule"].browse(int(rule_id)) + exc_type = exc_type_rule.type_id + if "partner_id" in rec._fields and not exc_type.is_partner_enabled( + rec.partner_id + ): + skip_rec_ids.add(rec.id) + skip_reason = f"Exchange not enabled for partner on rec={rec.id}" + self._edi_auto_log_skip(operation, skip_reason, exc_type=exc_type) + continue + # Get auto conf for current model + auto_conf = conf.get("auto", {}) + actions = auto_conf.get("actions", {}) + skip_reason = None + if not auto_conf or auto_conf.get("disable"): + skip_reason = "Auto-conf not found or disabled" + elif not actions: + skip_reason = "Auto-conf has no action configured" + if skip_reason: + skip_type_rule_ids.add(rule_id) + self._edi_auto_log_skip(operation, skip_reason, exc_type=exc_type) + continue + if exc_type.id not in rec_by_type: + rec_by_type[exc_type] = { + "conf": auto_conf, + "records": [], + "new_vals": new_vals, + } + rec_by_type[exc_type]["records"].append(rec) + return rec_by_type + + def _edi_auto_snippet_eval_context(self, record, target_record, todo): + """Prepare the context used when evaluating python code + + :returns: dict -- evaluation context given to safe_eval + """ + ctx = { + "uid": self.env.uid, + "user": self.env.user, + "record": record, + "target_record": target_record, + "todo": todo, + } + return ctx + + def _edi_auto_evaluate_snippet(self, code, rec, target_record, todo): + eval_ctx = self._edi_auto_snippet_eval_context(rec, target_record, todo) + safe_eval.safe_eval(code, eval_ctx, mode="exec", nocopy=True) + result = eval_ctx.get("result", False) + if not isinstance(result, bool): + _logger.error("code snippet should return a boolean into `result`") + return {} + return result + + def _edi_auto_log_skip(self, operation, reason, exc_type=None): + log_msg = "Skip model=%(model)s op=%(op)s" + log_args = { + "model": self._name, + "op": operation, + "reason": reason, + } + if exc_type: + log_msg += " type=%(type_code)s" + log_args["type_code"] = exc_type.code + log_msg += ": %(reason)s" + _logger.debug(log_msg, log_args) + + def _edi_auto_get_target_record(self, rec, action_conf): + target_record = rec + mapped = action_conf.get("target_record") + if mapped: + target_record = rec.mapped(mapped) + return target_record + + def _edi_auto_prepare_info(self, **kw): + kw["edi_type_id"] = kw.pop("edi_type").id + record = kw.pop("record") + target_record = kw.pop("target_record") + kw["_records"] = { + "source": { + "model": record._name, + "id": record.id, + }, + "target": { + "model": target_record._name, + "id": target_record.id, + }, + } + # TODO: serialize old_vals in case of relations + return EDIAutoInfo(**kw) + + # TODO: add tests + def _edi_auto_handle(self, todo): + """Handle automatic EDI actions to do. + + :param todo: list of `EDIAutoInfo` objects + """ + for info in todo: + edi_action = info.edi_action + target_record = info.get_target_record(self.env) + if info.event_only: + target_record._edi_auto_trigger_event(target_record, info) + continue + job_options = target_record._edi_auto_handle_job_options(info) + handler = getattr( + target_record.with_delay(**job_options), + "_edi_auto_handle_" + edi_action, + None, + ) + if not handler: + raise NotImplementedError(f"{edi_action} handler not implemented yet") + handler(info.as_dict()) + + def _edi_auto_handle_job_options(self, info): + return {} + + def _edi_auto_handle_generate(self, info_dict): + msg = None + info = EDIAutoInfo.from_dict(info_dict) + target_record = info.get_target_record(self.env) + exc_type = info.get_type(self.env) + created, exchange_record = self._edi_auto_get_or_create_record( + target_record, exc_type + ) + if not created: + # TODO: what if the file has to be updated? + # Nothing to do. Return a nice msg for the job result. + msg = f"Exchange record already exists for type: {exc_type.code}" + _logger.debug(msg) + return msg + msg = ( + f"Exchange record {exchange_record.identifier} created. " + f"Triggered by: {info.triggered_by}" + ) + _logger.debug(msg) + # Trigger event on exchange record + exchange_record._trigger_edi_event("auto_handle_generate", info=info) + # Trigger event on current record + self._edi_auto_trigger_event(target_record, info) + return msg + + def _edi_auto_get_or_create_record(self, target_record, exchange_type): + # TODO: here we must filter acks that are not valued yet. + # We should take control via conf on + # whether the ack has to be generated immediately or not + # by the cron of the backend. + parent = target_record._edi_get_origin() + exchange_record = target_record._get_exchange_record(exchange_type).filtered( + lambda x: not x.exchange_file + ) + created = False + # If the record has not been sent out yet for whatever reason + # (job delayed, job failed, send failed, etc) + # we still want to generate a new up to date record to be sent. + still_pending = exchange_record.edi_exchange_state in ( + "output_pending", + "output_error_on_send", + ) + if not exchange_record or still_pending: + vals = exchange_record._exchange_child_record_values() + vals["parent_id"] = parent.id + # NOTE: to fully automatize this, + # is recommended to enable `quick_exec` on the type + # otherwise records will have to wait for the cron to pass by. + exchange_record = target_record._edi_create_exchange_record( + exchange_type, vals=vals + ) + created = True + return created, exchange_record + + def _edi_auto_trigger_event(self, target_record, info): + self._event(self._edi_auto_make_event_name(info)).notify( + target_record, info=info + ) + + def _edi_auto_make_event_name(self, info): + return f"on_edi_auto_{info.edi_action}" + + +class EDIAutoInfo: + """Serialize-able object holding info on automatic actions and events.""" + + __slots__ = ( + "edi_type_id", + "edi_action", + "conf", + "triggered_by", + "_records", + "vals", + "old_vals", + "force", + "event_only", + ) + + def __init__(self, **kw): + for k, v in kw.items(): + if k in self.__slots__: + setattr(self, k, v) + + def as_dict(self): + return {k: getattr(self, k) for k in self.__slots__} + + @classmethod + def from_dict(cls, vals): + return cls(**vals) + + def get_type(self, env): + return env["edi.exchange.type"].browse(self.edi_type_id) + + def get_record(self, env): + source = self._records["source"] + return env[source["model"]].browse(source["id"]) + + def get_target_record(self, env): + target = self._records["target"] + return env[target["model"]].browse(target["id"]) + + +class EDIAutoSkipException(Exception): + __slots__ = ("operation", "reason", "exc_type") + + def __init__(self, operation, reason, exc_type=None): + self.operation = operation + self.reason = reason + self.exc_type = exc_type diff --git a/edi_exchange_type_auto/models/edi_exchange_type_rule.py b/edi_exchange_type_auto/models/edi_exchange_type_rule.py new file mode 100644 index 000000000..3d6206551 --- /dev/null +++ b/edi_exchange_type_auto/models/edi_exchange_type_rule.py @@ -0,0 +1,30 @@ +# Copyright 2023 Camptocamp SA +# @author Simone Orsi +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl). + +import yaml + +from odoo import api, fields, models + +from odoo.addons.base_sparse_field.models.fields import Serialized + + +class EDIExchangeTypeRule(models.Model): + _inherit = "edi.exchange.type.rule" + + kind = fields.Selection( + selection_add=([("auto", "Auto")]), ondelete={"auto": "set default"} + ) + auto_conf = Serialized(default={}, compute="_compute_auto_conf") + auto_conf_edit = fields.Text() + + @api.depends("auto_conf_edit") + def _compute_auto_conf(self): + for rec in self: + rec.auto_conf = rec._load_auto_conf() + + def _load_auto_conf(self): + # TODO: validate settings w/ a schema. + # Could be done w/ Cerberus or JSON-schema. + # This would help documenting core and custom keys. + return yaml.safe_load(self.auto_conf_edit or "") or {} diff --git a/edi_exchange_type_auto/readme/CONFIGURE.rst b/edi_exchange_type_auto/readme/CONFIGURE.rst new file mode 100644 index 000000000..1333ed77b --- /dev/null +++ b/edi_exchange_type_auto/readme/CONFIGURE.rst @@ -0,0 +1 @@ +TODO diff --git a/edi_exchange_type_auto/readme/CONTRIBUTORS.rst b/edi_exchange_type_auto/readme/CONTRIBUTORS.rst new file mode 100644 index 000000000..f1c71bce1 --- /dev/null +++ b/edi_exchange_type_auto/readme/CONTRIBUTORS.rst @@ -0,0 +1 @@ +* Simone Orsi diff --git a/edi_exchange_type_auto/readme/DESCRIPTION.rst b/edi_exchange_type_auto/readme/DESCRIPTION.rst new file mode 100644 index 000000000..1333ed77b --- /dev/null +++ b/edi_exchange_type_auto/readme/DESCRIPTION.rst @@ -0,0 +1 @@ +TODO diff --git a/edi_exchange_type_auto/readme/ROADMAP.rst b/edi_exchange_type_auto/readme/ROADMAP.rst new file mode 100644 index 000000000..1333ed77b --- /dev/null +++ b/edi_exchange_type_auto/readme/ROADMAP.rst @@ -0,0 +1 @@ +TODO diff --git a/edi_exchange_type_auto/tests/__init__.py b/edi_exchange_type_auto/tests/__init__.py new file mode 100644 index 000000000..6267bdd19 --- /dev/null +++ b/edi_exchange_type_auto/tests/__init__.py @@ -0,0 +1 @@ +from . import test_consumer_mixin diff --git a/edi_exchange_type_auto/tests/fake_models.py b/edi_exchange_type_auto/tests/fake_models.py new file mode 100644 index 000000000..d9b84f564 --- /dev/null +++ b/edi_exchange_type_auto/tests/fake_models.py @@ -0,0 +1,24 @@ +# Copyright 2022 Camptocamp SA +# @author Simone Orsi +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from odoo import fields, models + + +class EdiAutoExchangeConsumerTest(models.Model): + _name = "edi.auto.exchange.consumer.test" + _inherit = [ + "edi.auto.exchange.consumer.mixin", + ] + _description = _name + + name = fields.Char() + state = fields.Char() + number = fields.Integer() + partner_id = fields.Many2one("res.partner") + + _edi_test_check_generate_called_with = [] + + def _edi_test_check_generate(self, todo): + self._edi_test_check_generate_called_with.append(todo) + return self.env.context.get("_edi_test_check_generate_pass") diff --git a/edi_exchange_type_auto/tests/test_consumer_mixin.py b/edi_exchange_type_auto/tests/test_consumer_mixin.py new file mode 100644 index 000000000..9090d78e7 --- /dev/null +++ b/edi_exchange_type_auto/tests/test_consumer_mixin.py @@ -0,0 +1,337 @@ +# Copyright 2022 Camptocamp SA +# @author Simone Orsi +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +import textwrap + +import mock +from odoo_test_helper import FakeModelLoader + +from odoo.addons.edi_oca.tests.common import EDIBackendCommonTestCase + + +class TestConsumerAutoMixinCase(EDIBackendCommonTestCase): + @classmethod + def _setup_records(cls): + super()._setup_records() + # Load fake models ->/ + cls.loader = FakeModelLoader(cls.env, cls.__module__) + cls.loader.backup_registry() + from .fake_models import EdiAutoExchangeConsumerTest + + cls.loader.update_registry((EdiAutoExchangeConsumerTest,)) + # ->/ + cls.model = cls.env[EdiAutoExchangeConsumerTest._name] + cls.record = cls.model.with_context(edi__skip_auto_handle=True).create( + {"name": "Test auto"} + ) + cls.auto_exchange_type = cls._create_exchange_type( + name="Test auto output", + code="test_auto_output", + direction="output", + exchange_file_ext="xml", + exchange_filename_pattern="{record.id}.test", + ) + cls.type_rule = cls.env["edi.exchange.type.rule"].create( + { + "name": "Auto test", + "type_id": cls.auto_exchange_type.id, + "model_id": cls.env["ir.model"]._get_id(cls.model._name), + "kind": "auto", + "auto_conf_edit": "", + } + ) + cls.partner1 = cls.env["res.partner"].create({"name": "Avg Customer 1"}) + cls.partner2 = cls.env["res.partner"].create({"name": "Avg Customer 2"}) + + @classmethod + def tearDownClass(cls): + cls.loader.restore_registry() + super().tearDownClass() + + def test_skip(self): + with mock.patch.object( + type(self.model), "_edi_auto_collect_todo" + ) as mocked_collect: + # Skip via ctx key + record = self.model.with_context(edi__skip_auto_handle=True).create( + {"name": "Test auto 2"} + ) + mocked_collect.assert_not_called() + # Skip via class attr + with mock.patch.object( + type(self.model), + "_edi_no_auto_for_operation", + new_callable=mock.PropertyMock, + ) as mocked: + mocked.return_value = ("create", "write") + record = self.model.create({"name": "Test auto 2"}) + mocked_collect.assert_not_called() + vals = {"name": "New name"} + # Write is not allowed + record.write(vals) + mocked_collect.assert_not_called() + # Allow it + mocked.return_value = ("create",) + record.write(vals) + mocked_collect.assert_called_with("write", [vals]) + + def test_no_conf_no_trigger(self): + with mock.patch.object( + type(self.model), "_edi_auto_trigger_event" + ) as mocked_trigger: + record = self.model.create({"name": "Test auto 2"}) + mocked_trigger.assert_not_called() + vals = {"name": "New name"} + record.write(vals) + mocked_trigger.assert_not_called() + + def test_no_conf_no_trigger2(self): + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_trigger_event" + ) as mocked_trigger: + record = self.model.create({"name": "Test auto 2"}) + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Auto-conf not found or disabled" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + mocked_trigger.assert_not_called() + vals = {"name": "New name"} + record.write(vals) + self.assertEqual(watcher.output[1], expected_msg % "write") + mocked_trigger.assert_not_called() + + def test_conf_disable_no_trigger(self): + self.type_rule.auto_conf_edit = textwrap.dedent("disable: true") + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_trigger_event" + ) as mocked_trigger: + record = self.model.create({"name": "Test auto 2"}) + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Auto-conf not found or disabled" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + mocked_trigger.assert_not_called() + vals = {"name": "New name"} + record.write(vals) + self.assertEqual(watcher.output[1], expected_msg % "write") + mocked_trigger.assert_not_called() + + def test_edi_disable_flag_no_trigger(self): + self.type_rule.auto_conf_edit = textwrap.dedent( + """ + when: + - write + """ + ) + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_trigger_event" + ) as mocked_trigger: + record = self.model.create( + {"name": "Test auto 2", "disable_edi_auto": True} + ) + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s: EDI auto disabled for rec={record.id}" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + mocked_trigger.assert_not_called() + vals = {"name": "New name"} + record.write(vals) + self.assertEqual(watcher.output[1], expected_msg % "write") + mocked_trigger.assert_not_called() + + def test_conf_no_action_no_trigger(self): + self.type_rule.auto_conf_edit = textwrap.dedent( + """ + actions: + """ + ) + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_trigger_event" + ) as mocked_trigger: + record = self.model.create({"name": "Test auto 2"}) + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Auto-conf has no action configured" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + mocked_trigger.assert_not_called() + vals = {"name": "New name"} + record.write(vals) + self.assertEqual(watcher.output[1], expected_msg % "write") + mocked_trigger.assert_not_called() + + def test_conf_when_trigger(self): + self.type_rule.auto_conf_edit = textwrap.dedent( + """ + actions: + generate: + when: + - create + trigger_fields: + - name + """ + ) + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_handle" + ) as mocked_handler: + record = self.model.create({"name": "Test auto 2"}) + mocked_handler.assert_called() + mocked_handler.reset_mock() + vals = {"name": "New name"} + record.write(vals) + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Operation not allowed for action=generate" + ) + self.assertEqual(watcher.output[0], expected_msg % "write") + mocked_handler.assert_not_called() + + def test_conf_if_trigger_callable(self): + self.type_rule.auto_conf_edit = textwrap.dedent( + """ + actions: + generate: + when: + - create + trigger_fields: + - name + if: + callable: _edi_test_check_generate + """ + ) + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_handle" + ) as mocked_handler: + record = self.model.with_context( + _edi_test_check_generate_pass=True + ).create({"name": "Test auto 2"}) + info = record._edi_test_check_generate_called_with.pop() + self.assertEqual( + info.as_dict(), + { + "edi_type_id": self.auto_exchange_type.id, + "edi_action": "generate", + "conf": { + "when": ["create"], + "trigger_fields": ["name"], + "if": {"callable": "_edi_test_check_generate"}, + }, + "triggered_by": "name", + "_records": { + "source": { + "model": "edi.auto.exchange.consumer.test", + "id": record.id, + }, + "target": { + "model": "edi.auto.exchange.consumer.test", + "id": record.id, + }, + }, + "vals": {"name": "Test auto 2"}, + "old_vals": {"name": "Test auto 2"}, + "force": False, + "event_only": False, + }, + ) + mocked_handler.assert_called() + mocked_handler.reset_mock() + record = self.model.with_context( + _edi_test_check_generate_pass=False + ).create({"name": "Test auto 3"}) + mocked_handler.assert_not_called() + info = record._edi_test_check_generate_called_with.pop() + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Checker _edi_test_check_generate skip action" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + + def test_conf_if_trigger_snippet(self): + self.type_rule.auto_conf_edit = textwrap.dedent( + """ + actions: + generate: + when: + - create + trigger_fields: + - name + if: + snippet: result = todo.vals.get('name') == 'Test auto 2' + """ + ) + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_handle" + ) as mocked_handler: + self.model.create({"name": "Test auto 2"}) + mocked_handler.assert_called() + mocked_handler.reset_mock() + self.model.create({"name": "Test auto 3"}) + mocked_handler.assert_not_called() + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Snippet skip action" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + + def test_conf_skip_partner(self): + self.type_rule.auto_conf_edit = textwrap.dedent( + """ + actions: + generate: + when: + - create + trigger_fields: + - name + """ + ) + self.auto_exchange_type.partner_ids += self.partner2 + with self.assertLogs("edi_exchange_auto", level="DEBUG") as watcher: + with mock.patch.object( + type(self.model), "_edi_auto_handle" + ) as mocked_handler: + record = self.model.create( + {"name": "Test auto 2", "partner_id": self.partner1.id} + ) + expected_msg = ( + f"DEBUG:edi_exchange_auto.mixin:" + f"Skip model={self.model._name} " + f"op=%s " + f"type={self.auto_exchange_type.code}: " + f"Exchange not enabled for partner on rec={record.id}" + ) + self.assertEqual(watcher.output[0], expected_msg % "create") + mocked_handler.assert_not_called() + mocked_handler.reset_mock() + self.model.create( + {"name": "Test auto 3", "partner_id": self.partner2.id} + ) + mocked_handler.assert_called() diff --git a/edi_exchange_type_auto/views/edi_exchange_type_rule.xml b/edi_exchange_type_auto/views/edi_exchange_type_rule.xml new file mode 100644 index 000000000..7e83f295d --- /dev/null +++ b/edi_exchange_type_auto/views/edi_exchange_type_rule.xml @@ -0,0 +1,19 @@ + + + + edi.exchange.type.rule + + + + + + + + + + diff --git a/setup/edi_exchange_type_auto/odoo/addons/edi_exchange_type_auto b/setup/edi_exchange_type_auto/odoo/addons/edi_exchange_type_auto new file mode 120000 index 000000000..ba87417a2 --- /dev/null +++ b/setup/edi_exchange_type_auto/odoo/addons/edi_exchange_type_auto @@ -0,0 +1 @@ +../../../../edi_exchange_type_auto \ No newline at end of file diff --git a/setup/edi_exchange_type_auto/setup.py b/setup/edi_exchange_type_auto/setup.py new file mode 100644 index 000000000..28c57bb64 --- /dev/null +++ b/setup/edi_exchange_type_auto/setup.py @@ -0,0 +1,6 @@ +import setuptools + +setuptools.setup( + setup_requires=['setuptools-odoo'], + odoo_addon=True, +)