Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-23906
Browse files Browse the repository at this point in the history
# Conflicts:
#	sql/core/src/test/resources/sql-tests/results/operators.sql.out
  • Loading branch information
wangyum committed Sep 21, 2018
2 parents c715694 + 2c9d8f5 commit 87cea0b
Show file tree
Hide file tree
Showing 205 changed files with 3,899 additions and 2,451 deletions.
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 2.4.0
Version: 2.5.0
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
Expand Down
13 changes: 11 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ setMethod("createOrReplaceTempView",
#' @param x A SparkDataFrame
#' @param tableName A character vector containing the name of the table
#'
#' @family SparkDataFrame functions
#' @seealso \link{createOrReplaceTempView}
#' @rdname registerTempTable-deprecated
#' @name registerTempTable
Expand Down Expand Up @@ -3986,7 +3985,17 @@ setMethod("hint",
signature(x = "SparkDataFrame", name = "character"),
function(x, name, ...) {
parameters <- list(...)
stopifnot(all(sapply(parameters, is.character)))
if (!all(sapply(parameters, function(y) {
if (is.character(y) || is.numeric(y)) {
TRUE
} else if (is.list(y)) {
all(sapply(y, function(z) { is.character(z) || is.numeric(z) }))
} else {
FALSE
}
}))) {
stop("sql hint should be character, numeric, or list with character or numeric.")
}
jdf <- callJMethod(x@sdf, "hint", name, parameters)
dataFrame(jdf)
})
Expand Down
1 change: 0 additions & 1 deletion R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ createExternalTable <- function(x, ...) {
#' @param ... additional named parameters as options for the data source.
#' @return A SparkDataFrame.
#' @rdname createTable
#' @seealso \link{createExternalTable}
#' @examples
#'\dontrun{
#' sparkR.session()
Expand Down
9 changes: 9 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,15 @@ test_that("join(), crossJoin() and merge() on a DataFrame", {
expect_true(any(grepl("BroadcastHashJoin", execution_plan_broadcast)))
})

test_that("test hint", {
df <- sql("SELECT * FROM range(10e10)")
hintList <- list("hint2", "hint3", "hint4")
execution_plan_hint <- capture.output(
explain(hint(df, "hint1", 1.23456, "aaaaaaaaaa", hintList), TRUE)
)
expect_true(any(grepl("1.23456, aaaaaaaaaa", execution_plan_hint)))
})

test_that("toJSON() on DataFrame", {
df <- as.DataFrame(cars)
df_json <- toJSON(df)
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0-SNAPSHOT</version>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0-SNAPSHOT</version>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0-SNAPSHOT</version>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
*/
public abstract class ManagedBuffer {

/** Number of bytes of the data. */
/**
* Number of bytes of the data. If this buffer will decrypt for all of the views into the data,
* this is the size of the decrypted data.
*/
public abstract long size();

/**
Expand Down
2 changes: 1 addition & 1 deletion common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0-SNAPSHOT</version>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.io.IOException;

/**
* A handle on the file used when fetching remote data to disk. Used to ensure the lifecycle of
* writing the data, reading it back, and then cleaning it up is followed. Specific implementations
* may also handle encryption. The data can be read only via DownloadFileWritableChannel,
* which ensures data is not read until after the writer is closed.
*/
public interface DownloadFile {
/**
* Delete the file.
*
* @return <code>true</code> if and only if the file or directory is
* successfully deleted; <code>false</code> otherwise
*/
boolean delete();

/**
* A channel for writing data to the file. This special channel allows access to the data for
* reading, after the channel is closed, via {@link DownloadFileWritableChannel#closeAndRead()}.
*/
DownloadFileWritableChannel openForWriting() throws IOException;

/**
* The path of the file, intended only for debug purposes.
*/
String path();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

package org.apache.spark.network.shuffle;

import java.io.File;
import org.apache.spark.network.util.TransportConf;

/**
* A manager to create temp block files to reduce the memory usage and also clean temp
* files when they won't be used any more.
* A manager to create temp block files used when fetching remote data to reduce the memory usage.
* It will clean files when they won't be used any more.
*/
public interface TempFileManager {
public interface DownloadFileManager {

/** Create a temp block file. */
File createTempFile();
DownloadFile createTempFile(TransportConf transportConf);

/**
* Register a temp file to clean up when it won't be used any more. Return whether the
* file is registered successfully. If `false`, the caller should clean up the file by itself.
*/
boolean registerTempFileToClean(File file);
boolean registerTempFileToClean(DownloadFile file);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import org.apache.spark.network.buffer.ManagedBuffer;

import java.nio.channels.WritableByteChannel;

/**
* A channel for writing data which is fetched to disk, which allows access to the written data only
* after the writer has been closed. Used with DownloadFile and DownloadFileManager.
*/
public interface DownloadFileWritableChannel extends WritableByteChannel {
ManagedBuffer closeAndRead();
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ public void fetchBlocks(
String execId,
String[] blockIds,
BlockFetchingListener listener,
TempFileManager tempFileManager) {
DownloadFileManager downloadFileManager) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
(blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockFetcher(client, appId, execId,
blockIds1, listener1, conf, tempFileManager).start();
blockIds1, listener1, conf, downloadFileManager).start();
};

int maxRetries = conf.maxIORetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
Expand Down Expand Up @@ -58,7 +53,7 @@ public class OneForOneBlockFetcher {
private final BlockFetchingListener listener;
private final ChunkReceivedCallback chunkCallback;
private final TransportConf transportConf;
private final TempFileManager tempFileManager;
private final DownloadFileManager downloadFileManager;

private StreamHandle streamHandle = null;

Expand All @@ -79,14 +74,14 @@ public OneForOneBlockFetcher(
String[] blockIds,
BlockFetchingListener listener,
TransportConf transportConf,
TempFileManager tempFileManager) {
DownloadFileManager downloadFileManager) {
this.client = client;
this.openMessage = new OpenBlocks(appId, execId, blockIds);
this.blockIds = blockIds;
this.listener = listener;
this.chunkCallback = new ChunkCallback();
this.transportConf = transportConf;
this.tempFileManager = tempFileManager;
this.downloadFileManager = downloadFileManager;
}

/** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
Expand Down Expand Up @@ -125,7 +120,7 @@ public void onSuccess(ByteBuffer response) {
// Immediately request all chunks -- we expect that the total size of the request is
// reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
for (int i = 0; i < streamHandle.numChunks; i++) {
if (tempFileManager != null) {
if (downloadFileManager != null) {
client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
new DownloadCallback(i));
} else {
Expand Down Expand Up @@ -159,13 +154,13 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {

private class DownloadCallback implements StreamCallback {

private WritableByteChannel channel = null;
private File targetFile = null;
private DownloadFileWritableChannel channel = null;
private DownloadFile targetFile = null;
private int chunkIndex;

DownloadCallback(int chunkIndex) throws IOException {
this.targetFile = tempFileManager.createTempFile();
this.channel = Channels.newChannel(new FileOutputStream(targetFile));
this.targetFile = downloadFileManager.createTempFile(transportConf);
this.channel = targetFile.openForWriting();
this.chunkIndex = chunkIndex;
}

Expand All @@ -178,11 +173,8 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {

@Override
public void onComplete(String streamId) throws IOException {
channel.close();
ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0,
targetFile.length());
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
if (!tempFileManager.registerTempFileToClean(targetFile)) {
listener.onBlockFetchSuccess(blockIds[chunkIndex], channel.closeAndRead());
if (!downloadFileManager.registerTempFileToClean(targetFile)) {
targetFile.delete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void init(String appId) { }
* @param execId the executor id.
* @param blockIds block ids to fetch.
* @param listener the listener to receive block fetching status.
* @param tempFileManager TempFileManager to create and clean temp files.
* @param downloadFileManager DownloadFileManager to create and clean temp files.
* If it's not <code>null</code>, the remote blocks will be streamed
* into temp shuffle files to reduce the memory usage, otherwise,
* they will be kept in memory.
Expand All @@ -54,7 +54,7 @@ public abstract void fetchBlocks(
String execId,
String[] blockIds,
BlockFetchingListener listener,
TempFileManager tempFileManager);
DownloadFileManager downloadFileManager);

/**
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
Expand Down
Loading

0 comments on commit 87cea0b

Please sign in to comment.