From b7d42ff3aef06b54b168bd8868b6012de3116aa2 Mon Sep 17 00:00:00 2001 From: prabs Date: Wed, 4 Feb 2015 01:17:38 +0530 Subject: [PATCH] Mqtt streaming support in Python --- external/mqtt-assembly/pom.xml | 105 +++++++++++++++++++++++++++++++ pom.xml | 1 + project/SparkBuild.scala | 6 +- python/pyspark/streaming/mqtt.py | 57 +++++++++++++++++ 4 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 external/mqtt-assembly/pom.xml create mode 100644 python/pyspark/streaming/mqtt.py diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml new file mode 100644 index 0000000000000..01d0c2faed93a --- /dev/null +++ b/external/mqtt-assembly/pom.xml @@ -0,0 +1,105 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.3.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-mqtt-assembly_2.10 + jar + Spark Project External Kafka Assembly + http://spark.apache.org/ + + + streaming-mqtt-assembly + scala-${scala.binary.version} + spark-streaming-mqtt-assembly-${project.version}.jar + ${project.build.directory}/${spark.jar.dir}/${spark.jar.basename} + + + + + org.apache.spark + spark-streaming-mqtt_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + + false + ${spark.jar} + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + reference.conf + + + log4j.properties + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index ffa96128a3d61..df06632997029 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ external/flume-sink external/flume-assembly external/mqtt + external/mqtt-assembly external/zeromq examples repl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 3408c6d51ed4c..7555aa5e3861e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -45,8 +45,8 @@ object BuildCommons { sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", "kinesis-asl").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly) = - Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly") + val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly) = + Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-mqtt-assembly") .map(ProjectRef(buildLocation, _)) val tools = ProjectRef(buildLocation, "tools") @@ -347,7 +347,7 @@ object Assembly { .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String]) }, jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) => - if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly")) { + if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-mqtt-assembly")) { // This must match the same name used in maven (see external/kafka-assembly/pom.xml) s"${mName}-${v}.jar" } else { diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py new file mode 100644 index 0000000000000..423869bffdf74 --- /dev/null +++ b/python/pyspark/streaming/mqtt.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from py4j.java_collections import MapConverter +from py4j.java_gateway import java_import, Py4JError + +from pyspark.storagelevel import StorageLevel +from pyspark.serializers import PairDeserializer, NoOpSerializer +from pyspark.streaming import DStream + +__all__ = ['MQTTUtils'] + + +class MQTTUtils(object): + + @staticmethod + def createStream(ssc, brokerUrl, topic + storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): + """ + Create an input stream that pulls messages from a Mqtt Broker. + :param ssc: StreamingContext object + :param brokerUrl: Url of remote mqtt publisher + :param topic: topic name to subscribe to + :param storageLevel: RDD storage level. + :return: A DStream object + """ + java_import(ssc._jvm, "org.apache.spark.streaming.mqtt.MQTTUtils") + + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + + try: + jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel) + + except Py4JError, e: + # TODO: use --jar once it also work on driver + if not e.message or 'call a package' in e.message: + print "No Mqtt package, please put the assembly jar into classpath:" + print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \ + "scala-*/spark-streaming-mqtt-assembly-*.jar" + raise e + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) + stream = DStream(jstream, ssc, ser) + return stream