From b92b37a9a67250b7626be8c0760ec1c278c2a018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 21 Dec 2020 11:08:58 +0100 Subject: [PATCH] Transaction support for MongoDB with Panache --- docs/src/main/asciidoc/mongodb-panache.adoc | 11 ++- .../mongodb-panache-common/deployment/pom.xml | 4 + .../mongodb-panache-common/runtime/pom.xml | 4 + .../panache/runtime/MongoOperations.java | 75 ++++++++++++++++++- .../MongoTransactionException.java | 7 ++ 5 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java diff --git a/docs/src/main/asciidoc/mongodb-panache.adoc b/docs/src/main/asciidoc/mongodb-panache.adoc index e23b2cc07cf49d..be3131029558dc 100644 --- a/docs/src/main/asciidoc/mongodb-panache.adoc +++ b/docs/src/main/asciidoc/mongodb-panache.adoc @@ -652,7 +652,14 @@ quarkus.log.category."io.quarkus.mongodb.panache.runtime".level=DEBUG == Transactions -WARNING: MongoDB offers ACID transactions since version 4.0. MongoDB with Panache doesn't provide support for them. +MongoDB offers ACID transactions since version 4.0. + +To use them with MongoDB with Panache you need to annotate the method that starts the transaction with the `@Transactional` annotation. + +Transactions can be configured via the `@MongoTransactionConfiguration` that allows to specify the read preference, read concern, + and write concern used during the lifespan of the transaction. + +WARNING: Transaction support inside MongoDB with Panache is still experimental. == Custom IDs @@ -886,6 +893,8 @@ public Multi streamPersons() { TIP: `@SseElementType(MediaType.APPLICATION_JSON)` tells RESTEasy to serialize the object in JSON. +WARNING: Transactions are not supported for Reactive Entities and Repositories. + == Mocking === Using the active-record pattern diff --git a/extensions/panache/mongodb-panache-common/deployment/pom.xml b/extensions/panache/mongodb-panache-common/deployment/pom.xml index e00b7bc2c12af1..84697c94276df8 100644 --- a/extensions/panache/mongodb-panache-common/deployment/pom.xml +++ b/extensions/panache/mongodb-panache-common/deployment/pom.xml @@ -37,6 +37,10 @@ io.quarkus quarkus-mongodb-panache-common + + io.quarkus + quarkus-narayana-jta-deployment + io.quarkus diff --git a/extensions/panache/mongodb-panache-common/runtime/pom.xml b/extensions/panache/mongodb-panache-common/runtime/pom.xml index f3e803cac47c5b..03fa31e1295aff 100644 --- a/extensions/panache/mongodb-panache-common/runtime/pom.xml +++ b/extensions/panache/mongodb-panache-common/runtime/pom.xml @@ -43,6 +43,10 @@ quarkus-jackson true + + io.quarkus + quarkus-narayana-jta + org.junit.jupiter junit-jupiter diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java index e40b104f6b7090..ea2acba9913618 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java @@ -14,6 +14,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.mongodb.client.ClientSession; +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; +import io.quarkus.mongodb.panache.transaction.MongoTransactionException; import org.bson.BsonDocument; import org.bson.BsonDocumentWriter; import org.bson.BsonValue; @@ -37,9 +41,17 @@ import io.quarkus.panache.common.Parameters; import io.quarkus.panache.common.Sort; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.transaction.TransactionSynchronizationRegistry; + @SuppressWarnings({ "rawtypes", "unchecked" }) public abstract class MongoOperations { - public final String ID = "_id"; + public static final String ID = "_id"; + public static final Object SESSION_KEY = new Object(); + private static final Logger LOGGER = Logger.getLogger(MongoOperations.class); private final Map defaultDatabaseName = new ConcurrentHashMap<>(); @@ -204,7 +216,12 @@ public MongoDatabase mongoDatabase(Class entityClass) { // Private stuff private void persist(MongoCollection collection, Object entity) { - collection.insertOne(entity); + ClientSession session = getSession(entity); + if (session == null) { + collection.insertOne(entity); + } else { + collection.insertOne(session, entity); + } } private void persist(MongoCollection collection, List entities) { @@ -273,6 +290,60 @@ private BsonDocument getBsonDocument(MongoCollection collection, Object entity) return document; } + private ClientSession getSession(Object entity) { + MongoEntity mongoEntity = entity.getClass().getAnnotation(MongoEntity.class); + InstanceHandle instance = Arc.container() + .instance(TransactionSynchronizationRegistry.class); + if (instance.isAvailable()) { + TransactionSynchronizationRegistry registry = instance.get(); + if (registry.getTransactionStatus() == Status.STATUS_ACTIVE) { + ClientSession clientSession = (ClientSession) registry.getResource(SESSION_KEY); + if(clientSession == null){ + return registerClientSession(mongoEntity, registry); + } + } + } + return null; + } + + private ClientSession registerClientSession(MongoEntity mongoEntity, TransactionSynchronizationRegistry registry) { + System.out.println("Openning MongoDB session"); + TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get(); + MongoClient client = clientFromArc(mongoEntity, MongoClient.class, false); + ClientSession clientSession = client.startSession(); + clientSession.startTransaction();//TODO add txoptions from annotation + registry.putResource(SESSION_KEY, clientSession); + registry.registerInterposedSynchronization(new Synchronization() { + @Override + public void beforeCompletion() { + } + + @Override + public void afterCompletion(int i) { + try { + if (transactionManager.getStatus() == Status.STATUS_ROLLEDBACK) { + System.out.println("!!! MongoDB session rollback"); + try { + clientSession.abortTransaction(); + } finally { + clientSession.close(); + } + } else { + System.out.println("!!! MongoDB session commit"); + try { + clientSession.commitTransaction(); + } finally { + clientSession.close(); + } + } + } catch (SystemException e) { + throw new MongoTransactionException(e); + } + } + }); + return clientSession; + } + private MongoCollection mongoCollection(Object entity) { Class entityClass = entity.getClass(); return mongoCollection(entityClass); diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java new file mode 100644 index 00000000000000..5fdb6e1729bff8 --- /dev/null +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java @@ -0,0 +1,7 @@ +package io.quarkus.mongodb.panache.transaction; + +public class MongoTransactionException extends RuntimeException { + public MongoTransactionException(Exception cause) { + super(cause); + } +}