Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve parser performance #318

Merged
merged 25 commits into from
Jan 17, 2024
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
589b6d9
refactor: mark private function with _
eric-nguyen-cs Dec 8, 2023
ae5af55
refactor(parser): add type annotations and clean up code
eric-nguyen-cs Dec 8, 2023
cef659a
chore: use context manager to close session in tests
eric-nguyen-cs Dec 8, 2023
802091d
chore: update neo4j and Makefile
eric-nguyen-cs Dec 15, 2023
30aa918
refactor: create parser specific directory
eric-nguyen-cs Dec 15, 2023
0c9c856
refactor: start taxonomy_parser by copying parser file
eric-nguyen-cs Dec 20, 2023
f26f256
refactor: move logger to separate file
eric-nguyen-cs Dec 20, 2023
13cfce2
refactor: remove unnecessary code for taxonomy parser
eric-nguyen-cs Dec 20, 2023
3410ad0
feat: update TaxonomyParser to return taxonomy class
eric-nguyen-cs Dec 20, 2023
6674876
feat: update parser to use taxonomy parser
eric-nguyen-cs Dec 20, 2023
cd1e288
chore: update tests for new taxonomy parser
eric-nguyen-cs Dec 21, 2023
1540015
Merge branch 'main' into ericn/decouple-parser-and-db-writer
eric-nguyen-cs Dec 21, 2023
1098bc7
fix: remove multi_label for single project_label
eric-nguyen-cs Dec 20, 2023
3e88470
feat: improve node creation performance
eric-nguyen-cs Dec 20, 2023
6d1eda1
feat: add node id index to improve search query performance
eric-nguyen-cs Dec 20, 2023
faea1a1
feat: improve previous link creation performance
eric-nguyen-cs Dec 20, 2023
2b4f879
feat: improve child link creation performance
eric-nguyen-cs Dec 20, 2023
095b09a
feat: group queries into transaction
eric-nguyen-cs Dec 20, 2023
c7e03e0
chore: update logging info and add timing info
eric-nguyen-cs Dec 20, 2023
c0f6f50
fix: add db name to sessions
eric-nguyen-cs Dec 20, 2023
bdfb252
refactor: move ellipsis func to logger class
eric-nguyen-cs Dec 20, 2023
94bfa7d
fix: stop id index creation if index exists
eric-nguyen-cs Dec 21, 2023
f1e936d
fix: resolve comments
eric-nguyen-cs Jan 12, 2024
cf3a993
fix: resolve comments
eric-nguyen-cs Jan 17, 2024
572fc78
Merge branch 'main' into ericn/improve-parser-performance
eric-nguyen-cs Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 64 additions & 54 deletions parser/openfoodfacts_taxonomy_parser/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,33 @@ def _create_other_node(self, tx: Transaction, node_data: NodeData, project_label
SET n.src_position = $src_position
"""
if node_data.get_node_type() == NodeType.TEXT:
id_query = f" CREATE (n:{project_label}:TEXT) \n "
id_query = f"CREATE (n:{project_label}:TEXT) \n"
elif node_data.get_node_type() == NodeType.SYNONYMS:
id_query = f" CREATE (n:{project_label}:SYNONYMS) \n "
id_query = f"CREATE (n:{project_label}:SYNONYMS) \n"
elif node_data.get_node_type() == NodeType.STOPWORDS:
id_query = f" CREATE (n:{project_label}:STOPWORDS) \n "
id_query = f"CREATE (n:{project_label}:STOPWORDS) \n"
else:
raise ValueError(f"ENTRY node type should be batched")
raise ValueError(f"ENTRY nodes should not be passed to this function")

entry_query = ""
for key in node_data.tags:
entry_query += " SET n." + key + " = $" + key + "\n"
entry_queries = [f"SET n.{key} = ${key}" for key in node_data.tags]
entry_query = "\n".join(entry_queries) + "\n"

query = id_query + entry_query + position_query
tx.run(query, node_data.to_dict())

def _create_other_nodes(self, other_nodes: list[NodeData], project_label: str):
"""Create all TEXT, SYNONYMS and STOPWORDS nodes"""
self.parser_logger.info("Creating TEXT, SYNONYMS and STOPWORDS nodes")
start_time = timeit.default_timer()

with self.session.begin_transaction() as tx:
for node in other_nodes:
self._create_other_node(tx, node, project_label)

self.parser_logger.info(
f"Created {len(other_nodes)} TEXT, SYNONYMS and STOPWORDS nodes in {timeit.default_timer() - start_time} seconds"
)

def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str):
"""Create all ENTRY nodes in a single batch query"""
self.parser_logger.info("Creating ENTRY nodes")
Expand All @@ -66,20 +78,21 @@ def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str):
SET n.src_position = entry_node.src_position
SET n.main_language = entry_node.main_language
"""
additional_query = ""
seen_properties = set()
seen_tags = set()

# we don't know in advance which properties and tags
# we will encounter in the batch
# so we accumulate them in this set
seen_properties_and_tags = set()

for entry_node in entry_nodes:
if entry_node.get_node_type() != NodeType.ENTRY:
raise ValueError(f"Only ENTRY nodes can be batched")
for key in entry_node.properties:
if not key in seen_properties:
additional_query += " SET n." + key + " = entry_node." + key + "\n"
seen_properties.add(key)
for key in entry_node.tags:
if not key in seen_tags:
additional_query += " SET n." + key + " = entry_node." + key + "\n"
seen_tags.add(key)
raise ValueError(f"Only ENTRY nodes should be passed to this function")
seen_properties_and_tags.update(entry_node.tags)
seen_properties_and_tags.update(entry_node.properties)

additional_query = "\n" + "\n".join(
[f"SET n.{key} = entry_node.{key}" for key in seen_properties_and_tags]
)

query = base_query + additional_query
self.session.run(query, entry_nodes=[entry_node.to_dict() for entry_node in entry_nodes])
eric-nguyen-cs marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -88,24 +101,13 @@ def _create_entry_nodes(self, entry_nodes: list[NodeData], project_label: str):
f"Created {len(entry_nodes)} ENTRY nodes in {timeit.default_timer() - start_time} seconds"
)

def _create_other_nodes(self, other_nodes: list[NodeData], project_label: str):
"""Create all TEXT, SYNONYMS and STOPWORDS nodes"""
self.parser_logger.info("Creating TEXT, SYNONYMS and STOPWORDS nodes")
start_time = timeit.default_timer()

with self.session.begin_transaction() as tx:
for node in other_nodes:
self._create_other_node(tx, node, project_label)

self.parser_logger.info(
f"Created {len(other_nodes)} TEXT, SYNONYMS and STOPWORDS nodes in {timeit.default_timer() - start_time} seconds"
)

def _create_previous_links(self, previous_links: list[PreviousLink], project_label: str):
"""Create the 'is_before' relations between nodes"""
self.parser_logger.info("Creating 'is_before' links")
start_time = timeit.default_timer()

# The previous links creation is batched in a single query
# We also use the ID index to speed up the MATCH queries
query = f"""
eric-nguyen-cs marked this conversation as resolved.
Show resolved Hide resolved
UNWIND $previous_links as previous_link
MATCH(n:{project_label}) USING INDEX n:{project_label}(id)
Expand Down Expand Up @@ -137,6 +139,8 @@ def _create_child_links(
start_time = timeit.default_timer()

node_ids = set([node.id for node in entry_nodes])
# we collect nodes with a parent id which is the id of an entry (normalised)
# and nodes where a synonym was used to designate the parent (unnormalised)
normalised_child_links = []
unnormalised_child_links = []
for child_link in child_links:
Expand All @@ -145,6 +149,7 @@ def _create_child_links(
else:
unnormalised_child_links.append(child_link)

# adding normalised links is easy as we can directly match parent entries
normalised_query = f"""
eric-nguyen-cs marked this conversation as resolved.
Show resolved Hide resolved
UNWIND $normalised_child_links as child_link
MATCH (p:{project_label}) USING INDEX p:{project_label}(id)
Expand All @@ -157,27 +162,31 @@ def _create_child_links(
RETURN COUNT(relation)
"""

language_codes = set()
# for unnormalised links, we need to group them by language code of the parent id
lc_child_links_map = {}
eric-nguyen-cs marked this conversation as resolved.
Show resolved Hide resolved
for child_link in unnormalised_child_links:
lc, parent_id = child_link["parent_id"].split(":")
language_codes.add(lc)
child_link["parent_id"] = parent_id
if lc not in lc_child_links_map:
lc_child_links_map[lc] = []
lc_child_links_map[lc].append(child_link)

# we create a query for each language code
lc_queries = []
for lc, lc_child_links in lc_child_links_map.items():
lc_query = f"""
UNWIND $lc_child_links as child_link
MATCH (p:{project_label})
WHERE child_link.parent_id IN p.tags_ids_{lc}
MATCH (c:{project_label}) USING INDEX c:{project_label}(id)
WHERE c.id = child_link.id
CREATE (c)-[relations:is_child_of]->(p)
WITH relations
UNWIND relations AS relation
RETURN COUNT(relation)
"""
lc_queries.append((lc_query, lc_child_links))

parent_id_query = " OR ".join(
[f"child_link.parent_id IN p.tags_ids_{lc}" for lc in language_codes]
)

unnormalised_query = f"""
UNWIND $unnormalised_child_links as child_link
MATCH (p:{project_label})
WHERE {parent_id_query}
MATCH (c:{project_label}) USING INDEX c:{project_label}(id)
WHERE c.id = child_link.id
CREATE (c)-[relations:is_child_of]->(p)
WITH relations
UNWIND relations AS relation
RETURN COUNT(relation)
"""
count = 0

if normalised_child_links:
Expand All @@ -186,11 +195,10 @@ def _create_child_links(
)
count += normalised_result.value()[0]

if unnormalised_child_links:
unnormalised_result = self.session.run(
unnormalised_query, unnormalised_child_links=unnormalised_child_links
)
count += unnormalised_result.value()[0]
if lc_queries:
for lc_query, lc_child_links in lc_queries:
lc_result = self.session.run(lc_query, lc_child_links=lc_child_links)
count += lc_result.value()[0]

self.parser_logger.info(
f"Created {count} 'is_child_of' links in {timeit.default_timer() - start_time} seconds"
Expand Down Expand Up @@ -264,11 +272,13 @@ def _create_node_indexes(self, project_label: str):

def _write_to_database(self, taxonomy: Taxonomy, taxonomy_name: str, branch_name: str):
project_label = self._get_project_name(taxonomy_name, branch_name)
# First create nodes, then create node indexes to accelerate relationship creation, then create relationships
self._create_other_nodes(taxonomy.other_nodes, project_label)
self._create_entry_nodes(taxonomy.entry_nodes, project_label)
self._create_node_indexes(project_label)
self._create_child_links(taxonomy.child_links, taxonomy.entry_nodes, project_label)
self._create_previous_links(taxonomy.previous_links, project_label)
# Lastly create the parsing errors node
self._create_parsing_errors_node(taxonomy_name, branch_name, project_label)

def __call__(self, filename: str, branch_name: str, taxonomy_name: str):
Expand Down
Loading