Skip to content

Commit

Permalink
feat: Level-wise BOM cost updation
Browse files Browse the repository at this point in the history
- Process BOMs level wise and Pause after level is complete
- Cron job will resume Paused jobs, which will again process the new level and pause at the end
- This will go on until all BOMs are updated
- Added Progress section with fields to track updated BOMs in Log
- Cleanup: Add BOM Updation utils file to contain helper functions/sub-functions
- Cleanup: BOM Update Log file will only contain functions that are in direct context of the Log
  • Loading branch information
marination committed May 23, 2022
1 parent 90d4dc0 commit e46e9f9
Show file tree
Hide file tree
Showing 6 changed files with 335 additions and 199 deletions.
5 changes: 4 additions & 1 deletion erpnext/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,12 @@

scheduler_events = {
"cron": {
"0/5 * * * *": [
"erpnext.manufacturing.doctype.bom_update_log.bom_update_log.resume_bom_cost_update_jobs",
],
"0/30 * * * *": [
"erpnext.utilities.doctype.video.video.update_youtube_data",
]
],
},
"all": [
"erpnext.projects.doctype.project.project.project_status_update_reminder",
Expand Down
29 changes: 27 additions & 2 deletions erpnext/manufacturing/doctype/bom_update_log/bom_update_log.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"update_type",
"status",
"error_log",
"progress_section",
"current_boms",
"parent_boms",
"processed_boms",
"amended_from"
],
"fields": [
Expand Down Expand Up @@ -47,7 +51,7 @@
"fieldname": "status",
"fieldtype": "Select",
"label": "Status",
"options": "Queued\nIn Progress\nCompleted\nFailed"
"options": "Queued\nIn Progress\nPaused\nCompleted\nFailed"
},
{
"fieldname": "amended_from",
Expand All @@ -63,13 +67,34 @@
"fieldtype": "Link",
"label": "Error Log",
"options": "Error Log"
},
{
"fieldname": "progress_section",
"fieldtype": "Section Break",
"label": "Progress"
},
{
"fieldname": "current_boms",
"fieldtype": "Text",
"label": "Current BOMs"
},
{
"description": "Immediate parent BOMs",
"fieldname": "parent_boms",
"fieldtype": "Text",
"label": "Parent BOMs"
},
{
"fieldname": "processed_boms",
"fieldtype": "Text",
"label": "Processed BOMs"
}
],
"in_create": 1,
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
"modified": "2022-03-31 12:51:44.885102",
"modified": "2022-05-23 14:42:14.725914",
"modified_by": "Administrator",
"module": "Manufacturing",
"name": "BOM Update Log",
Expand Down
169 changes: 76 additions & 93 deletions erpnext/manufacturing/doctype/bom_update_log/bom_update_log.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from typing import Dict, List, Literal, Optional
import json
from typing import Dict, Optional

import frappe
from frappe import _
from frappe.model.document import Document
from frappe.utils import cstr, flt
from frappe.utils import cstr

from erpnext.manufacturing.doctype.bom_update_tool.bom_update_tool import update_cost
from erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils import (
get_leaf_boms,
handle_exception,
replace_bom,
set_values_in_log,
)


class BOMMissingError(frappe.ValidationError):
Expand Down Expand Up @@ -49,116 +55,93 @@ def on_submit(self):
if self.update_type == "Replace BOM":
boms = {"current_bom": self.current_bom, "new_bom": self.new_bom}
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_replace_bom_job",
doc=self,
boms=boms,
timeout=40000,
)
else:
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_update_log.run_bom_job",
doc=self,
update_type="Update Cost",
timeout=40000,
)


def replace_bom(boms: Dict) -> None:
"""Replace current BOM with new BOM in parent BOMs."""
current_bom = boms.get("current_bom")
new_bom = boms.get("new_bom")

unit_cost = get_new_bom_unit_cost(new_bom)
update_new_bom_in_bom_items(unit_cost, current_bom, new_bom)

frappe.cache().delete_key("bom_children")
parent_boms = get_parent_boms(new_bom)

for bom in parent_boms:
bom_obj = frappe.get_doc("BOM", bom)
# this is only used for versioning and we do not want
# to make separate db calls by using load_doc_before_save
# which proves to be expensive while doing bulk replace
bom_obj._doc_before_save = bom_obj
bom_obj.update_exploded_items()
bom_obj.calculate_cost()
bom_obj.update_parent_cost()
bom_obj.db_update()
if bom_obj.meta.get("track_changes") and not bom_obj.flags.ignore_version:
bom_obj.save_version()


