-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SubjectSubscriptionManager should be public #1174
Comments
What kind of Subject behavior do you need? |
Yes, Most new |
I have a use for a one-and-done subject, where each subscriber gets a This single item is either the most recently received item (in the past), Can you think of a way to do this through composition? It would seem to
|
behaviourSubject.drop(1).first()? |
I still want the first item, but only if it came from the the source
|
Make the default a sentinel then use a predicate with first to check it isn't that? (On my phone so sorry for the formatting) final Foo sentinel = new Foo();
BehaviourSubject b = BehaviourSubject.create(sentinel);
b.first(f -> f != sentinel) |
I've considered adding BehaviorSubject.create() without a default for this case as I've had to use the sentinel solution before and it's awkward. |
Why not use asyncsubject? |
AsyncSubject is only the last value after completed, not last value and all subsequent. |
|
Oh. Missed that part. Yup, that's AsyncSubject :-) |
The difference is that I want the subscriber to complete after one item
|
Sounds like AsyncSubject.take(1).subscribe() |
Sorry, Publish or BehaviorSubject and take(1). Not AsyncSubject. |
How about this: public class TakeOne {
static final class Client extends Subscriber<Integer> {
final int id;
public Client(int id) {
this.id = id;
}
@Override
public void onNext(Integer t) {
System.out.printf("%d: %s%n", id, t);
}
@Override
public void onError(Throwable e) {
System.out.printf("%d: %s%n", id, e);
}
@Override
public void onCompleted() {
System.out.printf("%d: %s%n", id, "Done.");
}
}
public static void main(String[] args) {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> intermediate = source.replay(1);
intermediate.connect();
Observable<Integer> result = intermediate.replay(o -> o.take(1));
Client c1 = new Client(1);
result.subscribe(c1);
source.onNext(1);
source.onNext(2);
Client c2 = new Client(2);
result.subscribe(c2);
source.onNext(3);
Client c3 = new Client(3);
result.subscribe(c3);
Client c4 = new Client(4);
result.subscribe(c4);
source.onNext(4);
Client c5 = new Client(5);
result.subscribe(c5);
}
} prints:
It may require PR #1175 to work correctly. Also, we seem to have found an use case for |
+1 this would be great to have |
@akarnokd That seems to work, thank you! Though my unit tests for this implementation also pass when just using this:
What is the reason for the replay(Func1) call? |
|
Anything else needed before closing this issue? |
Looks like that should work. On Tue, May 20, 2014 at 3:14 AM, Ben Christensen
|
Hiding SubjectSubscriptionManager (and making all Subject implementations final) makes it hard to create subjects with custom behavior. Please make this class public.
The text was updated successfully, but these errors were encountered: