Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UTs - Collection Sharded Reactive Mongo Template #49

Merged
merged 3 commits into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
<apache.commons.lang3.version>3.12.0</apache.commons.lang3.version>
<apache.commons.collection.version>3.2.2</apache.commons.collection.version>
<spring.data.mongodb.version>3.3.10</spring.data.mongodb.version>
<mongodb.driver.executable.version>4.4.2</mongodb.driver.executable.version>
<mongodb.driver.version>4.8.2</mongodb.driver.version>
<reactor.core.version>3.5.4</reactor.core.version>
<mongodb.driver.reactive.version>4.9.0</mongodb.driver.reactive.version>
<mockito.inline.version>3.12.4</mockito.inline.version>
<log4j2.version>2.17.2</log4j2.version>

</properties>

Expand Down
4 changes: 2 additions & 2 deletions sharding-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongodb.driver.executable.version}</version>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>${mongodb.driver.reactive.version}</version>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down
43 changes: 37 additions & 6 deletions sharding-spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
<artifactId>spring-data-mongodb</artifactId>
<version>${spring.data.mongodb.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>${mongodb.driver.reactive.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
Expand All @@ -41,8 +36,44 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongodb.driver.executable.version}</version>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-core -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>${mongodb.driver.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j2.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
import com.mongodb.reactivestreams.client.MongoCollection;
import org.apache.commons.collections.CollectionUtils;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.FindPublisherPreparer;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.MongoWriter;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -31,6 +37,10 @@
* @see CollectionShardedMongoTemplate
*/
public class CollectionShardedReactiveMongoTemplate extends ShardedReactiveMongoTemplate implements CollectionShardingAssistant {
private static final Logger LOGGER = LoggerFactory.getLogger(CollectionShardedReactiveMongoTemplate.class);

private static final String ID_KEY = "_id";

public CollectionShardedReactiveMongoTemplate(MongoClient mongoClient, String databaseName, CollectionShardingOptions collectionShardingOptions) {
super(mongoClient, databaseName, collectionShardingOptions);
}
Expand Down Expand Up @@ -75,13 +85,32 @@ protected <T> Mono<T> doFindOne(String collectionName, Document query, Document
}

@Override
protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass) {
return super.doFind(resolveCollectionNameForFindContext(collectionName, entityClass, query), query, fields, entityClass);
protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
return super.doFind(resolveCollectionNameForFindContext(collectionName, entityClass, query), query, fields, entityClass, preparer);
}

@Override
protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<T> entityClass) {
return super.doFindAndDelete(resolveCollectionNameForDeleteContext(collectionName, entityClass, query), query, entityClass);
Flux<T> flux = find(query, entityClass, collectionName);

return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
.flatMapMany(list -> {
MongoPersistentEntity<T> persistentEntity =
(MongoPersistentEntity<T>) this.getConverter().getMappingContext().getPersistentEntity(entityClass);

MultiValueMap<String, Object> byIds = new LinkedMultiValueMap<>();
list.forEach(resultEntry -> {
byIds.add(ID_KEY, persistentEntity.getPropertyAccessor(resultEntry).getProperty(
persistentEntity.getIdProperty()));
});

Criteria[] criterias = byIds.entrySet().stream() //
.map(it -> Criteria.where(it.getKey()).in(it.getValue())) //
.toArray(Criteria[]::new);
return Flux.from(remove(
new Query(criterias.length == 1 ? criterias[0] : new Criteria().orOperator(criterias)), entityClass, collectionName))
.flatMap(deleteResult -> Flux.fromIterable(list));
});
}

@Override
Expand Down Expand Up @@ -109,6 +138,7 @@ public Mono<MongoCollection<Document>> getCollection(String collectionName) {
return super.getCollection(resolveCollectionNameWithoutEntityContext(collectionName));
}


@Override
public Mono<Boolean> collectionExists(String collectionName) {
return super.collectionExists(resolveCollectionNameWithoutEntityContext(collectionName));
Expand All @@ -118,8 +148,17 @@ public Mono<Boolean> collectionExists(String collectionName, final String collec
return super.collectionExists(getShardingOptions().resolveCollectionName(collectionName, collectionHint));
}

private Mono<MongoCollection<Document>> getCollectionWithoutHintResolution(String collectionName) {
return super.getCollection(collectionName);
}

@Override
protected Mono<MongoCollection<Document>> doCreateCollection(String collectionName, CreateCollectionOptions collectionOptions) {
return super.doCreateCollection(resolveCollectionNameWithoutEntityContext(collectionName), collectionOptions);
String resolvedCollectionName = resolveCollectionNameWithoutEntityContext(collectionName);
return createMono(db -> db.createCollection(resolvedCollectionName, collectionOptions)).doOnSuccess(it -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Created collection [{}]", collectionName);
}
}).then(getCollectionWithoutHintResolution(resolvedCollectionName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.Collation;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;

Expand Down Expand Up @@ -125,6 +126,11 @@ public FindIterable<Document> comment(String s) {
return null;
}

@Override
public FindIterable<Document> comment(BsonValue bsonValue) {
return this;
}

@Override
public FindIterable<Document> hint(Bson bson) {
return null;
Expand All @@ -135,6 +141,11 @@ public FindIterable<Document> hintString(String s) {
return null;
}

@Override
public FindIterable<Document> let(Bson bson) {
return this;
}

@Override
public FindIterable<Document> max(Bson bson) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package com.alpha.mongodb.sharding.core.fixture.db;

import com.mongodb.CursorType;
import com.mongodb.ExplainVerbosity;
import com.mongodb.client.model.Collation;
import com.mongodb.reactivestreams.client.FindPublisher;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class FindFromDatabasePublisher implements FindPublisher<Document> {

private final List<Document> documents;
private final GenericMongoCursor<Document> documentGenericMongoCursor;

public FindFromDatabasePublisher(List<Document> documents) {
this.documents = documents;
this.documentGenericMongoCursor = new GenericMongoCursor<>(documents);
}

public FindFromDatabasePublisher(Document... documents) {
this(Arrays.stream(documents).collect(Collectors.toList()));
}


@Override
public Publisher<Document> first() {
return Flux.just(documents.get(0));
}

@Override
public FindPublisher<Document> filter(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> limit(int i) {
return this;
}

@Override
public FindPublisher<Document> skip(int i) {
return this;
}

@Override
public FindPublisher<Document> maxTime(long l, TimeUnit timeUnit) {
return this;
}

@Override
public FindPublisher<Document> maxAwaitTime(long l, TimeUnit timeUnit) {
return this;
}

@Override
public FindPublisher<Document> projection(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> sort(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> noCursorTimeout(boolean b) {
return this;
}

@Override
public FindPublisher<Document> oplogReplay(boolean b) {
return this;
}

@Override
public FindPublisher<Document> partial(boolean b) {
return this;
}

@Override
public FindPublisher<Document> cursorType(CursorType cursorType) {
return this;
}

@Override
public FindPublisher<Document> collation(Collation collation) {
return this;
}

@Override
public FindPublisher<Document> comment(String s) {
return this;
}

@Override
public FindPublisher<Document> comment(BsonValue bsonValue) {
return this;
}

@Override
public FindPublisher<Document> hint(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> hintString(String s) {
return this;
}

@Override
public FindPublisher<Document> let(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> max(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> min(Bson bson) {
return this;
}

@Override
public FindPublisher<Document> returnKey(boolean b) {
return this;
}

@Override
public FindPublisher<Document> showRecordId(boolean b) {
return this;
}

@Override
public FindPublisher<Document> batchSize(int i) {
return this;
}

@Override
public FindPublisher<Document> allowDiskUse(Boolean aBoolean) {
return this;
}

@Override
public Publisher<Document> explain() {
return Flux.fromIterable(documents);
}

@Override
public Publisher<Document> explain(ExplainVerbosity explainVerbosity) {
return Flux.fromIterable(documents);
}

@Override
public <E> Publisher<E> explain(Class<E> aClass) {
return null;
}

@Override
public <E> Publisher<E> explain(Class<E> aClass, ExplainVerbosity explainVerbosity) {
return null;
}

@Override
public void subscribe(Subscriber<? super Document> subscriber) {
Flux.fromIterable(documents).subscribe(subscriber);
}
}
Loading