From 628528127d12e6e1bdfe8418516007a7c5f4b8be Mon Sep 17 00:00:00 2001 From: shashankrnr32 Date: Thu, 13 Apr 2023 10:07:28 +0530 Subject: [PATCH] Reactive Mongo Client Implementation --- .../CollectionShardedReactiveMongoClient.java | 123 ++++++++ .../client/DatabaseShardedMongoClient.java | 2 +- .../DatabaseShardedReactiveMongoClient.java | 150 +++++++++ .../client/ShardedReactiveMongoClient.java | 6 + ...ollectionShardedReactiveMongoDatabase.java | 286 ++++++++++++++++++ 5 files changed, 566 insertions(+), 1 deletion(-) create mode 100644 sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/CollectionShardedReactiveMongoClient.java create mode 100644 sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedReactiveMongoClient.java create mode 100644 sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/ShardedReactiveMongoClient.java create mode 100644 sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/database/CollectionShardedReactiveMongoDatabase.java diff --git a/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/CollectionShardedReactiveMongoClient.java b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/CollectionShardedReactiveMongoClient.java new file mode 100644 index 00000000..e4ebb836 --- /dev/null +++ b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/CollectionShardedReactiveMongoClient.java @@ -0,0 +1,123 @@ +package com.alpha.mongodb.sharding.core.client; + +import com.alpha.mongodb.sharding.core.configuration.CollectionShardingOptions; +import com.mongodb.ClientSessionOptions; +import com.mongodb.connection.ClusterDescription; +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; +import com.mongodb.reactivestreams.client.ClientSession; +import com.mongodb.reactivestreams.client.ListDatabasesPublisher; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoDatabase; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.reactivestreams.Publisher; + +import java.util.List; + +public class CollectionShardedReactiveMongoClient implements ShardedReactiveMongoClient { + private final CollectionShardingOptions shardingOptions; + + private final MongoClient delegatedMongoClient; + + public CollectionShardedReactiveMongoClient(MongoClient mongoClient, + CollectionShardingOptions shardingOptions) { + super(); + this.shardingOptions = shardingOptions; + this.delegatedMongoClient = mongoClient; + } + + @Override + public MongoDatabase getDatabase(String s) { + return delegatedMongoClient.getDatabase(s); + } + + @Override + public void close() { + delegatedMongoClient.close(); + } + + @Override + public Publisher listDatabaseNames() { + return delegatedMongoClient.listDatabaseNames(); + } + + @Override + public Publisher listDatabaseNames(ClientSession clientSession) { + return delegatedMongoClient.listDatabaseNames(clientSession); + } + + @Override + public ListDatabasesPublisher listDatabases() { + return delegatedMongoClient.listDatabases(); + } + + @Override + public ListDatabasesPublisher listDatabases(Class aClass) { + return delegatedMongoClient.listDatabases(aClass); + } + + @Override + public ListDatabasesPublisher listDatabases(ClientSession clientSession) { + return delegatedMongoClient.listDatabases(clientSession); + } + + @Override + public ListDatabasesPublisher listDatabases(ClientSession clientSession, Class aClass) { + return delegatedMongoClient.listDatabases(clientSession, aClass); + } + + @Override + public ChangeStreamPublisher watch() { + return delegatedMongoClient.watch(); + } + + @Override + public ChangeStreamPublisher watch(Class aClass) { + return delegatedMongoClient.watch(aClass); + } + + @Override + public ChangeStreamPublisher watch(List list) { + return delegatedMongoClient.watch(list); + } + + @Override + public ChangeStreamPublisher watch(List list, Class aClass) { + return delegatedMongoClient.watch(list, aClass); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession) { + return delegatedMongoClient.watch(clientSession); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, Class aClass) { + return delegatedMongoClient.watch(clientSession, aClass); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, List list) { + return delegatedMongoClient.watch(clientSession, list); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, List list, Class aClass) { + return delegatedMongoClient.watch(clientSession, list, aClass); + } + + @Override + public Publisher startSession() { + return delegatedMongoClient.startSession(); + } + + @Override + public Publisher startSession(ClientSessionOptions clientSessionOptions) { + return delegatedMongoClient.startSession(clientSessionOptions); + } + + @Override + public ClusterDescription getClusterDescription() { + return delegatedMongoClient.getClusterDescription(); + } +} diff --git a/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedMongoClient.java b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedMongoClient.java index c27ff481..cca79b45 100644 --- a/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedMongoClient.java +++ b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedMongoClient.java @@ -26,7 +26,7 @@ * * @author Shashank Sharma */ -public class DatabaseShardedMongoClient implements MongoClient { +public class DatabaseShardedMongoClient implements ShardedMongoClient { private final DatabaseShardingOptions shardingOptions; diff --git a/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedReactiveMongoClient.java b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedReactiveMongoClient.java new file mode 100644 index 00000000..57ac80ac --- /dev/null +++ b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/DatabaseShardedReactiveMongoClient.java @@ -0,0 +1,150 @@ +package com.alpha.mongodb.sharding.core.client; + +import com.alpha.mongodb.sharding.core.configuration.DatabaseShardingOptions; +import com.alpha.mongodb.sharding.core.exception.UnresolvableDatabaseShardException; +import com.alpha.mongodb.sharding.core.hint.ShardingHint; +import com.alpha.mongodb.sharding.core.hint.ShardingHintManager; +import com.mongodb.ClientSessionOptions; +import com.mongodb.connection.ClusterDescription; +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; +import com.mongodb.reactivestreams.client.ClientSession; +import com.mongodb.reactivestreams.client.ListDatabasesPublisher; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoDatabase; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.reactivestreams.Publisher; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Database Sharded Reactive Mongo Client to get the databases which are sharded + * by database. + * + * @author Shashank Sharma + */ +public class DatabaseShardedReactiveMongoClient implements ShardedReactiveMongoClient { + + private final DatabaseShardingOptions shardingOptions; + + private final Map delegatedMongoClientMap; + + public DatabaseShardedReactiveMongoClient(Map delegatedMongoClientMap, + DatabaseShardingOptions shardingOptions) { + this.shardingOptions = shardingOptions; + this.delegatedMongoClientMap = delegatedMongoClientMap; + } + + @Override + public MongoDatabase getDatabase(String s) { + return getDelegatedMongoClient().getDatabase(shardingOptions.resolveDatabaseName(s, resolveDatabaseHint())); + + } + + @Override + public void close() { + getDelegatedMongoClient().close(); + } + + @Override + public Publisher listDatabaseNames() { + return getDelegatedMongoClient().listDatabaseNames(); + } + + @Override + public Publisher listDatabaseNames(ClientSession clientSession) { + return getDelegatedMongoClient().listDatabaseNames(clientSession); + } + + @Override + public ListDatabasesPublisher listDatabases() { + return getDelegatedMongoClient().listDatabases(); + } + + @Override + public ListDatabasesPublisher listDatabases(Class aClass) { + return getDelegatedMongoClient().listDatabases(aClass); + } + + @Override + public ListDatabasesPublisher listDatabases(ClientSession clientSession) { + return getDelegatedMongoClient().listDatabases(clientSession); + } + + @Override + public ListDatabasesPublisher listDatabases(ClientSession clientSession, Class aClass) { + return getDelegatedMongoClient().listDatabases(clientSession, aClass); + } + + @Override + public ChangeStreamPublisher watch() { + return getDelegatedMongoClient().watch(); + } + + @Override + public ChangeStreamPublisher watch(Class aClass) { + return getDelegatedMongoClient().watch(aClass); + } + + @Override + public ChangeStreamPublisher watch(List list) { + return getDelegatedMongoClient().watch(list); + } + + @Override + public ChangeStreamPublisher watch(List list, Class aClass) { + return getDelegatedMongoClient().watch(list, aClass); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession) { + return getDelegatedMongoClient().watch(clientSession); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, Class aClass) { + return getDelegatedMongoClient().watch(clientSession, aClass); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, List list) { + return getDelegatedMongoClient().watch(clientSession, list); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, List list, Class aClass) { + return getDelegatedMongoClient().watch(clientSession, list, aClass); + } + + @Override + public Publisher startSession() { + return getDelegatedMongoClient().startSession(); + } + + @Override + public Publisher startSession(ClientSessionOptions clientSessionOptions) { + return getDelegatedMongoClient().startSession(clientSessionOptions); + } + + @Override + public ClusterDescription getClusterDescription() { + return getDelegatedMongoClient().getClusterDescription(); + } + + public DatabaseShardedReactiveMongoClient(MongoClient delegatedMongoClient, DatabaseShardingOptions shardingOptions) { + this(shardingOptions.getDefaultDatabaseHintsSet().stream().collect( + Collectors.toMap(s -> s, s -> delegatedMongoClient)), shardingOptions); + } + + private String resolveDatabaseHint() { + return ShardingHintManager.getHint().map(ShardingHint::getDatabaseHint).orElseThrow(UnresolvableDatabaseShardException::new); + } + + private MongoClient getDelegatedMongoClient() { + String hint = resolveDatabaseHint(); + return Optional.ofNullable(delegatedMongoClientMap.get(hint)).orElseThrow(UnresolvableDatabaseShardException::new); + } +} diff --git a/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/ShardedReactiveMongoClient.java b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/ShardedReactiveMongoClient.java new file mode 100644 index 00000000..94fa82f9 --- /dev/null +++ b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/client/ShardedReactiveMongoClient.java @@ -0,0 +1,6 @@ +package com.alpha.mongodb.sharding.core.client; + +import com.mongodb.reactivestreams.client.MongoClient; + +public interface ShardedReactiveMongoClient extends MongoClient { +} diff --git a/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/database/CollectionShardedReactiveMongoDatabase.java b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/database/CollectionShardedReactiveMongoDatabase.java new file mode 100644 index 00000000..cd2559c9 --- /dev/null +++ b/sharding-driver/src/main/java/com/alpha/mongodb/sharding/core/database/CollectionShardedReactiveMongoDatabase.java @@ -0,0 +1,286 @@ +package com.alpha.mongodb.sharding.core.database; + +import com.alpha.mongodb.sharding.core.configuration.CollectionShardingOptions; +import com.alpha.mongodb.sharding.core.exception.UnresolvableCollectionShardException; +import com.alpha.mongodb.sharding.core.hint.ShardingHintManager; +import com.mongodb.ReadConcern; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; +import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.CreateViewOptions; +import com.mongodb.reactivestreams.client.AggregatePublisher; +import com.mongodb.reactivestreams.client.ChangeStreamPublisher; +import com.mongodb.reactivestreams.client.ClientSession; +import com.mongodb.reactivestreams.client.ListCollectionsPublisher; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; +import org.bson.Document; +import org.bson.codecs.configuration.CodecRegistry; +import org.bson.conversions.Bson; +import org.reactivestreams.Publisher; + +import java.util.List; + +/** + * CollectionShardedReactiveMongoDatabase to query for collections that are sharded + * within a single database. + * + * @author Shashank Sharma + */ +public class CollectionShardedReactiveMongoDatabase implements MongoDatabase { + + private final MongoDatabase delegatedMongoDatabase; + private final CollectionShardingOptions shardingOptions; + + public CollectionShardedReactiveMongoDatabase(MongoDatabase delegatedMongoDatabase, + CollectionShardingOptions shardingOptions) { + this.delegatedMongoDatabase = delegatedMongoDatabase; + this.shardingOptions = shardingOptions; + } + + @Override + public String getName() { + return delegatedMongoDatabase.getName(); + } + + @Override + public CodecRegistry getCodecRegistry() { + return delegatedMongoDatabase.getCodecRegistry(); + } + + @Override + public ReadPreference getReadPreference() { + return delegatedMongoDatabase.getReadPreference(); + } + + @Override + public WriteConcern getWriteConcern() { + return delegatedMongoDatabase.getWriteConcern(); + } + + @Override + public ReadConcern getReadConcern() { + return delegatedMongoDatabase.getReadConcern(); + } + + @Override + public MongoDatabase withCodecRegistry(CodecRegistry codecRegistry) { + return new CollectionShardedReactiveMongoDatabase( + delegatedMongoDatabase.withCodecRegistry(codecRegistry), shardingOptions); + } + + @Override + public MongoDatabase withReadPreference(ReadPreference readPreference) { + return new CollectionShardedReactiveMongoDatabase( + delegatedMongoDatabase.withReadPreference(readPreference), shardingOptions); + } + + @Override + public MongoDatabase withWriteConcern(WriteConcern writeConcern) { + return new CollectionShardedReactiveMongoDatabase( + delegatedMongoDatabase.withWriteConcern(writeConcern), shardingOptions); + } + + @Override + public MongoDatabase withReadConcern(ReadConcern readConcern) { + return new CollectionShardedReactiveMongoDatabase( + delegatedMongoDatabase.withReadConcern(readConcern), shardingOptions); + } + + @Override + public MongoCollection getCollection(String s) { + return delegatedMongoDatabase.getCollection(resolveCollectionName(s)); + } + + @Override + public MongoCollection getCollection(String s, Class aClass) { + return delegatedMongoDatabase.getCollection(resolveCollectionName(s), aClass); + } + + @Override + public Publisher runCommand(Bson bson) { + return delegatedMongoDatabase.runCommand(bson); + } + + @Override + public Publisher runCommand(Bson bson, ReadPreference readPreference) { + return delegatedMongoDatabase.runCommand(bson, readPreference); + } + + @Override + public Publisher runCommand(Bson bson, Class aClass) { + return delegatedMongoDatabase.runCommand(bson, aClass); + } + + @Override + public Publisher runCommand(Bson bson, ReadPreference readPreference, Class aClass) { + return delegatedMongoDatabase.runCommand(bson, readPreference, aClass); + } + + @Override + public Publisher runCommand(ClientSession clientSession, Bson bson) { + return delegatedMongoDatabase.runCommand(clientSession, bson); + } + + @Override + public Publisher runCommand(ClientSession clientSession, Bson bson, ReadPreference readPreference) { + return delegatedMongoDatabase.runCommand(clientSession, bson, readPreference); + } + + @Override + public Publisher runCommand(ClientSession clientSession, Bson bson, Class aClass) { + return delegatedMongoDatabase.runCommand(clientSession, bson, aClass); + } + + @Override + public Publisher runCommand(ClientSession clientSession, Bson bson, ReadPreference readPreference, Class aClass) { + return delegatedMongoDatabase.runCommand(clientSession, bson, readPreference, aClass); + } + + @Override + public Publisher drop() { + return delegatedMongoDatabase.drop(); + } + + @Override + public Publisher drop(ClientSession clientSession) { + return delegatedMongoDatabase.drop(clientSession); + } + + @Override + public Publisher listCollectionNames() { + return delegatedMongoDatabase.listCollectionNames(); + } + + @Override + public Publisher listCollectionNames(ClientSession clientSession) { + return delegatedMongoDatabase.listCollectionNames(); + } + + @Override + public ListCollectionsPublisher listCollections() { + return delegatedMongoDatabase.listCollections(); + } + + @Override + public ListCollectionsPublisher listCollections(Class aClass) { + return delegatedMongoDatabase.listCollections(aClass); + } + + @Override + public ListCollectionsPublisher listCollections(ClientSession clientSession) { + return delegatedMongoDatabase.listCollections(clientSession); + } + + @Override + public ListCollectionsPublisher listCollections(ClientSession clientSession, Class aClass) { + return delegatedMongoDatabase.listCollections(clientSession, aClass); + } + + @Override + public Publisher createCollection(String s) { + return delegatedMongoDatabase.createCollection(resolveCollectionName(s)); + } + + @Override + public Publisher createCollection(String s, CreateCollectionOptions createCollectionOptions) { + return delegatedMongoDatabase.createCollection(resolveCollectionName(s), createCollectionOptions); + } + + @Override + public Publisher createCollection(ClientSession clientSession, String s) { + return delegatedMongoDatabase.createCollection(clientSession, resolveCollectionName(s)); + } + + @Override + public Publisher createCollection(ClientSession clientSession, String s, CreateCollectionOptions createCollectionOptions) { + return delegatedMongoDatabase.createCollection(clientSession, resolveCollectionName(s), createCollectionOptions); + } + + @Override + public Publisher createView(String s, String s1, List list) { + return delegatedMongoDatabase.createView(s, s1, list); + } + + @Override + public Publisher createView(String s, String s1, List list, CreateViewOptions createViewOptions) { + return delegatedMongoDatabase.createView(s, s1, list, createViewOptions); + } + + @Override + public Publisher createView(ClientSession clientSession, String s, String s1, List list) { + return delegatedMongoDatabase.createView(clientSession, s, s1, list); + } + + @Override + public Publisher createView(ClientSession clientSession, String s, String s1, List list, CreateViewOptions createViewOptions) { + return delegatedMongoDatabase.createView(clientSession, s, s1, list, createViewOptions); + } + + @Override + public ChangeStreamPublisher watch() { + return delegatedMongoDatabase.watch(); + } + + @Override + public ChangeStreamPublisher watch(Class aClass) { + return delegatedMongoDatabase.watch(aClass); + } + + @Override + public ChangeStreamPublisher watch(List list) { + return delegatedMongoDatabase.watch(list); + } + + @Override + public ChangeStreamPublisher watch(List list, Class aClass) { + return delegatedMongoDatabase.watch(list, aClass); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession) { + return delegatedMongoDatabase.watch(clientSession); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, Class aClass) { + return delegatedMongoDatabase.watch(clientSession, aClass); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, List list) { + return delegatedMongoDatabase.watch(clientSession, list); + } + + @Override + public ChangeStreamPublisher watch(ClientSession clientSession, List list, Class aClass) { + return delegatedMongoDatabase.watch(clientSession, list, aClass); + } + + @Override + public AggregatePublisher aggregate(List list) { + return delegatedMongoDatabase.aggregate(list); + } + + @Override + public AggregatePublisher aggregate(List list, Class aClass) { + return delegatedMongoDatabase.aggregate(list, aClass); + } + + @Override + public AggregatePublisher aggregate(ClientSession clientSession, List list) { + return delegatedMongoDatabase.aggregate(clientSession, list); + } + + @Override + public AggregatePublisher aggregate(ClientSession clientSession, List list, Class aClass) { + return delegatedMongoDatabase.aggregate(clientSession, list, aClass); + } + + private String resolveCollectionName(String s) throws UnresolvableCollectionShardException { + return shardingOptions.resolveCollectionName(s, ShardingHintManager.getHint().map(shardingHint -> { + shardingOptions.validateCollectionHint(s, shardingHint.getCollectionHint()); + return shardingHint.getCollectionHint(); + }).orElseThrow(UnresolvableCollectionShardException::new)); + } +}