Skip to content

Commit

Permalink
Adding HTTP tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Nitesh Kant committed May 7, 2015
1 parent e71e1b7 commit 2b7f88a
Show file tree
Hide file tree
Showing 47 changed files with 1,272 additions and 512 deletions.
16 changes: 14 additions & 2 deletions rxnetty/src/main/java/io/reactivex/netty/channel/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.tcp.ConnectionInputSubscriberEvent;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func1;

/**
* An abstraction over netty's channel providing Rx APIs.
Expand Down Expand Up @@ -88,9 +91,18 @@ public void call(Subscriber<? super R> subscriber) {
* Unless, {@link ChannelOption#AUTO_READ} is set to {@code true}, the content will only be read from the
* underneath channel, if there is a subscriber to the input. So, upon recieving this connection, either one should
* call this method or eventually subscribe to the stream returned by {@link #getInput()}
*
* @return An {@link Observable}, subscription to which will discard the input. This {@code Observable} will
* error/complete when the input errors/completes and unsubscription from here will unsubscribe from the content.
*/
public void ignoreInput() {
nettyChannel.pipeline().fireUserEventTriggered(ConnectionInputSubscriberEvent.discardAllInput(this));
public Observable<Void> ignoreInput() {
return getInput().map(new Func1<R, Void>() {
@Override
public Void call(R r) {
ReferenceCountUtil.release(r);
return null;
}
}).ignoreElements();
}

public Channel getNettyChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ public DetachedChannelPipeline configure(Action1<ChannelPipeline> configurator)
return this;
}

public void copyTo(ChannelPipeline pipeline) {
synchronized (holdersInOrder) {
unguardedCopyToPipeline(pipeline);
}
}