def update_new_bom_in_bom_items(unit_cost: float, current_bom: str, new_bom: str) -> None:
bom_item = frappe.qb.DocType("BOM Item")
(
frappe.qb.update(bom_item)
.set(bom_item.bom_no, new_bom)
.set(bom_item.rate, unit_cost)
.set(bom_item.amount, (bom_item.stock_qty * unit_cost))
.where(
(bom_item.bom_no == current_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM")
)
).run()


def get_parent_boms(new_bom: str, bom_list: Optional[List] = None) -> List:
bom_list = bom_list or []
bom_item = frappe.qb.DocType("BOM Item")

parents = (
frappe.qb.from_(bom_item)
.select(bom_item.parent)
.where((bom_item.bom_no == new_bom) & (bom_item.docstatus < 2) & (bom_item.parenttype == "BOM"))
.run(as_dict=True)
)

for d in parents:
if new_bom == d.parent:
frappe.throw(_("BOM recursion: {0} cannot be child of {1}").format(new_bom, d.parent))

bom_list.append(d.parent)
get_parent_boms(d.parent, bom_list)

return list(set(bom_list))


def get_new_bom_unit_cost(new_bom: str) -> float:
bom = frappe.qb.DocType("BOM")
new_bom_unitcost = (
frappe.qb.from_(bom).select(bom.total_cost / bom.quantity).where(bom.name == new_bom).run()
)
process_boms_cost_level_wise(self)

return flt(new_bom_unitcost[0][0])


def run_bom_job(
def run_replace_bom_job(
doc: "BOMUpdateLog",
boms: Optional[Dict[str, str]] = None,
update_type: Literal["Replace BOM", "Update Cost"] = "Replace BOM",
) -> None:
try:
doc.db_set("status", "In Progress")

if not frappe.flags.in_test:
frappe.db.commit()

frappe.db.auto_commit_on_many_writes = 1

boms = frappe._dict(boms or {})

if update_type == "Replace BOM":
replace_bom(boms)
else:
update_cost()
replace_bom(boms)

doc.db_set("status", "Completed")

except Exception:
frappe.db.rollback()
error_log = doc.log_error("BOM Update Tool Error")

doc.db_set("status", "Failed")
doc.db_set("error_log", error_log.name)

handle_exception(doc)
finally:
frappe.db.auto_commit_on_many_writes = 0
frappe.db.commit() # nosemgrep


def process_boms_cost_level_wise(update_doc: "BOMUpdateLog") -> None:
"Queue jobs at the start of new BOM Level in 'Update Cost' Jobs."

current_boms, parent_boms = {}, []
values = {}

if update_doc.status == "Queued":
# First level yet to process. On Submit.
current_boms = {bom: False for bom in get_leaf_boms()}
values = {
"current_boms": json.dumps(current_boms),
"parent_boms": "[]",
"processed_boms": json.dumps({}),
"status": "In Progress",
}
else:
# status is Paused, resume. via Cron Job.
current_boms, parent_boms = json.loads(update_doc.current_boms), json.loads(
update_doc.parent_boms
)
if not current_boms:
# Process the next level BOMs. Stage parents as current BOMs.
current_boms = {bom: False for bom in parent_boms}
values = {
"current_boms": json.dumps(current_boms),
"parent_boms": "[]",
"status": "In Progress",
}

set_values_in_log(update_doc.name, values, commit=True)
queue_bom_cost_jobs(current_boms, update_doc)


def queue_bom_cost_jobs(current_boms: Dict, update_doc: "BOMUpdateLog") -> None:
"Queue batches of 20k BOMs of the same level to process parallelly"
current_boms_list = [bom for bom in current_boms]

while current_boms_list:
boms_to_process = current_boms_list[:20000] # slice out batch of 20k BOMs

# update list to exclude 20K (queued) BOMs
current_boms_list = current_boms_list[20000:] if len(current_boms_list) > 20000 else []
frappe.enqueue(
method="erpnext.manufacturing.doctype.bom_update_log.bom_updation_utils.update_cost_in_level",
doc=update_doc,
bom_list=boms_to_process,
timeout=40000,
)


def resume_bom_cost_update_jobs():
"Called every 10 minutes via Cron job."
paused_jobs = frappe.db.get_all("BOM Update Log", {"status": "Paused"})
if not paused_jobs:
return

for job in paused_jobs:
# resume from next level
process_boms_cost_level_wise(update_doc=frappe.get_doc("BOM Update Log", job.name))
Loading

0 comments on commit e46e9f9

Please sign in to comment.