From 3efaa64065c48996c0c75e60787b5fbb5cf11210 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Tue, 29 Oct 2013 00:59:54 -0700 Subject: [PATCH 1/4] Adding utility functions for observables of strings useful for processing non blocking IO. --- .../java/rx/observables/StringObservable.java | 255 ++++++++++++++++++ .../main/java/rx/util/AssertObservable.java | 139 ++++++++++ .../rx/observables/StringObservableTest.java | 115 ++++++++ .../java/rx/util/AssertObservableTest.java | 28 ++ 4 files changed, 537 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/observables/StringObservable.java create mode 100644 rxjava-core/src/main/java/rx/util/AssertObservable.java create mode 100644 rxjava-core/src/test/java/rx/observables/StringObservableTest.java create mode 100644 rxjava-core/src/test/java/rx/util/AssertObservableTest.java diff --git a/rxjava-core/src/main/java/rx/observables/StringObservable.java b/rxjava-core/src/main/java/rx/observables/StringObservable.java new file mode 100644 index 0000000000..dfe7ca3e68 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/StringObservable.java @@ -0,0 +1,255 @@ +package rx.observables; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.util.Arrays; +import java.util.regex.Pattern; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.Observable.OnSubscribeFunc; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class StringObservable { + /** + * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks. + * + * @param src + * @param charsetName + * @return + */ + public static Observable decode(Observable src, String charsetName) { + return decode(src, Charset.forName(charsetName)); + } + + /** + * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks. + * + * @param src + * @param charset + * @return + */ + public static Observable decode(Observable src, Charset charset) { + return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); + } + + /** + * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks. + * This method allows for more control over how malformed and unmappable characters are handled. + * + * @param src + * @param charsetDecoder + * @return + */ + public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) { + return Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + return src.subscribe(new Observer() { + private ByteBuffer leftOver = null; + + @Override + public void onCompleted() { + if (process(null, leftOver, true)) + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (process(null, leftOver, true)) + observer.onError(e); + } + + @Override + public void onNext(byte[] bytes) { + process(bytes, leftOver, false); + } + + public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { + ByteBuffer bb; + if (last != null) { + if (next != null) { + // merge leftover in front of the next bytes + bb = ByteBuffer.allocate(last.remaining() + next.length); + bb.put(last); + bb.put(next); + bb.flip(); + } + else { // next == null + bb = last; + } + } + else { // last == null + if (next != null) { + bb = ByteBuffer.wrap(next); + } + else { // next == null + return true; + } + } + + CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte())); + CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput); + cb.flip(); + + if (cr.isError()) { + try { + cr.throwException(); + } + catch (CharacterCodingException e) { + observer.onError(e); + return false; + } + } + + if (bb.remaining() > 0) { + leftOver = bb; + } + else { + leftOver = null; + } + + String string = cb.toString(); + if (!string.isEmpty()) + observer.onNext(string); + + return true; + } + }); + } + }); + } + + /** + * Encodes a possible infinite stream of strings into a Observable of byte arrays. + * + * @param src + * @param charsetName + * @return + */ + public static Observable encode(Observable src, String charsetName) { + return encode(src, Charset.forName(charsetName)); + } + + /** + * Encodes a possible infinite stream of strings into a Observable of byte arrays. + * + * @param src + * @param charset + * @return + */ + public static Observable encode(Observable src, Charset charset) { + return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE)); + } + + /** + * Encodes a possible infinite stream of strings into a Observable of byte arrays. + * This method allows for more control over how malformed and unmappable characters are handled. + * + * @param src + * @param charsetEncoder + * @return + */ + public static Observable encode(Observable src, final CharsetEncoder charsetEncoder) { + return src.map(new Func1() { + @Override + public byte[] call(String str) { + CharBuffer cb = CharBuffer.wrap(str); + ByteBuffer bb; + try { + bb = charsetEncoder.encode(cb); + } catch (CharacterCodingException e) { + throw new RuntimeException(e); + } + return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit()); + } + }); + } + + /** + * Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams. + * + * @param src + * @return + */ + public static Observable stringConcat(Observable src) { + return src.aggregate(new Func2() { + public String call(String a, String b) { + return a + b; + } + }); + } + + /** + * Rechunks the strings based on a regex pattern and works on infinite stream. + * + * resplit(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"] + * resplit(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""] + * + * See {@link Pattern} + * + * @param src + * @param regex + * @return + */ + public static Observable split(final Observable src, String regex) { + final Pattern pattern = Pattern.compile(regex); + return Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(final Observer observer) { + return src.subscribe(new Observer() { + private String leftOver = null; + + @Override + public void onCompleted() { + output(leftOver); + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + output(leftOver); + observer.onError(e); + } + + @Override + public void onNext(String segment) { + String[] parts = pattern.split(segment, -1); + + if (leftOver != null) + parts[0] = leftOver + parts[0]; + for (int i = 0; i < parts.length - 1; i++) { + String part = parts[i]; + output(part); + } + leftOver = parts[parts.length - 1]; + } + + private int emptyPartCount = 0; + /** + * when limit == 0 trailing empty parts are not emitted. + * @param part + */ + private void output(String part) { + if (part.isEmpty()) { + emptyPartCount++; + } + else { + for(; emptyPartCount>0; emptyPartCount--) + observer.onNext(""); + observer.onNext(part); + } + } + }); + } + }); + } +} diff --git a/rxjava-core/src/main/java/rx/util/AssertObservable.java b/rxjava-core/src/main/java/rx/util/AssertObservable.java new file mode 100644 index 0000000000..1bd34fcd23 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/AssertObservable.java @@ -0,0 +1,139 @@ +package rx.util; + +import rx.Notification; +import rx.Observable; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +public class AssertObservable { + /** + * Asserts that two Observables are equal. If they are not, an {@link AssertionError} is thrown + * with the given message. If expecteds and actuals are + * null, they are considered equal. + * + * @param expected + * Observable with expected values. + * @param actual + * Observable with actual values + */ + public static void assertObservableEqualsBlocking(Observable expected, Observable actual) { + assertObservableEqualsBlocking(null, expected, actual); + } + + /** + * Asserts that two Observables are equal. If they are not, an {@link AssertionError} is thrown + * with the given message. If expected and actual are + * null, they are considered equal. + * + * @param message + * the identifying message for the {@link AssertionError} (null okay) + * @param expected + * Observable with expected values. + * @param actual + * Observable with actual values + */ + public static void assertObservableEqualsBlocking(String message, Observable expected, Observable actual) { + assertObservableEquals(expected, actual).toBlockingObservable().lastOrDefault(null); + } + + /** + * Asserts that two {@link Observable}s are equal and returns an empty {@link Observable}. If + * they are not, an {@link Observable} is returned that calls onError with an + * {@link AssertionError} when subscribed to. If expected and actual + * are null, they are considered equal. + * + * @param message + * the identifying message for the {@link AssertionError} (null okay) + * @param expected + * Observable with expected values. + * @param actual + * Observable with actual values + */ + public static Observable assertObservableEquals(Observable expected, Observable actual) { + return assertObservableEquals(null, expected, actual); + } + + /** + * Asserts that two {@link Observable}s are equal and returns an empty {@link Observable}. If + * they are not, an {@link Observable} is returned that calls onError with an + * {@link AssertionError} when subscribed to with the given message. If expected + * and actual are null, they are considered equal. + * + * @param message + * the identifying message for the {@link AssertionError} (null okay) + * @param expected + * Observable with expected values. + * @param actual + * Observable with actual values + */ + public static Observable assertObservableEquals(final String message, Observable expected, Observable actual) { + if (actual == null && expected != null) { + return Observable.error(new AssertionError((message != null ? message + ": " : "") + "Actual was null and expected was not")); + } + if (actual != null && expected == null) { + return Observable.error(new AssertionError((message != null ? message + ": " : "") + "Expected was null and actual was not")); + } + if (actual == null && expected == null) { + return Observable.empty(); + } + + Func2, ? super Notification, Notification> zipFunction = new Func2, Notification, Notification>() { + @Override + public Notification call(Notification expectedNotfication, Notification actualNotification) { + if (expectedNotfication.equals(actualNotification)) { + StringBuilder message = new StringBuilder(); + message.append(expectedNotfication.getKind()); + if (expectedNotfication.hasValue()) + message.append(" ").append(expectedNotfication.getValue()); + if (expectedNotfication.hasThrowable()) + message.append(" ").append(expectedNotfication.getThrowable()); + return new Notification("equals " + message.toString()); + } + else { + StringBuilder error = new StringBuilder(); + error.append("expected:<").append(expectedNotfication.getKind()); + if (expectedNotfication.hasValue()) + error.append(" ").append(expectedNotfication.getValue()); + if (expectedNotfication.hasThrowable()) + error.append(" ").append(expectedNotfication.getThrowable()); + error.append("> but was:<").append(actualNotification.getKind()); + if (actualNotification.hasValue()) + error.append(" ").append(actualNotification.getValue()); + if (actualNotification.hasThrowable()) + error.append(" ").append(actualNotification.getThrowable()); + error.append(">"); + + return new Notification(new AssertionError(error.toString())); + } + } + }; + + Func2, Notification, Notification> accumulator = new Func2, Notification, Notification>() { + @Override + public Notification call(Notification a, Notification b) { + String message = a.isOnError() ? a.getThrowable().getMessage() : a.getValue(); + boolean fail = a.isOnError(); + + message += "\n\t" + (b.isOnError() ? b.getThrowable().getMessage() : b.getValue()); + fail |= b.isOnError(); + + if (fail) + return new Notification(new AssertionError(message)); + else + return new Notification(message); + } + }; + + Observable outcomeObservable = Observable.zip(expected.materialize(), actual.materialize(), zipFunction).aggregate(accumulator).map(new Func1, Notification>() { + @Override + public Notification call(Notification outcome) { + if (outcome.isOnError()) { + String fullMessage = (message != null ? message + ": " : "") + "Observables are different\n\t" + outcome.getThrowable().getMessage(); + return new Notification(new AssertionError(fullMessage)); + } + return new Notification(); + } + }).dematerialize(); + return outcomeObservable; + } +} diff --git a/rxjava-core/src/test/java/rx/observables/StringObservableTest.java b/rxjava-core/src/test/java/rx/observables/StringObservableTest.java new file mode 100644 index 0000000000..8ced455f63 --- /dev/null +++ b/rxjava-core/src/test/java/rx/observables/StringObservableTest.java @@ -0,0 +1,115 @@ +package rx.observables; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.MalformedInputException; + +import org.junit.Test; + +import rx.Observable; +import rx.observables.BlockingObservable; +import rx.observables.StringObservable; +import rx.util.AssertObservable; + +public class StringObservableTest { + + @Test + public void testMultibyteSpanningTwoBuffers() { + Observable src = Observable.from(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 }); + String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single(); + + assertEquals("\u00A1", out); + } + + @Test + public void testMalformedAtTheEndReplace() { + Observable src = Observable.from(new byte[] { (byte) 0xc2 }); + String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single(); + + // REPLACEMENT CHARACTER + assertEquals("\uFFFD", out); + } + + @Test + public void testMalformedInTheMiddleReplace() { + Observable src = Observable.from(new byte[] { (byte) 0xc2, 65 }); + String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single(); + + // REPLACEMENT CHARACTER + assertEquals("\uFFFDA", out); + } + + @Test(expected = RuntimeException.class) + public void testMalformedAtTheEndReport() { + Observable src = Observable.from(new byte[] { (byte) 0xc2 }); + CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder(); + StringObservable.decode(src, charsetDecoder).toBlockingObservable().single(); + } + + @Test(expected = RuntimeException.class) + public void testMalformedInTheMiddleReport() { + Observable src = Observable.from(new byte[] { (byte) 0xc2, 65 }); + CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder(); + StringObservable.decode(src, charsetDecoder).toBlockingObservable().single(); + } + + @Test + public void testPropogateError() { + Observable src = Observable.from(new byte[] { 65 }); + Observable err = Observable.error(new IOException()); + CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder(); + try { + StringObservable.decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single(); + fail(); + } catch (RuntimeException e) { + assertEquals(IOException.class, e.getCause().getClass()); + } + } + + @Test + public void testPropogateErrorInTheMiddleOfMultibyte() { + Observable src = Observable.from(new byte[] { (byte) 0xc2 }); + Observable err = Observable.error(new IOException()); + CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder(); + try { + StringObservable.decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single(); + fail(); + } catch (RuntimeException e) { + assertEquals(MalformedInputException.class, e.getCause().getClass()); + } + } + + @Test + public void testEncode() { + assertArrayEquals( + new byte[] { (byte) 0xc2, (byte) 0xa1 }, + StringObservable.encode(Observable.just("\u00A1"), "UTF-8").toBlockingObservable().single()); + } + + @Test + public void testSplitOnCollon() { + testSplit("boo:and:foo", ":", 0, "boo", "and", "foo"); + } + @Test + public void testSplitOnOh() { + testSplit("boo:and:foo", "o", 0, "b", "", ":and:f"); + } + + public void testSplit(String str, String regex, int limit, String... parts) { + testSplit(str, regex, 0, Observable.from(str), parts); + for (int i = 0; i < str.length(); i++) { + String a = str.substring(0, i); + String b = str.substring(i, str.length()); + testSplit(a+"|"+b, regex, limit, Observable.from(a, b), parts); + } + } + + public void testSplit(String message, String regex, int limit, Observable src, String... parts) { + Observable act = StringObservable.split(src, regex); + Observable exp = Observable.from(parts); + AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act); + } +} diff --git a/rxjava-core/src/test/java/rx/util/AssertObservableTest.java b/rxjava-core/src/test/java/rx/util/AssertObservableTest.java new file mode 100644 index 0000000000..f2182bd8cf --- /dev/null +++ b/rxjava-core/src/test/java/rx/util/AssertObservableTest.java @@ -0,0 +1,28 @@ +package rx.util; + +import org.junit.Test; + +import rx.Observable; + +public class AssertObservableTest { + + @Test + public void testPassNotNull() { + AssertObservable.assertObservableEqualsBlocking("foo", Observable.from(1, 2), Observable.from(1, 2)); + } + + @Test + public void testPassNull() { + AssertObservable.assertObservableEqualsBlocking("foo", null, null); + } + + @Test(expected = AssertionError.class) + public void testFailNotNull() { + AssertObservable.assertObservableEqualsBlocking("foo", Observable.from(1, 2), Observable.from(1)); + } + + @Test(expected = AssertionError.class) + public void testFailNull() { + AssertObservable.assertObservableEqualsBlocking("foo", Observable.from(1, 2), null); + } +} From 7e43a6c40b2094d63271e43d8e80be47fc1add94 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Mon, 25 Nov 2013 23:22:34 -0800 Subject: [PATCH 2/4] Moving StringObservable to contrib module --- rxjava-contrib/rxjava-string/build.gradle | 29 +++++++++++++++++++ .../java/rx/observables/StringObservable.java | 0 .../rx/observables/StringObservableTest.java | 0 settings.gradle | 3 +- 4 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 rxjava-contrib/rxjava-string/build.gradle rename {rxjava-core => rxjava-contrib/rxjava-string}/src/main/java/rx/observables/StringObservable.java (100%) rename {rxjava-core => rxjava-contrib/rxjava-string}/src/test/java/rx/observables/StringObservableTest.java (100%) diff --git a/rxjava-contrib/rxjava-string/build.gradle b/rxjava-contrib/rxjava-string/build.gradle new file mode 100644 index 0000000000..04f1e93394 --- /dev/null +++ b/rxjava-contrib/rxjava-string/build.gradle @@ -0,0 +1,29 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +javadoc { + options { + doclet = "org.benjchristensen.doclet.DocletExclude" + docletpath = [rootProject.file('./gradle/doclet-exclude.jar')] + stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css') + windowTitle = "RxJava Javadoc ${project.version}" + } + options.addStringOption('top').value = '

RxJava

' +} + +jar { + manifest { + name = 'rxjava-string' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + } +} diff --git a/rxjava-core/src/main/java/rx/observables/StringObservable.java b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java similarity index 100% rename from rxjava-core/src/main/java/rx/observables/StringObservable.java rename to rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java diff --git a/rxjava-core/src/test/java/rx/observables/StringObservableTest.java b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java similarity index 100% rename from rxjava-core/src/test/java/rx/observables/StringObservableTest.java rename to rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java diff --git a/settings.gradle b/settings.gradle index 01cf356da0..176e9150c5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,4 +7,5 @@ include 'rxjava-core', \ 'language-adaptors:rxjava-kotlin', \ 'rxjava-contrib:rxjava-swing', \ 'rxjava-contrib:rxjava-android', \ -'rxjava-contrib:rxjava-apache-http' +'rxjava-contrib:rxjava-apache-http', \ +'rxjava-contrib:rxjava-string' From b0e448dc12253e0b81693e240ec71001e3848408 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Thu, 28 Nov 2013 06:52:11 -0800 Subject: [PATCH 3/4] moving the AssertObservable class to the test side of the build. --- rxjava-core/src/{main => test}/java/rx/util/AssertObservable.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename rxjava-core/src/{main => test}/java/rx/util/AssertObservable.java (100%) diff --git a/rxjava-core/src/main/java/rx/util/AssertObservable.java b/rxjava-core/src/test/java/rx/util/AssertObservable.java similarity index 100% rename from rxjava-core/src/main/java/rx/util/AssertObservable.java rename to rxjava-core/src/test/java/rx/util/AssertObservable.java From a07fb6a0448c641bd91566b945431aaaee5a5a5f Mon Sep 17 00:00:00 2001 From: George Campbell Date: Thu, 28 Nov 2013 07:22:32 -0800 Subject: [PATCH 4/4] setting the up that string test to use core test --- rxjava-contrib/rxjava-string/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/rxjava-contrib/rxjava-string/build.gradle b/rxjava-contrib/rxjava-string/build.gradle index 04f1e93394..5c578ae04d 100644 --- a/rxjava-contrib/rxjava-string/build.gradle +++ b/rxjava-contrib/rxjava-string/build.gradle @@ -5,6 +5,7 @@ targetCompatibility = JavaVersion.VERSION_1_6 dependencies { compile project(':rxjava-core') + testCompile project(":rxjava-core").sourceSets.test.output provided 'junit:junit-dep:4.10' provided 'org.mockito:mockito-core:1.8.5' }