diff --git a/e2e-test/common/cypress.py b/e2e-test/common/cypress.py index f8024a58..4c3dc4da 100644 --- a/e2e-test/common/cypress.py +++ b/e2e-test/common/cypress.py @@ -1,6 +1,5 @@ import logging import os -import subprocess logger = logging.getLogger(__name__) @@ -23,12 +22,6 @@ def set_spark_conf(options, yt_client): def set_python_path(python_path, yt_client): - py_version_full = (subprocess.run([python_path, '-V'], capture_output=True).stdout.decode() - .replace("Python", "").strip()) - py_version_short = py_version_full[:py_version_full.index('.', py_version_full.index('.') + 1)] - logger.info(f"Set python interpreter (version {py_version_short}): {python_path}") - python_cluster_paths = {py_version_short: python_path} - yt_client.set("//home/spark/conf/global/python_cluster_paths", python_cluster_paths) set_spark_conf({'spark.pyspark.python': python_path}, yt_client) diff --git a/resource-manager/src/main/scala/org/apache/spark/scheduler/cluster/ytsaurus/YTsaurusOperationManager.scala b/resource-manager/src/main/scala/org/apache/spark/scheduler/cluster/ytsaurus/YTsaurusOperationManager.scala index 47aafbf4..9cc94c03 100644 --- a/resource-manager/src/main/scala/org/apache/spark/scheduler/cluster/ytsaurus/YTsaurusOperationManager.scala +++ b/resource-manager/src/main/scala/org/apache/spark/scheduler/cluster/ytsaurus/YTsaurusOperationManager.scala @@ -32,7 +32,6 @@ private[spark] class YTsaurusOperationManager(val ytClient: YTsaurusClient, user: String, token: String, portoLayers: YTreeNode, filePaths: YTreeNode, - pythonPaths: YTreeMapNode, environment: YTreeMapNode, home: String, prepareEnvCommand: String, @@ -229,15 +228,7 @@ private[spark] class YTsaurusOperationManager(val ytClient: YTsaurusClient, val execEnvironmentBuilder = environment.toMapBuilder if (isPythonApp && conf.get(YTSAURUS_PYTHON_VERSION).isDefined) { - val pythonVersion = conf.get(YTSAURUS_PYTHON_VERSION).get - val pythonPathOpt = pythonPaths.get(pythonVersion) - if (pythonPathOpt.isEmpty) { - throw new SparkException( - s"Python version $pythonVersion is not supported. Please check the path specified in " + - s"$GLOBAL_CONFIG_PATH for supported versions" - ) - } - execEnvironmentBuilder.key("PYSPARK_EXECUTOR_PYTHON").value(pythonPathOpt.get()) + execEnvironmentBuilder.key("PYSPARK_EXECUTOR_PYTHON").value(f"python${conf.get(YTSAURUS_PYTHON_VERSION).get}") } var executorCommand = (Seq( @@ -337,8 +328,6 @@ private[spark] object YTsaurusOperationManager extends Logging { filePathsList } - val pythonPaths = globalConfig.getMapO("python_cluster_paths").orElse(YTree.mapBuilder().buildMap()) - enrichSparkConf(conf, releaseConfig) enrichSparkConf(conf, globalConfig) @@ -372,7 +361,6 @@ private[spark] object YTsaurusOperationManager extends Logging { token, portoLayers, filePaths, - pythonPaths, environment, home, prepareEnvCommand, diff --git a/spyt-package/src/main/python/spyt/client.py b/spyt-package/src/main/python/spyt/client.py index ab0a010d..3ac9e643 100644 --- a/spyt-package/src/main/python/spyt/client.py +++ b/spyt-package/src/main/python/spyt/client.py @@ -189,11 +189,8 @@ class Environment(object): IS_CLUSTER_PYTHON_PATH = False @staticmethod - def configure_python_path(python_cluster_paths): - python_version = "%d.%d" % sys.version_info[:2] - if python_version not in python_cluster_paths: - raise RuntimeError("Python version {} is not supported".format(python_version)) - os.environ["PYSPARK_PYTHON"] = python_cluster_paths[python_version] + def configure_python_path(): + os.environ["PYSPARK_PYTHON"] = "python%d.%d" % sys.version_info[:2] Environment.IS_CLUSTER_PYTHON_PATH = True @staticmethod @@ -274,7 +271,7 @@ def _build_spark_conf(num_executors=None, set_conf(spark_conf, remote_conf["spark_conf"]) if is_client_mode: - Environment.configure_python_path(remote_conf["python_cluster_paths"]) + Environment.configure_python_path() spark_cluster_conf = read_cluster_conf(spark_cluster_conf_path, client=client).get("spark_conf") or {} spark_conf.setAll(spark_cluster_conf.items()) diff --git a/spyt-package/src/main/python/spyt/conf.py b/spyt-package/src/main/python/spyt/conf.py index 753c04b7..8ceec558 100644 --- a/spyt-package/src/main/python/spyt/conf.py +++ b/spyt-package/src/main/python/spyt/conf.py @@ -94,10 +94,6 @@ def latest_compatible_spyt_version(version, client=None): return max(compatible_spyt_versions, key=SpytVersion) -def python_bin_path(global_conf, version): - return global_conf["python_cluster_paths"].get(version) - - def worker_num_limit(global_conf): return global_conf.get("worker_num_limit", 1000) diff --git a/spyt-package/src/main/python/spyt/standalone.py b/spyt-package/src/main/python/spyt/standalone.py index 5b2bb514..e382dcdb 100644 --- a/spyt-package/src/main/python/spyt/standalone.py +++ b/spyt-package/src/main/python/spyt/standalone.py @@ -25,7 +25,7 @@ from .conf import read_remote_conf, validate_cluster_version, \ latest_compatible_spyt_version, update_config_inplace, validate_custom_params, validate_mtn_config, \ - latest_ytserver_proxy_path, read_global_conf, python_bin_path, \ + latest_ytserver_proxy_path, read_global_conf, \ worker_num_limit, validate_worker_num, read_cluster_conf, validate_ssd_config, cuda_toolkit_version # noqa: E402 from .utils import get_spark_master, base_spark_conf, SparkDiscovery, SparkCluster, call_get_proxy_address_url, \ parse_bool, _add_conf # noqa: E402 @@ -155,7 +155,7 @@ def raw_submit(discovery_path, spark_home, spark_args, _add_master(discovery, spark_base_args, rest=True, client=client) _add_shs_option(discovery, spark_base_args, client=client) _add_base_spark_conf(client, discovery, spark_base_args) - _add_python_version(python_version, spark_base_args, client) + _add_python_version(python_version, spark_base_args) _add_dedicated_driver_op_conf(spark_base_args, dedicated_driver_op) _add_ipv6_preference(ipv6_preference_enabled, spark_base_args) spark_env = _create_spark_env(client, spark_home) @@ -203,17 +203,9 @@ def _add_dedicated_driver_op_conf(spark_args, dedicated_driver_op): }, spark_args) -def _add_python_version(python_version, spark_args, client): +def _add_python_version(python_version, spark_args): if python_version is not None: - global_conf = read_global_conf(client=client) - python_path = python_bin_path(global_conf, python_version) - if python_path: - _add_conf({ - "spark.pyspark.python": python_path - }, spark_args) - else: - raise RuntimeError( - "Interpreter for python version `{}` is not found".format(python_version)) + _add_conf({"spark.pyspark.python": f"python{python_version}"}, spark_args) def _add_ipv6_preference(ipv6_preference_enabled, spark_args): diff --git a/tools/release/publisher/config_generator.py b/tools/release/publisher/config_generator.py index 730aadb0..b602f643 100644 --- a/tools/release/publisher/config_generator.py +++ b/tools/release/publisher/config_generator.py @@ -144,6 +144,7 @@ def prepare_global_config(os_release: bool) -> Dict[str, Any]: global_config['latest_spyt_version'] = "1.76.1" global_config['latest_spark_cluster_version'] = "1.75.4" if not os_release: + # is preserved for backward compatibility and is subject to remove python_cluster_paths = { "3.11": "/opt/python3.11/bin/python3.11", "3.12": "/opt/python3.12/bin/python3.12",