Skip to content

Commit

Permalink
More HTTP tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitesh Kant committed May 8, 2015
1 parent 2b7f88a commit 9da1977
Show file tree
Hide file tree
Showing 10 changed files with 1,731 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public abstract <T extends TrailingHeaders> Observable<HttpClientResponse<O>> wr
* #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused intermediary
* {@link HttpClientRequest} objects.
*/
public abstract HttpClientRequest<I, O> addHeader(CharSequence name, Iterable<Object> values);
public abstract HttpClientRequest<I, O> addHeaderValues(CharSequence name, Iterable<Object> values);

/**
* Overwrites the current value, if any, of the passed header to the passed date value for this request. The date is
Expand Down Expand Up @@ -429,7 +429,7 @@ public abstract <T extends TrailingHeaders> Observable<HttpClientResponse<O>> wr
* #newUpdater()} if you intend to do multiple mutations to this request, to avoid creating unused intermediary
* {@link HttpClientRequest} objects.
*/
public abstract HttpClientRequest<I, O> setHeader(CharSequence name, Iterable<Object> values);
public abstract HttpClientRequest<I, O> setHeaderValues(CharSequence name, Iterable<Object> values);

/**
* Removes the passed header from this request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/
package io.reactivex.netty.protocol.http.clientNew;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.ClientCookieEncoder;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.logging.LogLevel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.reactivex.netty.channel.Connection;
Expand All @@ -44,9 +45,6 @@
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

/**
* @author Nitesh Kant
*/
public final class HttpClientRequestImpl<I, O> extends HttpClientRequest<I, O> {

private final Request request;
Expand Down Expand Up @@ -213,8 +211,8 @@ public HttpClientRequest<I, O> addDateHeader(CharSequence name, Iterable<Date> v
}

@Override
public HttpClientRequest<I, O> addHeader(CharSequence name, Iterable<Object> values) {
return new HttpClientRequestImpl<>(request.addHeader(name, values), client);
public HttpClientRequest<I, O> addHeaderValues(CharSequence name, Iterable<Object> values) {
return new HttpClientRequestImpl<>(request.addHeaderValues(name, values), client);
}

@Override
Expand All @@ -233,8 +231,8 @@ public HttpClientRequest<I, O> setDateHeader(CharSequence name, Iterable<Date> v
}

@Override
public HttpClientRequest<I, O> setHeader(CharSequence name, Iterable<Object> values) {
return new HttpClientRequestImpl<>(request.setHeader(name, values), client);
public HttpClientRequest<I, O> setHeaderValues(CharSequence name, Iterable<Object> values) {
return new HttpClientRequestImpl<>(request.setHeaderValues(name, values), client);
}

@Override
Expand Down Expand Up @@ -386,7 +384,15 @@ private Observable<HttpClientResponse<O>> _writeContentRaw(Observable rawContent
return new HttpClientRequestImpl<>(r, client);
}

private static class Request {
/*Visible for testing*/ Request getRawRequest() {
return request;
}

/*Visible for testing*/ TcpClient<?, HttpClientResponse<O>> getTcpClient() {
return client;
}

/*Visible for testing*/static class Request {

private final HttpRequest headers;
@SuppressWarnings("rawtypes")
Expand All @@ -408,7 +414,7 @@ public Request addHeader(CharSequence name, Object value) {
return create(headersCopy, content, hasTrailers);
}

public Request addHeader(CharSequence name, Iterable<Object> values) {
public Request addHeaderValues(CharSequence name, Iterable<Object> values) {
HttpRequest headersCopy = _copyHeaders();
headersCopy.headers().add(name, values);
return create(headersCopy, content, hasTrailers);
Expand Down Expand Up @@ -445,16 +451,22 @@ public Request setHeader(CharSequence name, Object value) {
return create(headersCopy, content, hasTrailers);
}

public Request setHeader(CharSequence name, Iterable<Date> values) {
public Request setHeaderValues(CharSequence name, Iterable<Object> values) {
HttpRequest headersCopy = _copyHeaders();
headersCopy.headers().set(name, values);
return create(headersCopy, content, hasTrailers);
}

public Request setDateHeader(CharSequence name, Iterable<Date> values) {
HttpRequest headersCopy = _copyHeaders();
boolean addNow = false;
for (Date value : values) {
HttpHeaders.setDateHeader(headersCopy, name, value);
if (addNow) {
HttpHeaders.addDateHeader(headersCopy, name, value);
} else {
HttpHeaders.setDateHeader(headersCopy, name, value);
addNow = true;
}
}
return create(headersCopy, content, hasTrailers);
}
Expand All @@ -478,20 +490,19 @@ public Request removeHeader(CharSequence name) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
private Observable asObservable(Connection connection) {
/*visible for testing*/ Observable asObservable(Channel channel) {
Observable toReturn = Observable.just(headers);

if (null != content) {
if (null == flushSelector) {
toReturn = toReturn.concatWith(content);
} else {
toReturn = toReturn.concatWith(content.lift(new FlushSelectorOperator(flushSelector,
connection.getNettyChannel())));
toReturn = toReturn.concatWith(content.lift(new FlushSelectorOperator(flushSelector, channel)));
}
}

if (!hasTrailers) {
toReturn = toReturn.concatWith(Observable.just(new DefaultLastHttpContent()));
toReturn = toReturn.concatWith(Observable.just(LastHttpContent.EMPTY_LAST_CONTENT));
}

return toReturn;
Expand Down Expand Up @@ -522,6 +533,23 @@ private static Request create(HttpRequest headers, Observable content, Func1<?,
boolean hasTrailers) {
return new Request(headers, content, flushSelector, hasTrailers);
}

/*Visible for testing*/HttpRequest getHeaders() {
return headers;
}

@SuppressWarnings("rawtypes")
/*Visible for testing*/ Observable getContent() {
return content;
}

/*Visible for testing*/ Func1<?, Boolean> getFlushSelector() {
return flushSelector;
}

/*Visible for testing*/ boolean hasTrailers() {
return hasTrailers;
}
}

private static class OnSubscribeFuncImpl<O> implements OnSubscribe<HttpClientResponse<O>> {
Expand All @@ -542,7 +570,8 @@ public OnSubscribeFuncImpl(TcpClient<?, HttpClientResponse<O>> client, Request r
@SuppressWarnings("rawtypes")
@Override
public Observable<HttpClientResponse> call(Connection c) {
return c.write(OnSubscribeFuncImpl.this.request.asObservable(c))
return c.write(OnSubscribeFuncImpl.this.request
.asObservable(c.getNettyChannel()))
.ignoreElements()
.cast(HttpClientResponse.class)
.concatWith(c.getInput().take(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.AttributeKey;
import io.reactivex.netty.client.ClientMetricsEvent;
Expand Down Expand Up @@ -49,21 +44,6 @@ public HttpClientToConnectionBridge(MetricEventsSubject<ClientMetricsEvent<?>> e
this.eventsSubject = eventsSubject;
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
if (!HttpHeaders.isContentLengthSet(request)) {
// If there is no content length we need to specify the transfer encoding as chunked as we always
// send data in multiple HttpContent.
// On the other hand, if someone wants to not have chunked encoding, adding content-length will work
// as expected.
request.headers().set(Names.TRANSFER_ENCODING, Values.CHUNKED);
}
}
super.write(ctx, msg, promise);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectionResueEvent) {
Expand All @@ -76,9 +56,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}

@Override
protected void closedBeforeReceiveComplete(ConnectionInputSubscriber connectionInputSubscriber) {
connectionInputSubscriber.getChannel().attr(DISCARD_CONNECTION).set(true);
super.closedBeforeReceiveComplete(connectionInputSubscriber);
protected void onClosedBeforeReceiveComplete(ConnectionInputSubscriber connectionInputSubscriber) {
if (connectionInputSubscriber.getChannel().isActive()) {
/*
* If the close is triggerred by the user, the channel will be active.
* If the response, isn't complete, then the connection can not be used.
*/
connectionInputSubscriber.getChannel().attr(DISCARD_CONNECTION).set(true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.protocol.http.TrailingHeaders;
import io.reactivex.netty.protocol.tcp.ConnectionInputSubscriberEvent;
Expand All @@ -36,7 +41,7 @@
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public abstract class AbstractHttpConnectionBridge<C> extends ChannelDuplexHandler {
Expand All @@ -57,6 +62,18 @@ public abstract class AbstractHttpConnectionBridge<C> extends ChannelDuplexHandl
private static final IllegalStateException TRAILER_ARRIVED_WITH_NO_SUB =
new IllegalStateException("HTTP trailing headers received but no subscriber was registered.");

private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();

static {
ONLY_ONE_CONTENT_INPUT_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
LAZY_CONTENT_INPUT_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
CONTENT_ARRIVED_WITH_NO_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
ONLY_ONE_TRAILER_INPUT_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
LAZY_TRAILER_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
TRAILER_ARRIVED_WITH_NO_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
}

protected ConnectionInputSubscriber connectionInputSubscriber;
private final UnsafeEmptySubscriber<C> emptyContentSubscriber;
private final UnsafeEmptySubscriber<TrailingHeaders> emptyTrailerSubscriber;
Expand All @@ -70,7 +87,16 @@ protected AbstractHttpConnectionBridge() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
Object msgToWrite = msg;

if (msg instanceof String) {
if (isHeaderMessage(msg)) {
HttpMessage httpMsg = (HttpMessage) msg;
if (!HttpHeaders.isContentLengthSet(httpMsg)) {
// If there is no content length we need to specify the transfer encoding as chunked as we always
// send data in multiple HttpContent.
// On the other hand, if someone wants to not have chunked encoding, adding content-length will work
// as expected.
httpMsg.headers().set(Names.TRANSFER_ENCODING, Values.CHUNKED);
}
} else if (msg instanceof String) {
msgToWrite = ctx.alloc().buffer().writeBytes(((String) msg).getBytes());
} else if (msg instanceof byte[]) {
msgToWrite = ctx.alloc().buffer().writeBytes((byte[]) msg);
Expand Down Expand Up @@ -110,21 +136,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
super.userEventTriggered(ctx, eventToPropagateFurther);
}

protected void onChannelClose(ConnectionInputSubscriber connectionInputSubscriber) {
if (connectionInputSubscriber.state.startButNotCompleted()) {
closedBeforeReceiveComplete(connectionInputSubscriber);
}
protected final void onChannelClose(ConnectionInputSubscriber connectionInputSubscriber) {
/*
* If any of the subscribers(header, content, trailer) are still subscribed and the channel is closed, it is an
* error. If they are unsubscribed, this will be a no-op.
*/
connectionInputSubscriber.onError(CLOSED_CHANNEL_EXCEPTION);
}

protected void closedBeforeReceiveComplete(ConnectionInputSubscriber connectionInputSubscriber) {
if (isValidToEmit(connectionInputSubscriber.state.contentSub)) {
connectionInputSubscriber.state.contentSub
.onError(new IOException("Connection closed/released before receiving entire HTTP message."));
}
if (isValidToEmit(connectionInputSubscriber.state.trailerSub)) {
connectionInputSubscriber.state.trailerSub
.onError(new IOException("Connection closed/released before receiving entire HTTP message."));
}
protected void onClosedBeforeReceiveComplete(ConnectionInputSubscriber connectionInputSubscriber) {
// No Op. Override to add behavior
}

protected void resetSubscriptionState(final ConnectionInputSubscriber connectionInputSubscriber) {
Expand All @@ -147,6 +168,7 @@ private void processNextItemInEventloop(Object nextItem, ConnectionInputSubscrib
state.headerReceived();
Object newHttpObject = newHttpObject(nextItem, channel);
connectionInputSubscriber.nextHeader(newHttpObject);
/*Why not complete the header sub? It may be listening to multiple responses (pipelining)*/
checkEagerSubscriptionIfConfigured(channel, state);
}

Expand Down Expand Up @@ -352,18 +374,18 @@ public void onNext(T o) {
}
}

private static class TrailerProducer implements Producer {
/*Visible for testing*/static class TrailerProducer implements Producer {

private final Producer connInputProducer;
private final Producer delegate;
@SuppressWarnings("unused")
private volatile int requestedUp; /*Updated and used via the updater*/
/*Updater for requested*/
private static final AtomicIntegerFieldUpdater<TrailerProducer>
REQUESTED_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TrailerProducer.class, "requestedUp");

private TrailerProducer(Producer connInputProducer) {
this.connInputProducer = connInputProducer;
private TrailerProducer(Producer delegate) {
this.delegate = delegate;
}

@Override
Expand All @@ -373,9 +395,13 @@ public void request(long n) {
* Since, the trailer will always be 1 for a http message, this just makes sure the trailer
* subscriber never requests more than 1
*/
connInputProducer.request(1);
delegate.request(1);
}
}

/*Visible for testing*/ Producer getDelegateProducer() {
return delegate;
}
}

protected class ConnectionInputSubscriber extends Subscriber<Object> {
Expand All @@ -391,7 +417,7 @@ public ConnectionInputSubscriber(@SuppressWarnings("rawtypes") ConnectionInputSu
@Override
public void call() {
if (!state.receiveStarted()) {
unsubscribe(); // If the receive has not yet started, unsubscribe from input.
unsubscribe(); // If the receive has not yet started, unsubscribe from input, which closes connection
}
}
}));
Expand All @@ -402,9 +428,10 @@ public void call() {
public void onCompleted() {
// This means channel input has completed
if (state.startButNotCompleted()) {
closedBeforeReceiveComplete(this);
onError(CLOSED_CHANNEL_EXCEPTION);
} else {
completeAllSubs();
}
completeAllSubs();
}

@Override
Expand All @@ -413,7 +440,7 @@ public void onError(Throwable e) {
errorAllSubs(e);

if (state.startButNotCompleted()) {
closedBeforeReceiveComplete(this);
onClosedBeforeReceiveComplete(this);
}
}

Expand Down
Loading

0 comments on commit 9da1977

Please sign in to comment.