StructType |
- org.apache.spark.sql.api.java.Row |
+ org.apache.spark.sql.Row |
DataTypes.createStructType(fields)
Note: fields is a List or an array of StructFields.
@@ -1478,10 +1642,10 @@ please use factory methods provided in
-All data types of Spark SQL are located in the package of `pyspark.sql`.
+All data types of Spark SQL are located in the package of `pyspark.sql.types`.
You can access them by doing
{% highlight python %}
-from pyspark.sql import *
+from pyspark.sql.types import *
{% endhighlight %}
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index c59ab565c6862..b50b3816ff890 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -22,6 +22,7 @@
from __future__ import with_statement
import hashlib
+import itertools
import logging
import os
import os.path
@@ -159,6 +160,15 @@ def parse_args():
"--spark-ec2-git-branch",
default=DEFAULT_SPARK_EC2_BRANCH,
help="Github repo branch of spark-ec2 to use (default: %default)")
+ parser.add_option(
+ "--deploy-root-dir",
+ default=None,
+ help="A directory to copy into / on the first master. " +
+ "Must be absolute. Note that a trailing slash is handled as per rsync: " +
+ "If you omit it, the last directory of the --deploy-root-dir path will be created " +
+ "in / before copying its contents. If you append the trailing slash, " +
+ "the directory is not created and its contents are copied directly into /. " +
+ "(default: %default).")
parser.add_option(
"--hadoop-major-version", default="1",
help="Major version of Hadoop (default: %default)")
@@ -290,13 +300,6 @@ def get_validate_spark_version(version, repo):
return version
-# Check whether a given EC2 instance object is in a state we consider active,
-# i.e. not terminating or terminated. We count both stopping and stopped as
-# active since we can restart stopped clusters.
-def is_active(instance):
- return (instance.state in ['pending', 'running', 'stopping', 'stopped'])
-
-
# Source: http://aws.amazon.com/amazon-linux-ami/instance-type-matrix/
# Last Updated: 2014-06-20
# For easy maintainability, please keep this manually-inputted dictionary sorted by key.
@@ -564,8 +567,11 @@ def launch_cluster(conn, opts, cluster_name):
placement_group=opts.placement_group,
user_data=user_data_content)
slave_nodes += slave_res.instances
- print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone,
- zone, slave_res.id)
+ print "Launched {s} slave{plural_s} in {z}, regid = {r}".format(
+ s=num_slaves_this_zone,
+ plural_s=('' if num_slaves_this_zone == 1 else 's'),
+ z=zone,
+ r=slave_res.id)
i += 1
# Launch or resume masters
@@ -612,40 +618,47 @@ def launch_cluster(conn, opts, cluster_name):
return (master_nodes, slave_nodes)
-# Get the EC2 instances in an existing cluster if available.
-# Returns a tuple of lists of EC2 instance objects for the masters and slaves
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
- print "Searching for existing cluster " + cluster_name + " in region " \
- + opts.region + "..."
- reservations = conn.get_all_reservations()
- master_nodes = []
- slave_nodes = []
- for res in reservations:
- active = [i for i in res.instances if is_active(i)]
- for inst in active:
- group_names = [g.name for g in inst.groups]
- if (cluster_name + "-master") in group_names:
- master_nodes.append(inst)
- elif (cluster_name + "-slaves") in group_names:
- slave_nodes.append(inst)
- if any((master_nodes, slave_nodes)):
- print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))
- if master_nodes != [] or not die_on_error:
- return (master_nodes, slave_nodes)
- else:
- if master_nodes == [] and slave_nodes != []:
- print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name \
- + "-master" + " in region " + opts.region
- else:
- print >> sys.stderr, "ERROR: Could not find any existing cluster" \
- + " in region " + opts.region
+ """
+ Get the EC2 instances in an existing cluster if available.
+ Returns a tuple of lists of EC2 instance objects for the masters and slaves.
+ """
+ print "Searching for existing cluster {c} in region {r}...".format(
+ c=cluster_name, r=opts.region)
+
+ def get_instances(group_names):
+ """
+ Get all non-terminated instances that belong to any of the provided security groups.
+
+ EC2 reservation filters and instance states are documented here:
+ http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html#options
+ """
+ reservations = conn.get_all_reservations(
+ filters={"instance.group-name": group_names})
+ instances = itertools.chain.from_iterable(r.instances for r in reservations)
+ return [i for i in instances if i.state not in ["shutting-down", "terminated"]]
+
+ master_instances = get_instances([cluster_name + "-master"])
+ slave_instances = get_instances([cluster_name + "-slaves"])
+
+ if any((master_instances, slave_instances)):
+ print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
+ m=len(master_instances),
+ plural_m=('' if len(master_instances) == 1 else 's'),
+ s=len(slave_instances),
+ plural_s=('' if len(slave_instances) == 1 else 's'))
+
+ if not master_instances and die_on_error:
+ print >> sys.stderr, \
+ "ERROR: Could not find a master for cluster {c} in region {r}.".format(
+ c=cluster_name, r=opts.region)
sys.exit(1)
+ return (master_instances, slave_instances)
+
# Deploy configuration files and run setup scripts on a newly launched
# or started EC2 cluster.
-
-
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
@@ -694,6 +707,14 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
modules=modules
)
+ if opts.deploy_root_dir is not None:
+ print "Deploying {s} to master...".format(s=opts.deploy_root_dir)
+ deploy_user_files(
+ root_dir=opts.deploy_root_dir,
+ opts=opts,
+ master_nodes=master_nodes
+ )
+
print "Running setup on master..."
setup_spark_cluster(master, opts)
print "Done!"
@@ -931,6 +952,23 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
shutil.rmtree(tmp_dir)
+# Deploy a given local directory to a cluster, WITHOUT parameter substitution.
+# Note that unlike deploy_files, this works for binary files.
+# Also, it is up to the user to add (or not) the trailing slash in root_dir.
+# Files are only deployed to the first master instance in the cluster.
+#
+# root_dir should be an absolute path.
+def deploy_user_files(root_dir, opts, master_nodes):
+ active_master = master_nodes[0].public_dns_name
+ command = [
+ 'rsync', '-rv',
+ '-e', stringify_command(ssh_command(opts)),
+ "%s" % root_dir,
+ "%s@%s:/" % (opts.user, active_master)
+ ]
+ subprocess.check_call(command)
+
+
def stringify_command(parts):
if isinstance(parts, str):
return parts
@@ -1099,6 +1137,14 @@ def real_main():
"Furthermore, we currently only support forks named spark-ec2."
sys.exit(1)
+ if not (opts.deploy_root_dir is None or
+ (os.path.isabs(opts.deploy_root_dir) and
+ os.path.isdir(opts.deploy_root_dir) and
+ os.path.exists(opts.deploy_root_dir))):
+ print >> stderr, "--deploy-root-dir must be an absolute path to a directory that exists " \
+ "on the local file system"
+ sys.exit(1)
+
try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
@@ -1126,14 +1172,16 @@ def real_main():
setup_cluster(conn, master_nodes, slave_nodes, opts, True)
elif action == "destroy":
- print "Are you sure you want to destroy the cluster %s?" % cluster_name
- print "The following instances will be terminated:"
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- for inst in master_nodes + slave_nodes:
- print "> %s" % inst.public_dns_name
- msg = "ALL DATA ON ALL NODES WILL BE LOST!!\nDestroy cluster %s (y/N): " % cluster_name
+ if any(master_nodes + slave_nodes):
+ print "The following instances will be terminated:"
+ for inst in master_nodes + slave_nodes:
+ print "> %s" % inst.public_dns_name
+ print "ALL DATA ON ALL NODES WILL BE LOST!!"
+
+ msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
response = raw_input(msg)
if response == "y":
print "Terminating master..."
@@ -1145,7 +1193,6 @@ def real_main():
# Delete security groups as well
if opts.delete_groups:
- print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
wait_for_cluster_state(
conn=conn,
@@ -1153,6 +1200,7 @@ def real_main():
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated'
)
+ print "Deleting security groups (this will take some time)..."
attempt = 1
while attempt <= 3:
print "Attempt %d" % attempt
@@ -1259,6 +1307,17 @@ def real_main():
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready'
)
+
+ # Determine types of running instances
+ existing_master_type = master_nodes[0].instance_type
+ existing_slave_type = slave_nodes[0].instance_type
+ # Setting opts.master_instance_type to the empty string indicates we
+ # have the same instance type for the master and the slaves
+ if existing_master_type == existing_slave_type:
+ existing_master_type = ""
+ opts.master_instance_type = existing_master_type
+ opts.instance_type = existing_slave_type
+
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else:
diff --git a/examples/pom.xml b/examples/pom.xml
index 8caad2bc2e27a..994071d94d0ad 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
index e809a65b79975..f6f8d9f90c275 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/LiveJournalPageRank.scala
@@ -17,11 +17,6 @@
package org.apache.spark.examples.graphx
-import org.apache.spark.SparkContext._
-import org.apache.spark._
-import org.apache.spark.graphx._
-
-
/**
* Uses GraphX to run PageRank on a LiveJournal social network graph. Download the dataset from
* http://snap.stanford.edu/data/soc-LiveJournal1.html.
@@ -31,13 +26,13 @@ object LiveJournalPageRank {
if (args.length < 1) {
System.err.println(
"Usage: LiveJournalPageRank \n" +
+ " --numEPart=\n" +
+ " The number of partitions for the graph's edge RDD.\n" +
" [--tol=]\n" +
" The tolerance allowed at convergence (smaller => more accurate). Default is " +
"0.001.\n" +
" [--output=]\n" +
" If specified, the file to write the ranks to.\n" +
- " [--numEPart=]\n" +
- " The number of partitions for the graph's edge RDD. Default is 4.\n" +
" [--partStrategy=RandomVertexCut | EdgePartition1D | EdgePartition2D | " +
"CanonicalRandomVertexCut]\n" +
" The way edges are assigned to edge partitions. Default is RandomVertexCut.")
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 0706f1ebf66e2..96c2787e35cd0 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 1f2681394c583..172d447b77cda 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index 8daa7ed608f6a..5109b8ed87524 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index af96138d79405..369856187a244 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 560c8b9d18276..a344f000c5002 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index da6ffe7662f63..e95853f005ce2 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index e919c2c9b19ea..9b3475d7c3dc2 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
index 0fb431808bacd..bc2f8be10c9ce 100644
--- a/extras/java8-tests/pom.xml
+++ b/extras/java8-tests/pom.xml
@@ -19,7 +19,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index 216661b8bc73a..7e49a71907336 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -19,7 +19,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml
index f2f0aa78b0a4b..6eb29af03f833 100644
--- a/extras/spark-ganglia-lgpl/pom.xml
+++ b/extras/spark-ganglia-lgpl/pom.xml
@@ -19,7 +19,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 8fac24b6ed86d..57e338c03ecf9 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 4c8f34417ca65..b5c949e155cfd 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
index ced042e2f96ca..c1d1a224817e8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala
@@ -22,6 +22,7 @@ import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.evaluation.binary._
import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.sql.DataFrame
/**
* :: Experimental ::
@@ -53,6 +54,13 @@ class BinaryClassificationMetrics(
*/
def this(scoreAndLabels: RDD[(Double, Double)]) = this(scoreAndLabels, 0)
+ /**
+ * An auxiliary constructor taking a DataFrame.
+ * @param scoreAndLabels a DataFrame with two double columns: score and label
+ */
+ private[mllib] def this(scoreAndLabels: DataFrame) =
+ this(scoreAndLabels.map(r => (r.getDouble(0), r.getDouble(1))))
+
/** Unpersist intermediate RDDs used in the computation. */
def unpersist() {
cumulativeCounts.unpersist()
diff --git a/network/common/pom.xml b/network/common/pom.xml
index 8f7c924d6b3a3..74437f37c47e4 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 5bc6e5a2418a9..f0a89c9d9116c 100644
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -35,7 +35,6 @@
import org.apache.spark.network.server.TransportChannelHandler;
import org.apache.spark.network.server.TransportRequestHandler;
import org.apache.spark.network.server.TransportServer;
-import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
index 986957c1509fd..f76bb49e874fc 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/ChunkFetchFailure.java
@@ -17,7 +17,6 @@
package org.apache.spark.network.protocol;
-import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java b/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java
index 873c694250942..9162d0b977f83 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/Encoders.java
@@ -20,7 +20,6 @@
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
/** Provides a canonical set of Encoders for simple types. */
public class Encoders {
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java
index ebd764eb5eb5f..6b991375fc486 100644
--- a/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/RpcFailure.java
@@ -17,7 +17,6 @@
package org.apache.spark.network.protocol;
-import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index ef209991804b4..b7ce8541e565e 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -28,7 +28,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.util.internal.PlatformDependent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index bf8a1fc42fc6d..73da9b7346f4d 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -17,19 +17,13 @@
package org.apache.spark.network.util;
-import java.nio.ByteBuffer;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
-import com.google.common.base.Preconditions;
-import com.google.common.io.Closeables;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2a4b88b64cdc9..dabd6261d2aa0 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -25,7 +25,6 @@
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
-import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
index c2d0300ecd904..a2bcca26d8344 100644
--- a/network/shuffle/pom.xml
+++ b/network/shuffle/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
index 3777a18e33f78..026cbd260d16c 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java
@@ -19,16 +19,13 @@
import java.util.concurrent.ConcurrentMap;
-import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.protocol.Encodable;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index 8ed2e0b39ad23..e653f5cb147ee 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -29,7 +29,6 @@
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
-import org.apache.spark.network.util.JavaUtils;
/**
* Simple wrapper on top of a TransportClient which interprets each chunk as a whole block, and
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
index 62fce9b0d16cd..60485bace643c 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java
@@ -23,7 +23,6 @@
import io.netty.buffer.ByteBuf;
import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
/** Request to read a set of blocks. Returns {@link StreamHandle}. */
public class OpenBlocks extends BlockTransferMessage {
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
index 7eb4385044077..38acae3b31d64 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java
@@ -21,7 +21,6 @@
import io.netty.buffer.ByteBuf;
import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
/**
* Initial registration message between an executor and its local shuffle server.
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
index bc9daa6158ba3..9a9220211a50c 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
@@ -20,8 +20,6 @@
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
-
/**
* Identifier for a fixed number of chunks to read from a stream created by an "open blocks"
* message. This is used by {@link org.apache.spark.network.shuffle.OneForOneBlockFetcher}.
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
index 0b23e112bd512..2ff9aaa650f92 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/UploadBlock.java
@@ -23,7 +23,6 @@
import io.netty.buffer.ByteBuf;
import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 67a07f38eb5a0..23b4e06f064e1 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -17,12 +17,12 @@
package org.apache.spark.network.sasl;
-import java.util.Map;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
-import com.google.common.collect.ImmutableMap;
import org.junit.Test;
-import static org.junit.Assert.*;
/**
* Jointly tests SparkSaslClient and SparkSaslServer, as both are black boxes.
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
index 842741e3d354f..b35a6d685dd02 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
@@ -28,11 +28,16 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
index 39b99f54f6dbc..cea7a20c223e2 100644
--- a/network/yarn/pom.xml
+++ b/network/yarn/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/pom.xml b/pom.xml
index 54fe784fe566f..51bef30f9ca8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
14
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
pom
Spark Project Parent POM
@@ -422,6 +422,13 @@
2.42.2
test
+
+
+ xml-apis
+ xml-apis
+ 1.4.01
+ test
+
org.slf4j
slf4j-api
diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst
index b706c5e376ef4..15101470afc07 100644
--- a/python/docs/pyspark.mllib.rst
+++ b/python/docs/pyspark.mllib.rst
@@ -16,6 +16,13 @@ pyspark.mllib.clustering module
:members:
:undoc-members:
+pyspark.mllib.evaluation module
+-------------------------------
+
+.. automodule:: pyspark.mllib.evaluation
+ :members:
+ :undoc-members:
+
pyspark.mllib.feature module
-------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6011caf9f1c5a..78dccc40470e3 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -21,6 +21,8 @@
from threading import Lock
from tempfile import NamedTemporaryFile
+from py4j.java_collections import ListConverter
+
from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
@@ -30,13 +32,11 @@
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
-from pyspark.rdd import RDD
+from pyspark.rdd import RDD, _load_from_socket
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
-from py4j.java_collections import ListConverter
-
__all__ = ['SparkContext']
@@ -59,7 +59,6 @@ class SparkContext(object):
_gateway = None
_jvm = None
- _writeToFile = None
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
@@ -221,7 +220,6 @@ def _ensure_initialized(cls, instance=None, gateway=None):
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
- SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
if instance:
if (SparkContext._active_spark_context and
@@ -840,8 +838,9 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
# by runJob() in order to avoid having to pass a Python lambda into
# SparkContext#runJob.
mappedRDD = rdd.mapPartitions(partitionFunc)
- it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
- return list(mappedRDD._collect_iterator_through_file(it))
+ port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions,
+ allowLocal)
+ return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
def show_profiles(self):
""" Print the profile stats to stdout """
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index 5233c5801e2e6..83880a5afcd1d 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -39,7 +39,7 @@ def fit(self, dataset, params={}):
Fits a model to the input dataset with optional parameters.
:param dataset: input dataset, which is an instance of
- :py:class:`pyspark.sql.SchemaRDD`
+ :py:class:`pyspark.sql.DataFrame`
:param params: an optional param map that overwrites embedded
params
:returns: fitted model
@@ -62,7 +62,7 @@ def transform(self, dataset, params={}):
Transforms the input dataset with optional parameters.
:param dataset: input dataset, which is an instance of
- :py:class:`pyspark.sql.SchemaRDD`
+ :py:class:`pyspark.sql.DataFrame`
:param params: an optional param map that overwrites embedded
params
:returns: transformed dataset
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 4bae96f678388..31a66b3d2f730 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -102,7 +102,7 @@ def _fit_java(self, dataset, params={}):
"""
Fits a Java model to the input dataset.
:param dataset: input dataset, which is an instance of
- :py:class:`pyspark.sql.SchemaRDD`
+ :py:class:`pyspark.sql.DataFrame`
:param params: additional params (overwriting embedded values)
:return: fitted Java model
"""
diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py
new file mode 100644
index 0000000000000..16cb49cc0cfff
--- /dev/null
+++ b/python/pyspark/mllib/evaluation.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.mllib.common import JavaModelWrapper
+from pyspark.sql import SQLContext
+from pyspark.sql.types import StructField, StructType, DoubleType
+
+
+class BinaryClassificationMetrics(JavaModelWrapper):
+ """
+ Evaluator for binary classification.
+
+ >>> scoreAndLabels = sc.parallelize([
+ ... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2)
+ >>> metrics = BinaryClassificationMetrics(scoreAndLabels)
+ >>> metrics.areaUnderROC()
+ 0.70...
+ >>> metrics.areaUnderPR()
+ 0.83...
+ >>> metrics.unpersist()
+ """
+
+ def __init__(self, scoreAndLabels):
+ """
+ :param scoreAndLabels: an RDD of (score, label) pairs
+ """
+ sc = scoreAndLabels.ctx
+ sql_ctx = SQLContext(sc)
+ df = sql_ctx.createDataFrame(scoreAndLabels, schema=StructType([
+ StructField("score", DoubleType(), nullable=False),
+ StructField("label", DoubleType(), nullable=False)]))
+ java_class = sc._jvm.org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
+ java_model = java_class(df._jdf)
+ super(BinaryClassificationMetrics, self).__init__(java_model)
+
+ def areaUnderROC(self):
+ """
+ Computes the area under the receiver operating characteristic
+ (ROC) curve.
+ """
+ return self.call("areaUnderROC")
+
+ def areaUnderPR(self):
+ """
+ Computes the area under the precision-recall curve.
+ """
+ return self.call("areaUnderPR")
+
+ def unpersist(self):
+ """
+ Unpersists intermediate RDDs used in the computation.
+ """
+ self.call("unpersist")
+
+
+def _test():
+ import doctest
+ from pyspark import SparkContext
+ import pyspark.mllib.evaluation
+ globs = pyspark.mllib.evaluation.__dict__.copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest')
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index cb12fed98c53d..bf17f513c0bc3 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -19,7 +19,6 @@
from collections import defaultdict
from itertools import chain, ifilter, imap
import operator
-import os
import sys
import shlex
from subprocess import Popen, PIPE
@@ -29,6 +28,7 @@
import heapq
import bisect
import random
+import socket
from math import sqrt, log, isinf, isnan, pow, ceil
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
@@ -111,6 +111,17 @@ def _parse_memory(s):
return int(float(s[:-1]) * units[s[-1].lower()])
+def _load_from_socket(port, serializer):
+ sock = socket.socket()
+ try:
+ sock.connect(("localhost", port))
+ rf = sock.makefile("rb", 65536)
+ for item in serializer.load_stream(rf):
+ yield item
+ finally:
+ sock.close()
+
+
class Partitioner(object):
def __init__(self, numPartitions, partitionFunc):
self.numPartitions = numPartitions
@@ -698,21 +709,8 @@ def collect(self):
Return a list that contains all of the elements in this RDD.
"""
with SCCallSiteSync(self.context) as css:
- bytesInJava = self._jrdd.collect().iterator()
- return list(self._collect_iterator_through_file(bytesInJava))
-
- def _collect_iterator_through_file(self, iterator):
- # Transferring lots of data through Py4J can be slow because
- # socket.readline() is inefficient. Instead, we'll dump the data to a
- # file and read it back.
- tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
- tempFile.close()
- self.ctx._writeToFile(iterator, tempFile.name)
- # Read the data into Python and deserialize it:
- with open(tempFile.name, 'rb') as tempFile:
- for item in self._jrdd_deserializer.load_stream(tempFile):
- yield item
- os.unlink(tempFile.name)
+ port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
+ return list(_load_from_socket(port, self._jrdd_deserializer))
def reduce(self, f):
"""
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 5c3b7377c33b5..e8ce4547455a5 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -19,13 +19,11 @@
import itertools
import warnings
import random
-import os
-from tempfile import NamedTemporaryFile
from py4j.java_collections import ListConverter, MapConverter
from pyspark.context import SparkContext
-from pyspark.rdd import RDD
+from pyspark.rdd import RDD, _load_from_socket
from pyspark.serializers import BatchedSerializer, PickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync
@@ -310,14 +308,8 @@ def collect(self):
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
"""
with SCCallSiteSync(self._sc) as css:
- bytesInJava = self._jdf.javaToPython().collect().iterator()
- tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
- tempFile.close()
- self._sc._writeToFile(bytesInJava, tempFile.name)
- # Read the data into Python and deserialize it:
- with open(tempFile.name, 'rb') as tempFile:
- rs = list(BatchedSerializer(PickleSerializer()).load_stream(tempFile))
- os.unlink(tempFile.name)
+ port = self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
+ rs = list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
cls = _create_cls(self.schema)
return [cls(r) for r in rs]
diff --git a/python/run-tests b/python/run-tests
index a2c2f37a54eda..b7630c356cfae 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -75,6 +75,7 @@ function run_mllib_tests() {
echo "Run mllib tests ..."
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
+ run_test "pyspark/mllib/evaluation.py"
run_test "pyspark/mllib/feature.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/rand.py"
diff --git a/repl/pom.xml b/repl/pom.xml
index b883344bf0ceb..295f88ea3ecf9 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index f966f25c5a14c..ed9b207a86a0b 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -263,14 +263,14 @@ class ReplSuite extends FunSuite {
assertDoesNotContain("Exception", output)
}
- test("SPARK-2576 importing SQLContext.createSchemaRDD.") {
+ test("SPARK-2576 importing SQLContext.createDataFrame.") {
// We need to use local-cluster to test this case.
val output = runInterpreter("local-cluster[1,1,512]",
"""
|val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- |import sqlContext.createSchemaRDD
+ |import sqlContext.implicits._
|case class TestCaseClass(value: Int)
- |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toSchemaRDD.collect
+ |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
diff --git a/sbin/start-slaves.sh b/sbin/start-slaves.sh
index ba1a84abc1fef..76316a3067c93 100755
--- a/sbin/start-slaves.sh
+++ b/sbin/start-slaves.sh
@@ -64,6 +64,6 @@ else
SPARK_WORKER_WEBUI_PORT=8081
fi
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
- "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" $(( $i + 1 )) "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT" --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
+ "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" $(( $i + 1 )) --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i )) "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
done
fi
diff --git a/sbin/stop-all.sh b/sbin/stop-all.sh
index 971d5d49da664..1a9abe07db844 100755
--- a/sbin/stop-all.sh
+++ b/sbin/stop-all.sh
@@ -17,8 +17,8 @@
# limitations under the License.
#
-# Start all spark daemons.
-# Run this on the master nde
+# Stop all spark daemons.
+# Run this on the master node.
sbin="`dirname "$0"`"
diff --git a/sbin/stop-master.sh b/sbin/stop-master.sh
index b6bdaa4db373c..729702d92191e 100755
--- a/sbin/stop-master.sh
+++ b/sbin/stop-master.sh
@@ -17,7 +17,7 @@
# limitations under the License.
#
-# Starts the master on the machine this script is executed on.
+# Stops the master on the machine this script is executed on.
sbin=`dirname "$0"`
sbin=`cd "$sbin"; pwd`
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index a1947fb022e54..8ad026dbdf8ff 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index c363a5efacde8..54ab13ca352d2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -385,7 +385,7 @@ class SqlParser extends AbstractSparkSQLParser {
protected lazy val dotExpressionHeader: Parser[Expression] =
(ident <~ ".") ~ ident ~ rep("." ~> ident) ^^ {
- case i1 ~ i2 ~ rest => UnresolvedAttribute(i1 + "." + i2 + rest.mkString(".", ".", ""))
+ case i1 ~ i2 ~ rest => UnresolvedAttribute((Seq(i1, i2) ++ rest).mkString("."))
}
protected lazy val dataType: Parser[DataType] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e4e542562f22d..7753331748d7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -310,7 +310,7 @@ class Analyzer(catalog: Catalog,
}
/**
- * In many dialects of SQL is it valid to sort by attributes that are not present in the SELECT
+ * In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
@@ -321,7 +321,8 @@ class Analyzer(catalog: Catalog,
if !s.resolved && p.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
val resolved = unresolved.flatMap(child.resolve(_, resolver))
- val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a })
+ val requiredAttributes =
+ AttributeSet(resolved.flatMap(_.collect { case a: Attribute => a }))
val missingInProject = requiredAttributes -- p.output
if (missingInProject.nonEmpty) {
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index d4c8c687b67bd..3640104e497d4 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ce800e0754559..9c49e84bf9680 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -542,20 +542,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group specificdata
*/
@Experimental
- def jsonFile(path: String, schema: StructType): DataFrame = {
- val json = sparkContext.textFile(path)
- jsonRDD(json, schema)
- }
+ def jsonFile(path: String, schema: StructType): DataFrame =
+ load("json", schema, Map("path" -> path))
/**
* :: Experimental ::
* @group specificdata
*/
@Experimental
- def jsonFile(path: String, samplingRatio: Double): DataFrame = {
- val json = sparkContext.textFile(path)
- jsonRDD(json, samplingRatio)
- }
+ def jsonFile(path: String, samplingRatio: Double): DataFrame =
+ load("json", Map("path" -> path, "samplingRatio" -> samplingRatio.toString))
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 02e5b015e8ec2..3f97a11ceb97d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -34,10 +34,13 @@ import org.apache.spark.sql.execution.SparkPlan
package object sql {
/**
- * Converts a logical plan into zero or more SparkPlans.
+ * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
+ * with the query planner and is not designed to be stable across spark releases. Developers
+ * writing libraries should instead consider using the stable APIs provided in
+ * [[org.apache.spark.sql.sources]]
*/
@DeveloperApi
- protected[sql] type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan]
+ type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan]
/**
* Type alias for [[DataFrame]]. Kept here for backward source compatibility for Scala.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 097bf0dd23c89..4dedcd365f6cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1049,4 +1049,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
rdd.toDF().registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}
+
+ test("SPARK-6145: ORDER BY test for nested fields") {
+ jsonRDD(sparkContext.makeRDD(
+ """{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil)).registerTempTable("nestedOrder")
+ // These should be successfully analyzed
+ sql("SELECT 1 FROM nestedOrder ORDER BY a.b").queryExecution.analyzed
+ sql("SELECT a.b FROM nestedOrder ORDER BY a.b").queryExecution.analyzed
+ sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a").queryExecution.analyzed
+ sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d").queryExecution.analyzed
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 9d94d3406acfb..0c21f725f0b49 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.json
import java.sql.{Date, Timestamp}
+import org.scalactic.Tolerance._
+
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions._
@@ -551,6 +553,32 @@ class JsonSuite extends QueryTest {
jsonDF.registerTempTable("jsonTable")
}
+ test("jsonFile should be based on JSONRelation") {
+ val file = getTempFilePath("json")
+ val path = file.toString
+ sparkContext.parallelize(1 to 100).map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
+ val jsonDF = jsonFile(path, 0.49)
+
+ val analyzed = jsonDF.queryExecution.analyzed
+ assert(
+ analyzed.isInstanceOf[LogicalRelation],
+ "The DataFrame returned by jsonFile should be based on JSONRelation.")
+ val relation = analyzed.asInstanceOf[LogicalRelation].relation
+ assert(
+ relation.isInstanceOf[JSONRelation],
+ "The DataFrame returned by jsonFile should be based on JSONRelation.")
+ assert(relation.asInstanceOf[JSONRelation].path === path)
+ assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001))
+
+ val schema = StructType(StructField("a", LongType, true) :: Nil)
+ val logicalRelation =
+ jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation]
+ val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
+ assert(relationWithSchema.path === path)
+ assert(relationWithSchema.schema === schema)
+ assert(relationWithSchema.samplingRatio > 0.99)
+ }
+
test("Loading a JSON dataset from a text file") {
val file = getTempFilePath("json")
val path = file.toString
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 123a1f629ab1c..f466a3c0b5dc2 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
@@ -41,6 +41,11 @@
spark-hive_${scala.binary.version}
${project.version}
+
+ com.google.guava
+ guava
+ runtime
+
${hive.group}
hive-cli
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 72c474d66055c..0e3f4eb98cbf7 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -21,7 +21,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../../pom.xml
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 1e92ba686a57d..0370b0e9e1aa3 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -20,7 +20,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 985ded9111f74..6bdfe45dc7f83 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.language.postfixOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
diff --git a/tools/pom.xml b/tools/pom.xml
index e7419ed2c607a..181236d1bcbf6 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 65344aa8738e0..c13534f0410a1 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -19,7 +19,7 @@
4.0.0
org.apache.spark
- spark-parent
+ spark-parent_2.10
1.3.0-SNAPSHOT
../pom.xml
|