Skip to content

Commit

Permalink
Merge branch 'master' into avro-hadoop2
Browse files Browse the repository at this point in the history
  • Loading branch information
medale committed Feb 4, 2015
2 parents 9d85e2a + 5aa0f21 commit 1ab4fa3
Show file tree
Hide file tree
Showing 41 changed files with 1,163 additions and 515 deletions.
6 changes: 3 additions & 3 deletions bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
exit 1
fi

# NOTE: If you add or remove spark-sumbmit options,
# NOTE: If you add or remove spark-submit options,
# modify NOT ONLY this script but also SparkSubmitArgument.scala
SUBMISSION_OPTS=()
APPLICATION_OPTS=()
while (($#)); do
case "$1" in
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
--conf | --properties-file | --driver-memory | --driver-java-options | \
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
if [[ $# -lt 2 ]]; then
Expand Down
2 changes: 1 addition & 1 deletion bin/windows-utils.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"

echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
Expand Down
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
<version>${ivy.version}</version>
</dependency>
<dependency>
<groupId>oro</groupId>
<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
<artifactId>oro</artifactId>
<version>${oro.version}</version>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
Expand Down
93 changes: 88 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* '''Note:''' We ensure that the byte array for each record in the resulting RDD
* has the provided record length.
*
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
Expand All @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
val data = br.map{ case (k, v) => v.getBytes}
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
}
data
}

Expand Down Expand Up @@ -1224,7 +1231,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
null
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
try {
env.httpFileServer.addJar(new File(uri.getPath))
} catch {
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
}
}
// A JAR file which exists locally on every worker node
case "local" =>
Expand Down Expand Up @@ -1749,8 +1768,14 @@ object SparkContext extends Logging {
@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) = {
val kf = implicitly[K => Writable]
val vf = implicitly[V => Writable]
// Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
RDD.rddToSequenceFileRDDFunctions(rdd)
}

@deprecated("Replaced by implicit functions in the RDD companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
Expand All @@ -1767,20 +1792,35 @@ object SparkContext extends Logging {
def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
RDD.numericRDDToDoubleRDDFunctions(rdd)

// Implicit conversions to common Writable types, for saveAsSequenceFile
// The following deprecated functions have already been moved to `object WritableFactory` to
// make the compiler find them automatically. They are still kept here for backward compatibility.

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)

@deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
implicit def stringToText(s: String): Text = new Text(s)

private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
Expand Down Expand Up @@ -2070,7 +2110,7 @@ object WritableConverter {
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

// The following implicit functions were in SparkContext before 1.2 and users had to
// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
Expand Down Expand Up @@ -2103,3 +2143,46 @@ object WritableConverter {
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The Writable class will be used in `SequenceFileRDDFunctions`.
*/
private[spark] class WritableFactory[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: T => Writable) extends Serializable

object WritableFactory {

private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
: WritableFactory[T] = {
val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
new WritableFactory[T](_ => writableClass, convert)
}

implicit def intWritableFactory: WritableFactory[Int] =
simpleWritableFactory(new IntWritable(_))

implicit def longWritableFactory: WritableFactory[Long] =
simpleWritableFactory(new LongWritable(_))

implicit def floatWritableFactory: WritableFactory[Float] =
simpleWritableFactory(new FloatWritable(_))

implicit def doubleWritableFactory: WritableFactory[Double] =
simpleWritableFactory(new DoubleWritable(_))

implicit def booleanWritableFactory: WritableFactory[Boolean] =
simpleWritableFactory(new BooleanWritable(_))

implicit def bytesWritableFactory: WritableFactory[Array[Byte]] =
simpleWritableFactory(new BytesWritable(_))

implicit def stringWritableFactory: WritableFactory[String] =
simpleWritableFactory(new Text(_))

implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] =
simpleWritableFactory(w => w)

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra
* given directory (probably a temp directory)
*/
object WriteInputFormatTestDataGenerator {
import SparkContext._

def main(args: Array[String]) {
val path = args(0)
Expand Down
Loading

0 comments on commit 1ab4fa3

Please sign in to comment.