Skip to content

Commit

Permalink
fix #289 Split Signal into immutable and mutable version
Browse files Browse the repository at this point in the history
Signal is now an abstract class, and the default implementation is still
the immutable one. Introduce a mutable version for onNext signals that
can be used to limit the number of created instances in advanced cases,
here used for doOnEach.

A new operator, `FluxPeekStateful`, is used. It peeks into a sequence
while also passing around an arbitrary state object, per subscriber.

Note that FluxPeekStateful already doesn't call onError upon an
afterTerminate callback failure (#270).
  • Loading branch information
simonbasle authored Dec 13, 2016
1 parent c37f67b commit 4b469be
Show file tree
Hide file tree
Showing 8 changed files with 1,173 additions and 64 deletions.
35 changes: 30 additions & 5 deletions src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -3073,13 +3073,13 @@ public final Flux<T> doOnComplete(Runnable onComplete) {
* @see Signal
*/
public final Flux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
//TODO use a flyweight pattern for the onNext signals (re-use the same instance)?
Objects.requireNonNull(signalConsumer, "signalConsumer");
return doOnSignal(this,
return doOnSignalStateful(this,
MutableNextSignal::<T>undefined,
null,
t -> signalConsumer.accept(Signal.next(t)),
e -> signalConsumer.accept(Signal.<T>error(e)),
() -> signalConsumer.accept(Signal.<T>complete()),
(v, s) -> signalConsumer.accept(s.mutate(v)),
(e, s) -> signalConsumer.accept(Signal.<T>error(e)),
s -> signalConsumer.accept(Signal.<T>complete()),
null, null, null);
}

Expand Down Expand Up @@ -6790,6 +6790,31 @@ static <T> Flux<T> doOnSignal(Publisher<T> source,
onCancel));
}

/**
* Peek into a sequence signals while passing around a per-subscriber
* state object initialized by {@code stateSeeder} to the various callbacks
*/
static <T,S> Flux<T> doOnSignalStateful(Publisher<T> source,
Supplier<S> stateSeeder,
BiConsumer<? super Subscription, S> onSubscribe,
BiConsumer<? super T, S> onNext,
BiConsumer<? super Throwable, S> onError,
Consumer<S> onComplete,
Consumer<S> onAfterTerminate,
BiConsumer<Long, S> onRequest,
Consumer<S> onCancel) {
//TODO Fuseable version?
return onAssembly(new FluxPeekStateful<>(source,
stateSeeder,
onSubscribe,
onNext,
onError,
onComplete,
onAfterTerminate,
onRequest,
onCancel));
}


