From d92b15197958d61be70f265dbe272280dae761f8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 21 Sep 2013 21:26:49 -0700 Subject: [PATCH] Fixes to rxjava-apache-http - made Content-Type inspection more reliable - other small improvments --- .../java/rx/apache/http/ObservableHttp.java | 2 +- .../consumers/ResponseConsumerDelegate.java | 4 +- .../http/examples/ExampleObservableHttp.java | 69 +++++++++---------- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java b/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java index c8d87c8334..7f34365eb4 100644 --- a/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java +++ b/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/ObservableHttp.java @@ -143,7 +143,7 @@ public Subscription onSubscribe(final Observer o final CompositeSubscription parentSubscription = new CompositeSubscription(); // return a Subscription that wraps the Future so it can be cancelled - parentSubscription.add(Subscriptions.create(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription), + parentSubscription.add(Subscriptions.from(client.execute(requestProducer, new ResponseConsumerDelegate(observer, parentSubscription), new FutureCallback() { @Override diff --git a/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerDelegate.java b/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerDelegate.java index 3060d67bca..7eab30b441 100644 --- a/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerDelegate.java +++ b/rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerDelegate.java @@ -52,7 +52,9 @@ public ResponseConsumerDelegate(final Observer o @Override protected void onResponseReceived(HttpResponse response) throws HttpException, IOException { // when we receive the response with headers we evaluate what type of consumer we want - if (response.getFirstHeader("Content-Type").getValue().equals("text/event-stream")) { + if (response.getFirstHeader("Content-Type").getValue().contains("text/event-stream")) { + // use 'contains' instead of equals since Content-Type can contain additional information + // such as charset ... see here: http://www.w3.org/International/O-HTTP-charset consumer = new ResponseConsumerEventStream(observer, subscription); } else { consumer = new ResponseConsumerBasic(observer, subscription); diff --git a/rxjava-contrib/rxjava-apache-http/src/test/java/rx/apache/http/examples/ExampleObservableHttp.java b/rxjava-contrib/rxjava-apache-http/src/test/java/rx/apache/http/examples/ExampleObservableHttp.java index 763916f861..3f396a3894 100644 --- a/rxjava-contrib/rxjava-apache-http/src/test/java/rx/apache/http/examples/ExampleObservableHttp.java +++ b/rxjava-contrib/rxjava-apache-http/src/test/java/rx/apache/http/examples/ExampleObservableHttp.java @@ -77,41 +77,40 @@ public void call(String resp) { protected static void executeStreamingViaObservableHttpWithForEach(final HttpAsyncClient client) throws URISyntaxException, IOException, InterruptedException { System.out.println("---- executeStreamingViaObservableHttpWithForEach"); - for (int i = 0; i < 5; i++) { - final int c = i + 1; - ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://ec2-54-211-91-164.compute-1.amazonaws.com:8077/eventbus.stream?topic=hystrix-metrics"), client) - .toObservable() - .flatMap(new Func1>() { - - @Override - public Observable call(ObservableHttpResponse response) { - return response.getContent().map(new Func1() { - - @Override - public String call(byte[] bb) { - return new String(bb); - } - - }); - } - }) - .filter(new Func1() { - - @Override - public Boolean call(String t1) { - return !t1.startsWith(": ping"); - } - }) - .take(3) - .toBlockingObservable() - .forEach(new Action1() { - - @Override - public void call(String resp) { - System.out.println("Response [" + c + "]: " + resp + " (" + resp.length() + ")"); - } - }); - } + // URL against https://github.com/Netflix/Hystrix/tree/master/hystrix-examples-webapp + // More information at https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream + ObservableHttp.createRequest(HttpAsyncMethods.createGet("http://localhost:8989/hystrix-examples-webapp/hystrix.stream"), client) + .toObservable() + .flatMap(new Func1>() { + + @Override + public Observable call(ObservableHttpResponse response) { + return response.getContent().map(new Func1() { + + @Override + public String call(byte[] bb) { + return new String(bb); + } + + }); + } + }) + .filter(new Func1() { + + @Override + public Boolean call(String t1) { + return !t1.startsWith(": ping"); + } + }) + .take(3) + .toBlockingObservable() + .forEach(new Action1() { + + @Override + public void call(String resp) { + System.out.println(resp); + } + }); } }