Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into docSaveLoad
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed May 31, 2015
2 parents 1dd77cc + f7fe9e4 commit c604cad
Show file tree
Hide file tree
Showing 447 changed files with 3,017 additions and 1,828 deletions.
10 changes: 4 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,9 +1314,8 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
signature(df = "DataFrame", path = 'character'),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
Expand All @@ -1338,9 +1337,8 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character', source = 'character',
mode = 'character'),
function(df, path = NULL, source = NULL, mode = "append", ...){
signature(df = "DataFrame", path = 'character'),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})

Expand Down
5 changes: 5 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
if (!is.null(path)) {
options[['path']] <- path
}
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
sdf <- callJMethod(sqlContext, "load", source, options)
dataFrame(sdf)
}
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })

#' @rdname write.df
#' @export
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })

#' @rdname schema
#' @export
Expand Down
24 changes: 19 additions & 5 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,21 @@ sparkR.init <- function(
#' sqlContext <- sparkRSQL.init(sc)
#'}

sparkRSQL.init <- function(jsc) {
sparkRSQL.init <- function(jsc = NULL) {
if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
return(get(".sparkRSQLsc", envir = .sparkREnv))
}

# If jsc is NULL, create a Spark Context
sc <- if (is.null(jsc)) {
sparkR.init()
} else {
jsc
}

sqlContext <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createSQLContext",
jsc)
"createSQLContext",
sc)
assign(".sparkRSQLsc", sqlContext, envir = .sparkREnv)
sqlContext
}
Expand All @@ -249,12 +256,19 @@ sparkRSQL.init <- function(jsc) {
#' sqlContext <- sparkRHive.init(sc)
#'}

sparkRHive.init <- function(jsc) {
sparkRHive.init <- function(jsc = NULL) {
if (exists(".sparkRHivesc", envir = .sparkREnv)) {
return(get(".sparkRHivesc", envir = .sparkREnv))
}

ssc <- callJMethod(jsc, "sc")
# If jsc is NULL, create a Spark Context
sc <- if (is.null(jsc)) {
sparkR.init()
} else {
jsc
}

ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
}, error = function(err) {
Expand Down
7 changes: 7 additions & 0 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.bagel

import org.scalatest.{BeforeAndAfter, FunSuite, Assertions}
import org.scalatest.{BeforeAndAfter, Assertions}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

Expand All @@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable

class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
class BagelSuite extends SparkFunSuite with Assertions with BeforeAndAfter with Timeouts {

var sc: SparkContext = _

Expand Down
6 changes: 1 addition & 5 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYTHONHASHSEED=0
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
exec "$PYSPARK_DRIVER_PYTHON" $1
fi
exec "$PYSPARK_DRIVER_PYTHON" -m $1
exit
fi

Expand Down
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<!-- Added for selenium: -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;

/**
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
* writes incoming records to separate files, one file per reduce partition, then concatenates these
* per-partition files to form a single output file, regions of which are served to reducers.
* Records are not buffered in memory. This is essentially identical to
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
* <p>
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
* simultaneously opens separate serializers and file streams for all partitions. As a result,
* {@link SortShuffleManager} only selects this write path when
* <ul>
* <li>no Ordering is specified,</li>
* <li>no Aggregator is specific, and</li>
* <li>the number of partitions is less than
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
* </ul>
*
* This code used to be part of {@link org.apache.spark.util.collection.ExternalSorter} but was
* refactored into its own class in order to reduce code complexity; see SPARK-7855 for details.
* <p>
* There have been proposals to completely remove this code path; see SPARK-6026 for details.
*/
final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {

private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);

private final int fileBufferSize;
private final boolean transferToEnabled;
private final int numPartitions;
private final BlockManager blockManager;
private final Partitioner partitioner;
private final ShuffleWriteMetrics writeMetrics;
private final Serializer serializer;

/** Array of file writers, one for each partition */
private BlockObjectWriter[] partitionWriters;

public BypassMergeSortShuffleWriter(
SparkConf conf,
BlockManager blockManager,
Partitioner partitioner,
ShuffleWriteMetrics writeMetrics,
Serializer serializer) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
this.numPartitions = partitioner.numPartitions();
this.blockManager = blockManager;
this.partitioner = partitioner;
this.writeMetrics = writeMetrics;
this.serializer = serializer;
}

@Override
public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new BlockObjectWriter[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

for (BlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
}

@Override
public long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}

@Override
public void stop() throws IOException {
if (partitionWriters != null) {
try {
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
for (BlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
writer.revertPartialWritesAndClose();
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
logger.error("Error while deleting file for block {}", writer.blockId());
}
}
} finally {
partitionWriters = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.IOException;

import scala.Product2;
import scala.collection.Iterator;

import org.apache.spark.annotation.Private;
import org.apache.spark.TaskContext;
import org.apache.spark.storage.BlockId;

/**
* Interface for objects that {@link SortShuffleWriter} uses to write its output files.
*/
@Private
public interface SortShuffleFileWriter<K, V> {

void insertAll(Iterator<Product2<K, V>> records) throws IOException;

/**
* Write all the data added into this shuffle sorter into a file in the disk store. This is
* called by the SortShuffleWriter and can go through an efficient path of just concatenating
* binary files if we decided to avoid merge-sorting.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
long[] writePartitionedFile(
BlockId blockId,
TaskContext context,
File outputFile) throws IOException;

void stop() throws IOException;
}
Loading

0 comments on commit c604cad

Please sign in to comment.