Skip to content

Commit

Permalink
okhttp: Remove RPCs-before-ready tests
Browse files Browse the repository at this point in the history
In the olden days, before LB policies, transports had to accept RPCs as
soon as they were created. This hasn't been true for a very long time,
so remove the tests.

Since a978c9e we're using real, legit code flows in the tests. This
allowed TSAN to discover that `attributes` is racy when read when
creating a new stream before the transport is ready. We could use a lock
or volatile, but the value of the attributes would still be incorrect
for any RPCs that are created before the transport is ready.

Since there's now only one test that delays the connection, I inline the
support code.
  • Loading branch information
ejona86 authored Apr 7, 2022
1 parent 5351fb9 commit 054cb49
Showing 1 changed file with 8 additions and 89 deletions.
97 changes: 8 additions & 89 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -171,7 +170,6 @@ public class OkHttpClientTransportTest {
private ExecutorService executor = Executors.newCachedThreadPool();
private long nanoTime; // backs a ticker, for testing ping round-trip time measurement
private SettableFuture<Void> connectedFuture;
private DelayConnectedCallback delayConnectedCallback;
private Runnable tooManyPingsRunnable = new Runnable() {
@Override public void run() {
throw new AssertionError();
Expand Down Expand Up @@ -204,15 +202,6 @@ private void initTransport(int startId) throws Exception {
startTransport(startId, null, true, null);
}

private void initTransportAndDelayConnected() throws Exception {
delayConnectedCallback = new DelayConnectedCallback();
startTransport(
DEFAULT_START_STREAM_ID,
delayConnectedCallback,
false,
null);
}

private void startTransport(int startId, @Nullable Runnable connectingCallback,
boolean waitingForConnected, String userAgent)
throws Exception {
Expand Down Expand Up @@ -1681,70 +1670,17 @@ public void ping_failsIfTransportFails() throws Exception {
shutdownAndVerify();
}

@Test
public void writeBeforeConnected() throws Exception {
initTransportAndDelayConnected();
reset(frameWriter);
final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(listener);
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
stream.writeMessage(input);
stream.flush();
// The message should be queued.
verifyNoMoreInteractions(frameWriter);

allowTransportConnected();

// The queued message should be sent out.
verify(frameWriter, timeout(TIME_OUT_MS))
.data(eq(false), eq(3), any(Buffer.class), eq(12 + HEADER_LENGTH));
Buffer sentFrame = capturedBuffer.poll();
assertEquals(createMessageFrame(message), sentFrame);
stream.cancel(Status.CANCELLED);
shutdownAndVerify();
}

@Test
public void cancelBeforeConnected() throws Exception {
initTransportAndDelayConnected();
reset(frameWriter);
final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(listener);
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
stream.writeMessage(input);
stream.flush();
stream.cancel(Status.CANCELLED);
verifyNoMoreInteractions(frameWriter);

allowTransportConnected();
verifyNoMoreInteractions(frameWriter);
shutdownAndVerify();
}

@Test
public void shutdownDuringConnecting() throws Exception {
initTransportAndDelayConnected();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT, tracers);
stream.start(listener);
SettableFuture<Void> delayed = SettableFuture.create();
Runnable connectingCallback = () -> Futures.getUnchecked(delayed);
startTransport(
DEFAULT_START_STREAM_ID,
connectingCallback,
false,
null);
clientTransport.shutdown(SHUTDOWN_REASON);
allowTransportConnected();

// The new stream should be failed, but not the pending stream.
assertNewStreamFail();
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
assertEquals(1, activeStreamCount());
stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
delayed.set(null);
shutdownAndVerify();
}

Expand Down Expand Up @@ -2375,10 +2311,6 @@ public void onFailure(Throwable cause) {
}
}

private void allowTransportConnected() {
delayConnectedCallback.allowConnected();
}

private void shutdownAndVerify() {
clientTransport.shutdown(SHUTDOWN_REASON);
assertEquals(0, activeStreamCount());
Expand All @@ -2390,19 +2322,6 @@ private void shutdownAndVerify() {
frameReader.assertClosed();
}

private static class DelayConnectedCallback implements Runnable {
SettableFuture<Void> delayed = SettableFuture.create();

@Override
public void run() {
Futures.getUnchecked(delayed);
}

void allowConnected() {
delayed.set(null);
}
}

private static TransportStats getTransportStats(InternalInstrumented<SocketStats> obj)
throws ExecutionException, InterruptedException {
return obj.getStats().get().data;
Expand Down

0 comments on commit 054cb49

Please sign in to comment.