From 84be4c8d6fd52774462762f1f5972f60d286c289 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 17 Apr 2017 10:03:42 -0700 Subject: [PATCH] [SPARK-19019][PYTHON][BRANCH-2.0] Fix hijacked `collections.namedtuple` and port cloudpickle changes for PySpark to work with Python 3.6.0 ## What changes were proposed in this pull request? This PR proposes to backports https://github.com/apache/spark/pull/16429 to branch-2.0 so that Python 3.6.0 works with Spark 2.0.x. ## How was this patch tested? Manually, via ``` ./run-tests --python-executables=python3.6 ``` ``` Finished test(python3.6): pyspark.tests (124s) Finished test(python3.6): pyspark.accumulators (4s) Finished test(python3.6): pyspark.broadcast (4s) Finished test(python3.6): pyspark.conf (3s) Finished test(python3.6): pyspark.context (15s) Finished test(python3.6): pyspark.ml.classification (24s) Finished test(python3.6): pyspark.sql.tests (190s) Finished test(python3.6): pyspark.mllib.tests (190s) Finished test(python3.6): pyspark.ml.clustering (14s) Finished test(python3.6): pyspark.ml.linalg.__init__ (0s) Finished test(python3.6): pyspark.ml.recommendation (18s) Finished test(python3.6): pyspark.ml.feature (28s) Finished test(python3.6): pyspark.ml.evaluation (28s) Finished test(python3.6): pyspark.ml.regression (21s) Finished test(python3.6): pyspark.ml.tuning (17s) Finished test(python3.6): pyspark.streaming.tests (239s) Finished test(python3.6): pyspark.mllib.evaluation (15s) Finished test(python3.6): pyspark.mllib.classification (24s) Finished test(python3.6): pyspark.mllib.clustering (37s) Finished test(python3.6): pyspark.mllib.linalg.__init__ (0s) Finished test(python3.6): pyspark.mllib.fpm (19s) Finished test(python3.6): pyspark.mllib.feature (19s) Finished test(python3.6): pyspark.mllib.random (8s) Finished test(python3.6): pyspark.ml.tests (76s) Finished test(python3.6): pyspark.mllib.stat.KernelDensity (0s) Finished test(python3.6): pyspark.mllib.recommendation (21s) Finished test(python3.6): pyspark.mllib.linalg.distributed (27s) Finished test(python3.6): pyspark.mllib.regression (22s) Finished test(python3.6): pyspark.mllib.stat._statistics (11s) Finished test(python3.6): pyspark.mllib.tree (16s) Finished test(python3.6): pyspark.profiler (8s) Finished test(python3.6): pyspark.shuffle (1s) Finished test(python3.6): pyspark.mllib.util (17s) Finished test(python3.6): pyspark.serializers (12s) Finished test(python3.6): pyspark.rdd (18s) Finished test(python3.6): pyspark.sql.conf (4s) Finished test(python3.6): pyspark.sql.catalog (14s) Finished test(python3.6): pyspark.sql.column (13s) Finished test(python3.6): pyspark.sql.context (15s) Finished test(python3.6): pyspark.sql.group (26s) Finished test(python3.6): pyspark.sql.dataframe (31s) Finished test(python3.6): pyspark.sql.functions (32s) Finished test(python3.6): pyspark.sql.types (5s) Finished test(python3.6): pyspark.sql.streaming (11s) Finished test(python3.6): pyspark.sql.window (5s) Finished test(python3.6): pyspark.streaming.util (0s) Finished test(python3.6): pyspark.sql.session (15s) Finished test(python3.6): pyspark.sql.readwriter (34s) Tests passed in 376 seconds ``` Author: hyukjinkwon Closes #17374 from HyukjinKwon/SPARK-19019-backport. --- python/pyspark/cloudpickle.py | 98 ++++++++++++++++++++++++----------- python/pyspark/serializers.py | 20 +++++++ 2 files changed, 87 insertions(+), 31 deletions(-) diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 822ae46e45111..94168f041d91c 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -43,6 +43,7 @@ from __future__ import print_function import operator +import opcode import os import io import pickle @@ -53,6 +54,8 @@ import itertools import dis import traceback +import weakref + if sys.version < '3': from pickle import Pickler @@ -68,10 +71,10 @@ PY3 = True #relevant opcodes -STORE_GLOBAL = dis.opname.index('STORE_GLOBAL') -DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL') -LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL') -GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL] +STORE_GLOBAL = opcode.opmap['STORE_GLOBAL'] +DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL'] +LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL'] +GLOBAL_OPS = (STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL) HAVE_ARGUMENT = dis.HAVE_ARGUMENT EXTENDED_ARG = dis.EXTENDED_ARG @@ -90,6 +93,43 @@ def _builtin_type(name): return getattr(types, name) +if sys.version_info < (3, 4): + def _walk_global_ops(code): + """ + Yield (opcode, argument number) tuples for all + global-referencing instructions in *code*. + """ + code = getattr(code, 'co_code', b'') + if not PY3: + code = map(ord, code) + + n = len(code) + i = 0 + extended_arg = 0 + while i < n: + op = code[i] + i += 1 + if op >= HAVE_ARGUMENT: + oparg = code[i] + code[i + 1] * 256 + extended_arg + extended_arg = 0 + i += 2 + if op == EXTENDED_ARG: + extended_arg = oparg * 65536 + if op in GLOBAL_OPS: + yield op, oparg + +else: + def _walk_global_ops(code): + """ + Yield (opcode, argument number) tuples for all + global-referencing instructions in *code*. + """ + for instr in dis.get_instructions(code): + op = instr.opcode + if op in GLOBAL_OPS: + yield op, instr.arg + + class CloudPickler(Pickler): dispatch = Pickler.dispatch.copy() @@ -250,38 +290,34 @@ def save_function_tuple(self, func): write(pickle.TUPLE) write(pickle.REDUCE) # applies _fill_function on the tuple - @staticmethod - def extract_code_globals(co): + _extract_code_globals_cache = ( + weakref.WeakKeyDictionary() + if sys.version_info >= (2, 7) and not hasattr(sys, "pypy_version_info") + else {}) + + @classmethod + def extract_code_globals(cls, co): """ Find all globals names read or written to by codeblock co """ - code = co.co_code - if not PY3: - code = [ord(c) for c in code] - names = co.co_names - out_names = set() - - n = len(code) - i = 0 - extended_arg = 0 - while i < n: - op = code[i] + out_names = cls._extract_code_globals_cache.get(co) + if out_names is None: + try: + names = co.co_names + except AttributeError: + # PyPy "builtin-code" object + out_names = set() + else: + out_names = set(names[oparg] + for op, oparg in _walk_global_ops(co)) - i += 1 - if op >= HAVE_ARGUMENT: - oparg = code[i] + code[i+1] * 256 + extended_arg - extended_arg = 0 - i += 2 - if op == EXTENDED_ARG: - extended_arg = oparg*65536 - if op in GLOBAL_OPS: - out_names.add(names[oparg]) + # see if nested function have any global refs + if co.co_consts: + for const in co.co_consts: + if type(const) is types.CodeType: + out_names |= cls.extract_code_globals(const) - # see if nested function have any global refs - if co.co_consts: - for const in co.co_consts: - if type(const) is types.CodeType: - out_names |= CloudPickler.extract_code_globals(const) + cls._extract_code_globals_cache[co] = out_names return out_names diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 2a1326947f4f5..a9e14b8f7033b 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -370,18 +370,38 @@ def _hijack_namedtuple(): return global _old_namedtuple # or it will put in closure + global _old_namedtuple_kwdefaults # or it will put in closure too def _copy_func(f): return types.FunctionType(f.__code__, f.__globals__, f.__name__, f.__defaults__, f.__closure__) + def _kwdefaults(f): + # __kwdefaults__ contains the default values of keyword-only arguments which are + # introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple + # are as below: + # + # - Does not exist in Python 2. + # - Returns None in <= Python 3.5.x. + # - Returns a dictionary containing the default values to the keys from Python 3.6.x + # (See https://bugs.python.org/issue25628). + kargs = getattr(f, "__kwdefaults__", None) + if kargs is None: + return {} + else: + return kargs + _old_namedtuple = _copy_func(collections.namedtuple) + _old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple) def namedtuple(*args, **kwargs): + for k, v in _old_namedtuple_kwdefaults.items(): + kwargs[k] = kwargs.get(k, v) cls = _old_namedtuple(*args, **kwargs) return _hack_namedtuple(cls) # replace namedtuple with new one + collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple collections.namedtuple.__code__ = namedtuple.__code__