-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Guideline 6.4: Protect calls to user code from within an operator #216
Comments
@johngmyers This is an interesting observation ... I hadn't noticed that note at the end of guideline 6.4 before. We added this wrapping code close to 1 year ago as a result of dealing with errors being thrown in random places because of bad I don't yet fully understand the principle this note is trying to say nor the issue it claims it will cause. The reason why the For example: public void onNext(String v) {
int num = Integer.parseInt(v);
doSomething(num);
} That code is very easy to have result in a We had Operator implementations as part of the RxJava library all should comply with the Rx Guidelines but users of the library don't do that very well and thus attempts have been made to be resilient to incorrect behavior. The two most obvious examples are: 1) exceptions being thrown from code passed in by users Guideline 6.4 covers functions, comparators, and other such obvious things (Func* and Action* implementations predominantly), but the 2) Observables calling onNext concurrently or from multiple threads even when sequential (not concurrent) This is why we still implement the operators as if they are going to be called concurrently and use volatile, atomic and java.util.concurrent.* data structures. We understand Guideline 6.8 and thus do not synchronize anything (except where the operator requires it such as I'm going to ignore the second one for this discussion as that's not related to Guideline 6.4 which this issue is about. For errors occurring in 1) Throw This is bad and not the solution we can accept. The only time I know of when that's the only acceptable option is documented here: #198 2) Try/Catch Everywhere Every Rx operator implementation needs to try/catch on every call to 3) Single Wrapper Have a single wrapping class such as the current Option 2 means a lot of verbose error handling must be reproduced in every operator and it's error prone to make sure every single one does it in all the right places. What we found in experience was we would have some operators doing okay but then one would not and result in bad error handling for the user. Option 3 evolved out of wanting a single place to ensure proper error handling occurred regardless of whatever every single Rx operator was implemented without bugs in having a try/catch around every single call to the All this said I am not beholden to any particular implementation approach. My goals are:
On to an example ... The recently updated (#212) @Override
public void onError(Exception e) {
if (counter.getAndSet(num) < num) {
observer.onError(e);
}
}
@Override
public void onNext(T args) {
System.out.println("Take: " + args);
final int count = counter.incrementAndGet();
if (count <= num) {
System.out.println("Take sending to onNext: " + args);
observer.onNext(args);
System.out.println("Take AFTER sending to onNext: " + args);
if (count == num) {
observer.onCompleted();
}
}
if (count >= num) {
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
} Here is the unit test that breaks it: @Test
public void testTakeWithError() {
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
Observable.from("1", "2", "three", "4").take(3).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Exception e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
} That will print out 1 and 2 and then silently fail. Note how the Funny enough this test case even shows that the way If I force the This particular I've had this As for how having We obviously need some way of improving the libraries resilience against error handling bugs - even the current attempt with It is very easy to have holes in the error handling in all of the Rx operator implementations so I'd like to know how we can achieve Guideline 6.4 while admitting it's near impossible to have bug-free Rx operators and accomplish the goal of never having an exception be swallowed or lost on a random thread. I'd very much appreciate your or anyone else's assistance in figuring out the approach to take and then getting there. |
Here are unit tests demonstrating various use cases.
2 of these 5 can be helped by the AtomicObserver but it's only helping 1 of the 2 right now. These 2 use cases are the most common of the 5 so it's valid to pursue a solution I think. The use case that doesn't work could be fixed by using a better approach for deciding when to wrap (or of course finding a completely different approach which is the point of this issue). /**
* The error from the user provided Observable is not handled because this is
* asynchronous and the Exception throws on the thread.
*
* Result: Fails
*/
@Test
public void testCustomObservableWithErrorInObservableAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
Observable.create(new Func1<Observer<String>, Subscription>() {
@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription s = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!s.isUnsubscribed()) {
observer.onNext("1");
observer.onNext("2");
throw new NumberFormatException();
}
} finally {
latch.countDown();
}
}
}).start();
return s;
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Exception e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println(v);
count.incrementAndGet();
}
});
// wait for async sequence to complete
latch.await();
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
/**
* The error from the user provided Observer is not handled by the subscribe method try/catch.
*
* It is handled by the AtomicObserver that wraps the provided Observer.
*
* It doesn't compose well though ... if `take(1)` is put between the Observable and
* subscribe then it still fails.
*
* Result: Passes (if AtomicObserver functionality exists)
*/
@Test
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
Observable.create(new Func1<Observer<String>, Subscription>() {
@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription s = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!s.isUnsubscribed()) {
observer.onNext("1");
observer.onNext("2");
observer.onNext("three");
observer.onNext("4");
observer.onCompleted();
}
} finally {
latch.countDown();
}
}
}).start();
return s;
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Exception e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
// wait for async sequence to complete
latch.await();
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
/**
* The error from the user provided Observer is not handled by the subscribe method try/catch.
*
* It is also not handled by the AtomicObserver because composition defeats it.
*
* Result: Fails
*/
@Test
public void testCustomObservableWithErrorInObserverAsynchronousWithComposition() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
Observable.create(new Func1<Observer<String>, Subscription>() {
@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription s = new BooleanSubscription();
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!s.isUnsubscribed()) {
observer.onNext("1");
observer.onNext("2");
observer.onNext("three");
observer.onNext("4");
observer.onCompleted();
}
} finally {
latch.countDown();
}
}
}).start();
return s;
}
}).take(1).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Exception e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
// wait for async sequence to complete
latch.await();
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
/**
* The error from the user provided Observer is handled by the subscribe
* try/catch because this is synchronous
*
* Result: Passes
*/
@Test
public void testCustomObservableWithErrorInObserverSynchronous() {
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
Observable.create(new Func1<Observer<String>, Subscription>() {
@Override
public Subscription call(Observer<String> observer) {
observer.onNext("1");
observer.onNext("2");
observer.onNext("three");
observer.onNext("4");
observer.onCompleted();
return Subscriptions.empty();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Exception e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
}
/**
* The error from the user provided Observable is handled by the
* subscribe try/catch because this is synchronous
*
*
* Result: Passes
*/
@Test
public void testCustomObservableWithErrorInObservableSynchronous() {
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
Observable.create(new Func1<Observer<String>, Subscription>() {
@Override
public Subscription call(Observer<String> observer) {
observer.onNext("1");
observer.onNext("2");
throw new NumberFormatException();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("completed");
}
@Override
public void onError(Exception e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
System.out.println(v);
count.incrementAndGet();
}
});
assertEquals(2, count.get());
assertNotNull(error.get());
if (!(error.get() instanceof NumberFormatException)) {
fail("It should be a NumberFormatException");
}
} |
If this semantic is to be kept, it should be implemented by having I think what the guidelines are getting at is that protecting onNext() is going to produce onError() notifications that the Observer isn't going to expect to be recieving (especially since the Observer is by definition buggy). For example, consider an Observer that subscribes to * In my opinion, they should instead throw Put another way, I think the guidelines are trying to say that attempting to protect notification callbacks leads you down a rabbit hole, which I think you're starting to discover. It is, however, hard to argue against operational experience. It doesn't work to monitor the exceptions coming out of "random" threads, using them to identify the Observer bugs so they can be fixed? Perhaps a ProtectObserver that |
I agree which is what was seeking to be accomplished with AtomicObserver. It's concept is similar to how every operator shouldn't have to synchronize everything - error handling should if possible be abstracted to a single place. I was looking back through the history of our refactoring from internal code to open source RxJava and found where it broke down. We wrapped every This code got mangled while doing performance optimizations (#104) by reducing extraneous wrapping and synchronization and unit tests didn't exist for the error handling use case we lost and I rediscovered while investigating this issue.
This is the reason for the RxJavaErrorHandler plugin. We use this in production to capture any errors that occur so we see them even if an I'll work on this further to get improved unit testing around error handling and explore a clean way to provide the protection without hurting the performance benefits we got in #104. |
That's not the same thing. |
...an exception passed through |
So you're suggesting a separate plugin (or method/type on the existing plugin) that specifically handles only errors thrown by the Observer. That's an interesting idea. I can see the value in knowing the difference. |
I hope you're not counting my set of eyes here. I noticed this hole in most trusted operators when writing If you want users of an interface to comply with a contract requirement, it helps to either enforce the requirement or at least to provide tools to allow implementers to unit test conformance to the requirement. |
Enforcing the requirements is the intent of things like I have a question about implementation details for wrapping an incoming Observer ... I've asked around several folks internally as well. All approaches have drawbacks I think. The simplest and cleanest option is to wrap every Since it is ensuring conformance of the contract and not just try/catch behavior this also means it is doing volatile checks which in a tight-loop and deep composition chain causes performance impacts.
The numbers are in milliseconds but only mean anything relative to each other. It makes a difference though: 4392 vs 2248. So, to avoid wrapping every layer it would be nice to only wrap the final Observer passed in by user code. To know if an Observer is from internal or external can be done by:
This is the cleanest of the 4 options but I question the performance impact (need to test this) of checking every single time.
I dislike marker interfaces but it would perform well.
Annotations are basically just a different flavor of a marker interface. It adds more complexity to scanning them.
I don't like this because it would have to be a public method as Java can't allow me to make a method only accessible to certain packages. Right now I'm pursuing either option 1 or 2 and will report back on my findings on option 1 which I'd prefer if it performs well. Any other approaches that you feel are better or what is your opinion on these 4? |
This code for option 1 appears to perform fine: /**
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
if (observer.getClass().getPackage().getName().startsWith("rx")) {
Subscription s = onSubscribe.call(observer);
if (s == null) {
// this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens
// we want to gracefully handle it the same as AtomicObservableSubscription does
return Subscriptions.empty();
} else {
return s;
}
} else {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(onSubscribe.call(new AtomicObserver<T>(subscription, observer)));
} I was expecting
Since it's in the right order of magnitude (and faster than wrapping everything) I'm proceeding with option 1 for now unless someone comes up with a better solution. |
|
They don't do synchronization. They use java.util.concurrent.atomic.* classes which use compareAndSet or volatile - not synchronization. |
java.util.concurrent.* and volatile are synchronization. |
Volatile and Atomic* classes in Java affect visibility with memory effects across threads and CPUs, but they don't As per the JDK http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/atomic/package-summary.html which represents atomics as being "lock-free":
The volatile and atomic functionality is non-blocking (especially since the use in I'll be submitting a pull request shortly for this issue ... |
The visibility and memory effects have a performance cost and are within the meaning of the word "synchronization" as used in RX Design Guideline 6.8. My opinion on the proposals:
|
Yep I know about the visibility and memory affects and their performance cost. Thanks for your input on the options. |
Not all forms of protection need to fix up buggy implementations so that they work. An alternative form of protection is to make failures happen earlier, with higher probability, and/or higher visibility. That way, bugs get noticed and fixed earlier in the deployment process. |
I've submitted a pull request to resolve the most glaring issues: #221 I have not explored separating |
I merged #221 which solves these:
It does not solve this one and I don't know if there's anything we can do:
I also added fixes to wrap around the Is there anything more that we should do on this subject? |
…ion failures Related to ReactiveX#216 The new forEach unit test went into a deadlock prior to this fix.
- Stop catching the error and passing to onError and instead let the SafeObserver handle it which will then prevent subsequent onNext calls and/or unsubscribe when a failure occurs. - This also solves the OnErrorResumeNext issue fixed in ReactiveX#312 but those changes still seem valid so I'll leave them. Related to ReactiveX#216 and ReactiveX#312
- Stop catching the error and passing to onError and instead let the SafeObserver handle it which will then prevent subsequent onNext calls and/or unsubscribe when a failure occurs. - This also solves the OnErrorResumeNext issue fixed in ReactiveX#312 but those changes still seem valid so I'll leave them. Related to ReactiveX#216 and ReactiveX#312
…X#216) * Issue ReactiveX#197: Option to auto transition to half open * Issue ReactiveX#197: Option to auto transition, tidy up * Issue ReactiveX#197: Option to auto transition, reduce number of cb instances * Issue ReactiveX#197 Open to auto transition to half open, PR changes * Issue ReactiveX#197 Open to auto transition to half open, PR changes 2 * Issue ReactiveX#197 tidy up config after rebase
While discussing pull request #212 a possible issue came up with out incoming (not operators)
Observer
's are handled in relation to wrapping them withAtomicObserver
.Instead of discussing it on that unrelated pull request I am moving it to here. The original comment on the possible problem is from @johngmyers here:
The text was updated successfully, but these errors were encountered: