Skip to content

Commit

Permalink
[SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkCont…
Browse files Browse the repository at this point in the history
…ext.

The code was failing to propagate the user conf in the case where the
JVM was already initialized, which happens when a user submits a
python script via spark-submit.

Tested with new unit test and by running a python script in a real cluster.

Author: Marcelo Vanzin <[email protected]>

Closes apache#16682 from vanzin/SPARK-19307.
  • Loading branch information
Marcelo Vanzin authored and uzadude committed Jan 27, 2017
1 parent 9ac9e22 commit c2c25fe
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._conf = conf
else:
self._conf = SparkConf(_jvm=SparkContext._jvm)
if conf is not None:
for k, v in conf.getAll():
self._conf.set(k, v)

self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,26 @@ def test_single_script_on_cluster(self):
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))

def test_user_configuration(self):
"""Make sure user configuration is respected (SPARK-19307)"""
script = self.createTempFile("test.py", """
|from pyspark import SparkConf, SparkContext
|
|conf = SparkConf().set("spark.test_config", "1")
|sc = SparkContext(conf = conf)
|try:
| if sc._conf.get("spark.test_config") != "1":
| raise Exception("Cannot find spark.test_config in SparkContext's conf.")
|finally:
| sc.stop()
""")
proc = subprocess.Popen(
[self.sparkSubmit, "--master", "local", script],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out))


class ContextTests(unittest.TestCase):

Expand Down

0 comments on commit c2c25fe

Please sign in to comment.