Skip to content

Commit

Permalink
CSHARP-1378: Make BulkWrite enumerate requests argument only once (#1298
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sanych-sun authored Apr 2, 2024
1 parent 5f7fc33 commit 0f738fd
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 26 deletions.
22 changes: 11 additions & 11 deletions src/MongoDB.Driver/BulkWriteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public abstract class BulkWriteResult<TDocument> : BulkWriteResult
/// <param name="processedRequests">The processed requests.</param>
protected BulkWriteResult(
int requestCount,
IEnumerable<WriteModel<TDocument>> processedRequests)
IReadOnlyList<WriteModel<TDocument>> processedRequests)
: base(requestCount)
{
_processedRequests = processedRequests.ToList();
_processedRequests = processedRequests;
}

// public properties
Expand All @@ -130,16 +130,16 @@ internal static BulkWriteResult<TDocument> FromCore(Core.Operations.BulkWriteOpe
result.DeletedCount,
result.InsertedCount,
result.IsModifiedCountAvailable ? (long?)result.ModifiedCount : null,
result.ProcessedRequests.Select(r => WriteModel<TDocument>.FromCore(r)),
result.Upserts.Select(u => BulkWriteUpsert.FromCore(u)));
result.ProcessedRequests.Select(WriteModel<TDocument>.FromCore).ToArray(),
result.Upserts.Select(BulkWriteUpsert.FromCore));
}

return new Unacknowledged(
result.RequestCount,
result.ProcessedRequests.Select(r => WriteModel<TDocument>.FromCore(r)));
result.ProcessedRequests.Select(WriteModel<TDocument>.FromCore).ToArray());
}

internal static BulkWriteResult<TDocument> FromCore(Core.Operations.BulkWriteOperationResult result, IEnumerable<WriteModel<TDocument>> requests)
internal static BulkWriteResult<TDocument> FromCore(Core.Operations.BulkWriteOperationResult result, IReadOnlyList<WriteModel<TDocument>> requests)
{
if (result.IsAcknowledged)
{
Expand All @@ -150,7 +150,7 @@ internal static BulkWriteResult<TDocument> FromCore(Core.Operations.BulkWriteOpe
result.InsertedCount,
result.IsModifiedCountAvailable ? (long?)result.ModifiedCount : null,
requests,
result.Upserts.Select(u => BulkWriteUpsert.FromCore(u)));
result.Upserts.Select(BulkWriteUpsert.FromCore));
}