/**
* Returns the appropriate Mono instance for a known Supplier Flux.
Expand Down
286 changes: 286 additions & 0 deletions src/main/java/reactor/core/publisher/FluxPeekStateful.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.core.Producer;
import reactor.core.Receiver;

/**
* Peek into the lifecycle events and signals of a sequence, passing around
* a per-subscription state object initialized by a {@link Supplier} {@code stateSeeder}.
* <p>
* <p>
* The callbacks are all optional.
* <p>
* <p>
* Crashes by the lambdas are ignored.
*
* @param <T> the value type
* @param <S> the state type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxPeekStateful<T, S> extends FluxSource<T, T> implements SignalPeekStateful<T, S> {

final Supplier<S> stateSeeder;

final BiConsumer<? super Subscription, S> onSubscribeCall;

final BiConsumer<? super T, S> onNextCall;

final BiConsumer<? super Throwable, S> onErrorCall;

final Consumer<S> onCompleteCall;

final Consumer<S> onAfterTerminateCall;

final BiConsumer<Long, S> onRequestCall;

final Consumer<S> onCancelCall;

public FluxPeekStateful(Publisher<? extends T> source,
Supplier<S> stateSeeder,
BiConsumer<? super Subscription, S> onSubscribeCall,
BiConsumer<? super T, S> onNextCall,
BiConsumer<? super Throwable, S> onErrorCall,
Consumer<S> onCompleteCall,
Consumer<S> onAfterTerminateCall,
BiConsumer<Long, S> onRequestCall,
Consumer<S> onCancelCall) {
super(source);
this.stateSeeder = stateSeeder;
this.onSubscribeCall = onSubscribeCall;
this.onNextCall = onNextCall;
this.onErrorCall = onErrorCall;
this.onCompleteCall = onCompleteCall;
this.onAfterTerminateCall = onAfterTerminateCall;
this.onRequestCall = onRequestCall;
this.onCancelCall = onCancelCall;
}

@Override
public void subscribe(Subscriber<? super T> s) {
//TODO fuseable version?
//TODO conditional version?
source.subscribe(new PeekStatefulSubscriber<>(s, this, stateSeeder.get()));
}

static final class PeekStatefulSubscriber<T, S> implements Subscriber<T>, Subscription, Receiver, Producer {

final Subscriber<? super T> actual;

final SignalPeekStateful<T, S> parent;

final S state;

Subscription s;

boolean done;

public PeekStatefulSubscriber(Subscriber<? super T> actual,
SignalPeekStateful<T, S> parent, S state) {
this.actual = actual;
this.parent = parent;
this.state = state;
}

@Override
public void request(long n) {
if(parent.onRequestCall() != null) {
try {
parent.onRequestCall().accept(n, state);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e));
return;
}
}
s.request(n);
}

@Override
public void cancel() {
if(parent.onCancelCall() != null) {
try {
parent.onCancelCall().accept(state);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e));
return;
}
}
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if(parent.onSubscribeCall() != null) {
try {
parent.onSubscribeCall().accept(s, state);
}
catch (Throwable e) {
Operators.error(actual, Operators.onOperatorError(s, e));
return;
}
}
this.s = s;
actual.onSubscribe(this);
}

@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t);
return;
}
if(parent.onNextCall() != null) {
try {
parent.onNextCall().accept(t, state);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t));
return;
}
}

actual.onNext(t);
}

@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t);
return;
}
done = true;
if(parent.onErrorCall() != null) {
try {
parent.onErrorCall().accept(t, state);
}
catch (Throwable e) {
//this performs a throwIfFatal or suppresses t in e
t = Operators.onOperatorError(null, e, t);
}
}

try {
actual.onError(t);
}
catch (UnsupportedOperationException use){
if(parent.onErrorCall() == null
|| !Exceptions.isErrorCallbackNotImplemented(use) && use.getCause() != t){
throw use;
}
//ignore if missing callback
}

if(parent.onAfterTerminateCall() != null) {
try {
parent.onAfterTerminateCall().accept(state);
}
catch (Throwable e) {
//don't invoke error callback, see https://github.com/reactor/reactor-core/issues/270
Exceptions.throwIfFatal(e);
Throwable _e = Operators.onOperatorError(null, e, t);
Operators.onErrorDropped(_e);
}
}
}

@Override
public void onComplete() {
if (done) {
return;
}
if(parent.onCompleteCall() != null) {
try {
parent.onCompleteCall().accept(state);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e));
return;
}
}
done = true;

actual.onComplete();

if(parent.onAfterTerminateCall() != null) {
try {
parent.onAfterTerminateCall().accept(state);
}
catch (Throwable e) {
//don't invoke error callback, see https://github.com/reactor/reactor-core/issues/270
Exceptions.throwIfFatal(e);
Throwable _e = Operators.onOperatorError(e);
Operators.onErrorDropped(_e);
}
}
}

@Override
public Object downstream() {
return actual;
}

@Override
public Object upstream() {
return s;
}
}

@Override
public BiConsumer<? super Subscription, S> onSubscribeCall() {
return onSubscribeCall;
}

@Override
public BiConsumer<? super T, S> onNextCall() {
return onNextCall;
}

@Override
public BiConsumer<? super Throwable, S> onErrorCall() {
return onErrorCall;
}

@Override
public Consumer<S> onCompleteCall() {
return onCompleteCall;
}

@Override
public Consumer<S> onAfterTerminateCall() {
return onAfterTerminateCall;
}

@Override
public BiConsumer<Long, S> onRequestCall() {
return onRequestCall;
}

@Override
public Consumer<S> onCancelCall() {
return onCancelCall;
}
}
Loading

0 comments on commit 4b469be

Please sign in to comment.