Skip to content

Commit

Permalink
Merge pull request #14 from apache/master
Browse files Browse the repository at this point in the history
latest apache spark
  • Loading branch information
rekhajoshm authored Jun 21, 2018
2 parents dca3a9e + 9de11d3 commit ae51f60
Show file tree
Hide file tree
Showing 286 changed files with 11,852 additions and 2,709 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.

(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ exportMethods("%<=>%",
"approxCountDistinct",
"approxQuantile",
"array_contains",
"array_join",
"array_max",
"array_min",
"array_position",
Expand Down
29 changes: 26 additions & 3 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ NULL
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))
#' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp))
#' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5)))
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))}
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))
#' tmp5 <- mutate(df, v6 = create_array(df$model, df$model))
#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL")))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -3006,6 +3008,27 @@ setMethod("array_contains",
column(jc)
})

#' @details
#' \code{array_join}: Concatenates the elements of column using the delimiter.
#' Null values are replaced with nullReplacement if set, otherwise they are ignored.
#'
#' @param delimiter a character string that is used to concatenate the elements of column.
#' @param nullReplacement an optional character string that is used to replace the Null values.
#' @rdname column_collection_functions
#' @aliases array_join array_join,Column-method
#' @note array_join since 2.4.0
setMethod("array_join",
signature(x = "Column", delimiter = "character"),
function(x, delimiter, nullReplacement = NULL) {
jc <- if (is.null(nullReplacement)) {
callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter)
} else {
callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter,
as.character(nullReplacement))
}
column(jc)
})

#' @details
#' \code{array_max}: Returns the maximum value of the array.
#'
Expand Down Expand Up @@ -3197,8 +3220,8 @@ setMethod("size",
#' (or starting from the end if start is negative) with the specified length.
#'
#' @rdname column_collection_functions
#' @param start an index indicating the first element occuring in the result.
#' @param length a number of consecutive elements choosen to the result.
#' @param start an index indicating the first element occurring in the result.
#' @param length a number of consecutive elements chosen to the result.
#' @aliases slice slice,Column-method
#' @note slice since 2.4.0
setMethod("slice",
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,10 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun
#' @name NULL
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_join", function(x, delimiter, ...) { standardGeneric("array_join") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_max", function(x) { standardGeneric("array_max") })
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,21 @@ test_that("column functions", {
result <- collect(select(df, arrays_overlap(df[[1]], df[[2]])))[[1]]
expect_equal(result, c(TRUE, FALSE, NA))

# Test array_join()
df <- createDataFrame(list(list(list("Hello", "World!"))))
result <- collect(select(df, array_join(df[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")
df2 <- createDataFrame(list(list(list("Hello", NA, "World!"))))
result <- collect(select(df2, array_join(df2[[1]], "#", "Beautiful")))[[1]]
expect_equal(result, "Hello#Beautiful#World!")
result <- collect(select(df2, array_join(df2[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")
df3 <- createDataFrame(list(list(list("Hello", NULL, "World!"))))
result <- collect(select(df3, array_join(df3[[1]], "#", "Beautiful")))[[1]]
expect_equal(result, "Hello#Beautiful#World!")
result <- collect(select(df3, array_join(df3[[1]], "#")))[[1]]
expect_equal(result, "Hello#World!")

# Test array_sort() and sort_array()
df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L))))

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ can be run using:
Please see the guidance on how to
[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).

There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md

## A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
Expand Down
33 changes: 24 additions & 9 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,25 @@ function build {
if [ ! -d "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi

local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}

docker build "${BUILD_ARGS[@]}" \
local BINDING_BUILD_ARGS=(
--build-arg
base_img=$(image_ref spark)
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}

docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$DOCKERFILE" .
-f "$BASEDOCKERFILE" .

docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
}

function push {
docker push "$(image_ref spark)"
docker push "$(image_ref spark-py)"
}

function usage {
Expand All @@ -86,10 +95,12 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.
Options:
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
Expand All @@ -116,14 +127,18 @@ fi

REPO=
TAG=
DOCKERFILE=
while getopts f:mr:t: option
BASEDOCKERFILE=
PYDOCKERFILE=
NOCACHEARG=
while getopts f:mr:t:n option
do
case "${option}"
in
f) DOCKERFILE=${OPTARG};;
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
m)
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
Expand Down
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,4 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
echo "Using \`mvn\` from path: $MVN_BIN" 1>&2

# Last, call the `mvn` command as usual
${MVN_BIN} -DzincPort=${ZINC_PORT} "$@"
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,15 @@ protected void deallocate() {
}

private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
ByteBuffer buffer = buf.nioBuffer();
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
target.write(buffer) : writeNioBuffer(target, buffer);
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
int written = target.write(buffer);
buf.skipBytes(written);
return written;
}

private int writeNioBuffer(
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int ret = 0;

try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = writeCh.write(buf);
} finally {
buf.limit(originalLimit);
}

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.network.util;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -91,11 +88,24 @@ public static String bytesToString(ByteBuffer b) {
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file) throws IOException {
deleteRecursively(file, null);
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
*
* @param file Input file / dir to be deleted
* @param filter A filename filter that make sure only files / dirs with the satisfied filenames
* are deleted.
* @throws IOException if deletion is unsuccessful
*/
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException {
if (file == null) { return; }

// On Unix systems, use operating system command to run faster
// If that does not work out, fallback to the Java IO way
if (SystemUtils.IS_OS_UNIX) {
if (SystemUtils.IS_OS_UNIX && filter == null) {
try {
deleteRecursivelyUsingUnixNative(file);
return;
Expand All @@ -105,15 +115,17 @@ public static void deleteRecursively(File file) throws IOException {
}
}

deleteRecursivelyUsingJavaIO(file);
deleteRecursivelyUsingJavaIO(file, filter);
}

private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
private static void deleteRecursivelyUsingJavaIO(
File file,
FilenameFilter filter) throws IOException {
if (file.isDirectory() && !isSymlink(file)) {
IOException savedIOException = null;
for (File child : listFilesSafely(file)) {
for (File child : listFilesSafely(file, filter)) {
try {
deleteRecursively(child);
deleteRecursively(child, filter);
} catch (IOException e) {
// In case of multiple exceptions, only last one will be thrown
savedIOException = e;
Expand All @@ -124,10 +136,13 @@ private static void deleteRecursivelyUsingJavaIO(File file) throws IOException {
}
}

boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
// Delete file only when it's a normal file or an empty directory.
if (file.isFile() || (file.isDirectory() && listFilesSafely(file, null).length == 0)) {
boolean deleted = file.delete();
// Delete can also fail if the file simply did not exist.
if (!deleted && file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath());
}
}
}

Expand Down Expand Up @@ -157,9 +172,9 @@ private static void deleteRecursivelyUsingUnixNative(File file) throws IOExcepti
}
}

private static File[] listFilesSafely(File file) throws IOException {
private static File[] listFilesSafely(File file, FilenameFilter filter) throws IOException {
if (file.exists()) {
File[] files = file.listFiles();
File[] files = file.listFiles(filter);
if (files == null) {
throw new IOException("Failed to list files for dir: " + file);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
blockManager.applicationRemoved(appId, cleanupLocalDirs);
}

/**
* Clean up any non-shuffle files in any local directories associated with an finished executor.
*/
public void executorRemoved(String executorId, String appId) {
blockManager.executorRemoved(executorId, appId);
}

/**
* Register an (application, executor) with the given shuffle info.
*
Expand Down
Loading

0 comments on commit ae51f60

Please sign in to comment.