Skip to content

Commit

Permalink
ReactiveX#35 Make sure the Reactor operators can be used together on …
Browse files Browse the repository at this point in the history
…a Flux (ReactiveX#211)
  • Loading branch information
madgnome authored and RobWin committed Feb 26, 2018
1 parent d26c832 commit 13d18a9
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.BaseSubscriber;

import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -32,71 +32,53 @@
*
* @param <T> the value type of the upstream and downstream
*/
class BulkheadSubscriber<T> extends Operators.MonoSubscriber<T, T> {
class BulkheadSubscriber<T> extends BaseSubscriber<T> {

private final CoreSubscriber<? super T> actual;
private final Bulkhead bulkhead;
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);

private Subscription subscription;

public BulkheadSubscriber(Bulkhead bulkhead,
CoreSubscriber<? super T> actual) {
super(actual);
this.actual = actual;
this.bulkhead = requireNonNull(bulkhead);
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(new BulkheadFullException(
String.format("Bulkhead '%s' is full", bulkhead.getName())));
}
public void hookOnSubscribe(Subscription subscription) {
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(new BulkheadFullException(
String.format("Bulkhead '%s' is full", bulkhead.getName())));
}
}

@Override
public void onNext(T t) {
requireNonNull(t);

if (isInvocationPermitted()) {
public void hookOnNext(T t) {
if (notCancelled() && wasCallPermitted()) {
actual.onNext(t);
}
}

@Override
public void onError(Throwable t) {
requireNonNull(t);

if (isInvocationPermitted()) {
public void hookOnError(Throwable t) {
if (wasCallPermitted()) {
bulkhead.onComplete();
actual.onError(t);
}
}

@Override
public void onComplete() {
if (isInvocationPermitted()) {
public void hookOnComplete() {
if (wasCallPermitted()) {
releaseBulkhead();
actual.onComplete();
}
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
super.cancel();
}

private boolean acquireCallPermit() {
boolean callPermitted = false;
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
Expand All @@ -108,12 +90,8 @@ private boolean acquireCallPermit() {
return callPermitted;
}

private boolean isInvocationPermitted() {
return notCancelled() && wasCallPermitted();
}

private boolean notCancelled() {
return !this.isCancelled();
return !this.isDisposed();
}

private boolean wasCallPermitted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.BaseSubscriber;

import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -33,65 +33,54 @@
*
* @param <T> the value type of the upstream and downstream
*/
class CircuitBreakerSubscriber<T> extends Operators.MonoSubscriber<T, T> {
class CircuitBreakerSubscriber<T> extends BaseSubscriber<T> {

private final CoreSubscriber<? super T> actual;
private final CircuitBreaker circuitBreaker;
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
private StopWatch stopWatch;
private Subscription subscription;

public CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
CoreSubscriber<? super T> actual) {
super(actual);
this.actual = actual;
this.circuitBreaker = requireNonNull(circuitBreaker);
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;

if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(new CircuitBreakerOpenException(
String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())));
}
}
}

@Override
public void onNext(T t) {
requireNonNull(t);

if (isInvocationPermitted()) {
actual.onNext(t);
protected void hookOnSubscribe(Subscription subscription) {
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(new CircuitBreakerOpenException(
String.format("CircuitBreaker '%s' is open", circuitBreaker.getName())));
}
}

@Override
public void onError(Throwable t) {
requireNonNull(t);

markFailure(t);
if (isInvocationPermitted()) {
actual.onError(t);
protected void hookOnNext(T value) {
if (notCancelled() && wasCallPermitted()) {
actual.onNext(value);
}
}

@Override
public void onComplete() {
protected void hookOnComplete() {
markSuccess();
if (isInvocationPermitted()) {
if (wasCallPermitted()) {
actual.onComplete();
}
}

@Override
public void request(long n) {
subscription.request(n);
protected void hookOnError(Throwable t) {
requireNonNull(t);

markFailure(t);
if (wasCallPermitted()) {
actual.onError(t);
}
}

private boolean acquireCallPermit() {
Expand All @@ -107,10 +96,6 @@ private boolean acquireCallPermit() {
return callPermitted;
}

private boolean isInvocationPermitted() {
return !this.isCancelled() && wasCallPermitted();
}

private void markFailure(Throwable e) {
if (wasCallPermitted()) {
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
Expand All @@ -123,6 +108,10 @@ private void markSuccess() {
}
}

private boolean notCancelled() {
return !this.isDisposed();
}

private boolean wasCallPermitted() {
return permitted.get() == Permit.ACQUIRED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.BaseSubscriber;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -33,39 +33,33 @@
*
* @param <T> the value type of the upstream and downstream
*/
class RateLimiterSubscriber<T> extends Operators.MonoSubscriber<T, T> {
class RateLimiterSubscriber<T> extends BaseSubscriber<T> {

private final CoreSubscriber<? super T> actual;
private final RateLimiter rateLimiter;
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);
private final AtomicBoolean firstEvent = new AtomicBoolean(true);

private Subscription subscription;

public RateLimiterSubscriber(RateLimiter rateLimiter,
CoreSubscriber<? super T> actual) {
super(actual);
this.actual = actual;
this.rateLimiter = requireNonNull(rateLimiter);
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(rateLimitExceededException());
}
public void hookOnSubscribe(Subscription subscription) {
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(rateLimitExceededException());
}
}

@Override
public void onNext(T t) {
requireNonNull(t);

if (isInvocationPermitted()) {
public void hookOnNext(T t) {
if (notCancelled() && wasCallPermitted()) {
if (firstEvent.getAndSet(false) || rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
actual.onNext(t);
} else {
Expand All @@ -76,31 +70,19 @@ public void onNext(T t) {
}

@Override
public void onError(Throwable t) {
requireNonNull(t);

if (isInvocationPermitted()) {
public void hookOnError(Throwable t) {
if (wasCallPermitted()) {
actual.onError(t);
}
}

@Override
public void onComplete() {
if (isInvocationPermitted()) {
public void hookOnComplete() {
if (wasCallPermitted()) {
actual.onComplete();
}
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
super.cancel();
}

private boolean acquireCallPermit() {
boolean callPermitted = false;
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
Expand All @@ -112,12 +94,8 @@ private boolean acquireCallPermit() {
return callPermitted;
}

private boolean isInvocationPermitted() {
return notCancelled() && wasCallPermitted();
}

private boolean notCancelled() {
return !this.isCancelled();
return !this.isDisposed();
}

private boolean wasCallPermitted() {
Expand Down
Loading

0 comments on commit 13d18a9

Please sign in to comment.