Skip to content

Commit

Permalink
Add config for implicit transactions to neo4j extractor (#2258)
Browse files Browse the repository at this point in the history
Signed-off-by: Kristen Armes <[email protected]>
  • Loading branch information
kristenarmes authored Aug 14, 2024
1 parent 8a6b639 commit 96f642f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
14 changes: 12 additions & 2 deletions databuilder/databuilder/extractor/neo4j_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ class Neo4jExtractor(Extractor):
"""NEO4J_ENCRYPTED is a boolean indicating whether to use SSL/TLS when connecting."""
NEO4J_VALIDATE_SSL = 'neo4j_validate_ssl'
"""NEO4J_VALIDATE_SSL is a boolean indicating whether to validate the server's SSL/TLS cert against system CAs."""
NEO4J_USE_IMPLICIT_TRANSACTIONS = 'neo4j_use_implicit_transactions'
"""NEO4J_USE_IMPLICIT_TRANSACTIONS is a boolean indicating whether to use implicit or explicit transactions. This
is only needed when implicit transactions are required, such as for CALL {} IN TRANSACTIONS queries."""

DEFAULT_CONFIG = ConfigFactory.from_dict({
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
NEO4J_DATABASE_NAME: neo4j.DEFAULT_DATABASE,
NEO4J_USE_IMPLICIT_TRANSACTIONS: False,
})

def init(self, conf: ConfigTree) -> None:
Expand All @@ -50,6 +54,7 @@ def init(self, conf: ConfigTree) -> None:
self.graph_url = self.conf.get_string(Neo4jExtractor.GRAPH_URL_CONFIG_KEY)
self.cypher_query = self.conf.get_string(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY)
self.db_name = self.conf.get_string(Neo4jExtractor.NEO4J_DATABASE_NAME)
self.use_implicit_transactions = self.conf.get(Neo4jExtractor.NEO4J_USE_IMPLICIT_TRANSACTIONS)

uri = self.conf.get_string(Neo4jExtractor.GRAPH_URL_CONFIG_KEY)
driver_args = {
Expand Down Expand Up @@ -107,10 +112,15 @@ def _get_extract_iter(self) -> Iterator[Any]:
Execute {cypher_query} and yield result one at a time
"""
with self.driver.session(
database=self.db_name
database=self.db_name,
default_access_mode=neo4j.READ_ACCESS
) as session:
if not hasattr(self, 'results'):
self.results = session.read_transaction(self._execute_query)
if not self.use_implicit_transactions:
self.results = session.read_transaction(self._execute_query)
else:
LOGGER.info('Executing query in implicit transaction %s', self.cypher_query)
self.results = session.run(self.cypher_query).data()

for result in self.results:
if hasattr(self, 'model_class'):
Expand Down
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

__version__ = '7.5.0'
__version__ = '7.5.1'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'requirements.txt')
Expand Down

0 comments on commit 96f642f

Please sign in to comment.