/*Visible for testing*/ LinkedList<HandlerHolder> getHoldersInOrder() {
return holdersInOrder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 Netflix, Inc.
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,6 @@

/**
* A holder of cookies parsed from the Http headers.
*
* @author Nitesh Kant
*/
public class CookiesHolder {

Expand Down Expand Up @@ -65,10 +63,10 @@ private synchronized Map<String, Set<Cookie>> _parseIfNeededAndGet() {
for (String aCookieHeader : allCookieHeaders) {
Set<Cookie> decode = CookieDecoder.decode(aCookieHeader);
for (Cookie cookie : decode) {
Set<Cookie> existingCookiesOfName = cookies.get(cookie.getName());
Set<Cookie> existingCookiesOfName = cookies.get(cookie.name());
if (null == existingCookiesOfName) {
existingCookiesOfName = new HashSet<Cookie>();
cookies.put(cookie.getName(), existingCookiesOfName);
cookies.put(cookie.name(), existingCookiesOfName);
}
existingCookiesOfName.add(cookie);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@

import java.util.concurrent.TimeUnit;

/**
* @author Nitesh Kant
*/
public class HttpClientImpl<I, O> extends HttpClient<I, O> {

private final TcpClient<?, HttpClientResponse<O>> client;
Expand Down Expand Up @@ -225,6 +222,17 @@ public void call(ChannelPipeline pipeline) {
}));
}

public static HttpClient<ByteBuf, ByteBuf> unsafeCreate(TcpClient<ByteBuf, ByteBuf> tcpClient) {
return new HttpClientImpl<>(
tcpClient.<Object, HttpClientResponse<ByteBuf>>pipelineConfigurator(new Action1<ChannelPipeline>() {
@Override
public void call(ChannelPipeline pipeline) {
// TODO: Fix events subject
pipeline.addLast(new HttpClientToConnectionBridge<>(new MetricEventsSubject<ClientMetricsEvent<?>>()));
}
}));
}

@SuppressWarnings("unchecked")
private static <OO> TcpClient<?, HttpClientResponse<OO>> castClient(TcpClient<?, ?> rawTypes) {
return (TcpClient<?, HttpClientResponse<OO>>) rawTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ public abstract <II, OO> HttpClientRequest<II, OO> addChannelHandlerAfter(EventE
*
* @return A new instance of {@link HttpClientRequestUpdater}
*/
@Experimental
public abstract HttpClientRequestUpdater<I, O> newUpdater();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public Observable<HttpClientResponse> call(Connection c) {
public void call(Subscriber<? super HttpClientResponse<O>> subscriber) {
@SuppressWarnings("rawtypes")
final Subscriber rawSub = subscriber;
source.subscribe(rawSub);
source.unsafeSubscribe(rawSub);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 Netflix, Inc.
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.EventExecutorGroup;
import rx.Observable;
import rx.annotations.Experimental;
import rx.functions.Action1;

import java.util.Date;
Expand All @@ -37,6 +38,7 @@
*
* @author Nitesh Kant
*/
@Experimental
public abstract class HttpClientRequestUpdater<I, O> implements HttpClientRequestOperations<I, HttpClientRequestUpdater<I, O>> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ public abstract class HttpClientResponse<T> {
*/
public abstract Observable<T> getContent();

/**
* Marks the content to be discarded. This means that the content can not be read from this response from now.
*
* @return An {@link Observable}, subscription to which will discard the content. This {@code Observable} will
* error/complete when the content errors/completes and unsubscription from here will unsubscribe from the content.
*/
public abstract Observable<Void> discardContent();

public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(getHttpVersion().text())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.protocol.http.CookiesHolder;
import io.reactivex.netty.protocol.http.internal.HttpContentSubscriberEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func1;

import java.text.ParseException;
import java.util.Date;
Expand Down Expand Up @@ -238,6 +240,17 @@ public void call(Subscriber<? super T> subscriber) {
});
}

@Override
public Observable<Void> discardContent() {
return getContent().map(new Func1<T, Void>() {
@Override
public Void call(T t) {
ReferenceCountUtil.release(t);
return null;
}
}).ignoreElements();
}

/**
* Parses the timeout value from the HTTP keep alive header (with name {@link #KEEP_ALIVE_HEADER_NAME}) as described in
* <a href="http://tools.ietf.org/id/draft-thomson-hybi-http-timeout-01.html">this spec</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.AttributeKey;
import io.reactivex.netty.channel.ClientConnectionToChannelBridge.ConnectionResueEvent;
import io.reactivex.netty.channel.ClientConnectionToChannelBridge.PooledConnectionReleaseEvent;
import io.reactivex.netty.client.ClientMetricsEvent;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.client.HttpClientMetricsEvent;
import io.reactivex.netty.protocol.http.internal.AbstractHttpConnectionBridge;
import io.reactivex.netty.protocol.tcp.client.ClientConnectionToChannelBridge.ConnectionResueEvent;
import io.reactivex.netty.protocol.tcp.client.ClientConnectionToChannelBridge.PooledConnectionReleaseEvent;

public class HttpClientToConnectionBridge<C> extends AbstractHttpConnectionBridge<C> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.channel.ConnectionInputSubscriberEvent;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.protocol.http.TrailingHeaders;
import io.reactivex.netty.protocol.tcp.ConnectionInputSubscriberEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Producer;
Expand Down Expand Up @@ -256,7 +256,7 @@ private static boolean isValidToEmit(Subscriber<?> subscriber) {
*/
protected static final class State {

private enum Stage {
/*Visible for testing*/enum Stage {
/*Strictly in the order in which the transitions would happen*/
Created,
HeaderReceived,
Expand Down Expand Up @@ -296,7 +296,35 @@ public boolean startButNotCompleted() {
public boolean receiveStarted() {
return stage.ordinal() > Stage.Created.ordinal();
}
}

/*Visible for testing*/Subscriber<?> getHeaderSub() {
return headerSub;
}

/*Visible for testing*/Subscriber<?> getContentSub() {
return contentSub;
}

/*Visible for testing*/Subscriber<? super TrailingHeaders> getTrailerSub() {
return trailerSub;
}

/*Visible for testing*/long getHeaderReceivedTimeMillis() {
return headerReceivedTimeMillis;
}

/*Visible for testing*/Stage getStage() {
return stage;
}

/*Visible for testing*/IllegalStateException getRaiseErrorOnInputSubscription() {
return raiseErrorOnInputSubscription;
}

/*Visible for testing*/IllegalStateException getRaiseErrorOnTrailerSubscription() {
return raiseErrorOnTrailerSubscription;
}
}

/**
* A subscriber that can be reused if and only if not wrapped in a {@link rx.observers.SafeSubscriber}.
Expand Down Expand Up @@ -370,11 +398,6 @@ public void call() {
channel = evt.getConnection().getNettyChannel();
}

@Override
public void onStart() {
request(1); // Looking for a single message. The content request comes from the actual subscriber.
}

@Override
public void onCompleted() {
// This means channel input has completed
Expand Down Expand Up @@ -519,5 +542,9 @@ private void contentArrivedWhenSubscriberNotValid() {
state.raiseErrorOnInputSubscription = CONTENT_ARRIVED_WITH_NO_SUB;
}
}

/*Visible for testing*/State getState() {
return state;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package io.reactivex.netty.protocol.http.internal;

import io.netty.util.ReferenceCountUtil;
import rx.Subscriber;
import rx.functions.Action1;
import rx.observers.Subscribers;

public class HttpContentSubscriberEvent<T> {

Expand All @@ -31,13 +28,4 @@ public HttpContentSubscriberEvent(Subscriber<? super T> subscriber) {
public Subscriber<? super T> getSubscriber() {
return subscriber;
}

public static <T> HttpContentSubscriberEvent<T> discardAllInput() {
return new HttpContentSubscriberEvent<>(Subscribers.create(new Action1<T>() {
@Override
public void call(T msg) {
ReferenceCountUtil.release(msg);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,10 @@ public abstract class HttpServerRequest<T> {
*
* Unless the channel is set to auto-read, aA {@link RequestHandler} processing a request <em>must</em> make sure
* that it either discards or subscribes to the content. Failure to do so, will stall the connection and no other
* requests will arrive on that connection as nothing is read from the connection.
* requests will arrive on that connection as nothing is read from it.
*
* @return An {@link Observable}, subscription to which will discard the content.
* @return An {@link Observable}, subscription to which will discard the content. This {@code Observable} will
* error/complete when the content errors/completes and unsubscription from here will unsubscribe from the content.
*/
public abstract Observable<Void> discardContent();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.protocol.http.CookiesHolder;
import io.reactivex.netty.protocol.http.internal.HttpContentSubscriberEvent;
import io.reactivex.netty.protocol.http.server.UriInfoHolder;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func1;

import java.text.ParseException;
import java.util.Date;
Expand Down Expand Up @@ -257,13 +259,12 @@ public void call(Subscriber<? super T> subscriber) {

@Override
public Observable<Void> discardContent() {
return Observable.create(new OnSubscribe<Void>() {
return getContent().map(new Func1<T, Void>() {
@Override
public void call(Subscriber<? super Void> subscriber) {
nettyChannel.pipeline()
.fireUserEventTriggered(HttpContentSubscriberEvent.discardAllInput());
subscriber.onCompleted();
public Void call(T t) {
ReferenceCountUtil.release(t);
return null;
}
});
}).ignoreElements();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.channel;
package io.reactivex.netty.protocol.tcp;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.channel.ChannelMetricEventProvider;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.channel.ConnectionImpl;
import io.reactivex.netty.metrics.MetricEventsSubject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,15 +32,15 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
* A bridge between a {@link Connection} instance and the associated {@link Channel}.
* A bridge between a {@link io.reactivex.netty.channel.Connection} instance and the associated {@link Channel}.
*
* All operations on {@link Connection} will pass through this bridge to an appropriate action on the {@link Channel}
* All operations on {@link io.reactivex.netty.channel.Connection} will pass through this bridge to an appropriate action on the {@link Channel}
*
* <h2>Lazy {@link Connection#getInput()} subscription</h2>
* <h2>Lazy {@link io.reactivex.netty.channel.Connection#getInput()} subscription</h2>
*
* Lazy subscriptions are allowed on {@link Connection#getInput()} if and only if the channel is configured to
* Lazy subscriptions are allowed on {@link io.reactivex.netty.channel.Connection#getInput()} if and only if the channel is configured to
* not read data automatically (i.e. {@link ChannelOption#AUTO_READ} is set to {@code false}). Otherwise,
* if {@link Connection#getInput()} is subscribed lazily, the subscriber always recieves an error. The content
* if {@link io.reactivex.netty.channel.Connection#getInput()} is subscribed lazily, the subscriber always recieves an error. The content
* in this case is disposed upon reading.
*
* @param <R> Type read from the connection held by this handler.
Expand Down
Loading

0 comments on commit 2b7f88a

Please sign in to comment.