Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6190][core] create LargeByteBuffer for eliminating 2GB block limit #5400

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9834942
ByteArrayChunkOutputStream.slice, for grabbing aribitrary portion of …
squito Apr 7, 2015
4e1a842
LargeByteBuffer
squito Apr 7, 2015
8bd5606
update from PR feedback
squito Apr 9, 2015
ff9f968
LargeBBInputStream should dispose when we close the stream, not when …
squito Apr 9, 2015
a3dc811
add bounds checks to ByteArrayChunkOutputStream.slice()
squito Apr 9, 2015
da29c3d
cleanup rewind() a little
squito Apr 9, 2015
366f921
WrappedLargeByteBuffer.asByteBuffer shouldnt copy -- it should just e…
squito Apr 13, 2015
34c2131
sigh, need to make the WrappedLargeBB constructor w/ chunkSize public…
squito Apr 14, 2015
bf4ec0a
style
squito Apr 14, 2015
9d232d1
move package of LargeByteBufferIOStreams so that I can keep unsafe me…
squito Apr 14, 2015
6b2f751
fix typo
squito Apr 14, 2015
4da4626
error handling for get(); comments, style
squito Jun 1, 2015
2afb351
constructed WrappedLargeByteBuffer always has position==0, simplifies…
squito Jun 1, 2015
4042c1a
move LargeBBIn/Out Streams to java
squito Jun 1, 2015
3c599b2
add comments for MAX_CHUNK_SIZE
squito Jun 1, 2015
95588c2
updateCurrentBuffer --> updateCurrentBufferIfNeeded + comment
squito Jun 1, 2015
b77bbe2
style
squito Jun 1, 2015
6c2a115
add tests that buffers position is independent from underyling bytebu…
squito Jun 2, 2015
a9616e4
better variable name; more chunks in tests
squito Jun 2, 2015
0250ac5
private, @VisibleForTesting
squito Jun 2, 2015
b3b6363
fix newlines
squito Jun 2, 2015
112c49e
style
squito Jun 2, 2015
8ec2c5c
comment explaining check on subBufferSize
squito Jun 2, 2015
d0605a1
get() return this; another version of get() which just takes dest array
squito Jun 3, 2015
b6620d0
docs on LargeBBOutputStream
squito Jun 3, 2015
54d09af
@Override
squito Jun 3, 2015
31244c8
fix comment
squito Jun 3, 2015
ca23763
Merge branch 'master' into SPARK-6190_largeBB
squito Jun 10, 2015
36d1801
Merge branch 'master' into SPARK-6190_largeBB
squito Aug 19, 2015
040a461
use random numbers for test
squito Aug 19, 2015
80c4032
Merge branch 'master' into SPARK-6190_largeBB
squito Aug 19, 2015
3f701dc
Merge branch 'master' into SPARK-6190_largeBB
squito Nov 4, 2015
3447bb9
review feedback
squito Nov 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.buffer;

import java.io.InputStream;

import com.google.common.annotations.VisibleForTesting;

