Skip to content

Commit

Permalink
Support aggregation with $merge as a string (#768)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyemin committed Jul 29, 2021
1 parent 240e712 commit 05fdd7e
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,19 @@ private MongoNamespace getOutNamespace() {
+ "is not a string or namespace document");
}
} else if (lastPipelineStage.containsKey("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(databaseName)).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(databaseName, mergeDocument.getString("into").getValue());
if (lastPipelineStage.isString("$merge")) {
return new MongoNamespace(databaseName, lastPipelineStage.getString("$merge").getValue());
} else if (lastPipelineStage.isDocument("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(databaseName)).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(databaseName, mergeDocument.getString("into").getValue());
}
} else {
throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ void shouldBuildTheExpectedOperationsForDollarOutAsDocument() {
assertOperationIsTheSameAs(expectedOperation, executor.getWriteOperation());
}

@DisplayName("Should build the expected AggregateOperation for $merge")
@DisplayName("Should build the expected AggregateOperation for $merge document")
@Test
void shouldBuildTheExpectedOperationsForDollarMerge() {
void shouldBuildTheExpectedOperationsForDollarMergeDocument() {
String collectionName = "collectionName";
List<BsonDocument> pipeline = asList(BsonDocument.parse("{'$match': 1}"),
BsonDocument.parse(format("{'$merge': {into: '%s'}}", collectionName)));
Expand Down Expand Up @@ -293,6 +293,38 @@ void shouldBuildTheExpectedOperationsForDollarMerge() {
assertOperationIsTheSameAs(expectedOperation, executor.getWriteOperation());
}

@DisplayName("Should build the expected AggregateOperation for $merge string")
@Test
void shouldBuildTheExpectedOperationsForDollarMergeString() {
String collectionName = "collectionName";
MongoNamespace collectionNamespace = new MongoNamespace(NAMESPACE.getDatabaseName(), collectionName);
List<BsonDocument> pipeline = asList(BsonDocument.parse("{'$match': 1}"),
BsonDocument.parse(format("{'$merge': '%s'}", collectionName)));

TestOperationExecutor executor = createOperationExecutor(asList(getBatchCursor(), getBatchCursor(), getBatchCursor(), null));
AggregatePublisher<Document> publisher =
new AggregatePublisherImpl<>(null, createMongoOperationPublisher(executor), pipeline, AggregationLevel.COLLECTION);

AggregateToCollectionOperation expectedOperation = new AggregateToCollectionOperation(NAMESPACE, pipeline,
ReadConcern.DEFAULT,
WriteConcern.ACKNOWLEDGED);

// default input should be as expected
Flux.from(publisher).blockFirst();

WriteOperationThenCursorReadOperation operation = (WriteOperationThenCursorReadOperation) executor.getReadOperation();
assertEquals(ReadPreference.primary(), executor.getReadPreference());
assertOperationIsTheSameAs(expectedOperation, operation.getAggregateToCollectionOperation());

FindOperation<Document> expectedFindOperation =
new FindOperation<>(collectionNamespace, getDefaultCodecRegistry().get(Document.class))
.filter(new BsonDocument())
.batchSize(Integer.MAX_VALUE)
.retryReads(true);

assertOperationIsTheSameAs(expectedFindOperation, operation.getReadOperation());
}

@DisplayName("Should handle error scenarios")
@Test
void shouldHandleErrorScenarios() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,19 @@ private MongoNamespace getOutNamespace() {
+ "is not a string or namespace document");
}
} else if (lastPipelineStage.containsKey("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
if (lastPipelineStage.isString("$merge")) {
return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$merge").getValue());
} else if (lastPipelineStage.isDocument("$merge")) {
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
if (mergeDocument.isDocument("into")) {
BsonDocument intoDocument = mergeDocument.getDocument("into");
return new MongoNamespace(intoDocument.getString("db", new BsonString(namespace.getDatabaseName())).getValue(),
intoDocument.getString("coll").getValue());
} else if (mergeDocument.isString("into")) {
return new MongoNamespace(namespace.getDatabaseName(), mergeDocument.getString("into").getValue());
}
} else {
throw new IllegalStateException("Cannot return a cursor when the value for $merge stage is not a string or a document");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge'() {
def 'should build the expected AggregateToCollectionOperation for $merge document'() {
given:
def executor = new TestOperationExecutor([null, null, null, null, null, null, null])
def collectionName = 'collectionName'
Expand Down Expand Up @@ -331,6 +331,31 @@ class AggregateIterableSpecification extends Specification {
.comment('this is a comment'))
}

def 'should build the expected AggregateToCollectionOperation for $merge string'() {
given:
def executor = new TestOperationExecutor([null, null, null, null, null, null, null])
def collectionName = 'collectionName'
def collectionNamespace = new MongoNamespace(namespace.getDatabaseName(), collectionName)
def pipeline = [new BsonDocument('$match', new BsonDocument()), new BsonDocument('$merge', new BsonString(collectionName))]

when:
new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference, readConcern, writeConcern, executor,
pipeline, AggregationLevel.COLLECTION, false)
.iterator()

def operation = executor.getWriteOperation() as AggregateToCollectionOperation

then:
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace, pipeline, readConcern, writeConcern,
AggregationLevel.COLLECTION))

when:
operation = executor.getReadOperation() as FindOperation<Document>

then:
operation.getNamespace() == collectionNamespace
}

def 'should build the expected AggregateToCollectionOperation for $out as a document'() {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
Expand Down

0 comments on commit 05fdd7e

Please sign in to comment.