From a46b90073772752c2c99ead05185d325214af53f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 21 Dec 2020 11:08:58 +0100 Subject: [PATCH] Transaction support for MongoDB with Panache --- docs/src/main/asciidoc/mongodb-panache.adoc | 11 +- .../mongodb-panache-common/deployment/pom.xml | 4 + .../mongodb-panache-common/runtime/pom.xml | 4 + .../runtime/CommonPanacheQueryImpl.java | 23 +- .../panache/runtime/MongoOperations.java | 229 +++++++++++++----- .../panache/runtime/PanacheUpdateImpl.java | 15 +- .../MongoTransactionException.java | 7 + .../kotlin/runtime/PanacheQueryImpl.java | 5 +- .../kotlin/runtime/KotlinMongoOperations.kt | 5 +- .../panache/runtime/JavaMongoOperations.java | 5 +- .../panache/runtime/PanacheQueryImpl.java | 5 +- 11 files changed, 240 insertions(+), 73 deletions(-) create mode 100644 extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java diff --git a/docs/src/main/asciidoc/mongodb-panache.adoc b/docs/src/main/asciidoc/mongodb-panache.adoc index bb1d3e2d90ca5c..a2482429265031 100644 --- a/docs/src/main/asciidoc/mongodb-panache.adoc +++ b/docs/src/main/asciidoc/mongodb-panache.adoc @@ -656,7 +656,14 @@ quarkus.log.category."io.quarkus.mongodb.panache.runtime".level=DEBUG == Transactions -WARNING: MongoDB offers ACID transactions since version 4.0. MongoDB with Panache doesn't provide support for them. +MongoDB offers ACID transactions since version 4.0. + +To use them with MongoDB with Panache you need to annotate the method that starts the transaction with the `@Transactional` annotation. + +Transactions can be configured via the `@MongoTransactionConfiguration` that allows to specify the read preference, read concern, + and write concern used during the lifespan of the transaction. + +WARNING: Transaction support inside MongoDB with Panache is still experimental. == Custom IDs @@ -890,6 +897,8 @@ public Multi streamPersons() { TIP: `@SseElementType(MediaType.APPLICATION_JSON)` tells RESTEasy to serialize the object in JSON. +WARNING: Transactions are not supported for Reactive Entities and Repositories. + == Mocking === Using the active-record pattern diff --git a/extensions/panache/mongodb-panache-common/deployment/pom.xml b/extensions/panache/mongodb-panache-common/deployment/pom.xml index e00b7bc2c12af1..84697c94276df8 100644 --- a/extensions/panache/mongodb-panache-common/deployment/pom.xml +++ b/extensions/panache/mongodb-panache-common/deployment/pom.xml @@ -37,6 +37,10 @@ io.quarkus quarkus-mongodb-panache-common + + io.quarkus + quarkus-narayana-jta-deployment + io.quarkus diff --git a/extensions/panache/mongodb-panache-common/runtime/pom.xml b/extensions/panache/mongodb-panache-common/runtime/pom.xml index f3e803cac47c5b..03fa31e1295aff 100644 --- a/extensions/panache/mongodb-panache-common/runtime/pom.xml +++ b/extensions/panache/mongodb-panache-common/runtime/pom.xml @@ -43,6 +43,10 @@ quarkus-jackson true + + io.quarkus + quarkus-narayana-jta + org.junit.jupiter junit-jupiter diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/CommonPanacheQueryImpl.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/CommonPanacheQueryImpl.java index 2cdaa88be6e2fd..16039f58064a6b 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/CommonPanacheQueryImpl.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/CommonPanacheQueryImpl.java @@ -6,10 +6,12 @@ import java.util.Set; import java.util.stream.Stream; +import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.ReadPreference; +import com.mongodb.client.ClientSession; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -21,6 +23,7 @@ public class CommonPanacheQueryImpl { private MongoCollection collection; + private ClientSession clientSession; private Bson mongoQuery; private Bson sort; private Bson projections; @@ -32,14 +35,17 @@ public class CommonPanacheQueryImpl { private Collation collation; - public CommonPanacheQueryImpl(MongoCollection collection, Bson mongoQuery, Bson sort) { + public CommonPanacheQueryImpl(MongoCollection collection, ClientSession session, Bson mongoQuery, + Bson sort) { this.collection = collection; + this.clientSession = session; this.mongoQuery = mongoQuery; this.sort = sort; } private CommonPanacheQueryImpl(CommonPanacheQueryImpl previousQuery, Bson projections, Class documentClass) { this.collection = previousQuery.collection.withDocumentClass(documentClass); + this.clientSession = previousQuery.clientSession; this.mongoQuery = previousQuery.mongoQuery; this.sort = previousQuery.sort; this.projections = projections; @@ -150,7 +156,8 @@ public CommonPanacheQueryImpl withReadPreference(ReadPrefe @SuppressWarnings("unchecked") public long count() { if (count == null) { - count = collection.countDocuments(mongoQuery); + Bson query = getQuery(); + count = clientSession == null ? collection.countDocuments(query) : collection.countDocuments(clientSession, query); } return count; } @@ -162,7 +169,8 @@ public List list() { @SuppressWarnings("unchecked") private List list(Integer limit) { List list = new ArrayList<>(); - FindIterable find = mongoQuery == null ? collection.find() : collection.find(mongoQuery); + Bson query = getQuery(); + FindIterable find = clientSession == null ? collection.find(query) : collection.find(clientSession, query); if (this.projections != null) { find.projection(projections); } @@ -170,15 +178,12 @@ private List list(Integer limit) { find.collation(collation); } manageOffsets(find, limit); - MongoCursor cursor = find.sort(sort).iterator(); - try { + try (MongoCursor cursor = find.sort(sort).iterator()) { while (cursor.hasNext()) { T entity = cursor.next(); list.add(entity); } - } finally { - cursor.close(); } return list; } @@ -232,4 +237,8 @@ private void manageOffsets(FindIterable find, Integer limit) { find.limit(limit); } } + + private Bson getQuery() { + return mongoQuery == null ? new BsonDocument() : mongoQuery; + } } diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java index 9ed4225e275178..15f0f38dece945 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/MongoOperations.java @@ -14,6 +14,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.transaction.Status; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.transaction.TransactionSynchronizationRegistry; + import org.bson.BsonDocument; import org.bson.BsonDocumentWriter; import org.bson.BsonValue; @@ -22,6 +28,7 @@ import org.bson.codecs.EncoderContext; import org.jboss.logging.Logger; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -31,15 +38,20 @@ import com.mongodb.client.model.WriteModel; import com.mongodb.client.result.DeleteResult; +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; import io.quarkus.mongodb.panache.MongoEntity; import io.quarkus.mongodb.panache.binder.NativeQueryBinder; import io.quarkus.mongodb.panache.binder.PanacheQlQueryBinder; +import io.quarkus.mongodb.panache.transaction.MongoTransactionException; import io.quarkus.panache.common.Parameters; import io.quarkus.panache.common.Sort; @SuppressWarnings({ "rawtypes", "unchecked" }) public abstract class MongoOperations { - public final String ID = "_id"; + public static final String ID = "_id"; + public static final Object SESSION_KEY = new Object(); + private static final Logger LOGGER = Logger.getLogger(MongoOperations.class); // update operators: https://docs.mongodb.com/manual/reference/operator/update/ @@ -51,7 +63,8 @@ public abstract class MongoOperations { private final Map defaultDatabaseName = new ConcurrentHashMap<>(); - protected abstract QueryType createQuery(MongoCollection collection, Document query, Document sortDoc); + protected abstract QueryType createQuery(MongoCollection collection, ClientSession session, Document query, + Document sortDoc); protected abstract UpdateType createUpdate(MongoCollection collection, Class entityClass, Document docUpdate); @@ -71,34 +84,24 @@ public void persist(Iterable entities) { objects.add(entity); } - if (objects.size() > 0) { - // get the first entity to be able to retrieve the collection with it - Object firstEntity = objects.get(0); - MongoCollection collection = mongoCollection(firstEntity); - persist(collection, objects); - } + persist(objects); } public void persist(Object firstEntity, Object... entities) { - MongoCollection collection = mongoCollection(firstEntity); if (entities == null || entities.length == 0) { + MongoCollection collection = mongoCollection(firstEntity); persist(collection, firstEntity); } else { List entityList = new ArrayList<>(); entityList.add(firstEntity); entityList.addAll(Arrays.asList(entities)); - persist(collection, entityList); + persist(entityList); } } public void persist(Stream entities) { List objects = entities.collect(Collectors.toList()); - if (objects.size() > 0) { - // get the first entity to be able to retrieve the collection with it - Object firstEntity = objects.get(0); - MongoCollection collection = mongoCollection(firstEntity); - update(collection, objects); - } + persist(objects); } public void update(Object entity) { @@ -155,12 +158,7 @@ public void persistOrUpdate(Iterable entities) { objects.add(entity); } - if (objects.size() > 0) { - // get the first entity to be able to retrieve the collection with it - Object firstEntity = objects.get(0); - MongoCollection collection = mongoCollection(firstEntity); - persistOrUpdate(collection, objects); - } + persistOrUpdate(objects); } public void persistOrUpdate(Object firstEntity, Object... entities) { @@ -171,18 +169,13 @@ public void persistOrUpdate(Object firstEntity, Object... entities) { List entityList = new ArrayList<>(); entityList.add(firstEntity); entityList.addAll(Arrays.asList(entities)); - persistOrUpdate(collection, entityList); + persistOrUpdate(entityList); } } public void persistOrUpdate(Stream entities) { List objects = entities.collect(Collectors.toList()); - if (objects.size() > 0) { - // get the first entity to be able to retrieve the collection with it - Object firstEntity = objects.get(0); - MongoCollection collection = mongoCollection(firstEntity); - persistOrUpdate(collection, objects); - } + persistOrUpdate(objects); } public void delete(Object entity) { @@ -190,7 +183,12 @@ public void delete(Object entity) { BsonDocument document = getBsonDocument(collection, entity); BsonValue id = document.get(ID); BsonDocument query = new BsonDocument().append(ID, id); - collection.deleteOne(query); + ClientSession session = getSession(entity); + if (session == null) { + collection.deleteOne(query); + } else { + collection.deleteOne(session, query); + } } public MongoCollection mongoCollection(Class entityClass) { @@ -211,11 +209,26 @@ public MongoDatabase mongoDatabase(Class entityClass) { // Private stuff private void persist(MongoCollection collection, Object entity) { - collection.insertOne(entity); + ClientSession session = getSession(entity); + if (session == null) { + collection.insertOne(entity); + } else { + collection.insertOne(session, entity); + } } - private void persist(MongoCollection collection, List entities) { - collection.insertMany(entities); + private void persist(List entities) { + if (entities.size() > 0) { + // get the first entity to be able to retrieve the collection with it + Object firstEntity = entities.get(0); + MongoCollection collection = mongoCollection(firstEntity); + ClientSession session = getSession(firstEntity); + if (session == null) { + collection.insertMany(entities); + } else { + collection.insertMany(session, entities); + } + } } private void update(MongoCollection collection, Object entity) { @@ -225,7 +238,12 @@ private void update(MongoCollection collection, Object entity) { //then we get its id field and create a new Document with only this one that will be our replace query BsonValue id = document.get(ID); BsonDocument query = new BsonDocument().append(ID, id); - collection.replaceOne(query, entity); + ClientSession session = getSession(entity); + if (session == null) { + collection.replaceOne(query, entity); + } else { + collection.replaceOne(session, query, entity); + } } private void update(MongoCollection collection, List entities) { @@ -238,19 +256,37 @@ private void persistOrUpdate(MongoCollection collection, Object entity) { //we transform the entity as a document first BsonDocument document = getBsonDocument(collection, entity); + ClientSession session = getSession(entity); //then we get its id field and create a new Document with only this one that will be our replace query BsonValue id = document.get(ID); if (id == null) { //insert with autogenerated ID - collection.insertOne(entity); + if (session == null) { + collection.insertOne(entity); + } else { + collection.insertOne(session, entity); + } } else { //insert with user provided ID or update BsonDocument query = new BsonDocument().append(ID, id); - collection.replaceOne(query, entity, new ReplaceOptions().upsert(true)); + if (session == null) { + collection.replaceOne(query, entity, new ReplaceOptions().upsert(true)); + } else { + collection.replaceOne(session, query, entity, new ReplaceOptions().upsert(true)); + } } } - private void persistOrUpdate(MongoCollection collection, List entities) { + private void persistOrUpdate(List entities) { + if (entities.isEmpty()) { + return; + } + + // get the first entity to be able to retrieve the collection with it + Object firstEntity = entities.get(0); + MongoCollection collection = mongoCollection(firstEntity); + ClientSession session = getSession(firstEntity); + //this will be an ordered bulk: it's less performant than a unordered one but will fail at the first failed write List bulk = new ArrayList<>(); for (Object entity : entities) { @@ -270,7 +306,11 @@ private void persistOrUpdate(MongoCollection collection, List entities) } } - collection.bulkWrite(bulk); + if (session == null) { + collection.bulkWrite(bulk); + } else { + collection.bulkWrite(session, bulk); + } } private BsonDocument getBsonDocument(MongoCollection collection, Object entity) { @@ -280,6 +320,64 @@ private BsonDocument getBsonDocument(MongoCollection collection, Object entity) return document; } + ClientSession getSession(Object entity) { + return getSession(entity.getClass()); + } + + ClientSession getSession(Class entityClass) { + MongoEntity mongoEntity = entityClass.getAnnotation(MongoEntity.class); + InstanceHandle instance = Arc.container() + .instance(TransactionSynchronizationRegistry.class); + if (instance.isAvailable()) { + TransactionSynchronizationRegistry registry = instance.get(); + if (registry.getTransactionStatus() == Status.STATUS_ACTIVE) { + ClientSession clientSession = (ClientSession) registry.getResource(SESSION_KEY); + if (clientSession == null) { + return registerClientSession(mongoEntity, registry); + } + } + } + return null; + } + + private ClientSession registerClientSession(MongoEntity mongoEntity, TransactionSynchronizationRegistry registry) { + System.out.println("Openning MongoDB session"); + TransactionManager transactionManager = Arc.container().instance(TransactionManager.class).get(); + MongoClient client = clientFromArc(mongoEntity, MongoClient.class, false); + ClientSession clientSession = client.startSession(); + clientSession.startTransaction();//TODO add txoptions from annotation + registry.putResource(SESSION_KEY, clientSession); + registry.registerInterposedSynchronization(new Synchronization() { + @Override + public void beforeCompletion() { + } + + @Override + public void afterCompletion(int i) { + try { + if (transactionManager.getStatus() == Status.STATUS_ROLLEDBACK) { + System.out.println("!!! MongoDB session rollback"); + try { + clientSession.abortTransaction(); + } finally { + clientSession.close(); + } + } else { + System.out.println("!!! MongoDB session commit"); + try { + clientSession.commitTransaction(); + } finally { + clientSession.close(); + } + } + } catch (SystemException e) { + throw new MongoTransactionException(e); + } + } + }); + return clientSession; + } + private MongoCollection mongoCollection(Object entity) { Class entityClass = entity.getClass(); return mongoCollection(entityClass); @@ -307,7 +405,9 @@ public String apply(String beanName) { public Object findById(Class entityClass, Object id) { MongoCollection collection = mongoCollection(entityClass); - return collection.find(new Document(ID, id)).first(); + ClientSession session = getSession(entityClass); + return session == null ? collection.find(new Document(ID, id)).first() + : collection.find(session, new Document(ID, id)).first(); } public Optional findByIdOptional(Class entityClass, Object id) { @@ -324,7 +424,8 @@ public QueryType find(Class entityClass, String query, Sort sort, Object... p Document docQuery = Document.parse(bindQuery); Document docSort = sortToDocument(sort); MongoCollection collection = mongoCollection(entityClass); - return createQuery(collection, docQuery, docSort); + ClientSession session = getSession(entityClass); + return createQuery(collection, session, docQuery, docSort); } /** @@ -420,7 +521,8 @@ public QueryType find(Class entityClass, String query, Sort sort, Map entityClass, String query, Parameters params) { @@ -435,12 +537,14 @@ public QueryType find(Class entityClass, String query, Sort sort, Parameters public QueryType find(Class entityClass, Document query, Sort sort) { MongoCollection collection = mongoCollection(entityClass); Document sortDoc = sortToDocument(sort); - return createQuery(collection, query, sortDoc); + ClientSession session = getSession(entityClass); + return createQuery(collection, session, query, sortDoc); } public QueryType find(Class entityClass, Document query, Document sort) { MongoCollection collection = mongoCollection(entityClass); - return createQuery(collection, query, sort); + ClientSession session = getSession(entityClass); + return createQuery(collection, session, query, sort); } public QueryType find(Class entityClass, Document query) { @@ -518,14 +622,16 @@ public Stream stream(Class entityClass, Document query, Document sort) { @SuppressWarnings("rawtypes") public QueryType findAll(Class entityClass) { MongoCollection collection = mongoCollection(entityClass); - return createQuery(collection, null, null); + ClientSession session = getSession(entityClass); + return createQuery(collection, session, null, null); } @SuppressWarnings("rawtypes") public QueryType findAll(Class entityClass, Sort sort) { MongoCollection collection = mongoCollection(entityClass); Document sortDoc = sortToDocument(sort); - return createQuery(collection, null, sortDoc); + ClientSession session = getSession(entityClass); + return createQuery(collection, session, null, sortDoc); } private Document sortToDocument(Sort sort) { @@ -558,21 +664,26 @@ public Stream streamAll(Class entityClass, Sort sort) { public long count(Class entityClass) { MongoCollection collection = mongoCollection(entityClass); - return collection.countDocuments(); + ClientSession session = getSession(entityClass); + return session == null ? collection.countDocuments() : collection.countDocuments(session); } public long count(Class entityClass, String query, Object... params) { String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); MongoCollection collection = mongoCollection(entityClass); - return collection.countDocuments(docQuery); + + ClientSession session = getSession(entityClass); + return session == null ? collection.countDocuments(docQuery) : collection.countDocuments(session, docQuery); } public long count(Class entityClass, String query, Map params) { String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); MongoCollection collection = mongoCollection(entityClass); - return collection.countDocuments(docQuery); + + ClientSession session = getSession(entityClass); + return session == null ? collection.countDocuments(docQuery) : collection.countDocuments(session, docQuery); } public long count(Class entityClass, String query, Parameters params) { @@ -582,18 +693,22 @@ public long count(Class entityClass, String query, Parameters params) { //specific Mongo query public long count(Class entityClass, Document query) { MongoCollection collection = mongoCollection(entityClass); - return collection.countDocuments(query); + ClientSession session = getSession(entityClass); + return session == null ? collection.countDocuments(query) : collection.countDocuments(session, query); } public long deleteAll(Class entityClass) { MongoCollection collection = mongoCollection(entityClass); - return collection.deleteMany(new Document()).getDeletedCount(); + ClientSession session = getSession(entityClass); + return session == null ? collection.deleteMany(new Document()).getDeletedCount() + : collection.deleteMany(session, new Document()).getDeletedCount(); } public boolean deleteById(Class entityClass, Object id) { MongoCollection collection = mongoCollection(entityClass); Document query = new Document().append(ID, id); - DeleteResult results = collection.deleteOne(query); + ClientSession session = getSession(entityClass); + DeleteResult results = session == null ? collection.deleteOne(query) : collection.deleteOne(session, query); return results.getDeletedCount() == 1; } @@ -601,14 +716,18 @@ public long delete(Class entityClass, String query, Object... params) { String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); MongoCollection collection = mongoCollection(entityClass); - return collection.deleteMany(docQuery).getDeletedCount(); + ClientSession session = getSession(entityClass); + return session == null ? collection.deleteMany(docQuery).getDeletedCount() + : collection.deleteMany(session, docQuery).getDeletedCount(); } public long delete(Class entityClass, String query, Map params) { String bindQuery = bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); MongoCollection collection = mongoCollection(entityClass); - return collection.deleteMany(docQuery).getDeletedCount(); + ClientSession session = getSession(entityClass); + return session == null ? collection.deleteMany(docQuery).getDeletedCount() + : collection.deleteMany(session, docQuery).getDeletedCount(); } public long delete(Class entityClass, String query, Parameters params) { @@ -618,7 +737,9 @@ public long delete(Class entityClass, String query, Parameters params) { //specific Mongo query public long delete(Class entityClass, Document query) { MongoCollection collection = mongoCollection(entityClass); - return collection.deleteMany(query).getDeletedCount(); + ClientSession session = getSession(entityClass); + return session == null ? collection.deleteMany(query).getDeletedCount() + : collection.deleteMany(session, query).getDeletedCount(); } public UpdateType update(Class entityClass, String update, Map params) { diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheUpdateImpl.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheUpdateImpl.java index 1cf9e9d7bfac6d..bc6265079e2244 100644 --- a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheUpdateImpl.java +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheUpdateImpl.java @@ -5,6 +5,7 @@ import org.bson.BsonDocument; import org.bson.conversions.Bson; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoCollection; import io.quarkus.mongodb.panache.PanacheUpdate; @@ -15,26 +16,28 @@ public class PanacheUpdateImpl implements PanacheUpdate { private Class entityClass; private Bson update; private MongoCollection collection; + private ClientSession session; public PanacheUpdateImpl(MongoOperations operations, Class entityClass, Bson update, MongoCollection collection) { this.operations = operations; this.entityClass = entityClass; this.update = update; this.collection = collection; + this.session = operations.getSession(entityClass); } @Override public long where(String query, Object... params) { String bindQuery = operations.bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); - return collection.updateMany(docQuery, update).getModifiedCount(); + return executeUpdate(docQuery); } @Override public long where(String query, Map params) { String bindQuery = operations.bindFilter(entityClass, query, params); BsonDocument docQuery = BsonDocument.parse(bindQuery); - return collection.updateMany(docQuery, update).getModifiedCount(); + return executeUpdate(docQuery); } @Override @@ -44,6 +47,12 @@ public long where(String query, Parameters params) { @Override public long all() { - return collection.updateMany(new BsonDocument(), update).getModifiedCount(); + BsonDocument all = new BsonDocument(); + return executeUpdate(all); + } + + private long executeUpdate(BsonDocument query) { + return session == null ? collection.updateMany(query, update).getModifiedCount() + : collection.updateMany(session, query, update).getModifiedCount(); } } diff --git a/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java new file mode 100644 index 00000000000000..5fdb6e1729bff8 --- /dev/null +++ b/extensions/panache/mongodb-panache-common/runtime/src/main/java/io/quarkus/mongodb/panache/transaction/MongoTransactionException.java @@ -0,0 +1,7 @@ +package io.quarkus.mongodb.panache.transaction; + +public class MongoTransactionException extends RuntimeException { + public MongoTransactionException(Exception cause) { + super(cause); + } +} diff --git a/extensions/panache/mongodb-panache-kotlin/runtime/src/main/java/io/quarkus/mongodb/panache/kotlin/runtime/PanacheQueryImpl.java b/extensions/panache/mongodb-panache-kotlin/runtime/src/main/java/io/quarkus/mongodb/panache/kotlin/runtime/PanacheQueryImpl.java index 29ef0719bece66..3b5d7d779a7428 100644 --- a/extensions/panache/mongodb-panache-kotlin/runtime/src/main/java/io/quarkus/mongodb/panache/kotlin/runtime/PanacheQueryImpl.java +++ b/extensions/panache/mongodb-panache-kotlin/runtime/src/main/java/io/quarkus/mongodb/panache/kotlin/runtime/PanacheQueryImpl.java @@ -6,6 +6,7 @@ import org.bson.conversions.Bson; import com.mongodb.ReadPreference; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.Collation; @@ -16,8 +17,8 @@ public class PanacheQueryImpl implements PanacheQuery { private final CommonPanacheQueryImpl delegate; - PanacheQueryImpl(MongoCollection collection, Bson mongoQuery, Bson sort) { - this.delegate = new CommonPanacheQueryImpl<>(collection, mongoQuery, sort); + PanacheQueryImpl(MongoCollection collection, ClientSession session, Bson mongoQuery, Bson sort) { + this.delegate = new CommonPanacheQueryImpl<>(collection, session, mongoQuery, sort); } private PanacheQueryImpl(CommonPanacheQueryImpl delegate) { diff --git a/extensions/panache/mongodb-panache-kotlin/runtime/src/main/kotlin/io/quarkus/mongodb/panache/kotlin/runtime/KotlinMongoOperations.kt b/extensions/panache/mongodb-panache-kotlin/runtime/src/main/kotlin/io/quarkus/mongodb/panache/kotlin/runtime/KotlinMongoOperations.kt index 98e8eaec091b45..9332e6a2420d31 100644 --- a/extensions/panache/mongodb-panache-kotlin/runtime/src/main/kotlin/io/quarkus/mongodb/panache/kotlin/runtime/KotlinMongoOperations.kt +++ b/extensions/panache/mongodb-panache-kotlin/runtime/src/main/kotlin/io/quarkus/mongodb/panache/kotlin/runtime/KotlinMongoOperations.kt @@ -1,5 +1,6 @@ package io.quarkus.mongodb.panache.kotlin.runtime +import com.mongodb.client.ClientSession import com.mongodb.client.MongoCollection import io.quarkus.mongodb.panache.PanacheUpdate import io.quarkus.mongodb.panache.kotlin.PanacheQuery @@ -30,8 +31,8 @@ class KotlinMongoOperations : MongoOperations, PanacheUpdate>() * * @return the new query implementation */ - override fun createQuery(collection: MongoCollection<*>, query: Document?, sortDoc: Document?) = - PanacheQueryImpl(collection, query, sortDoc) + override fun createQuery(collection: MongoCollection<*>, session: ClientSession?, query: Document?, sortDoc: Document?) = + PanacheQueryImpl(collection, session, query, sortDoc) /** * Creates the update implementation diff --git a/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/JavaMongoOperations.java b/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/JavaMongoOperations.java index 5b40d09cabe182..82ca2bfa3bafa9 100644 --- a/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/JavaMongoOperations.java +++ b/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/JavaMongoOperations.java @@ -5,6 +5,7 @@ import org.bson.Document; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoCollection; import io.quarkus.mongodb.panache.PanacheQuery; @@ -17,8 +18,8 @@ public class JavaMongoOperations extends MongoOperations, Panach public static final JavaMongoOperations INSTANCE = new JavaMongoOperations(); @Override - protected PanacheQuery createQuery(MongoCollection collection, Document query, Document sortDoc) { - return new PanacheQueryImpl(collection, query, sortDoc); + protected PanacheQuery createQuery(MongoCollection collection, ClientSession session, Document query, Document sortDoc) { + return new PanacheQueryImpl(collection, session, query, sortDoc); } @Override diff --git a/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheQueryImpl.java b/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheQueryImpl.java index 89d2f81669da39..2b87df395610b1 100644 --- a/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheQueryImpl.java +++ b/extensions/panache/mongodb-panache/runtime/src/main/java/io/quarkus/mongodb/panache/runtime/PanacheQueryImpl.java @@ -7,6 +7,7 @@ import org.bson.conversions.Bson; import com.mongodb.ReadPreference; +import com.mongodb.client.ClientSession; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.Collation; @@ -16,8 +17,8 @@ public class PanacheQueryImpl implements PanacheQuery { private final CommonPanacheQueryImpl delegate; - PanacheQueryImpl(MongoCollection collection, Bson mongoQuery, Bson sort) { - this.delegate = new CommonPanacheQueryImpl<>(collection, mongoQuery, sort); + PanacheQueryImpl(MongoCollection collection, ClientSession session, Bson mongoQuery, Bson sort) { + this.delegate = new CommonPanacheQueryImpl<>(collection, session, mongoQuery, sort); } private PanacheQueryImpl(CommonPanacheQueryImpl delegate) {