Skip to content

Commit

Permalink
Merge pull request #6 from apache/master
Browse files Browse the repository at this point in the history
merge upstream changes
  • Loading branch information
nchammas committed Sep 6, 2014
2 parents dc1ba9e + ba5bcad commit d4c5f43
Show file tree
Hide file tree
Showing 23 changed files with 236 additions and 74 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLLib for machine learning, GraphX for graph processing,
and Spark Streaming.
data processing, MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

<http://spark.apache.org/>

Expand Down
2 changes: 2 additions & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS

# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
if (buf.size == 0) {
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
Expand Down
33 changes: 25 additions & 8 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,17 @@ def parse_args():
"(for debugging)")
parser.add_option(
"--ebs-vol-size", metavar="SIZE", type="int", default=0,
help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
"/vol. The volumes will be deleted when the instances terminate. " +
"Only possible on EBS-backed AMIs.")
help="Size (in GB) of each EBS volume.")
parser.add_option(
"--ebs-vol-type", default="standard",
help="EBS volume type (e.g. 'gp2', 'standard').")
parser.add_option(
"--ebs-vol-num", type="int", default=1,
help="Number of EBS volumes to attach to each node as /vol[x]. " +
"The volumes will be deleted when the instances terminate. " +
"Only possible on EBS-backed AMIs. " +
"EBS volumes are only attached if --ebs-vol-size > 0." +
"Only support up to 8 EBS volumes.")
parser.add_option(
"--swap", metavar="SWAP", type="int", default=1024,
help="Swap space to set up per node, in MB (default: 1024)")
Expand Down Expand Up @@ -348,13 +356,16 @@ def launch_cluster(conn, opts, cluster_name):
print >> stderr, "Could not find AMI " + opts.ami
sys.exit(1)

# Create block device mapping so that we can add an EBS volume if asked to
# Create block device mapping so that we can add EBS volumes if asked to.
# The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
block_map = BlockDeviceMapping()
if opts.ebs_vol_size > 0:
device = EBSBlockDeviceType()
device.size = opts.ebs_vol_size
device.delete_on_termination = True
block_map["/dev/sdv"] = device
for i in range(opts.ebs_vol_num):
device = EBSBlockDeviceType()
device.size = opts.ebs_vol_size
device.volume_type=opts.ebs_vol_type
device.delete_on_termination = True
block_map["/dev/sd" + chr(ord('s') + i)] = device

# AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
if opts.instance_type.startswith('m3.'):
Expand Down Expand Up @@ -828,6 +839,12 @@ def get_partition(total, num_partitions, current_partitions):

def real_main():
(opts, action, cluster_name) = parse_args()

# Input parameter validation
if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)

try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,11 +1089,11 @@ def take(self, num):
# we actually cap it at totalParts in runJob.
numPartsToTry = 1
if partsScanned > 0:
# If we didn't find any rows after the first iteration, just
# try all partitions next. Otherwise, interpolate the number
# of partitions we need to try, but overestimate it by 50%.
# If we didn't find any rows after the previous iteration,
# quadruple and retry. Otherwise, interpolate the number of
# partitions we need to try, but overestimate it by 50%.
if len(items) == 0:
numPartsToTry = totalParts - 1
numPartsToTry = partsScanned * 4
else:
numPartsToTry = int(1.5 * num * partsScanned / len(items))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val currentTable = table(tableName).queryExecution.analyzed
val asInMemoryRelation = currentTable match {
case _: InMemoryRelation =>
currentTable.logicalPlan
currentTable

case _ =>
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ case class SetCommand(
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
Array(Row(s"$k=$v"))
Seq(Row(s"$k=$v"))
}

// Query the value bound to key k.
Expand All @@ -78,11 +78,19 @@ case class SetCommand(
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")

Array(
context.getAllConfs.map { case (k, v) =>
Row(s"$k=$v")
}.toSeq ++ Seq(
Row("system:java.class.path=" + hiveJars),
Row("system:sun.java.command=shark.SharkServer2"))
} else {
Array(Row(s"$k=${context.getConf(k, "<undefined>")}"))
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
} else {
Seq(Row(s"$k=${context.getConf(k, "<undefined>")}"))
}
}

// Query all key-value pairs that are set in the SQLConf of the context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command

private[hive] case class AddFile(filePath: String) extends Command

private[hive] case class AddJar(path: String) extends Command

private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command

private[hive] case class AnalyzeTable(tableName: String) extends Command
Expand Down Expand Up @@ -231,7 +233,7 @@ private[hive] object HiveQl {
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
CacheCommand(sql.trim.drop(14).trim, false)
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
NativeCommand(sql)
AddJar(sql.trim.drop(8).trim)
} else if (sql.trim.toLowerCase.startsWith("add file")) {
AddFile(sql.trim.drop(9))
} else if (sql.trim.toLowerCase.startsWith("dfs")) {
Expand Down Expand Up @@ -1018,9 +1020,9 @@ private[hive] object HiveQl {

/* Other functions */
case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType))
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))