return new Unacknowledged(
Expand All @@ -174,7 +174,7 @@ public class Acknowledged : BulkWriteResult<TDocument>

// constructors
/// <summary>
/// Initializes a new instance of the <see cref="Acknowledged" /> class.
/// Initializes a new instance of the <see cref="BulkWriteResult{TDocument}.Acknowledged" /> class.
/// </summary>
/// <param name="requestCount">The request count.</param>
/// <param name="matchedCount">The matched count.</param>
Expand All @@ -189,7 +189,7 @@ public Acknowledged(
long deletedCount,
long insertedCount,
long? modifiedCount,
IEnumerable<WriteModel<TDocument>> processedRequests,
IReadOnlyList<WriteModel<TDocument>> processedRequests,
IEnumerable<BulkWriteUpsert> upserts)
: base(requestCount, processedRequests)
{
Expand Down Expand Up @@ -259,13 +259,13 @@ public class Unacknowledged : BulkWriteResult<TDocument>
{
// constructors
/// <summary>
/// Initializes a new instance of the <see cref="Unacknowledged"/> class.
/// Initializes a new instance of the <see cref="BulkWriteResult{TDocument}.Unacknowledged"/> class.
/// </summary>
/// <param name="requestCount">The request count.</param>
/// <param name="processedRequests">The processed requests.</param>
public Unacknowledged(
int requestCount,
IEnumerable<WriteModel<TDocument>> processedRequests)
IReadOnlyList<WriteModel<TDocument>> processedRequests)
: base(requestCount, processedRequests)
{
}
Expand Down
2 changes: 1 addition & 1 deletion src/MongoDB.Driver/MongoBulkWriteException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ internal static MongoBulkWriteException<TDocument> FromCore(MongoBulkWriteOperat

return new MongoBulkWriteException<TDocument>(
ex.ConnectionId,
BulkWriteResult<TDocument>.FromCore(ex.Result, processedRequests),
BulkWriteResult<TDocument>.FromCore(ex.Result, processedRequests.ToArray()),
ex.WriteErrors.Select(e => BulkWriteError.FromCore(e)),
WriteConcernError.FromCore(ex.WriteConcernError),
unprocessedRequests);
Expand Down
34 changes: 20 additions & 14 deletions src/MongoDB.Driver/MongoCollectionImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,27 +218,30 @@ public override MongoCollectionSettings Settings
public override BulkWriteResult<TDocument> BulkWrite(IClientSessionHandle session, IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
{
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(requests, nameof(requests));
if (!requests.Any())
Ensure.IsNotNull((object)requests, nameof(requests));

var requestsArray = requests.ToArray();
if (requestsArray.Length == 0)
{
throw new ArgumentException("Must contain at least 1 request.", "requests");
throw new ArgumentException("Must contain at least 1 request.", nameof(requests));
}
foreach (var request in requests)

foreach (var request in requestsArray)
{
request.ThrowIfNotValid();
}

options = options ?? new BulkWriteOptions();

var operation = CreateBulkWriteOperation(session, requests, options);
var operation = CreateBulkWriteOperation(session, requestsArray, options);
try
{
var result = ExecuteWriteOperation(session, operation, cancellationToken);
return BulkWriteResult<TDocument>.FromCore(result, requests);
return BulkWriteResult<TDocument>.FromCore(result, requestsArray);
}
catch (MongoBulkWriteOperationException ex)
{
throw MongoBulkWriteException<TDocument>.FromCore(ex, requests.ToList());
throw MongoBulkWriteException<TDocument>.FromCore(ex, requestsArray);
}
}

Expand All @@ -250,27 +253,30 @@ public override MongoCollectionSettings Settings
public override async Task<BulkWriteResult<TDocument>> BulkWriteAsync(IClientSessionHandle session, IEnumerable<WriteModel<TDocument>> requests, BulkWriteOptions options, CancellationToken cancellationToken = default(CancellationToken))
{
Ensure.IsNotNull(session, nameof(session));
Ensure.IsNotNull(requests, nameof(requests));
if (!requests.Any())
Ensure.IsNotNull((object)requests, nameof(requests));

var requestsArray = requests.ToArray();
if (requestsArray.Length == 0)
{
throw new ArgumentException("Must contain at least 1 request.", "requests");
throw new ArgumentException("Must contain at least 1 request.", nameof(requests));
}
foreach (var request in requests)

foreach (var request in requestsArray)
{
request.ThrowIfNotValid();
}

options = options ?? new BulkWriteOptions();

var operation = CreateBulkWriteOperation(session, requests, options);
var operation = CreateBulkWriteOperation(session, requestsArray, options);
try
{
var result = await ExecuteWriteOperationAsync(session, operation, cancellationToken).ConfigureAwait(false);
return BulkWriteResult<TDocument>.FromCore(result, requests);
return BulkWriteResult<TDocument>.FromCore(result, requestsArray);
}
catch (MongoBulkWriteOperationException ex)
{
throw MongoBulkWriteException<TDocument>.FromCore(ex, requests.ToList());
throw MongoBulkWriteException<TDocument>.FromCore(ex, requestsArray);
}
}

Expand Down
33 changes: 33 additions & 0 deletions tests/MongoDB.Driver.Tests/MongoCollectionImplTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Linq.Expressions;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
Expand All @@ -31,6 +32,7 @@
using MongoDB.Driver.Core.Operations;
using MongoDB.Driver.Core.Servers;
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
using MongoDB.Driver.TestHelpers;
using MongoDB.Driver.Tests;
using Moq;
using Xunit;
Expand Down Expand Up @@ -442,6 +444,37 @@ public void AggregateToCollection_should_throw_when_last_stage_is_not_an_output_
exception.Should().BeOfType<InvalidOperationException>();
}

[Theory]
[ParameterAttributeData]
public async Task BulkWrite_should_enumerate_requests_once([Values(false, true)] bool async)
{
var subject = CreateSubject<BsonDocument>();
var document = new BsonDocument("_id", 1).Add("a", 1);
var requests = new WriteModel<BsonDocument>[]
{
new InsertOneModel<BsonDocument>(document)
};
var processedRequest = new InsertRequest(document) { CorrelationId = 0 };
var operationResult = new BulkWriteOperationResult.Acknowledged(
requestCount: 1,
matchedCount: 0,
deletedCount: 0,
insertedCount: 1,
modifiedCount: 0,
processedRequests: new[] { processedRequest },
upserts: new List<BulkWriteOperationUpsert>());
_operationExecutor.EnqueueResult<BulkWriteOperationResult>(operationResult);
var wrappedRequests = new Mock<IEnumerable<WriteModel<BsonDocument>>>();
wrappedRequests.Setup(e => e.GetEnumerator()).Returns(((IEnumerable<WriteModel<BsonDocument>>)requests).GetEnumerator());

var result = async ? await subject.BulkWriteAsync(wrappedRequests.Object) : subject.BulkWrite(wrappedRequests.Object);

wrappedRequests.Verify(e => e.GetEnumerator(), Times.Once);
result.Should().NotBeNull();
result.RequestCount.Should().Be(1);
result.ProcessedRequests.ShouldBeEquivalentTo(requests);
}

[Theory]
[ParameterAttributeData]
public void BulkWrite_should_execute_a_BulkMixedWriteOperation(
Expand Down

0 comments on commit 0f738fd

Please sign in to comment.