Skip to content

Commit

Permalink
[SPARK-36464][CORE] Fix Underlying Size Variable Initialization in Ch…
Browse files Browse the repository at this point in the history
…unkedByteBufferOutputStream for Writing Over 2GB Data

### What changes were proposed in this pull request?
The `size` method of `ChunkedByteBufferOutputStream` returns a `Long` value; however, the underlying `_size` variable is initialized as `Int`.
That causes an overflow and returns a negative size when over 2GB data is written into `ChunkedByteBufferOutputStream`

This PR proposes to change the underlying `_size` variable from `Int` to `Long` at the initialization

### Why are the changes needed?
Be cause the `size` method of `ChunkedByteBufferOutputStream` incorrectly returns a negative value when over 2GB data is written.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly *ChunkedByteBufferOutputStreamSuite – -z SPARK-36464"
```

Closes #33690 from kazuyukitanimura/SPARK-36464.

Authored-by: Kazuyuki Tanimura <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
a0x8o committed Aug 10, 2021
1 parent 97cf1e5 commit 9ba7168
Show file tree
Hide file tree
Showing 29 changed files with 680 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,13 @@ public boolean useOldFetchProtocol() {
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
* a cluster level because this configuration is set to
* 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
* 'org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager'.
* To turn on push-based shuffle at a cluster level, set the configuration to
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
*/
public String mergedShuffleFileManagerImpl() {
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
"org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,64 +594,6 @@ public ManagedBuffer next() {
}
}

/**
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
* is not enabled.
*
* @since 3.1.0
*/
public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {

// This constructor is needed because we use this constructor to instantiate an implementation
// of MergedShuffleFileManager using reflection.
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}

@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
// No-Op. Do nothing.
}

@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
// No-Op. Do nothing.
}

@Override
public ManagedBuffer getMergedBlockData(
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId,
int chunkId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public MergedBlockMeta getMergedBlockMeta(
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public String[] getMergedBlockDirs(String appId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}
}

@Override
public void channelActive(TransportClient client) {
metrics.activeConnections.inc();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.io.IOException;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.util.TransportConf;

/**
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
* is not enabled.
*
* @since 3.1.0
*/
public class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {

// This constructor is needed because we use this constructor to instantiate an implementation
// of MergedShuffleFileManager using reflection.
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}

@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
// No-Op. Do nothing.
}

@Override
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
// No-Op. Do nothing.
}

@Override
public ManagedBuffer getMergedBlockData(
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId,
int chunkId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public MergedBlockMeta getMergedBlockMeta(
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public String[] getMergedBlockDirs(String appId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,7 @@ public String getID() {
private void writeBuf(ByteBuffer buf) throws IOException {
while (buf.hasRemaining()) {
long updatedPos = partitionInfo.getDataFilePos() + length;
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current pos"
+ " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,
partitionInfo.shuffleMergeId, partitionInfo.reduceId,
logger.debug("{} current pos {} updated pos {}", partitionInfo,
partitionInfo.getDataFilePos(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos);
}
Expand Down Expand Up @@ -795,9 +793,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
return;
}
abortIfNecessary();
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData writable",
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
partitionInfo.reduceId);
logger.trace("{} onData writable", partitionInfo);
if (partitionInfo.getCurrentMapIndex() < 0) {
partitionInfo.setCurrentMapIndex(mapIndex);
}
Expand All @@ -817,9 +813,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
throw ioe;
}
} else {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onData deferred",
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
partitionInfo.reduceId);
logger.trace("{} onData deferred", partitionInfo);
// If we cannot write to disk, we buffer the current block chunk in memory so it could
// potentially be written to disk later. We take our best effort without guarantee
// that the block will be written to disk. If the block data is divided into multiple
Expand Down Expand Up @@ -852,9 +846,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
@Override
public void onComplete(String streamId) throws IOException {
synchronized (partitionInfo) {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} onComplete invoked",
partitionInfo.appId, partitionInfo.shuffleId, partitionInfo.shuffleMergeId,
partitionInfo.reduceId);
logger.trace("{} onComplete invoked", partitionInfo);
// Initially when this request got to the server, the shuffle merge finalize request
// was not received yet or this was the latest stage attempt (or latest shuffleMergeId)
// generating shuffle output for the shuffle ID. By the time we finish reading this
Expand Down Expand Up @@ -936,9 +928,7 @@ public void onFailure(String streamId, Throwable throwable) throws IOException {
synchronized (partitionInfo) {
if (!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {}"
+ " encountered failure", partitionInfo.appId, partitionInfo.shuffleId,
partitionInfo.shuffleMergeId, partitionInfo.reduceId);
logger.debug("{} encountered failure", partitionInfo);
partitionInfo.setCurrentMapIndex(-1);
}
}
Expand Down Expand Up @@ -1032,9 +1022,7 @@ public long getDataFilePos() {
}

public void setDataFilePos(long dataFilePos) {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} current pos {}"
+ " update pos {}", appId, shuffleId, shuffleMergeId, reduceId, this.dataFilePos,
dataFilePos);
logger.trace("{} current pos {} update pos {}", this, this.dataFilePos, dataFilePos);
this.dataFilePos = dataFilePos;
}

Expand All @@ -1043,9 +1031,7 @@ int getCurrentMapIndex() {
}

void setCurrentMapIndex(int mapIndex) {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} updated mapIndex {}"
+ " current mapIndex {}", appId, shuffleId, shuffleMergeId, reduceId,
currentMapIndex, mapIndex);
logger.trace("{} mapIndex {} current mapIndex {}", this, currentMapIndex, mapIndex);
this.currentMapIndex = mapIndex;
}

Expand All @@ -1054,8 +1040,7 @@ long getLastChunkOffset() {
}

void blockMerged(int mapIndex) {
logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} updated merging mapIndex {}",
appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
logger.debug("{} updated merging mapIndex {}", this, mapIndex);
mapTracker.add(mapIndex);
chunkTracker.add(mapIndex);
lastMergedMapIndex = mapIndex;
Expand All @@ -1073,9 +1058,8 @@ void resetChunkTracker() {
*/
void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
try {
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} index current {}"
+ " updated {}", appId, shuffleId, shuffleMergeId, reduceId,
this.lastChunkOffset, chunkOffset);
logger.trace("{} index current {} updated {}", this, this.lastChunkOffset,
chunkOffset);
if (indexMetaUpdateFailed) {
indexFile.getChannel().position(indexFile.getPos());
}
Expand Down Expand Up @@ -1103,8 +1087,7 @@ private void writeChunkTracker(int mapIndex) throws IOException {
return;
}
chunkTracker.add(mapIndex);
logger.trace("{} shuffleId {} shuffleMergeId {} reduceId {} mapIndex {}"
+ " write chunk to meta file", appId, shuffleId, shuffleMergeId, reduceId, mapIndex);
logger.trace("{} mapIndex {} write chunk to meta file", this, mapIndex);
if (indexMetaUpdateFailed) {
metaFile.getChannel().position(metaFile.getPos());
}
Expand Down Expand Up @@ -1169,6 +1152,12 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) {
}
}

@Override
public String toString() {
return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s",
appId, shuffleId, shuffleMergeId, reduceId);
}

@Override
protected void finalize() throws Throwable {
closeAllFilesAndDeleteIfNeeded(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
import org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager;
import org.apache.spark.network.util.LevelDBProvider;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -284,7 +285,7 @@ static MergedShuffleFileManager newMergedShuffleFileManagerInstance(TransportCon
return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
} catch (Exception e) {
logger.error("Unable to create an instance of {}", mergeManagerImplClassName);
return new ExternalBlockHandler.NoOpMergedShuffleFileManager(conf);
return new NoOpMergedShuffleFileManager(conf);
}
}

Expand Down
10 changes: 7 additions & 3 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client/cluster mode
# Options read in any mode
# - SPARK_CONF_DIR, Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
# - SPARK_EXECUTOR_CORES, Number of cores for the executors (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Executor (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Driver (e.g. 1000M, 2G) (Default: 1G)

# Options read in any cluster manager using HDFS
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files

# Options read in YARN client/cluster mode
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN

# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_HOST, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class ChunkedByteBufferOutputStream(
* This can also never be 0.
*/
private[this] var position = chunkSize
private[this] var _size = 0
private[this] var _size = 0L
private[this] var closed: Boolean = false

def size: Long = _size
Expand Down Expand Up @@ -120,4 +120,5 @@ private[spark] class ChunkedByteBufferOutputStream(
new ChunkedByteBuffer(ret)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,14 @@ class ChunkedByteBufferOutputStreamSuite extends SparkFunSuite {
assert(arrays(1).toSeq === ref.slice(10, 20))
assert(arrays(2).toSeq === ref.slice(20, 30))
}

test("SPARK-36464: size returns correct positive number even with over 2GB data") {
val ref = new Array[Byte](1024 * 1024 * 1024)
val o = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
o.write(ref)
o.write(ref)
o.close()
assert(o.size > 0L) // make sure it is not overflowing
assert(o.size == ref.length.toLong * 2)
}
}
Loading

0 comments on commit 9ba7168

Please sign in to comment.