Skip to content

Commit

Permalink
deprecate defaultMinSplits
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Apr 18, 2014
1 parent ba2c663 commit 4b60541
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 16 deletions.
21 changes: 13 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class SparkContext(config: SparkConf) extends Logging {
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinSplits): RDD[String] = {
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
Expand Down Expand Up @@ -459,7 +459,8 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def wholeTextFiles(path: String, minPartitions: Int = defaultMinSplits): RDD[(String, String)] = {
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
Expand Down Expand Up @@ -493,7 +494,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
Expand All @@ -512,7 +513,7 @@ class SparkContext(config: SparkConf) extends Logging {
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand Down Expand Up @@ -565,7 +566,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
Expand Down Expand Up @@ -641,7 +642,7 @@ class SparkContext(config: SparkConf) extends Logging {
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)

/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
Expand All @@ -665,7 +666,7 @@ class SparkContext(config: SparkConf) extends Logging {
* a `map` function.
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinSplits)
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
Expand All @@ -688,7 +689,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinSplits
minPartitions: Int = defaultMinPartitions
): RDD[T] = {
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
Expand Down Expand Up @@ -1183,8 +1184,12 @@ class SparkContext(config: SparkConf) extends Logging {
def defaultParallelism: Int = taskScheduler.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

private val nextShuffleId = new AtomicInteger(0)

private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,17 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: java.lang.Integer = sc.defaultParallelism

/** Default min number of partitions for Hadoop RDDs when not given by user */
/**
* Default min number of partitions for Hadoop RDDs when not given by user.
* @deprecated As of Spark 1.0.0, defaultMinSplits is deprecated, use
* {@link #defaultMinPartitions()} instead
*/
@Deprecated
def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits

/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions

/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
12 changes: 6 additions & 6 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ object MLUtils {
* @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
* @param numFeatures number of features, which will be determined from the input data if a
* negative value is given. The default value is -1.
* @param minSplits min number of partitions, default: sc.defaultMinSplits
* @param minPartitions min number of partitions, default: sc.defaultMinPartitions
* @return labeled data stored as an RDD[LabeledPoint]
*/
def loadLibSVMData(
sc: SparkContext,
path: String,
labelParser: LabelParser,
numFeatures: Int,
minSplits: Int): RDD[LabeledPoint] = {
val parsed = sc.textFile(path, minSplits)
minPartitions: Int): RDD[LabeledPoint] = {
val parsed = sc.textFile(path, minPartitions)
.map(_.trim)
.filter(!_.isEmpty)
.map(_.split(' '))
Expand Down Expand Up @@ -101,7 +101,7 @@ object MLUtils {
* with number of features determined automatically and the default number of partitions.
*/
def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinSplits)
loadLibSVMData(sc, path, BinaryLabelParser, -1, sc.defaultMinPartitions)

/**
* Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
Expand All @@ -112,7 +112,7 @@ object MLUtils {
sc: SparkContext,
path: String,
labelParser: LabelParser): RDD[LabeledPoint] =
loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinPartitions)

/**
* Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
Expand All @@ -124,7 +124,7 @@ object MLUtils {
path: String,
labelParser: LabelParser,
numFeatures: Int): RDD[LabeledPoint] =
loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinPartitions)

/**
* :: Experimental ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
// Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
// it is smaller than what Spark suggests.
private val _minSplitsPerRDD = math.max(
sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits)
sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)

// TODO: set aws s3 credentials.

Expand Down

0 comments on commit 4b60541

Please sign in to comment.