Skip to content

Commit

Permalink
Merge pull request ReactiveX#604 from akarnokd/StringObservableOps1
Browse files Browse the repository at this point in the history
Added op:join to concat objects with separator between elements.
  • Loading branch information
benjchristensen committed Dec 12, 2013
2 parents 2725700 + c9dade8 commit acdb131
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public byte[] call(String str) {
*/
public static Observable<String> stringConcat(Observable<String> src) {
return src.aggregate(new Func2<String, String, String>() {
@Override
public String call(String a, String b) {
return a + b;
}
Expand Down Expand Up @@ -267,4 +268,58 @@ private void output(String part) {
}
});
}
/**
* Concatenates the sequence of values by adding a separator
* between them and emitting the result once the source completes.
* <p>
* The conversion from the value type to String is performed via
* {@link java.lang.String#valueOf(java.lang.Object)} calls.
* <p>
* For example:
* <pre>
* Observable&lt;Object> source = Observable.from("a", 1, "c");
* Observable&lt;String> result = join(source, ", ");
* </pre>
*
* will yield a single element equal to "a, 1, c".
*
* @param source the source sequence of CharSequence values
* @param separator the separator to a
* @return an Observable which emits a single String value having the concatenated
* values of the source observable with the separator between elements
*/
public static <T> Observable<String> join(final Observable<T> source, final CharSequence separator) {
return Observable.create(new OnSubscribeFunc<String>() {

@Override
public Subscription onSubscribe(final Observer<? super String> t1) {
return source.subscribe(new Observer<T>() {
boolean mayAddSeparator;
StringBuilder b = new StringBuilder();
@Override
public void onNext(T args) {
if (mayAddSeparator) {
b.append(separator);
}
mayAddSeparator = true;
b.append(String.valueOf(args));
}

@Override
public void onError(Throwable e) {
b = null;
t1.onError(e);
}

@Override
public void onCompleted() {
String str = b.toString();
b = null;
t1.onNext(str);
t1.onCompleted();
}
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import java.nio.charset.MalformedInputException;

import org.junit.Test;
import static org.mockito.Mockito.*;

import rx.Observable;
import rx.observables.BlockingObservable;
import rx.observables.StringObservable;
import rx.Observer;
import rx.util.AssertObservable;

public class StringObservableTest {
Expand Down Expand Up @@ -127,4 +127,89 @@ public void testSplit(String message, String regex, int limit, Observable<String
Observable<String> exp = Observable.from(parts);
AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act);
}

@Test
public void testJoinMixed() {
Observable<Object> source = Observable.<Object>from("a", 1, "c");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("a, 1, c");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinWithEmptyString() {
Observable<String> source = Observable.from("", "b", "c");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext(", b, c");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinWithNull() {
Observable<String> source = Observable.from("a", null, "c");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("a, null, c");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinSingle() {
Observable<String> source = Observable.from("a");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("a");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinEmpty() {
Observable<String> source = Observable.empty();

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinThrows() {
Observable<String> source = Observable.concat(Observable.just("a"), Observable.<String>error(new RuntimeException("Forced failure")));

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, never()).onNext("a");
verify(observer, never()).onCompleted();
verify(observer, times(1)).onError(any(Throwable.class));
}
}

0 comments on commit acdb131

Please sign in to comment.