Skip to content

Commit

Permalink
resolving merge conflicts
Browse files Browse the repository at this point in the history
*fingers crossed*

I admit I’m not exactly sure how this works… Let’s see if I did the
right thing.
  • Loading branch information
nchammas committed Aug 3, 2014
1 parent a31ccc4 commit 9a66cb0
Show file tree
Hide file tree
Showing 132 changed files with 4,986 additions and 263 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,4 @@ The following components are provided under the MIT License. See project link fo
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
3 changes: 2 additions & 1 deletion bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ if [ -n "$1" ]; then
else
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
echo " - set MASTER=XX to use a specific master" 1>&2
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
echo " - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2
echo " (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2
exit 1
fi

Expand Down
3 changes: 2 additions & 1 deletion bin/run-example2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ rem Test that an argument was given
if not "x%1"=="x" goto arg_given
echo Usage: run-example ^<example-class^> [example-args]
echo - set MASTER=XX to use a specific master
echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)
echo - can use abbreviated example class name relative to com.apache.spark.examples
echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)
goto exit
:arg_given

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ trait Logging {
// be serialized and used on another machine
@transient private var log_ : Logger = null

// Method to get the logger name for this object
protected def logName = {
// Ignore trailing $'s in the class names for Scala objects
this.getClass.getName.stripSuffix("$")
}

// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
log_ = LoggerFactory.getLogger(className.stripSuffix("$"))
log_ = LoggerFactory.getLogger(logName)
}
log_
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class Worker(
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)


val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
Expand All @@ -82,7 +82,12 @@ private[spark] class Worker(
@volatile var connected = false
val workerId = generateWorkerId()
val sparkHome =
new File(sys.props.get("spark.test.home").orElse(sys.env.get("SPARK_HOME")).getOrElse("."))
if (testing) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).sum
val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
val memUsed = storageStatusList.map(_.memUsed).sum
memUsed / 1024 / 1024
}
})

Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def memRemaining: Long = maxMem - memUsed

/** Return the memory used by this block manager. */
def memUsed: Long =
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum

/** Return the disk space used by this block manager. */
def diskUsed: Long =
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum

/** Return the off-heap space used by this block manager. */
def offHeapUsed: Long =
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum

/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
Expand Down Expand Up @@ -246,7 +243,7 @@ private[spark] object StorageUtils {
val rddId = rddInfo.id
// Assume all blocks belonging to the same RDD have the same storage level
val storageLevel = statuses
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
.flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE)
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.language.postfixOps
class DriverSuite extends FunSuite with Timeouts {

test("driver should exit after finishing") {
val sparkHome = sys.props("spark.test.home")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
val sparkHome = sys.props("spark.test.home")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Utils.executeAndGetOutput(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.props("spark.test.home")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
Expand Down
4 changes: 2 additions & 2 deletions dev/audit-release/audit_release.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_url(url):
"spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
"spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
"spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
"spark-catalyst", "spark-sql", "spark-hive"
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
]
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)

Expand Down Expand Up @@ -136,7 +136,7 @@ def ensure_path_not_present(x):
os.chdir(original_dir)

# SBT application tests
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]:
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive", "sbt_app_kinesis"]:
os.chdir(app)
ret = run_cmd("sbt clean run", exit_on_failure=False)
test(ret == 0, "sbt application (%s)" % app)
Expand Down
7 changes: 7 additions & 0 deletions dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,12 @@ object SimpleApp {
println("Ganglia sink was loaded via spark-core")
System.exit(-1)
}

// Remove kinesis from default build due to ASL license issue
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
if (foundKinesis) {
println("Kinesis was loaded via spark-core")
System.exit(-1)
}
}
}
28 changes: 28 additions & 0 deletions dev/audit-release/sbt_app_kinesis/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

name := "Kinesis Test"

version := "1.0"

scalaVersion := System.getenv.get("SCALA_VERSION")

libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % System.getenv.get("SPARK_VERSION")

resolvers ++= Seq(
"Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"),
"Spray Repository" at "http://repo.spray.cc/")
33 changes: 33 additions & 0 deletions dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main.scala

import scala.util.Try

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
def main(args: Array[String]) {
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
if (!foundKinesis) {
println("Kinesis not loaded via kinesis-asl")
System.exit(-1)
}
}
}
2 changes: 1 addition & 1 deletion dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object SparkSqlExample {

import sqlContext._
val people = sc.makeRDD(1 to 100, 10).map(x => Person(s"Name$x", x))
people.registerAsTable("people")
people.registerTempTable("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val teenagerNames = teenagers.map(t => "Name: " + t(0)).collect()
teenagerNames.foreach(println)
Expand Down
4 changes: 2 additions & 2 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
-Dmaven.javadoc.skip=true \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
--batch-mode release:prepare

mvn -DskipTests \
-Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Dmaven.javadoc.skip=true \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
release:perform

cd ..
Expand Down
3 changes: 3 additions & 0 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ fi
if [ -z "$SBT_MAVEN_PROFILES_ARGS" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
fi

export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"

echo "SBT_MAVEN_PROFILES_ARGS=\"$SBT_MAVEN_PROFILES_ARGS\""

# Remove work directory
Expand Down
18 changes: 9 additions & 9 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -210,7 +210,7 @@ JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").m

// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -248,7 +248,7 @@ people = parts.map(lambda p: {"name": p[0], "age": int(p[1])})
# In future versions of PySpark we would like to add support for registering RDDs with other
# datatypes as tables
schemaPeople = sqlContext.inferSchema(people)
schemaPeople.registerAsTable("people")
schemaPeople.registerTempTable("people")

# SQL can be run over SchemaRDDs that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -292,7 +292,7 @@ people.saveAsParquetFile("people.parquet")
val parquetFile = sqlContext.parquetFile("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile")
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% endhighlight %}
Expand All @@ -314,7 +314,7 @@ schemaPeople.saveAsParquetFile("people.parquet");
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
parquetFile.registerTempTable("parquetFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
public String call(Row row) {
Expand All @@ -340,7 +340,7 @@ schemaPeople.saveAsParquetFile("people.parquet")
parquetFile = sqlContext.parquetFile("people.parquet")

# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
Expand Down Expand Up @@ -378,7 +378,7 @@ people.printSchema()
// |-- name: StringType

// Register this SchemaRDD as a table.
people.registerAsTable("people")
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down Expand Up @@ -416,7 +416,7 @@ people.printSchema();
// |-- name: StringType

// Register this JavaSchemaRDD as a table.
people.registerAsTable("people");
people.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlContext.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
Expand Down Expand Up @@ -455,7 +455,7 @@ people.printSchema()
# |-- name: StringType

# Register this SchemaRDD as a table.
people.registerAsTable("people")
people.registerTempTable("people")

# SQL statements can be run by using the sql methods provided by sqlContext.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down
4 changes: 2 additions & 2 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
---

Spark Streaming can receive streaming data from any arbitrary data source beyond
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
This requires the developer to implement a *receiver* that is customized for receiving data from
the concerned data source. This guide walks through the process of implementing a custom receiver
and using it in a Spark Streaming application.
Expand Down Expand Up @@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" "))
...
{% endhighlight %}

The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).

</div>
<div data-lang="java" markdown="1">
Expand Down
Loading

0 comments on commit 9a66cb0

Please sign in to comment.