Skip to content

Commit

Permalink
Support calling QueryBatchCursor.close concurrently with other `Que…
Browse files Browse the repository at this point in the history
…ryBatchCursor` methods (#765)

JAVA-4183
  • Loading branch information
stIncMale authored and rozza committed Aug 3, 2021
1 parent 357ce8b commit bfd25dd
Show file tree
Hide file tree
Showing 22 changed files with 732 additions and 201 deletions.
11 changes: 11 additions & 0 deletions .evergreen/run-load-balancer-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,22 @@ echo $first
--tests UnifiedTransactionsTest \
--tests InitialDnsSeedlistDiscoveryTest
second=$?
echo $second

./gradlew -PjdkHome=/opt/java/${JDK} \
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
-Dorg.mongodb.test.transaction.uri=${MULTI_MONGOS_LB_URI} \
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
--tests QueryBatchCursorFunctionalSpecification
third=$?
echo $third

if [ $first -ne 0 ]; then
exit $first
elif [ $second -ne 0 ]; then
exit $second
elif [ $third -ne 0 ]; then
exit $third
else
exit 0
fi
14 changes: 13 additions & 1 deletion driver-core/src/main/com/mongodb/assertions/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,23 @@ public static boolean assertFalse(final boolean value) throws AssertionError {

/**
* @throws AssertionError Always
* @return Never completes normally. The return type is {@link AssertionError} to allow writing {@code throw fail()}.
* This may be helpful in non-{@code void} methods.
*/
public static void fail() throws AssertionError {
public static AssertionError fail() throws AssertionError {
throw new AssertionError();
}

/**
* @param msg The failure message.
* @throws AssertionError Always
* @return Never completes normally. The return type is {@link AssertionError} to allow writing {@code throw fail("failure message")}.
* This may be helpful in non-{@code void} methods.
*/
public static AssertionError fail(final String msg) throws AssertionError {
throw new AssertionError(assertNotNull(msg));
}

private Assertions() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ public interface AsyncBatchCursor<T> extends Closeable {
* To help making such code simpler, this method is required to be idempotent.
* <p>
* Another quirk is that this method is allowed to release resources "eventually",
* i.e., not before (in the happens before order) returning.
* i.e., not before (in the happens-before order) returning.
* Nevertheless, {@link #isClosed()} called after (in the happens-before order) {@link #close()} must return {@code true}.
*
* @see #close()
*/
@Override
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;

/**
* A source of connections to a single MongoDB server.
Expand All @@ -42,8 +43,10 @@ public interface ConnectionSource extends ReferenceCounted {
*
* @since 3.6
*/
@Nullable
SessionContext getSessionContext();

@Nullable
ServerApi getServerApi();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ private MongoTimeoutException createTimeoutException(final Timeout timeout) {
}
}


/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
ConcurrentPool<UsageTrackingInternalConnection> getPool() {
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ public void connect() {
serverMonitor.connect();
}

/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
ConnectionPool getConnectionPool() {
return connectionPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ public void getConnectionAsync(final SingleResultCallback<AsyncConnection> callb
});
}

/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
ConnectionPool getConnectionPool() {
return connectionPool;
}

private class LoadBalancedServerProtocolExecutor implements ProtocolExecutor {
@SuppressWarnings("unchecked")
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;

import java.io.Closeable;
import java.util.Iterator;
Expand All @@ -37,6 +38,15 @@
*/
@NotThreadSafe
public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
/**
* Despite this interface being {@linkplain NotThreadSafe non-thread-safe},
* {@link #close()} is allowed to be called concurrently with any method of the cursor, including itself.
* This is useful to cancel blocked {@link #hasNext()}, {@link #next()}.
* This method is idempotent.
* <p>
* Another quirk is that this method is allowed to release resources "eventually",
* i.e., not before (in the happens-before order) returning.
*/
@Override
void close();

Expand Down Expand Up @@ -85,6 +95,7 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
*
* @return ServerCursor
*/
@Nullable
ServerCursor getServerCursor();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.operation;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoException;
Expand Down Expand Up @@ -44,7 +45,7 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T

private AggregateResponseBatchCursor<RawBsonDocument> wrapped;
private BsonDocument resumeToken;
private volatile boolean closed;
private final AtomicBoolean closed;

ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
final AggregateResponseBatchCursor<RawBsonDocument> wrapped,
Expand All @@ -56,6 +57,7 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T
this.wrapped = wrapped;
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
closed = new AtomicBoolean();
}

AggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
Expand Down Expand Up @@ -108,8 +110,7 @@ public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBa

@Override
public void close() {
if (!closed) {
closed = true;
if (!closed.getAndSet(true)) {
wrapped.close();
binding.release();
}
Expand Down
Loading

0 comments on commit bfd25dd

Please sign in to comment.