Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-23939
Browse files Browse the repository at this point in the history
  • Loading branch information
codeatri authored Aug 14, 2018
2 parents 1cbaf0c + 80784a1 commit bb52630
Show file tree
Hide file tree
Showing 231 changed files with 5,082 additions and 1,178 deletions.
24 changes: 24 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,27 @@ Copyright 2014 and onwards The Apache Software Foundation.
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).


Export Control Notice
---------------------

This distribution includes cryptographic software. The country in which you currently reside may have
restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
<http://www.wassenaar.org/> for more information.

The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
both object code and source code.

The following provides more details on the included cryptographic software:

This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
support authentication, and encryption and decryption of data sent across the network between
services.

This software includes Bouncy Castle (http://bouncycastle.org/) to support the jets3t library.
29 changes: 27 additions & 2 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,31 @@ This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).


Export Control Notice
---------------------

This distribution includes cryptographic software. The country in which you currently reside may have
restrictions on the import, possession, use, and/or re-export to another country, of encryption software.
BEFORE using any encryption software, please check your country's laws, regulations and policies concerning
the import, possession, or use, and re-export of encryption software, to see if this is permitted. See
<http://www.wassenaar.org/> for more information.

The U.S. Government Department of Commerce, Bureau of Industry and Security (BIS), has classified this
software as Export Commodity Control Number (ECCN) 5D002.C.1, which includes information security software
using or performing cryptographic functions with asymmetric algorithms. The form and manner of this Apache
Software Foundation distribution makes it eligible for export under the License Exception ENC Technology
Software Unrestricted (TSU) exception (see the BIS Export Administration Regulations, Section 740.13) for
both object code and source code.

The following provides more details on the included cryptographic software:

This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
support authentication, and encryption and decryption of data sent across the network between
services.

