Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark on Kubernetes integration #1030

Closed
h4gen opened this issue Nov 20, 2018 · 24 comments
Closed

Spark on Kubernetes integration #1030

h4gen opened this issue Nov 20, 2018 · 24 comments

Comments

@h4gen
Copy link

h4gen commented Nov 20, 2018

Hello everybody!

I am using pangeo as configuration for my JupyterHub but decided to post this issue here as I think it is not pangeo specific. As some of you maybe know the current version of Spark (2.4) introduces PySpark support for the new kubernetes functionality of spark. I tried to get it running on my cluster by adapting this tutorial. I know this issue is primarily a PySpark issue but I thought it is maybe interesting to discuss it here as I can imagine it is interesting for other JupyterHub users too. Here is what I did:

  1. Adding this configuration to my cluster:
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 namespace: default
 name: spark-role
rules:
- apiGroups: [“”]
 resources: [“pods”]
 verbs: [“get”, “watch”, “list”, “edit”, “create”, “delete”]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 name: spark
 namespace: default
subjects:
- kind: User
 name: system:serviceaccount:pangeo:daskkubernetes
 apiGroup: “”
roleRef:
 kind: Role
 name: spark-role
 apiGroup: “”

This extends the rights of the daskkubernetes service account which is necessary for pangeo to interact with dask properly.

  1. Creating a user pod from the current Jupyter PySpark docker image, which supports Spark 2.4

  2. Getting my master ip with kubectl cluster-info

  3. Create an new SparkContext in the following way:

sc = SparkContext(master='k8s://https://<Master-IP>')

This is the output of the context:
image
When I do kubectl get po --all-namespaces I can not see a new Spark pod running.

  1. Try to create a dummy data frame with:
sqlc = SQLContext(sc)
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlc.createDataFrame(people)

The last line sadly stucks and when I interrupt it this is the output:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-6-f2a7a0f2bef0> in <module>
      3 rdd = sc.parallelize(l)
      4 people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
----> 5 schemaPeople = sqlc.createDataFrame(people)

/usr/local/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    305         Py4JJavaError: ...
    306         """
--> 307         return self.sparkSession.createDataFrame(data, schema, samplingRatio, verifySchema)
    308 
    309     @since(1.3)

/usr/local/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    744 
    745         if isinstance(data, RDD):
--> 746             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    747         else:
    748             rdd, schema = self._createFromLocal(map(prepare, data), schema)

/usr/local/spark/python/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
    388         """
    389         if schema is None or isinstance(schema, (list, tuple)):
--> 390             struct = self._inferSchema(rdd, samplingRatio, names=schema)
    391             converter = _create_converter(struct)
    392             rdd = rdd.map(converter)

/usr/local/spark/python/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
    359         :return: :class:`pyspark.sql.types.StructType`
    360         """
--> 361         first = rdd.first()
    362         if not first:
    363             raise ValueError("The first row in RDD is empty, "

/usr/local/spark/python/pyspark/rdd.py in first(self)
   1376         ValueError: RDD is empty
   1377         """
-> 1378         rs = self.take(1)
   1379         if rs:
   1380             return rs[0]

/usr/local/spark/python/pyspark/rdd.py in take(self, num)
   1358 
   1359             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1360             res = self.context.runJob(self, takeUpToNumLeft, p)
   1361 
   1362             items += res

/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1049         # SparkContext#runJob.
   1050         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1051         sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1052         return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
   1053 

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1253             proto.END_COMMAND_PART
   1254 
-> 1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
   1257             answer, self.gateway_client, self.target_id, self.name)

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
    983         connection = self._get_connection()
    984         try:
--> 985             response = connection.send_command(command)
    986             if binary:
    987                 return response, self._create_connection_guard(connection)

/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in send_command(self, command)
   1150 
   1151         try:
-> 1152             answer = smart_decode(self.stream.readline()[:-1])
   1153             logger.debug("Answer received: {0}".format(answer))
   1154             if answer.startswith(proto.RETURN_MESSAGE):

/opt/conda/lib/python3.6/socket.py in readinto(self, b)
    584         while True:
    585             try:
--> 586                 return self._sock.recv_into(b)
    587             except timeout:
    588                 self._timeout_occurred = True

Referring to the tutorial I think that the SparkContext needs more Information to run correctly. Sadly it does not throw any errors when creating it. Is there anybody with more spark/kubernetes knowledge interested in trying it and sharing insights?
Edit:
The main problem seems to be that referring to the pyspark api, there is no way to provide the necessary information to the SparkContext.

Thank you very much!

@h4gen
Copy link
Author

h4gen commented Nov 21, 2018

Okay, I just figured out, that it is necessary to provide the self built docker images based on the executing environment (in this case the jupyter pyspark docker image) and to provide it to the SparkContext probably via the SparkConf object. I'll keep you posted.

@consideRatio
Copy link
Member

Thanks for documenting what you learn @h4gen!

@h4gen
Copy link
Author

h4gen commented Nov 21, 2018

So, some progress over here. I did the following to build the images and provide the necessary information to the SparkContext:

  1. Run the jupyter pyspark image:
docker run -i --rm -e GRANT_SUDO=yes \
-v /var/run/docker.sock:/var/run/docker.sock \ # This is important to expose the hosts docker daemon
jupyter/pyspark-notebook:5b2160dfd919 # Tag with spark 2.4

  1. Enter the running container to build executor images from environment
docker exec -it -u root <CONTAINER-ID> bash
  1. Install docker in container following this example

  2. Build executor images from env following the Spark on Kubernetes example

cd $SPARK_HOME
./bin/docker-image-tool.sh -r <repo> -t my-tag build 
./bin/docker-image-tool.sh -r <repo> -t my-tag push

Feel free to skip these steps and use my pre-built images from docker hub for testing out yourself (assuming I made no mistakes so far):

idalab/spark
idalab/spark-r
idalab/spark-py
  1. This information ist then provided to the SparkConf on the user pod:
conf.setMaster('k8s://https://<MASTER-IP>')
conf.set('spark.kubernetes.container.image', 'idalab/spark-py:spark')
conf.set('spark.submit.deployMode', 'cluster')
conf.set('spark.executor.instances', 2)
conf.setAppName('spark-k8s-test')
  1. And the conf is provided to the SparkContext:
sc = SparkContext(conf=conf)

Now I get the following error:

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-9-66f9c693822e> in <module>
----> 1 sc = SparkContext(conf=conf)