/**
* Reads data from a LargeByteBuffer, and optionally cleans it up using buffer.dispose()
* when the stream is closed (e.g. to close a memory-mapped file).
*/
public class LargeByteBufferInputStream extends InputStream {

private LargeByteBuffer buffer;
private final boolean dispose;

public LargeByteBufferInputStream(LargeByteBuffer buffer, boolean dispose) {
this.buffer = buffer;
this.dispose = dispose;
}

public LargeByteBufferInputStream(LargeByteBuffer buffer) {
this(buffer, false);
}

@Override
public int read() {
if (buffer == null || buffer.remaining() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the null checking for buffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the null check is for after the stream has been closed. Looks like InputStream doesn't specify what read should do after the stream has been closed, but this is consistent with what we do elsewhere eg. ByteBufferInputStream

return -1;
} else {
return buffer.get() & 0xFF;
}
}

@Override
public int read(byte[] dest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should be adding @Override annotations to all of these methods to make sure that we're not accidentally overloading instead. Probably not an issue here, but a best-practice that we should stick to for all new code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, I've added @Override in

return read(dest, 0, dest.length);
}

@Override
public int read(byte[] dest, int offset, int length) {
if (buffer == null || buffer.remaining() == 0) {
return -1;
} else {
int amountToGet = (int) Math.min(buffer.remaining(), length);
buffer.get(dest, offset, amountToGet);
return amountToGet;
}
}

@Override
public long skip(long toSkip) {
if (buffer != null) {
return buffer.skip(toSkip);
} else {
return 0L;
}
}

/**
* Clean up the buffer, and potentially dispose of it
*/
@Override
public void close() {
if (buffer != null) {
if (dispose) {
buffer.dispose();
}
buffer = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.buffer;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.util.io.ByteArrayChunkOutputStream;

/**
* An OutputStream that will write all data to memory. It supports writing over 2GB
* and the resulting data can be retrieved as a
* {@link org.apache.spark.network.buffer.LargeByteBuffer}
*/
public class LargeByteBufferOutputStream extends OutputStream {

private final ByteArrayChunkOutputStream output;

/**
* Create a new LargeByteBufferOutputStream which writes to byte arrays of the given size. Note
* that <code>chunkSize</code> has <b>no effect</b> on the LargeByteBuffer returned by
* {@link #largeBuffer()}.
*
* @param chunkSize size of the byte arrays used by this output stream, in bytes
*/
public LargeByteBufferOutputStream(int chunkSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunkSize does need to be an int. its used to determine the size of each of the Array[Byte] it builds.

I'll add some docs to this class, it isn't very clear right now.

output = new ByteArrayChunkOutputStream(chunkSize);
}

@Override
public void write(int b) {
output.write(b);
}

@Override
public void write(byte[] bytes, int off, int len) {
output.write(bytes, off, len);
}

/**
* Get all of the data written to the stream so far as a LargeByteBuffer. This method can be
* called multiple times, and each returned buffer will be completely independent (the data
* is copied for each returned buffer). It does not close the stream.
*
* @return the data written to the stream as a LargeByteBuffer
*/
public LargeByteBuffer largeBuffer() {
return largeBuffer(LargeByteBufferHelper.MAX_CHUNK_SIZE);
}

/**
* You don't really ever want to call this method -- the returned
* buffer will not implement {{asByteBuffer}} correctly.
*/
@VisibleForTesting
LargeByteBuffer largeBuffer(int maxChunk) {
long totalSize = output.size();
int chunksNeeded = (int) ((totalSize + maxChunk - 1) / maxChunk);
ByteBuffer[] chunks = new ByteBuffer[chunksNeeded];
long remaining = totalSize;
long pos = 0;
for (int idx = 0; idx < chunksNeeded; idx++) {
int nextSize = (int) Math.min(maxChunk, remaining);
chunks[idx] = ByteBuffer.wrap(output.slice(pos, pos + nextSize));
pos += nextSize;
remaining -= nextSize;
}
return new WrappedLargeByteBuffer(chunks, maxChunk);
}

@Override
public void close() throws IOException {
output.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.OutputStream

import scala.collection.mutable.ArrayBuffer


/**
* An OutputStream that writes to fixed-size chunks of byte arrays.
*
Expand All @@ -43,10 +42,13 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
*/
private var position = chunkSize

private[spark] var size: Long = 0L

override def write(b: Int): Unit = {
allocateNewChunkIfNeeded()
chunks(lastChunkIndex)(position) = b.toByte
position += 1
size += 1
}

override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
Expand All @@ -58,6 +60,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
written += thisBatch
position += thisBatch
}
size += len
}

@inline
Expand Down Expand Up @@ -91,4 +94,44 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
ret
}
}

/**
* Get a copy of the data between the two endpoints, start <= idx < until. Always returns
* an array of size (until - start). Throws an IllegalArgumentException unless
* 0 <= start <= until <= size
*/
def slice(start: Long, until: Long): Array[Byte] = {
require((until - start) < Integer.MAX_VALUE, "max slice length = Integer.MAX_VALUE")
require(start >= 0 && start <= until, s"start ($start) must be >= 0 and <= until ($until)")
require(until >= start && until <= size,
s"until ($until) must be >= start ($start) and <= size ($size)")
var chunkStart = 0L
var chunkIdx = 0
val length = (until - start).toInt
var foundStart = false
val result = new Array[Byte](length)
while (!foundStart) {
val nextChunkStart = chunkStart + chunks(chunkIdx).size
if (nextChunkStart > start) {
foundStart = true
} else {
chunkStart = nextChunkStart
chunkIdx += 1
}
}

var remaining = length
var pos = 0
var offsetInChunk = (start - chunkStart).toInt
while (remaining > 0) {
val lenToCopy = math.min(remaining, chunks(chunkIdx).size - offsetInChunk)
System.arraycopy(chunks(chunkIdx), offsetInChunk, result, pos, lenToCopy)
chunkIdx += 1
offsetInChunk = 0
pos += lenToCopy
remaining -= lenToCopy
}
result
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.buffer

import java.io.{File, FileInputStream, FileOutputStream, OutputStream}
import java.nio.channels.FileChannel.MapMode

import org.junit.Assert._
import org.mockito.Mockito._
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar

import org.apache.spark.SparkFunSuite

class LargeByteBufferInputStreamSuite extends SparkFunSuite with Matchers with MockitoSugar {

test("read from large mapped file") {
val testFile = File.createTempFile("large-buffer-input-stream-test", ".bin")

try {
val out: OutputStream = new FileOutputStream(testFile)
val buffer: Array[Byte] = new Array[Byte](1 << 16)
val len: Long = buffer.length.toLong + Integer.MAX_VALUE + 1
(0 until buffer.length).foreach { idx =>
buffer(idx) = idx.toByte
}
(0 until (len / buffer.length).toInt).foreach { idx =>
out.write(buffer)
}
out.close

val channel = new FileInputStream(testFile).getChannel
val buf = LargeByteBufferHelper.mapFile(channel, MapMode.READ_ONLY, 0, len)
val in = new LargeByteBufferInputStream(buf, true)

val read = new Array[Byte](buffer.length)
(0 until (len / buffer.length).toInt).foreach { idx =>
in.read(read) should be(read.length)
(0 until buffer.length).foreach { arrIdx =>
assertEquals(buffer(arrIdx), read(arrIdx))
}
}
in.read(read) should be(-1)
in.close()
} finally {
testFile.delete()
}
}

test("dispose on close") {
// don't need to read to the end -- dispose anytime we close
val mockBuffer = mock[LargeByteBuffer]
when(mockBuffer.remaining()).thenReturn(0)
val in = new LargeByteBufferInputStream(mockBuffer, true)
verify(mockBuffer, times(0)).dispose()
// reading to the end shouldn't auto-dispose
in.read() should be (-1)
verify(mockBuffer, times(0)).dispose()
in.close()
verify(mockBuffer, times(1)).dispose()
}

test("io stream roundtrip") {
val out = new LargeByteBufferOutputStream(128)
(0 until 200).foreach { idx => out.write(idx) }
out.close()

val lb = out.largeBuffer(128)
// just make sure that we test reading from multiple chunks
lb.asInstanceOf[WrappedLargeByteBuffer].underlying.size should be > 1

val rawIn = new LargeByteBufferInputStream(lb)
val arr = new Array[Byte](500)
val nRead = rawIn.read(arr, 0, 500)
nRead should be (200)
(0 until 200).foreach { idx =>
arr(idx) should be (idx.toByte)
}
}

}
Loading