diff --git a/examples/src/main/python/streaming/mqtt_wordcount.py b/examples/src/main/python/streaming/mqtt_wordcount.py index 3c5a73d25c2e3..0fb585e58fbe4 100644 --- a/examples/src/main/python/streaming/mqtt_wordcount.py +++ b/examples/src/main/python/streaming/mqtt_wordcount.py @@ -25,7 +25,7 @@ `$ bin/run-example \ org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` and then run the example as - `$ bin/spark-submit --driver-class-path external/mqtt-assembly/target/scala-*/\ + `$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\ spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \ tcp://localhost:1883 foo` """ diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml index 33f7f08c9cbdc..7c5ba7051ac15 100644 --- a/external/mqtt-assembly/pom.xml +++ b/external/mqtt-assembly/pom.xml @@ -21,14 +21,14 @@ org.apache.spark spark-parent_2.10 - 1.4.0-SNAPSHOT + 1.5.0-SNAPSHOT ../../pom.xml org.apache.spark spark-streaming-mqtt-assembly_2.10 jar - Spark Project External Kafka Assembly + Spark Project External MQTT Assembly http://spark.apache.org/ diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index d87e9fcbfd67c..558965a70b5a5 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -44,12 +44,28 @@ def createStream(ssc, brokerUrl, topic, try: jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel) - except Py4JError, e: - # TODO: use --jar once it also work on driver - if not e.message or 'call a package' in e.message: - print "No Mqtt package, please put the assembly jar into classpath:" - print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \ - "scala-*/spark-streaming-mqtt-assembly-*.jar" + if 'ClassNotFoundException' in str(e.java_exception): + MQTTUtils._printErrorMsg(ssc.sparkContext) raise e return DStream(jstream, ssc, UTF8Deserializer()) + + @staticmethod + def _printErrorMsg(sc): + print(""" +________________________________________________________________________________________________ + + Spark Streaming's MQTT libraries not found in class path. Try one of the following. + + 1. Include the MQTT library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s. + Then, include the jar in the spark-submit command as + + $ bin/spark-submit --jars ... +________________________________________________________________________________________________ +""" % (sc.version, sc.version))