diff --git a/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java index a4b1e2571af56..dc8da016db3ed 100644 --- a/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java +++ b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java @@ -72,10 +72,6 @@ public long skip(long toSkip) { } } - // only for testing - @VisibleForTesting - boolean disposed = false; - /** * Clean up the buffer, and potentially dispose of it */ @@ -84,7 +80,6 @@ public void close() { if (buffer != null) { if (dispose) { buffer.dispose(); - disposed = true; } buffer = null; } diff --git a/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java index 975de7b10f65c..81de016781400 100644 --- a/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java +++ b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java @@ -65,7 +65,7 @@ public LargeByteBuffer largeBuffer() { } /** - * exposed for testing. You don't really ever want to call this method -- the returned + * You don't really ever want to call this method -- the returned * buffer will not implement {{asByteBuffer}} correctly. */ @VisibleForTesting diff --git a/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala b/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala index d8e48db32f78c..44b458d6555cf 100644 --- a/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala @@ -20,11 +20,13 @@ import java.io.{File, FileInputStream, FileOutputStream, OutputStream} import java.nio.channels.FileChannel.MapMode import org.junit.Assert._ +import org.mockito.Mockito._ import org.scalatest.Matchers +import org.scalatest.mock.MockitoSugar import org.apache.spark.SparkFunSuite -class LargeByteBufferInputStreamSuite extends SparkFunSuite with Matchers { +class LargeByteBufferInputStreamSuite extends SparkFunSuite with Matchers with MockitoSugar { test("read from large mapped file") { val testFile = File.createTempFile("large-buffer-input-stream-test", ".bin") @@ -47,17 +49,13 @@ class LargeByteBufferInputStreamSuite extends SparkFunSuite with Matchers { val read = new Array[Byte](buffer.length) (0 until (len / buffer.length).toInt).foreach { idx => - in.disposed should be(false) in.read(read) should be(read.length) (0 until buffer.length).foreach { arrIdx => assertEquals(buffer(arrIdx), read(arrIdx)) } } - in.disposed should be(false) in.read(read) should be(-1) - in.disposed should be(false) in.close() - in.disposed should be(true) } finally { testFile.delete() } @@ -65,11 +63,15 @@ class LargeByteBufferInputStreamSuite extends SparkFunSuite with Matchers { test("dispose on close") { // don't need to read to the end -- dispose anytime we close - val data = new Array[Byte](10) - val in = new LargeByteBufferInputStream(LargeByteBufferHelper.asLargeByteBuffer(data), true) - in.disposed should be (false) + val mockBuffer = mock[LargeByteBuffer] + when(mockBuffer.remaining()).thenReturn(0) + val in = new LargeByteBufferInputStream(mockBuffer, true) + verify(mockBuffer, times(0)).dispose() + // reading to the end shouldn't auto-dispose + in.read() should be (-1) + verify(mockBuffer, times(0)).dispose() in.close() - in.disposed should be (true) + verify(mockBuffer, times(1)).dispose() } test("io stream roundtrip") { diff --git a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala index 38bc24528f3a7..2bb0df394a9c2 100644 --- a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala @@ -126,9 +126,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite { } // errors on bad bounds - intercept[IllegalArgumentException]{o.slice(31, 31)} - intercept[IllegalArgumentException]{o.slice(-1, 10)} - intercept[IllegalArgumentException]{o.slice(10, 5)} - intercept[IllegalArgumentException]{o.slice(10, 35)} + intercept[IllegalArgumentException] { o.slice(31, 31) } + intercept[IllegalArgumentException] { o.slice(-1, 10) } + intercept[IllegalArgumentException] { o.slice(10, 5) } + intercept[IllegalArgumentException] { o.slice(10, 35) } } } diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java index beeb007e2197e..21d954fcaeca8 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java @@ -47,7 +47,6 @@ public interface LargeByteBuffer { public byte get(); - /** * Bulk copy data from this buffer into the given array. First checks there is sufficient * data in this buffer; if not, throws a {@link java.nio.BufferUnderflowException}. Behaves @@ -69,7 +68,6 @@ public interface LargeByteBuffer { */ public LargeByteBuffer get(byte[] dst, int offset, int length); - public LargeByteBuffer rewind(); /** diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java index 58a621249386f..859ad30100413 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java @@ -101,7 +101,6 @@ public WrappedLargeByteBuffer(ByteBuffer[] underlying) { size = sum; } - @Override public WrappedLargeByteBuffer get(byte[] dest) { return get(dest, 0, dest.length); diff --git a/network/common/src/test/java/org/apache/spark/network/buffer/LargeByteBufferHelperSuite.java b/network/common/src/test/java/org/apache/spark/network/buffer/LargeByteBufferHelperSuite.java index 9e636fc032928..d0eb61be933c9 100644 --- a/network/common/src/test/java/org/apache/spark/network/buffer/LargeByteBufferHelperSuite.java +++ b/network/common/src/test/java/org/apache/spark/network/buffer/LargeByteBufferHelperSuite.java @@ -77,7 +77,7 @@ public void testMapFile() throws IOException { @Test public void testAllocate() { - WrappedLargeByteBuffer buf = (WrappedLargeByteBuffer) LargeByteBufferHelper.allocate(95,10); + WrappedLargeByteBuffer buf = (WrappedLargeByteBuffer) LargeByteBufferHelper.allocate(95, 10); assertEquals(10, buf.underlying.length); for (int i = 0 ; i < 9; i++) { assertEquals(10, buf.underlying[i].capacity());