This software includes Bouncy Castle (http://bouncycastle.org/) to support the jets3t library.


// ------------------------------------------------------------------
// NOTICE file corresponding to the section 4d of The Apache License,
// Version 2.0, in this case for
Expand Down Expand Up @@ -451,7 +476,7 @@ which has the following notices:
PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
(Apache 2.0 license)

This library containd statically linked libstdc++. This inclusion is allowed by
This library contains statically linked libstdc++. This inclusion is allowed by
"GCC RUntime Library Exception"
http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html

Expand Down Expand Up @@ -1167,4 +1192,4 @@ Apache Solr (http://lucene.apache.org/solr/)
Copyright 2014 The Apache Software Foundation

Apache Mahout (http://mahout.apache.org/)
Copyright 2014 The Apache Software Foundation
Copyright 2014 The Apache Software Foundation
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ setMethod("cache",
#' \url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}.
#'
#' @param x the SparkDataFrame to persist.
#' @param newLevel storage level chosen for the persistance. See available options in
#' @param newLevel storage level chosen for the persistence. See available options in
#' the description.
#'
#' @family SparkDataFrame functions
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ setMethod("toDF", signature(x = "RDD"),
read.json.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
# Allow the user to have a more flexible definition of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
Expand Down Expand Up @@ -421,7 +421,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
read.orc <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the ORC file path
# Allow the user to have a more flexible definition of the ORC file path
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
Expand All @@ -442,7 +442,7 @@ read.orc <- function(path, ...) {
read.parquet.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the Parquet file path
# Allow the user to have a more flexible definition of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
Expand Down Expand Up @@ -492,7 +492,7 @@ parquetFile <- function(x, ...) {
read.text.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
# Allow the user to have a more flexible definition of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ getMinPartitions <- function(sc, minPartitions) {
#' lines <- textFile(sc, "myfile.txt")
#'}
textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
# Allow the user to have a more flexible definition of the text file path
path <- suppressWarnings(normalizePath(path))
# Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
Expand Down Expand Up @@ -71,7 +71,7 @@ textFile <- function(sc, path, minPartitions = NULL) {
#' rdd <- objectFile(sc, "myfile")
#'}
objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path
# Allow the user to have a more flexible definition of the text file path
path <- suppressWarnings(normalizePath(path))
# Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ setMethod("isActive",
#'
#' @param x a StreamingQuery.
#' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
#' is called or an error has occured.
#' is called or an error has occurred.
#' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
#' specified.
#' @rdname awaitTermination
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ compute <- function(mode, partition, serializer, deserializer, key,
# Transform the result data.frame back to a list of rows
output <- split(output, seq(nrow(output)))
} else {
# Serialize the ouput to a byte array
# Serialize the output to a byte array
stopifnot(serializer == "byte")
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ test_that("cleanClosure on R functions", {
expect_true("l" %in% ls(env))
expect_true("f" %in% ls(env))
expect_equal(get("l", envir = env, inherits = FALSE), l)
# "y" should be in the environemnt of g.
# "y" should be in the environment of g.
newG <- get("g", envir = env, inherits = FALSE)
env <- environment(newG)
expect_equal(length(ls(env)), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testObjectWriteReadDelete() throws Exception {

try {
store.read(CustomType1.class, t.key);
fail("Expected exception for non-existant object.");
fail("Expected exception for non-existent object.");
} catch (NoSuchElementException nsee) {
// Expected.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testObjectWriteReadDelete() throws Exception {

try {
db.read(CustomType1.class, t.key);
fail("Expected exception for non-existant object.");
fail("Expected exception for non-existent object.");
} catch (NoSuchElementException nsee) {
// Expected.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public boolean release(int decrement) {

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Preconditions.checkArgument(position == transferred(), "Invalid position.");

do {
if (currentEncrypted == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,17 @@ public boolean release(int decrement) {
* data into memory at once, and can avoid ballooning memory usage when transferring large
* messages such as shuffle blocks.
*
* The {@link #transfered()} counter also behaves a little funny, in that it won't go forward
* The {@link #transferred()} counter also behaves a little funny, in that it won't go forward
* until a whole chunk has been written. This is done because the code can't use the actual
* number of bytes written to the channel as the transferred count (see {@link #count()}).
* Instead, once an encrypted chunk is written to the output (including its header), the
* size of the original block will be added to the {@link #transfered()} amount.
* size of the original block will be added to the {@link #transferred()} amount.
*/
@Override
public long transferTo(final WritableByteChannel target, final long position)
throws IOException {

Preconditions.checkArgument(position == transfered(), "Invalid position.");
Preconditions.checkArgument(position == transferred(), "Invalid position.");

long reportedWritten = 0L;
long actuallyWritten = 0L;
Expand Down Expand Up @@ -273,7 +273,7 @@ public long transferTo(final WritableByteChannel target, final long position)
currentChunkSize = 0;
currentReportedBytes = 0;
}
} while (currentChunk == null && transfered() + reportedWritten < count());
} while (currentChunk == null && transferred() + reportedWritten < count());

// Returning 0 triggers a backoff mechanism in netty which may harm performance. Instead,
// we return 1 until we can (i.e. until the reported count would actually match the size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
val nullalbeSeq = Gen.listOf(Gen.oneOf[String](null: String, randomString))

test("concat") {
def concat(orgin: Seq[String]): String =
if (orgin.contains(null)) null else orgin.mkString
def concat(origin: Seq[String]): String =
if (origin.contains(null)) null else origin.mkString

forAll { (inputs: Seq[String]) =>
assert(UTF8String.concat(inputs.map(toUTF8): _*) === toUTF8(inputs.mkString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, 0);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -167,7 +167,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.spark.storage.FileSegment;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
Expand Down Expand Up @@ -184,6 +185,7 @@ private void writeSortedFile(boolean isLastFile) {
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);

int currentPartition = -1;
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final int partition = sortedRecords.packedRecordPointer.getPartitionId();
Expand All @@ -200,8 +202,8 @@ private void writeSortedFile(boolean isLastFile) {
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = taskMemoryManager.getPage(recordPointer);
final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + 4; // skip over record length
int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
while (dataRemaining > 0) {
final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
Platform.copyMemory(
Expand Down Expand Up @@ -389,15 +391,16 @@ public void insertRecord(Object recordBase, long recordOffset, int length, int p
}

growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int required = length + 4;
final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
acquireNewPageIfNecessary(required);

assert(currentPage != null);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
Platform.putInt(base, pageCursor, length);
pageCursor += 4;
UnsafeAlignedOffset.putSize(base, pageCursor, length);
pageCursor += uaoSize;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
pageCursor += length;
inMemSorter.insertRecord(recordAddress, partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
*/
private int usableCapacity = 0;

private int initialSize;
private final int initialSize;

ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) {
this.consumer = consumer;
Expand Down Expand Up @@ -94,12 +94,20 @@ public int numRecords() {
}

public void reset() {
// Reset `pos` here so that `spill` triggered by the below `allocateArray` will be no-op.
pos = 0;
if (consumer != null) {
consumer.freeArray(array);
// As `array` has been released, we should set it to `null` to avoid accessing it before
// `allocateArray` returns. `usableCapacity` is also set to `0` to avoid any codes writing
// data to `ShuffleInMemorySorter` when `array` is `null` (e.g., in
// ShuffleExternalSorter.growPointerArrayIfNecessary, we may try to access
// `ShuffleInMemorySorter` when `allocateArray` throws SparkOutOfMemoryError).
array = null;
usableCapacity = 0;
array = consumer.allocateArray(initialSize);
usableCapacity = getUsableCapacity();
}
pos = 0;
}

public void expandPointerArray(LongArray newArray) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ void closeAndWriteOutput() throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, writeMetrics.recordsWritten());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ public int getValueLength() {
* It is only valid to call this method immediately after calling `lookup()` using the same key.
* </p>
* <p>
* The key and value must be word-aligned (that is, their sizes must multiples of 8).
* The key and value must be word-aligned (that is, their sizes must be a multiple of 8).
* </p>
* <p>
* After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void insertRecord(

growPointerArrayIfNecessary();
int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 bytes to store the record length.
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
acquireNewPageIfNecessary(required);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void addSpillIfNotEmpty(UnsafeSorterIterator spillReader) throws IOExcept
if (spillReader.hasNext()) {
// We only add the spillReader to the priorityQueue if it is not empty. We do this to
// make sure the hasNext method of UnsafeSorterIterator returned by getSortedIterator
// does not return wrong result because hasNext will returns true
// does not return wrong result because hasNext will return true
// at least priorityQueue.size() times. If we allow n spillReaders in the
// priorityQueue, we will have n extra empty records in the result of UnsafeSorterIterator.
spillReader.loadNext();
Expand Down
Loading

0 comments on commit bb52630

Please sign in to comment.