Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClosedChannelException while TcpClient reads from TcpServer #598

Open
JonasHallFnx opened this issue Sep 22, 2017 · 3 comments
Open

ClosedChannelException while TcpClient reads from TcpServer #598

JonasHallFnx opened this issue Sep 22, 2017 · 3 comments

Comments

@JonasHallFnx
Copy link
Contributor

I am getting java.nio.channels.ClosedChannelException in my TcpClient when the server closes the connection. I would expect the getInput() Observable to complete when the client notices a closed connection, rather than to return an error.

Consider the following test:

	@Test
	public void testTcpServerClient() {
		TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
				.start(c->c.writeStringAndFlushOnEach(just("one", "two")));

		TcpClient.newClient("127.0.0.1", server.getServerPort())
				.createConnectionRequest()
				.flatMap(Connection::getInput)
				.map(b->b.toString(Charset.defaultCharset()))
				.map(s->"Client recieved: "+s)
				.doOnNext(System.out::println)
				.toBlocking()
				.last();
	}

The test will output:

Client recieved: one
Client recieved: two

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

Are my expectations wrong? How can I modify the test so that the client can read the input to the end? Examples in the repository uses take(x) which will unsubscribe after receiving the expected number of results, but how can I write an example with an arbitrary number of results?

@jamesgorman2
Copy link
Collaborator

Hi Jonas,

it looks like io.reactivex.netty.channel.AbstractConnectionToChannelBridge expects the client to terminate the connection. I've not dug super deep into it but the code where the exception is prepared is commented with:

/If the subscriber is still active, then it expects data but the channel is closed./

Here is an example that shows how to escape (and test) the exception. I've also added some fixes for bugs that are in your original code.

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import io.reactivex.netty.protocol.tcp.server.TcpServer;
import org.testng.annotations.Test;
import rx.Observable;
import rx.observables.StringObservable;
import rx.observers.TestSubscriber;

import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.stream.Stream;

public class TcpTest {
  @Test
  public void testSinkClosedChannelException() throws Exception {
    int count = 100;
    TcpServer<ByteBuf, ByteBuf> server = TcpServer.newServer(0)
      .start(
        c ->
          c.writeString(
            Observable.range(0, count)
              .map(i -> i + ",")
          )
      );

    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
    TcpClient.newClient("127.0.0.1", server.getServerPort())
      .createConnectionRequest()
      .flatMap(Connection::getInput)
      .map(
        b -> {
          // must free the ByteBuf
          String s = b.toString(Charset.defaultCharset());
          b.release();
          return s;
        }
      )
      // can't guarantee that the block boundaries will line up so we force split
      .compose(o -> StringObservable.split(o, ","))
      .map(Integer::parseInt)
      .onErrorResumeNext(
        e -> {
          if (e instanceof ClosedChannelException) {
            // sink ClosedChannelException to nothing
            return Observable.empty();
          }
          // 'rethrow' all other errors
          return Observable.error(e);
        }
      )
      .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent();

    testSubscriber.assertValueCount(count);
    testSubscriber.assertValues(
      Stream.iterate(0, i -> i + 1).limit(count).toArray(Integer[]::new)
    );
    testSubscriber.assertNoErrors();
}

@JonasHallFnx
Copy link
Contributor Author

Thank you for a quick reply!

Catching the ClosedChannelException and returning empty will work, but I was more interested in the thoughts behind considering a connection closed by the server as an error. I tried to dig down in the commits for some clues, but there is just one large commit by @NiteshKant ( 9da1977#diff-cb86b4ce0a39c9fea0a3174ae12680feR99 ) where this was changed from onCompleted to onError.

I will try to understand why this does not happen when using HttpServer + HttpClient.

@jamesgorman2
Copy link
Collaborator

Yeah, beyond that comment I have no idea. Nothing struck me in the commit either. I had wondered if it is to signal client close vs server close (which would otherwise be lost) because Netflix had a socket client that needed to know this, but that's pure speculation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants