Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpytDistributions #35

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions spyt-package/src/main/python/spyt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from .arcadia import checked_extract_spark # noqa: E402
from .utils import default_token, default_discovery_dir, get_spark_master, set_conf, \
SparkDiscovery, parse_memory, format_memory, base_spark_conf, parse_bool, get_spyt_home # noqa: E402
from .conf import read_remote_conf, read_global_conf, validate_versions_compatibility, \
read_cluster_conf, SELF_VERSION # noqa: E402
from .conf import validate_versions_compatibility, \
read_cluster_conf, SELF_VERSION, SpytDistributions # noqa: E402
from .standalone import get_spyt_distributions


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,6 +84,7 @@ def spark_session(num_executors=None,
dynamic_allocation=False,
spark_conf_args=None,
local_conf_path=Defaults.LOCAL_CONF_PATH,
spyt_distributions: SpytDistributions = None,
client=None,
spyt_version=None):
def do_create_inner_cluster_session():
Expand All @@ -97,6 +99,7 @@ def do_create_inner_cluster_session():
dynamic_allocation=dynamic_allocation,
spark_conf_args=spark_conf_args,
local_conf_path=local_conf_path,
spyt_distributions=spyt_distributions,
client=client,
spyt_version=spyt_version,
)
Expand Down Expand Up @@ -246,6 +249,7 @@ def _build_spark_conf(num_executors=None,
dynamic_allocation=None,
spark_conf_args=None,
local_conf_path=None,
spyt_distributions: SpytDistributions = None,
client=None,
spyt_version=None):
is_client_mode = os.getenv("IS_SPARK_CLUSTER") is None
Expand All @@ -269,8 +273,9 @@ def _build_spark_conf(num_executors=None,
num_executors, cores_per_executor, executor_memory_per_core,
driver_memory, dynamic_allocation)

global_conf = read_global_conf(client=client)
remote_conf = read_remote_conf(global_conf, spark_cluster_version, client=client)
if spyt_distributions is None:
spyt_distributions = get_spyt_distributions(client)
remote_conf = spyt_distributions.read_remote_conf(spark_cluster_version)
set_conf(spark_conf, remote_conf["spark_conf"])

if is_client_mode:
Expand Down Expand Up @@ -304,6 +309,7 @@ def connect(num_executors=5,
dynamic_allocation=True,
spark_conf_args=None,
local_conf_path=Defaults.LOCAL_CONF_PATH,
spyt_distributions: SpytDistributions = None,
client=None,
spyt_version=None):
conf = _build_spark_conf(
Expand All @@ -317,6 +323,7 @@ def connect(num_executors=5,
dynamic_allocation=dynamic_allocation,
spark_conf_args=spark_conf_args,
local_conf_path=local_conf_path,
spyt_distributions=spyt_distributions,
client=client,
spyt_version=spyt_version,
)
Expand Down
146 changes: 64 additions & 82 deletions spyt-package/src/main/python/spyt/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,11 @@
from spyt.dependency_utils import require_yt_client
require_yt_client()

from yt.wrapper import get, YPath, list as yt_list, exists # noqa: E402
from yt.wrapper import get, YPath, list as yt_list, exists, YtClient # noqa: E402
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of importing and using get method because now you use YtClient instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside the SpytDistributions – yes, but I am not sure about the external methods: client can still be None in those.

from yt.wrapper.common import update_inplace # noqa: E402
from .version import __scala_version__ # noqa: E402
from pyspark import __version__ as spark_version # noqa: E402

SPARK_BASE_PATH = YPath("//home/spark")

CONF_BASE_PATH = SPARK_BASE_PATH.join("conf")
GLOBAL_CONF_PATH = CONF_BASE_PATH.join("global")

SPYT_BASE_PATH = SPARK_BASE_PATH.join("spyt")
DISTRIB_BASE_PATH = SPARK_BASE_PATH.join("distrib")

RELEASES_SUBDIR = "releases"
SNAPSHOTS_SUBDIR = "snapshots"

Expand All @@ -26,6 +18,69 @@
logger = logging.getLogger(__name__)


class SpytDistributions:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this class and the get_spyt_distributions method to SpytDistribution because plural form confuses. It actually points to a different root, but it's not multiple distributions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is really not singular, but plural, because this yt_root contains multiple SPYT and Spark distributions and to choose one you have to provide specific version to it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to support multiple spyt and spark distributions for a single cluster or job? I think there must be specific versions for each YT operation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog //home/spark is organized in a such way that there are multiple versions in it. If I am giving a way to change this path to something else, I am giving way to customize multiple SPYT distributions, not a single one, even though only one will be used.

def __init__(self, client: YtClient, yt_root: str):
self.client = client
self.yt_root = YPath(yt_root)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename yt_root to base_path so it would be SpytDistribution.base_path, I think it makes more sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is called root in the publisher scripts: https://github.com/ytsaurus/ytsaurus-spyt/tree/382ae1656208601175b6f004fd31a39affb17b83/tools/release/publisher
Also it is not just any root or path, but a YT one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think base_path in the context of this class is more suitable

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In context of this class – maybe, but in context of public interface I believe that this should be coherent with the --root parameter from publisher scripts.

self.conf_base_path = self.yt_root.join("conf")
self.global_conf = client.get(self.conf_base_path.join("global"))
self.distrib_base_path = self.yt_root.join("distrib")
self.spyt_base_path = self.yt_root.join("spyt")

def read_remote_conf(self, cluster_version):
version_conf = self.client.get(self._get_version_conf_path(cluster_version))
version_conf["cluster_version"] = cluster_version
return update_inplace(self.global_conf, version_conf) # TODO(alex-shishkin): Might cause undefined behaviour

def latest_ytserver_proxy_path(self, cluster_version):
if cluster_version:
return None
symlink_path = self.global_conf.get("ytserver_proxy_path")
if symlink_path is None:
return None
return self.client.get("{}&/@target_path".format(symlink_path))

def validate_cluster_version(self, spark_cluster_version):
if not exists(self._get_version_conf_path(spark_cluster_version), client=self.client):
raise RuntimeError("Unknown SPYT cluster version: {}. Available release versions are: {}".format(
spark_cluster_version, self.get_available_cluster_versions()
))
spyt_minor_version = SpytVersion(SELF_VERSION).get_minor()
cluster_minor_version = SpytVersion(spark_cluster_version).get_minor()
if spyt_minor_version < cluster_minor_version:
logger.warning("You required SPYT version {} which is older than your local ytsaurus-spyt version {}."
"Please update your local ytsaurus-spyt".format(spark_cluster_version, SELF_VERSION))

def get_available_cluster_versions(self):
subdirs = yt_list(self.conf_base_path.join(RELEASES_SUBDIR), client=self.client)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subdirs can be obtained by calling self.get_available_spyt_versions() so it will remove code duplication

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two different paths used there, so this method cannot be used.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, conf_base_path and spyt_base_path, you're right

return [x for x in subdirs if x != "spark-launch-conf"]

def latest_compatible_spyt_version(self, version):
minor_version = SpytVersion(version).get_minor()
spyt_versions = self.get_available_spyt_versions()
compatible_spyt_versions = [x for x in spyt_versions if SpytVersion(x).get_minor() == minor_version]
if not compatible_spyt_versions:
raise RuntimeError(f"No compatible SPYT versions found for specified version {version}")
return max(compatible_spyt_versions, key=SpytVersion)

def get_available_spyt_versions(self):
return yt_list(self.spyt_base_path.join(RELEASES_SUBDIR), client=self.client)

def get_spark_distributive(self):
distrib_root = self.distrib_base_path.join(spark_version.replace('.', '/'))
distrib_root_contents = yt_list(distrib_root, client=self.client)
spark_tgz = [x for x in distrib_root_contents if x.endswith('.tgz')]
if len(spark_tgz) == 0:
raise RuntimeError(f"Spark {spark_version} tgz distributive doesn't exist "
f"at path {distrib_root} on cluster {self.client.config['proxy']['url']}")
return (spark_tgz[0], distrib_root.join(spark_tgz[0]))

def _get_version_conf_path(self, version):
return self.conf_base_path.join(
SNAPSHOTS_SUBDIR if "SNAPSHOT" in version or "beta" in version or "dev" in version else RELEASES_SUBDIR
).join(version).join("spark-launch-conf")


class SpytVersion:
def __init__(self, version=None, major=0, minor=0, patch=0):
if version is not None:
Expand Down Expand Up @@ -58,18 +113,6 @@ def __str__(self):
return f"{self.major}.{self.minor}.{self.patch}"


def validate_cluster_version(spark_cluster_version, client=None):
if not check_cluster_version_exists(spark_cluster_version, client=client):
raise RuntimeError("Unknown SPYT cluster version: {}. Available release versions are: {}".format(
spark_cluster_version, get_available_cluster_versions(client=client)
))
spyt_minor_version = SpytVersion(SELF_VERSION).get_minor()
cluster_minor_version = SpytVersion(spark_cluster_version).get_minor()
if spyt_minor_version < cluster_minor_version:
logger.warning("You required SPYT version {} which is older than your local ytsaurus-spyt version {}."
"Please update your local ytsaurus-spyt".format(spark_cluster_version, SELF_VERSION))


def validate_versions_compatibility(spyt_version, spark_cluster_version):
spyt_minor_version = SpytVersion(spyt_version).get_minor()
spark_cluster_minor_version = SpytVersion(spark_cluster_version).get_minor()
Expand All @@ -84,15 +127,6 @@ def validate_mtn_config(enablers, network_project, tvm_id, tvm_secret):
raise RuntimeError("When using MTN, network_project arg must be set.")


def latest_compatible_spyt_version(version, client=None):
minor_version = SpytVersion(version).get_minor()
spyt_versions = get_available_spyt_versions(client)
compatible_spyt_versions = [x for x in spyt_versions if SpytVersion(x).get_minor() == minor_version]
if not compatible_spyt_versions:
raise RuntimeError(f"No compatible SPYT versions found for specified version {version}")
return max(compatible_spyt_versions, key=SpytVersion)


def python_bin_path(global_conf, version):
return global_conf["python_cluster_paths"].get(version)

Expand All @@ -115,26 +149,6 @@ def validate_ssd_config(disk_limit, disk_account):
raise RuntimeError("Disk account must be provided to use disk limit, please add --worker-disk-account option")


def get_available_cluster_versions(client=None):
subdirs = yt_list(CONF_BASE_PATH.join(RELEASES_SUBDIR), client=client)
return [x for x in subdirs if x != "spark-launch-conf"]


def check_cluster_version_exists(cluster_version, client=None):
return exists(_get_version_conf_path(cluster_version), client=client)


def read_global_conf(client=None):
return client.get(GLOBAL_CONF_PATH)


def read_remote_conf(global_conf, cluster_version, client=None):
version_conf_path = _get_version_conf_path(cluster_version)
version_conf = get(version_conf_path, client=client)
version_conf["cluster_version"] = cluster_version
return update_inplace(global_conf, version_conf) # TODO(alex-shishkin): Might cause undefined behaviour


def read_cluster_conf(path=None, client=None):
if path is None:
return {}
Expand All @@ -156,41 +170,9 @@ def validate_custom_params(params):
"Use argument 'enablers' instead")


def get_available_spyt_versions(client=None):
return yt_list(SPYT_BASE_PATH.join(RELEASES_SUBDIR), client=client)


def latest_ytserver_proxy_path(cluster_version, client=None):
if cluster_version:
return None
global_conf = read_global_conf(client=client)
symlink_path = global_conf.get("ytserver_proxy_path")
if symlink_path is None:
return None
return get("{}&/@target_path".format(symlink_path), client=client)


def ytserver_proxy_attributes(path, client=None):
return get("{}/@user_attributes".format(path), client=client)


def get_spark_distributive(client):
distrib_root = DISTRIB_BASE_PATH.join(spark_version.replace('.', '/'))
distrib_root_contents = yt_list(distrib_root, client=client)
spark_tgz = [x for x in distrib_root_contents if x.endswith('.tgz')]
if len(spark_tgz) == 0:
raise RuntimeError(f"Spark {spark_version} tgz distributive doesn't exist "
f"at path {distrib_root} on cluster {client.config['proxy']['url']}")
return (spark_tgz[0], distrib_root.join(spark_tgz[0]))


def _get_or_else(d, key, default):
return d.get(key) or default


def _version_subdir(version):
return SNAPSHOTS_SUBDIR if "SNAPSHOT" in version or "beta" in version or "dev" in version else RELEASES_SUBDIR