/* UDFs - Must be last otherwise will preempt built in functions */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ private[hive] trait HiveStrategies {

case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil

case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil

case hive.AddJar(path) => execution.AddJar(path) :: Nil

case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil

case describe: logical.DescribeCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
Seq.empty[Row]
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class AddJar(path: String) extends LeafNode with Command {
def hiveContext = sqlContext.asInstanceOf[HiveContext]

override def output = Seq.empty

override protected[sql] lazy val sideEffectResult: Seq[Row] = {
hiveContext.runSqlHive(s"ADD JAR $path")
hiveContext.sparkContext.addJar(path)
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.hive.execution

import java.io.File

import scala.util.Try

import org.apache.spark.sql.{SchemaRDD, Row}
import org.apache.spark.SparkException
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
Expand Down Expand Up @@ -313,7 +315,7 @@ class HiveQuerySuite extends HiveComparisonTest {
"SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15")

test("case sensitivity: registered table") {
val testData: SchemaRDD =
val testData =
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(2, "str2") :: Nil)
Expand Down Expand Up @@ -467,7 +469,7 @@ class HiveQuerySuite extends HiveComparisonTest {
}

// Describe a registered temporary table.
val testData: SchemaRDD =
val testData =
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(1, "str2") :: Nil)
Expand Down Expand Up @@ -495,6 +497,23 @@ class HiveQuerySuite extends HiveComparisonTest {
}
}

test("ADD JAR command") {
val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath
sql("CREATE TABLE alter1(a INT, b INT)")
intercept[Exception] {
sql(
"""ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
|WITH serdeproperties('s1'='9')
""".stripMargin)
}
sql(s"ADD JAR $testJar")
sql(
"""ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
|WITH serdeproperties('s1'='9')
""".stripMargin)
sql("DROP TABLE alter1")
}

test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.{SecurityManager, SparkConf, Logging}


class ExecutorRunnable(
Expand All @@ -46,7 +46,8 @@ class ExecutorRunnable(
slaveId: String,
hostname: String,
executorMemory: Int,
executorCores: Int)
executorCores: Int,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {

var rpc: YarnRPC = YarnRPC.create(conf)
Expand Down Expand Up @@ -86,6 +87,8 @@ class ExecutorRunnable(
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo

import org.apache.hadoop.conf.Configuration
Expand All @@ -41,21 +41,23 @@ private[yarn] class YarnAllocationHandler(
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {

private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()

override protected def allocateContainers(count: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null

// default.
if (count <= 0 || preferredHostToCount.isEmpty) {
logDebug("numExecutors: " + count + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
logDebug("numExecutors: " + count)
if (count <= 0) {
resourceRequests = List()
} else if (preferredHostToCount.isEmpty) {
logDebug("host preferences is empty")
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils

Expand All @@ -45,15 +45,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String) = {
uiHistoryAddress: String,
securityMgr: SecurityManager) = {
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress

resourceManager = registerWithResourceManager(conf)
registerApplicationMaster(uiAddress)

new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations)
preferredNodeLocations, securityMgr)
}

override def getAttemptId() = {
Expand Down
Loading

0 comments on commit d4c5f43

Please sign in to comment.