From 8ed89a672f5d701ac918ada3f76cad1d3bb5e757 Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Fri, 19 Jun 2015 09:41:33 +0100 Subject: [PATCH] Chain generators to prevent potential deadlock --- python/pyspark/rdd.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index da9d11f689aef..45575f14fa798 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -704,12 +704,15 @@ def pipe_objs(out): out.write(s.encode('utf-8')) out.close() Thread(target=pipe_objs, args=[pipe.stdin]).start() - result = (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b'')) - pipe.wait() - if pipe.returncode: - raise Exception("Pipe function `%s' exited " - "with error code %d" % (command, pipe.returncode)) - return result + def check_return_code(): + pipe.wait() + if pipe.returncode: + raise Exception("Pipe function `%s' exited " + "with error code %d" % (command, pipe.returncode)) + else: + return None + return (x.rstrip(b'\n').decode('utf-8') for x in + chain(iter(pipe.stdout.readline, b''), iter(check_return_code, None))) return self.mapPartitions(func) def foreach(self, f):