def _get_version_conf_path(cluster_version):
return CONF_BASE_PATH.join(_version_subdir(cluster_version)).join(cluster_version).join("spark-launch-conf")
8 changes: 5 additions & 3 deletions spyt-package/src/main/python/spyt/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from yt.wrapper.http_helpers import get_token, get_user_name # noqa: E402
from yt.wrapper.spec_builders import VanillaSpecBuilder # noqa: E402

from .conf import ytserver_proxy_attributes, get_spark_distributive # noqa: E402
from .conf import ytserver_proxy_attributes, SpytDistributions # noqa: E402
from .utils import SparkDiscovery, call_get_proxy_address_url, parse_memory # noqa: E402
from .enabler import SpytEnablers # noqa: E402
from .version import __version__ # noqa: E402
Expand Down Expand Up @@ -43,6 +43,7 @@ class SparkDefaultArguments(object):
@staticmethod
def get_params():
return {
"spyt_distributions": { "yt_root": "//home/spark" },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rename this parameter to spyt_distribution or spyt_distribution_root and set it to //home/spark by default because it is the only sensible value in this structure for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this entity parameter nested so it would be possible to support other way of distributing YT, for example – "spyt_distributions": {"local_path": "/opt/spyt" }. I don't think that this should be a flat string parameter.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in this pull request you don't support it. I think it should be kept as simple as possible, and if we decide to support other ways of specifying distribution (btw, is there a need for this?) we just refactor it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've meant that if we will want to support it we will have to add a new conflicting parameter entity instead of extending an existing one. That's why I didn't want to make it a flat string.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what's the possibility of such situation? I think it's almost none

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that in the future we will most likely want to support ways of distributing SPYT different from a YT catalog, for example in Docker images, like the Spark does.

"operation_spec": {
"annotations": {
"is_spark": True,
Expand Down Expand Up @@ -362,15 +363,16 @@ def _script_absolute_path(script):
.end_task()


def build_spark_operation_spec(config: dict, client: YtClient,
def build_spark_operation_spec(config: dict, spyt_distributions: SpytDistributions,
job_types: List[str], common_config: CommonComponentConfig,
master_config: MasterConfig = None, worker_config: WorkerConfig = None,
hs_config: HistoryServerConfig = None, livy_config: LivyConfig = None):
client = spyt_distributions.client
if job_types == [] or job_types is None:
job_types = ['master', 'history', 'worker']

spark_home = "./tmpfs" if common_config.enable_tmpfs else "."
spark_distributive_tgz, spark_distributive_path = get_spark_distributive(client)
spark_distributive_tgz, spark_distributive_path = spyt_distributions.get_spark_distributive()

extra_java_opts = ["-Dlog4j.loglevel={}".format(common_config.cluster_log_level)]
if common_config.enablers.enable_preference_ipv6:
Expand Down
Loading
Loading