/usr/local/spark/python/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    113         """
    114         self._callsite = first_spark_call() or CallSite(None, None, None)
--> 115         SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    116         try:
    117             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,

/usr/local/spark/python/pyspark/context.py in _ensure_initialized(cls, instance, gateway, conf)
    296         with SparkContext._lock:
    297             if not SparkContext._gateway:
--> 298                 SparkContext._gateway = gateway or launch_gateway(conf)
    299                 SparkContext._jvm = SparkContext._gateway.jvm
    300 

/usr/local/spark/python/pyspark/java_gateway.py in launch_gateway(conf)
     92 
     93             if not os.path.isfile(conn_info_file):
---> 94                 raise Exception("Java gateway process exited before sending its port number")
     95 
     96             with open(conn_info_file, "rb") as info:

Exception: Java gateway process exited before sending its port number

Short google query leads to the asumption that this is a sudo problem in the user pod. Will investigate further the next days.

Cheers!

@h4gen
Copy link
Author

h4gen commented Nov 27, 2018

Hello everyone,

I further investigated the error and came up with a running configuration.
I am running the following code on my user pod to get spark running:

from pyspark import *
import os

# Config Spark
conf.setMaster('k8s://https://<MASTER_IP>:443') # The port is important. Otherwise it won't run.
conf.set('spark.kubernetes.container.image', 'idalab/spark-py:spark') # Provide the image for the executor pods
conf.set('spark.submit.deployMode', 'client') # Only client mode is possible 
conf.set('spark.executor.instances', '2') # Set the number of executer pods
conf.setAppName('pyspark-shell')
conf.set('spark.driver.host', '<USER_POD_IP>') # This is the IP of the user pod in the K8s cluster
os.environ['PYSPARK_PYTHON'] = 'python3' # Needs to be explicitly provided as env. Otherwise workers run Python 2.7
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'  # Same

# Create context
sc = SparkContext(conf=conf) 

So far everything I tried worked pretty nice. I will update this if I encounter any further problems.

Cheers!

@consideRatio
Copy link
Member

Wieeee thank you so luch @h4gen! This may help me out personally a lot in the future!

@h4gen h4gen changed the title PySpark Kubernetes integration Spark on Kubernetes integration Nov 28, 2018
@h4gen
Copy link
Author

h4gen commented Nov 28, 2018

A few further thoughts on this.

  1. Right now the cluster admin has to provide the user with the <MASTER_IP> of the cluster to be able to connect to create the executor pods. Is there a more convenient way of asking for this IP for the user? As far as I understand the pods are assigned a DNS name like jupyter-username. Is there something similar for the K8s master? Or would it be possible for the hub to expose the <MASTER_IP> as environment variable in the user pod?

  2. Currently the UI is not accessible. As far as I can see pangeo uses the nbserverproxy to expose the dask dashboard to the user. I think this would be also easy to do for the Jupyter Spark image. The question is whether the link for the dashboard in the SparkContext can be manipulated by configuration to make it more easily accessible for the user. Right now the user would have to type-in the correct link himself (which is annoying to explain for a lot of users).

@consideRatio
Copy link
Member

some loose thoughts, no clear answer: writing from mobile

The hub.extraConfig can expand the dictionary found in c.KubeSpawner.environment, this will influence the env of spawned user pods if u need to find it programmatically.

but u van also use the charts singleuser.extraEnv to set it directly from the chart config.

if you have a k8s service pointing to some pod, u can reach it with mysvc.mynamespace.svc.cluster.local ar URI btw.

note that the the jupyter-username is a pod name, and u cannot access that as a network identifier like google.se. if u would need that u would need to create a service for each user pod pointing to a pod with certain labels.

i think the master may always be reached with a fixed ip on GKE and other managed k8s procided by some cloud provider btw

@dsludwig
Copy link

dsludwig commented Nov 28, 2018

Kubernetes creates the environment variable KUBERNETES_SERVICE_HOST, which contains the master IP (the IP for the kubernetes service, actually).

See: https://kubernetes.io/docs/concepts/services-networking/service/#discovering-services

To get the pod IP, it is probably most convenient to use Kubernetes' downward API: https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information/#use-pod-fields-as-values-for-environment-variables

env:
        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP

(You could also just call hostname -i, or write the equivalent Python code).

@consideRatio
Copy link
Member

Slam dunk @dsludwig ! :D

@betatim
Copy link
Member

betatim commented Nov 28, 2018

This is a cool write up with lots of useful bits of information. Could you be persuaded to write it up once more now that you know what "the answer" is and then post it on https://discourse.jupyter.org/c/jupyterhub/z2jh-k8s? Our thinking is that the discourse forum is a lot more discoverable (and better indexed by google?) than GitHub issues. Something to ponder.

@h4gen
Copy link
Author

h4gen commented Nov 28, 2018

@dsludwig Thank you for the information. That was exactly what I was looking for. @betatim Sure thing! Will do it the next days.

@h4gen
Copy link
Author

h4gen commented Nov 30, 2018

Before I do the documentation I would like to ask again if somebody can give me a hint regarding the Spark UI problem. I assumed it would be working with nbserverextension, but it does not :( I sum up all Information, I have:

  1. I create the context which gives me the link to the UI.
  2. The link created by Spark is obviously not accessible on the hub as it points to <POD_IP>:4040
  3. I checked on a local version of the user image if the UI is accessible. That works fine!
  4. As I want to access the port of the POD via browser, I installed nbserverextension in the pod to make it accessible via .../user/proxy/4040 but this does not seem to work
  5. Other ports are accessible via this method so I assume nbserverextension is working correctly.
  6. This is the output of npnetstat -pl:
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 localhost:51084         0.0.0.0:*               LISTEN      23/python
tcp        0      0 localhost:42415         0.0.0.0:*               LISTEN      26/python
tcp        0      0 localhost:58607         0.0.0.0:*               LISTEN      26/python
tcp        0      0 localhost:39601         0.0.0.0:*               LISTEN      23/python
tcp        0      0 localhost:34996         0.0.0.0:*               LISTEN      27/python
tcp        0      0 localhost:41208         0.0.0.0:*               LISTEN      27/python
tcp        0      0 0.0.0.0:8888            0.0.0.0:*               LISTEN      7/python
tcp        0      0 localhost:58553         0.0.0.0:*               LISTEN      23/python
tcp        0      0 jupyter-hagen:45243     0.0.0.0:*               LISTEN      74/java
tcp        0      0 localhost:38241         0.0.0.0:*               LISTEN      23/python
tcp        0      0 localhost:35746         0.0.0.0:*               LISTEN      27/python
tcp        0      0 localhost:38050         0.0.0.0:*               LISTEN      26/python
tcp        0      0 localhost:52964         0.0.0.0:*               LISTEN      26/python
tcp        0      0 localhost:60869         0.0.0.0:*               LISTEN      26/python
tcp        0      0 localhost:59910         0.0.0.0:*               LISTEN      27/python
tcp        0      0 jupyter-hagen:42343     0.0.0.0:*               LISTEN      74/java
tcp        0      0 localhost:47911         0.0.0.0:*               LISTEN      26/python
tcp        0      0 0.0.0.0:4040            0.0.0.0:*               LISTEN      74/java
tcp        0      0 localhost:35305         0.0.0.0:*               LISTEN      27/python
tcp        0      0 localhost:40810         0.0.0.0:*               LISTEN      23/python
tcp        0      0 localhost:36362         0.0.0.0:*               LISTEN      23/python
tcp        0      0 localhost:43627         0.0.0.0:*               LISTEN      74/java
tcp        0      0 localhost:45067         0.0.0.0:*               LISTEN      27/python
tcp        0      0 localhost:57547         0.0.0.0:*               LISTEN      26/python
Active UNIX domain sockets (only servers)
Proto RefCnt Flags       Type       State         I-Node   PID/Program name     Path

One can see that the exposed Local Address has another format than the other ones which are accessible.
7. This output has the environment variable '_JAVA_OPTIONS' set to "-Djava.net.preferIPv4Stack=true" as I thought it would be an IPv6 problem which seems to be the standard for java but it did not resolve the issue.

Any ideas on this? I'm a bit lost. Thank you!

@ryanlovett
Copy link
Collaborator

@h4gen Can you submit this as an issue to nbserverproxy so we can discuss it there? A couple of simple things to try:

  • Make sure nbserverproxy is installed before you start your notebook server.
  • Append a slash to the proxy url, e.g. .../{user}/proxy/4040/
    Otherwise please post the error you're seeing to the nbserverproxy issue.

@easel
Copy link

easel commented Dec 2, 2018

Not entirely sure if pangeo is sufficiently similar for this to be useful or not, but I'll throw it up here for the record. I've got Spark 2.4 on Kubernetes running nicely with standard Toree kernel and version 0.7 of the jupyterhub helm charts:

Create a config.yaml:

singleuser:
  serviceAccountName: spark
  image:
    name: jupyter/all-spark-notebook
    tag: 14fdfbf9cfc1
  extraEnv:
    SPARK_OPTS: >-
      --deploy-mode=client
      --master k8s://https://kubernetes.default.svc
      --conf spark.kubernetes.namespace=`cat /var/run/secrets//kubernetes.io/serviceaccount/namespace`
      --conf spark.driver.pod.name=${HOSTNAME}
      --conf spark.driver.host=`hostname -i`
      --conf spark.driver.port=19998
      --conf spark.kubernetes.container.image=docker.io/7thsense/spark:spark24  
      --conf spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token
      --conf spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt 

Then install.
helm upgrade jhub --install jupyterhub/jupyterhub --namespace jhub --version 0.7.0 --values config.yaml

I've elided the spark RBAC setup but it's the usual stuff from the spark-on-kubernetes docs.

At least for me, figuring out how to use the service account credentials (and that they were needed) and getting the pod context configuration (namespace, pod name, ip) into the environment variable were tricky since most of the examples I've seen were using downward config maps which don't seem to "fit" into extraVars.

@h4gen
Copy link
Author

h4gen commented Dec 2, 2018

@easel This is super helpful! A few questions regarding your setup:

  1. Did you also create your executor images from your environment as is described here or do you have a more convenient way?
  2. Is your Spark UI working with this setup?
  3. How exactly do you evaluate your SPARK_OPTS at container start? I think this is pretty elegant.

@easel
Copy link

easel commented Dec 2, 2018

Hi @h4gen, answers per above:

  1. I just built the images per the "spark on Kubernetes" docs at https://spark.apache.org/docs/latest/running-on-kubernetes.html#docker-images on my Mac -- the url's above are in a public repo though and you could just use them.

  2. Spark ui works but you have to forward the port into the single-user pod, something like kubectl port-forward pod/jupyter-erik 4040:4040. It would be cool to get it wired back into the hub or an ingress somehow but I haven't done anything like that.

  3. I may not fully understand your question. It just happens "magically", the extraVars pushes environment variables into the kernel and Toree picks it up. I guess maybe I got the idea from https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-an-apache-toree-scala-notebook ?

@h4gen
Copy link
Author

h4gen commented Dec 3, 2018

Thanks for the answers @easel. Few more questions:

  1. Do you have to forward every uder pod by hand or is there a way to forward all automatically? Maybe Spark UI not accessible jupyter-server-proxy#57 Is also interesting for you.

  2. I see now how the variables are set. But weirdly it does not work for me because the shell commands are not evaluated but set as string. Example:
    I set:

      SPARK_OPTS: >-
        --deploy-mode=client
        --master=k8s://https://kubernetes.default.svc
        --conf spark.driver.host=$(hostname -i)
        --conf spark.kubernetes.container.image=idalab/spark-py:spark
        --conf spark.ui.proxyBase=${JUPYTERHUB_SERVICE_PREFIX}proxy/4040
...

but on the pod this results in an environment variabel like:

'SPARK_OPTS': '--deploy-mode=client --master=k8s://https://kubernetes.default.svc --conf spark.driver.host=$(hostname -i) --conf spark.kubernetes.container.image=idalab/spark-py:spark --conf spark.ui.proxyBase=${JUPYTERHUB_SERVICE_PREFIX}proxy/4040

Does anybody know why this is the case?

Thank you!

@h4gen
Copy link
Author

h4gen commented Dec 5, 2018

@betatim Posted consolidated results here.

@metrofun
Copy link

metrofun commented Mar 1, 2019

Having the same issue with spark.driver.host. How to set this dynamic value before the Docker's ENDPOINT executes? I am using KubeSpawner, if it is relevant.

@stevenstetzler
Copy link
Contributor

@metrofun The best way I found to set this dynamically is to write to a spark-defaults.conf file when the container starts. In your JupyterHub configuration:

jupyterhub:
  singleuser:
    lifecycleHooks:
      postStart:
        exec:
          command:
            - "/bin/sh"
            - "-c"
            - |
              echo "spark.driver.host=$(hostname -i)" >> $SPARK_HOME/conf/spark-defaults.conf

@stevenstetzler
Copy link
Contributor

stevenstetzler commented Apr 26, 2019

@h4gen were you able to find a way to automatically set SPARK_PUBLIC_DNS? I am running into the same issue where

jupyterhub:
  singleuser:
    extraEnv:
      SPARK_PUBLIC_DNS: my.url.com${JUPYTERHUB_SERVICE_PREFIX}proxy/4040/jobs/

does not evaluate correctly, and setting environment variables in the postStart lifecycleHooks for the container doesn't seem to work either (it will result in an empty environment variable):

jupyterhub:
  singleuser:
    lifecycleHooks:
      postStart:
        exec:
          command:
            - "/bin/sh"
            - "-c"
            - |
              export SPARK_PUBLIC_DNS="my.url.com${JUPYTERHUB_SERVICE_PREFIX}proxy/4040/jobs/"

I was thinking of opening a separate issue about setting environment variables that require other environment variables, but wanted to see if you had a solution first.

Edit: Of course the second solution won't work as that will only set the environment variable in the postStart shell. My workaround is to set SPARK_PUBLIC_DNS in the single user's Entrypoint script. I've opened an issue about this to see if this functionality is possible with extraEnv: #1255

@jasonsmithio
Copy link

@h4gen have you updated this at all using updates over the past year or so?

@h4gen
Copy link
Author

h4gen commented Feb 6, 2020

Hi, @thejaysmith . Sorry no updates from my side. We switched to kubeflow.

@consideRatio
Copy link
Member

@h4gen thank you soo much for writing this up publically, too much work is repeated when we work in isolation! ❤️

At this point, I'll go ahead and close this issue as its becoming stale and doesn't involve an action point - it is still very findable with search engines though.

Lots of love from Sweden!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants