Skip to content

Commit

Permalink
Transaction support for MongoDB with Panache
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Dec 21, 2020
1 parent 6bdb7b4 commit b92b37a
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 3 deletions.
11 changes: 10 additions & 1 deletion docs/src/main/asciidoc/mongodb-panache.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,14 @@ quarkus.log.category."io.quarkus.mongodb.panache.runtime".level=DEBUG

== Transactions

WARNING: MongoDB offers ACID transactions since version 4.0. MongoDB with Panache doesn't provide support for them.
MongoDB offers ACID transactions since version 4.0.

To use them with MongoDB with Panache you need to annotate the method that starts the transaction with the `@Transactional` annotation.

Transactions can be configured via the `@MongoTransactionConfiguration` that allows to specify the read preference, read concern,
and write concern used during the lifespan of the transaction.

WARNING: Transaction support inside MongoDB with Panache is still experimental.

== Custom IDs

Expand Down Expand Up @@ -886,6 +893,8 @@ public Multi<ReactivePerson> streamPersons() {

TIP: `@SseElementType(MediaType.APPLICATION_JSON)` tells RESTEasy to serialize the object in JSON.

WARNING: Transactions are not supported for Reactive Entities and Repositories.

== Mocking

=== Using the active-record pattern
Expand Down
4 changes: 4 additions & 0 deletions extensions/panache/mongodb-panache-common/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mongodb-panache-common</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-narayana-jta-deployment</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
Expand Down
4 changes: 4 additions & 0 deletions extensions/panache/mongodb-panache-common/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<artifactId>quarkus-jackson</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-narayana-jta</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.mongodb.client.ClientSession;
import io.quarkus.arc.Arc;
import io.quarkus.arc.InstanceHandle;
import io.quarkus.mongodb.panache.transaction.MongoTransactionException;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWriter;
import org.bson.BsonValue;
Expand All @@ -37,9 +41,17 @@
import io.quarkus.panache.common.Parameters;
import io.quarkus.panache.common.Sort;

import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.TransactionSynchronizationRegistry;

@SuppressWarnings({ "rawtypes", "unchecked" })
public abstract class MongoOperations<QueryType, UpdateType> {
public final String ID = "_id";
public static final String ID = "_id";
public static final Object SESSION_KEY = new Object();

private static final Logger LOGGER = Logger.getLogger(MongoOperations.class);

private final Map<String, String> defaultDatabaseName = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -204,7 +216,12 @@ public MongoDatabase mongoDatabase(Class<?> entityClass) {
// Private stuff

private void persist(MongoCollection collection, Object entity) {
collection.insertOne(entity);
ClientSession session = getSession(entity);
if (session == null) {
collection.insertOne(entity);
} else {
collection.insertOne(session, entity);
}
}

private void persist(MongoCollection collection, List<Object> entities) {
Expand Down Expand Up @@ -273,6 +290,60 @@ private BsonDocument getBsonDocument(MongoCollection collection, Object entity)
return document;
}

private ClientSession getSession(Object entity) {
MongoEntity mongoEntity = entity.getClass().getAnnotation(MongoEntity.class);
InstanceHandle<TransactionSynchronizationRegistry> instance = Arc.container()
.instance(TransactionSynchronizationRegistry.class);
if (instance.isAvailable()) {
TransactionSynchronizationRegistry registry = instance.get();
if (registry.getTransactionStatus() == Status.STATUS_ACTIVE) {
ClientSession clientSession = (ClientSession) registry.getResource(SESSION_KEY);
if(clientSession == null){
return registerClientSession(mongoEntity, registry);
}
}
}
return null;
}

private ClientSession registerClientSession(MongoEntity mongoEntity, TransactionSynchronizationRegistry registry) {
System.out.println("Openning MongoDB session");
TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get();
MongoClient client = clientFromArc(mongoEntity, MongoClient.class, false);
ClientSession clientSession = client.startSession();
clientSession.startTransaction();//TODO add txoptions from annotation
registry.putResource(SESSION_KEY, clientSession);
registry.registerInterposedSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int i) {
try {
if (transactionManager.getStatus() == Status.STATUS_ROLLEDBACK) {
System.out.println("!!! MongoDB session rollback");
try {
clientSession.abortTransaction();
} finally {
clientSession.close();
}
} else {
System.out.println("!!! MongoDB session commit");
try {
clientSession.commitTransaction();
} finally {
clientSession.close();
}
}
} catch (SystemException e) {
throw new MongoTransactionException(e);
}
}
});
return clientSession;
}

private MongoCollection mongoCollection(Object entity) {
Class<?> entityClass = entity.getClass();
return mongoCollection(entityClass);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.quarkus.mongodb.panache.transaction;

public class MongoTransactionException extends RuntimeException {
public MongoTransactionException(Exception cause) {
super(cause);
}
}

0 comments on commit b92b37a

Please sign in to comment.