Skip to content

Commit

Permalink
Merge branch 'master' into takeSample
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jun 9, 2014
2 parents 9bdd36e + 6cf335d commit 065ebcd
Show file tree
Hide file tree
Showing 175 changed files with 2,392 additions and 1,105 deletions.
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 0 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
sc.stop()
sc = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}

test("halting by voting") {
Expand Down
2 changes: 1 addition & 1 deletion bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi

./bin/spark-submit \
"$FWDIR"/bin/spark-submit \
--master $EXAMPLE_MASTER \
--class $EXAMPLE_CLASS \
"$SPARK_EXAMPLES_JAR" \
Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -240,7 +240,7 @@
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,17 @@ class HashPartitioner(partitions: Int) extends Partitioner {
case _ =>
false
}

override def hashCode: Int = numPartitions
}

/**
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*
* Note that the actual number of partitions created by the RangePartitioner might not be the same
* as the `partitions` parameter, in the case where the number of sampled records is less than
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
Expand Down Expand Up @@ -119,7 +125,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
}

def numPartitions = partitions
def numPartitions = rangeBounds.length + 1

private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

Expand Down Expand Up @@ -155,4 +161,16 @@ class RangePartitioner[K : Ordering : ClassTag, V](
case _ =>
false
}

override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < rangeBounds.length) {
result = prime * result + rangeBounds(i).hashCode
i += 1
}
result = prime * result + ascending.hashCode
result
}
}
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
minPartitions).map(pair => pair._2.toString).setName(path)
}

/**
Expand Down Expand Up @@ -496,7 +496,7 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[String],
classOf[String],
updateConf,
minPartitions)
minPartitions).setName(path)
}

/**
Expand Down Expand Up @@ -551,7 +551,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass,
keyClass,
valueClass,
minPartitions)
minPartitions).setName(path)
}

/**
Expand Down Expand Up @@ -623,7 +623,7 @@ class SparkContext(config: SparkConf) extends Logging {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

/**
Expand Down
51 changes: 30 additions & 21 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
* Partitioner to partition the output RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, partitioner)
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
{
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
}

/**
* Return approximate number of distinct values for each key this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
* level.
* Return approximate number of distinct values for each key in this RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD.
*/
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD)
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
}


/**
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
* output RDD into numPartitions.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
}

/** Assign a name to this RDD */
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))


/**
* Randomly splits this RDD with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1
*
* @return split RDDs in an array
*/
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
randomSplit(weights, Utils.random.nextLong)

/**
* Randomly splits this RDD with the provided weights.
*
* @param weights weights for splits, will be normalized if they don't sum to 1
* @param seed random seed
*
* @return split RDDs in an array
*/
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
rdd.randomSplit(weights, seed).map(wrapRDD)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return approximate number of distinct elements in the RDD.
*
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
*
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)

def name(): String = rdd.name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ private[spark] class PythonPartitioner(
case _ =>
false
}

override def hashCode: Int = 31 * numPartitions + pyPartitionFunctionId.hashCode
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
|
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| --help, -h Show this help message and exit
| --verbose, -v Print additional debug output
|
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
| --supervise If given, restarts the driver on failure.
Expand Down
Loading

0 comments on commit 065ebcd

Please sign in to comment.