diff --git a/server/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java b/server/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java deleted file mode 100644 index b08b72252d999..0000000000000 --- a/server/src/main/java/org/elasticsearch/transport/CompressibleBytesOutputStream.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.transport; - -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStream; -import org.elasticsearch.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.zip.DeflaterOutputStream; - -/** - * This class exists to provide a stream with optional compression. This is useful as using compression - * requires that the underlying {@link DeflaterOutputStream} be closed to write EOS bytes. However, the - * {@link BytesStream} should not be closed yet, as we have not used the bytes. This class handles these - * intricacies. - * - * {@link CompressibleBytesOutputStream#materializeBytes()} should be called when all the bytes have been - * written to this stream. If compression is enabled, the proper EOS bytes will be written at that point. - * The underlying {@link BytesReference} will be returned. - * - * {@link CompressibleBytesOutputStream#close()} will NOT close the underlying stream. The byte stream passed - * in the constructor must be closed individually. - */ -final class CompressibleBytesOutputStream extends StreamOutput { - - private final OutputStream stream; - private final BytesStream bytesStreamOutput; - private final boolean shouldCompress; - - CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException { - this.bytesStreamOutput = bytesStreamOutput; - this.shouldCompress = shouldCompress; - if (shouldCompress) { - this.stream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); - } else { - this.stream = bytesStreamOutput; - } - } - - /** - * This method ensures that compression is complete and returns the underlying bytes. - * - * @return bytes underlying the stream - * @throws IOException if an exception occurs when writing or flushing - */ - BytesReference materializeBytes() throws IOException { - // If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written. - // The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when - // passed to the deflater stream. - if (shouldCompress) { - stream.close(); - } - - return bytesStreamOutput.bytes(); - } - - @Override - public void writeByte(byte b) throws IOException { - stream.write(b); - } - - @Override - public void writeBytes(byte[] b, int offset, int length) throws IOException { - stream.write(b, offset, length); - } - - @Override - public void flush() throws IOException { - stream.flush(); - } - - @Override - public void close() throws IOException { - if (stream != bytesStreamOutput) { - assert shouldCompress : "If the streams are different we should be compressing"; - IOUtils.close(stream); - } - } - - @Override - public void reset() throws IOException { - throw new UnsupportedOperationException(); - } -} diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java index fac27d7456239..7c7af3e5b796e 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundMessage.java @@ -11,7 +11,10 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -42,15 +45,41 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition); } - try (CompressibleBytesOutputStream stream = - new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) { + final boolean compress = TransportStatus.isCompress(status); + final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream; + final BytesReference zeroCopyBuffer; + try { stream.setVersion(version); - stream.setFeatures(bytesStream.getFeatures()); + if (bytesStream != stream) { + stream.setFeatures(bytesStream.getFeatures()); + } if (variableHeaderLength == -1) { writeVariableHeader(stream); } - reference = writeMessage(stream); + if (message instanceof BytesTransportRequest) { + BytesTransportRequest bRequest = (BytesTransportRequest) message; + bRequest.writeThin(stream); + zeroCopyBuffer = bRequest.bytes; + } else if (message instanceof RemoteTransportException) { + stream.writeException((RemoteTransportException) message); + zeroCopyBuffer = BytesArray.EMPTY; + } else { + message.writeTo(stream); + zeroCopyBuffer = BytesArray.EMPTY; + } + } finally { + // We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker) + // are written. + if (compress) { + stream.close(); + } + } + final BytesReference message = bytesStream.bytes(); + if (zeroCopyBuffer.length() == 0) { + reference = message; + } else { + reference = CompositeBytesReference.of(message, zeroCopyBuffer); } bytesStream.seek(0); @@ -59,34 +88,14 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { return reference; } - protected void writeVariableHeader(StreamOutput stream) throws IOException { - threadContext.writeTo(stream); + // compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release + // resources and write EOS marker bytes but must not yet release the bytes themselves + private OutputStreamStreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException { + return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream))); } - protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException { - final BytesReference zeroCopyBuffer; - if (message instanceof BytesTransportRequest) { - BytesTransportRequest bRequest = (BytesTransportRequest) message; - bRequest.writeThin(stream); - zeroCopyBuffer = bRequest.bytes; - } else if (message instanceof RemoteTransportException) { - stream.writeException((RemoteTransportException) message); - zeroCopyBuffer = BytesArray.EMPTY; - } else { - message.writeTo(stream); - zeroCopyBuffer = BytesArray.EMPTY; - } - // we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream - // might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker) - // are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the - // #validateRequest method. this might be a problem in deflate after all but it's important to write - // the marker bytes. - final BytesReference message = stream.materializeBytes(); - if (zeroCopyBuffer.length() == 0) { - return message; - } else { - return CompositeBytesReference.of(message, zeroCopyBuffer); - } + protected void writeVariableHeader(StreamOutput stream) throws IOException { + threadContext.writeTo(stream); } static class Request extends OutboundMessage { diff --git a/server/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java b/server/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java deleted file mode 100644 index e8cb7eeda8e62..0000000000000 --- a/server/src/test/java/org/elasticsearch/transport/CompressibleBytesOutputStreamTests.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.transport; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressorFactory; -import org.elasticsearch.common.io.stream.BytesStream; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.InputStreamStreamInput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.test.ESTestCase; - -import java.io.EOFException; -import java.io.IOException; - -public class CompressibleBytesOutputStreamTests extends ESTestCase { - - public void testStreamWithoutCompression() throws IOException { - BytesStream bStream = new ZeroOutOnCloseStream(); - CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false); - - byte[] expectedBytes = randomBytes(randomInt(30)); - stream.write(expectedBytes); - - BytesReference bytesRef = stream.materializeBytes(); - // Closing compression stream does not close underlying stream - stream.close(); - - assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); - - StreamInput streamInput = bytesRef.streamInput(); - byte[] actualBytes = new byte[expectedBytes.length]; - streamInput.readBytes(actualBytes, 0, expectedBytes.length); - - assertEquals(-1, streamInput.read()); - assertArrayEquals(expectedBytes, actualBytes); - - bStream.close(); - - // The bytes should be zeroed out on close - for (byte b : bytesRef.toBytesRef().bytes) { - assertEquals((byte) 0, b); - } - } - - public void testStreamWithCompression() throws IOException { - BytesStream bStream = new ZeroOutOnCloseStream(); - CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true); - - byte[] expectedBytes = randomBytes(randomInt(30)); - stream.write(expectedBytes); - - BytesReference bytesRef = stream.materializeBytes(); - stream.close(); - - assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef)); - - StreamInput streamInput = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesRef.streamInput())); - byte[] actualBytes = new byte[expectedBytes.length]; - streamInput.readBytes(actualBytes, 0, expectedBytes.length); - - assertEquals(-1, streamInput.read()); - assertArrayEquals(expectedBytes, actualBytes); - - bStream.close(); - - // The bytes should be zeroed out on close - for (byte b : bytesRef.toBytesRef().bytes) { - assertEquals((byte) 0, b); - } - } - - public void testCompressionWithCallingMaterializeFails() throws IOException { - BytesStream bStream = new ZeroOutOnCloseStream(); - CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true); - - byte[] expectedBytes = randomBytes(between(1, 30)); - stream.write(expectedBytes); - - - StreamInput streamInput = - new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput())); - byte[] actualBytes = new byte[expectedBytes.length]; - EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); - assertEquals("Unexpected end of ZLIB input stream", e.getMessage()); - - stream.close(); - } - - private static byte[] randomBytes(int length) { - byte[] bytes = new byte[length]; - for (int i = 0; i < bytes.length; ++i) { - bytes[i] = randomByte(); - } - return bytes; - } - - private static class ZeroOutOnCloseStream extends BytesStreamOutput { - - @Override - public void close() { - if (bytes != null) { - int size = (int) bytes.size(); - bytes.set(0, new byte[size], 0, size); - } - } - } -}