Skip to content

Commit

Permalink
Improve limit handling in StringDecoder
Browse files Browse the repository at this point in the history
The case of one data buffer containing multiple lines can could cause
a buffer leak due to a suspected issue in concatMapIterable. This
commit adds workarounds for that until the underlying issue is
addressed.

Closes gh-24339
  • Loading branch information
rstoyanchev committed Jan 13, 2020
1 parent 850cbf0 commit a741ae4
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,20 +94,44 @@ public Flux<String> decode(Publisher<DataBuffer> input, ResolvableType elementTy

byte[][] delimiterBytes = getDelimiterBytes(mimeType);

// TODO: Drop Consumer and use bufferUntil with Supplier<LimistedDataBufferList> (reactor-core#1925)
// TODO: Drop doOnDiscard(LimitedDataBufferList.class, ...) (reactor-core#1924)
LimitedDataBufferConsumer limiter = new LimitedDataBufferConsumer(getMaxInMemorySize());

Flux<DataBuffer> inputFlux = Flux.defer(() -> {
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes);
return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher))
.doOnNext(limiter)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(LimitedDataBufferList.class, LimitedDataBufferList::releaseAndClear)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
if (getMaxInMemorySize() != -1) {

// Passing limiter into endFrameAfterDelimiter helps to ensure that in case of one DataBuffer
// containing multiple lines, the limit is checked and raised immediately without accumulating
// subsequent lines. This is necessary because concatMapIterable doesn't respect doOnDiscard.
// When reactor-core#1925 is resolved, we could replace bufferUntil with:

// .windowUntil(buffer -> buffer instanceof EndFrameBuffer)
// .concatMap(fluxes -> fluxes.collect(() -> new LimitedDataBufferList(getMaxInMemorySize()), LimitedDataBufferList::add))

LimitedDataBufferList limiter = new LimitedDataBufferList(getMaxInMemorySize());

return Flux.from(input)
.concatMapIterable(buffer -> endFrameAfterDelimiter(buffer, matcher, limiter))
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
else {

// When the decoder is unlimited (-1), concatMapIterable will cache buffers that may not
// be released if cancel is signalled before they are turned into String lines
// (see test maxInMemoryLimitReleasesUnprocessedLinesWhenUnlimited).
// When reactor-core#1925 is resolved, the workaround can be removed and the entire
// else clause possibly dropped.

ConcatMapIterableDiscardWorkaroundCache cache = new ConcatMapIterableDiscardWorkaroundCache();

return Flux.from(input)
.concatMapIterable(buffer -> cache.addAll(endFrameAfterDelimiter(buffer, matcher, null)))
.doOnNext(cache)
.doOnCancel(cache)
.bufferUntil(buffer -> buffer instanceof EndFrameBuffer)
.map(buffers -> joinAndStrip(buffers, this.stripDelimiter))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
});

return super.decode(inputFlux, elementType, mimeType, hints);
Expand Down Expand Up @@ -152,29 +176,49 @@ private static Charset getCharset(@Nullable MimeType mimeType) {
*
* @param dataBuffer the buffer to find delimiters in
* @param matcher used to find the first delimiters
* @param limiter to enforce maxInMemorySize with
* @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was
* found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable)
* results in memory leaks due to pre-fetching.
*/
private static List<DataBuffer> endFrameAfterDelimiter(DataBuffer dataBuffer, DataBufferUtils.Matcher matcher) {
private static List<DataBuffer> endFrameAfterDelimiter(
DataBuffer dataBuffer, DataBufferUtils.Matcher matcher, @Nullable LimitedDataBufferList limiter) {

List<DataBuffer> result = new ArrayList<>();
do {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1) {
int readPosition = dataBuffer.readPosition();
int length = endIdx - readPosition + 1;
result.add(dataBuffer.retainedSlice(readPosition, length));
result.add(new EndFrameBuffer(matcher.delimiter()));
dataBuffer.readPosition(endIdx + 1);
try {
do {
int endIdx = matcher.match(dataBuffer);
if (endIdx != -1) {
int readPosition = dataBuffer.readPosition();
int length = (endIdx - readPosition + 1);
DataBuffer slice = dataBuffer.retainedSlice(readPosition, length);
result.add(slice);
result.add(new EndFrameBuffer(matcher.delimiter()));
dataBuffer.readPosition(endIdx + 1);
if (limiter != null) {
limiter.add(slice); // enforce the limit
limiter.clear();
}
}
else {
result.add(DataBufferUtils.retain(dataBuffer));
if (limiter != null) {
limiter.add(dataBuffer);
}
break;
}
}
else {
result.add(DataBufferUtils.retain(dataBuffer));
break;
while (dataBuffer.readableByteCount() > 0);
}
catch (DataBufferLimitException ex) {
if (limiter != null) {
limiter.releaseAndClear();
}
throw ex;
}
finally {
DataBufferUtils.release(dataBuffer);
}
while (dataBuffer.readableByteCount() > 0);

DataBufferUtils.release(dataBuffer);
return result;
}

Expand Down Expand Up @@ -288,34 +332,32 @@ public byte[] delimiter() {
}


/**
* Temporary measure for reactor-core#1925.
* Consumer that adds to a {@link LimitedDataBufferList} to enforce limits.
*/
private static class LimitedDataBufferConsumer implements Consumer<DataBuffer> {
private class ConcatMapIterableDiscardWorkaroundCache implements Consumer<DataBuffer>, Runnable {

private final LimitedDataBufferList bufferList;
private final List<DataBuffer> buffers = new ArrayList<>();


public LimitedDataBufferConsumer(int maxInMemorySize) {
this.bufferList = new LimitedDataBufferList(maxInMemorySize);
public List<DataBuffer> addAll(List<DataBuffer> buffersToAdd) {
this.buffers.addAll(buffersToAdd);
return buffersToAdd;
}

@Override
public void accept(DataBuffer dataBuffer) {
this.buffers.remove(dataBuffer);
}

@Override
public void accept(DataBuffer buffer) {
if (buffer instanceof EndFrameBuffer) {
this.bufferList.clear();
}
else {
public void run() {
this.buffers.forEach(buffer -> {
try {
this.bufferList.add(buffer);
}
catch (DataBufferLimitException ex) {
DataBufferUtils.release(buffer);
throw ex;
}
}
catch (Throwable ex) {
// Keep going..
}
});
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,17 +130,30 @@ void decodeNewLine() {
}

@Test
void decodeNewLineWithLimit() {
void maxInMemoryLimit() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("abc\n"),
stringBuffer("defg\n"),
stringBuffer("hijkl\n")
);
this.decoder.setMaxInMemorySize(5);
stringBuffer("abc\n"), stringBuffer("defg\n"), stringBuffer("hijkl\n"));

this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step ->
step.expectNext("abc", "defg")
.verifyError(DataBufferLimitException.class));
step.expectNext("abc", "defg").verifyError(DataBufferLimitException.class));
}

@Test // gh-24312
void maxInMemoryLimitReleaseUnprocessedLinesFromCurrentBuffer() {
Flux<DataBuffer> input = Flux.just(
stringBuffer("TOO MUCH DATA\nanother line\n\nand another\n"));

this.decoder.setMaxInMemorySize(5);
testDecode(input, String.class, step -> step.verifyError(DataBufferLimitException.class));
}

@Test // gh-24339
void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() {
Flux<DataBuffer> input = Flux.just(stringBuffer("Line 1\nLine 2\nLine 3\n"));

this.decoder.setMaxInMemorySize(-1);
testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap());
}

@Test
Expand Down

0 comments on commit a741ae4

Please sign in to comment.