-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Prevent possible memory leak from Reactive REST Client multipart uplo…
- Loading branch information
Showing
2 changed files
with
183 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
171 changes: 171 additions & 0 deletions
171
.../src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/RequestTrackingPipe.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
package org.jboss.resteasy.reactive.client.impl.multipart; | ||
|
||
import io.vertx.core.AsyncResult; | ||
import io.vertx.core.Future; | ||
import io.vertx.core.Handler; | ||
import io.vertx.core.Promise; | ||
import io.vertx.core.VertxException; | ||
import io.vertx.core.streams.Pipe; | ||
import io.vertx.core.streams.ReadStream; | ||
import io.vertx.core.streams.WriteStream; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
/** | ||
* Based on {@link io.vertx.core.streams.impl.PipeImpl}. | ||
* We can't use the aforementioned class as it is not open enough for us to add the read tracking we want. | ||
*/ | ||
class RequestTrackingPipe<T> implements Pipe<T> { | ||
|
||
private final long maxInFlightReads; | ||
|
||
private final Promise<Void> result; | ||
private final ReadStream<T> src; | ||
private boolean endOnSuccess = true; | ||
private boolean endOnFailure = true; | ||
private WriteStream<T> dst; | ||
|
||
private boolean manuallyPaused = false; | ||
|
||
private final AtomicLong inFlightReads = new AtomicLong(0); | ||
|
||
public RequestTrackingPipe(ReadStream<T> src, long maxInFlightReads) { | ||
this.src = src; | ||
this.maxInFlightReads = maxInFlightReads; | ||
this.result = Promise.promise(); | ||
|
||
// Set handlers now | ||
src.endHandler(result::tryComplete); | ||
src.exceptionHandler(result::tryFail); | ||
} | ||
|
||
@Override | ||
public synchronized Pipe<T> endOnFailure(boolean end) { | ||
endOnFailure = end; | ||
return this; | ||
} | ||
|
||
@Override | ||
public synchronized Pipe<T> endOnSuccess(boolean end) { | ||
endOnSuccess = end; | ||
return this; | ||
} | ||
|
||
@Override | ||
public synchronized Pipe<T> endOnComplete(boolean end) { | ||
endOnSuccess = end; | ||
endOnFailure = end; | ||
return this; | ||
} | ||
|
||
private void handleWriteResult(AsyncResult<Void> ack) { | ||
long currentInFlightReads = inFlightReads.decrementAndGet(); | ||
if (currentInFlightReads <= (maxInFlightReads / 2)) { | ||
synchronized (RequestTrackingPipe.this) { | ||
if (manuallyPaused) { | ||
manuallyPaused = false; | ||
src.resume(); | ||
} | ||
} | ||
} | ||
if (ack.failed()) { | ||
result.tryFail(new WriteException(ack.cause())); | ||
} | ||
} | ||
|
||
@Override | ||
public void to(WriteStream<T> ws, Handler<AsyncResult<Void>> completionHandler) { | ||
if (ws == null) { | ||
throw new NullPointerException(); | ||
} | ||
boolean endOnSuccess; | ||
boolean endOnFailure; | ||
synchronized (RequestTrackingPipe.this) { | ||
if (dst != null) { | ||
throw new IllegalStateException(); | ||
} | ||
dst = ws; | ||
endOnSuccess = this.endOnSuccess; | ||
endOnFailure = this.endOnFailure; | ||
} | ||
Handler<Void> drainHandler = v -> src.resume(); | ||
src.handler(item -> { | ||
ws.write(item, this::handleWriteResult); | ||
long currentInFlightReads = inFlightReads.incrementAndGet(); | ||
if (ws.writeQueueFull()) { | ||
src.pause(); | ||
ws.drainHandler(drainHandler); | ||
} else if (currentInFlightReads > maxInFlightReads) { | ||
synchronized (RequestTrackingPipe.this) { | ||
src.pause(); | ||
manuallyPaused = true; | ||
} | ||
} | ||
}); | ||
src.resume(); | ||
result.future().onComplete(ar -> { | ||
try { | ||
src.handler(null); | ||
} catch (Exception ignore) { | ||
} | ||
try { | ||
src.exceptionHandler(null); | ||
} catch (Exception ignore) { | ||
} | ||
try { | ||
src.endHandler(null); | ||
} catch (Exception ignore) { | ||
} | ||
if (ar.succeeded()) { | ||
handleSuccess(completionHandler); | ||
} else { | ||
Throwable err = ar.cause(); | ||
if (err instanceof WriteException) { | ||
src.resume(); | ||
err = err.getCause(); | ||
} | ||
handleFailure(err, completionHandler); | ||
} | ||
}); | ||
} | ||
|
||
private void handleSuccess(Handler<AsyncResult<Void>> completionHandler) { | ||
if (endOnSuccess) { | ||
dst.end(completionHandler); | ||
} else { | ||
completionHandler.handle(Future.succeededFuture()); | ||
} | ||
} | ||
|
||
private void handleFailure(Throwable cause, Handler<AsyncResult<Void>> completionHandler) { | ||
Future<Void> res = Future.failedFuture(cause); | ||
if (endOnFailure) { | ||
dst.end(ignore -> { | ||
completionHandler.handle(res); | ||
}); | ||
} else { | ||
completionHandler.handle(res); | ||
} | ||
} | ||
|
||
public void close() { | ||
synchronized (this) { | ||
src.exceptionHandler(null); | ||
src.handler(null); | ||
if (dst != null) { | ||
dst.drainHandler(null); | ||
dst.exceptionHandler(null); | ||
} | ||
} | ||
VertxException err = new VertxException("Pipe closed", true); | ||
if (result.tryFail(err)) { | ||
src.resume(); | ||
} | ||
} | ||
|
||
private static class WriteException extends VertxException { | ||
private WriteException(Throwable cause) { | ||
super(cause, true); | ||
} | ||
} | ||
|
||
} |