diff --git a/samples/snippets/create_cluster.py b/samples/snippets/create_cluster.py index d893a142..a396ddc6 100644 --- a/samples/snippets/create_cluster.py +++ b/samples/snippets/create_cluster.py @@ -14,22 +14,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +# This sample walks a user through creating a Cloud Dataproc cluster using +# the Python client library. + +# [START dataproc_create_cluster] +from google.cloud import dataproc_v1 as dataproc + def create_cluster(project_id, region, cluster_name): - # [START dataproc_create_cluster] - from google.cloud import dataproc_v1 as dataproc + """This sample walks a user through creating a Cloud Dataproc cluster + using the Python client library. - # TODO(developer): Uncomment and set the following variables - # project_id = 'YOUR_PROJECT_ID' - # region = 'YOUR_CLUSTER_REGION' - # cluster_name = 'YOUR_CLUSTER_NAME' + Args: + project_id (string): Project to use for creating resources. + region (string): Region where the resources should live. + cluster_name (string): Name to use for creating a cluster. + """ - # Create a client with the endpoint set to the desired cluster region - client = dataproc.ClusterControllerClient(client_options={ + # Create a client with the endpoint set to the desired cluster region. + cluster_client = dataproc.ClusterControllerClient(client_options={ 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region) }) - # Create the cluster config + # Create the cluster config. cluster = { 'project_id': project_id, 'cluster_name': cluster_name, @@ -45,10 +52,10 @@ def create_cluster(project_id, region, cluster_name): } } - # Create the cluster - operation = client.create_cluster(project_id, region, cluster) + # Create the cluster. + operation = cluster_client.create_cluster(project_id, region, cluster) result = operation.result() - # Output a success message + # Output a success message. print('Cluster created successfully: {}'.format(result.cluster_name)) # [END dataproc_create_cluster] diff --git a/samples/snippets/create_cluster_test.py b/samples/snippets/create_cluster_test.py index d58a1d0b..04274579 100644 --- a/samples/snippets/create_cluster_test.py +++ b/samples/snippets/create_cluster_test.py @@ -20,20 +20,23 @@ import create_cluster + PROJECT_ID = os.environ['GCLOUD_PROJECT'] REGION = 'us-central1' -CLUSTER_NAME = 'test-cluster-{}'.format(str(uuid.uuid4())) +CLUSTER_NAME = 'py-cc-test-{}'.format(str(uuid.uuid4())) @pytest.fixture(autouse=True) def teardown(): yield - client = dataproc.ClusterControllerClient(client_options={ + cluster_client = dataproc.ClusterControllerClient(client_options={ 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION) }) # Client library function - client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME) + operation = cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME) + # Wait for cluster to delete + operation.result() def test_cluster_create(capsys): diff --git a/samples/snippets/quickstart/quickstart.py b/samples/snippets/quickstart/quickstart.py new file mode 100644 index 00000000..fcbda882 --- /dev/null +++ b/samples/snippets/quickstart/quickstart.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START dataproc_quickstart] +import time + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + + +def quickstart(project_id, region, cluster_name, job_file_path): + """This quickstart sample walks a user through creating a Cloud Dataproc + cluster, submitting a PySpark job from Google Cloud Storage to the + cluster, reading the output of the job and deleting the cluster, all + using the Python client library. + + Args: + project_id (string): Project to use for creating resources. + region (string): Region where the resources should live. + cluster_name (string): Name to use for creating a cluster. + job_file_path (string): Job in GCS to execute against the cluster. + """ + + # Create the cluster client. + cluster_client = dataproc.ClusterControllerClient(client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region) + }) + + # Create the cluster config. + cluster = { + 'project_id': project_id, + 'cluster_name': cluster_name, + 'config': { + 'master_config': { + 'num_instances': 1, + 'machine_type_uri': 'n1-standard-1' + }, + 'worker_config': { + 'num_instances': 2, + 'machine_type_uri': 'n1-standard-1' + } + } + } + + # Create the cluster. + operation = cluster_client.create_cluster(project_id, region, cluster) + result = operation.result() + + print('Cluster created successfully: {}'.format(result.cluster_name)) + + # Create the job client. + job_client = dataproc.JobControllerClient(client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region) + }) + + # Create the job config. + job = { + 'placement': { + 'cluster_name': cluster_name + }, + 'pyspark_job': { + 'main_python_file_uri': job_file_path + } + } + + job_response = job_client.submit_job(project_id, region, job) + job_id = job_response.reference.job_id + + print('Submitted job \"{}\".'.format(job_id)) + + # Termimal states for a job. + terminal_states = { + dataproc.types.JobStatus.ERROR, + dataproc.types.JobStatus.CANCELLED, + dataproc.types.JobStatus.DONE + } + + # Create a timeout such that the job gets cancelled if not in a + # terminal state after a fixed period of time. + timeout_seconds = 600 + time_start = time.time() + + # Wait for the job to complete. + while job_response.status.state not in terminal_states: + if time.time() > time_start + timeout_seconds: + job_client.cancel_job(project_id, region, job_id) + print('Job {} timed out after threshold of {} seconds.'.format( + job_id, timeout_seconds)) + + # Poll for job termination once a second. + time.sleep(1) + job_response = job_client.get_job(project_id, region, job_id) + + # Cloud Dataproc job output gets saved to a GCS bucket allocated to it. + cluster_info = cluster_client.get_cluster( + project_id, region, cluster_name) + + storage_client = storage.Client() + bucket = storage_client.get_bucket(cluster_info.config.config_bucket) + output_blob = ( + 'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000' + .format(cluster_info.cluster_uuid, job_id)) + output = bucket.blob(output_blob).download_as_string() + + print('Job {} finished with state {}:\n{}'.format( + job_id, + job_response.status.State.Name(job_response.status.state), + output)) + + # Delete the cluster once the job has terminated. + operation = cluster_client.delete_cluster(project_id, region, cluster_name) + operation.result() + + print('Cluster {} successfully deleted.'.format(cluster_name)) + # [END dataproc_quickstart] diff --git a/samples/snippets/quickstart/quickstart_test.py b/samples/snippets/quickstart/quickstart_test.py new file mode 100644 index 00000000..df488d0a --- /dev/null +++ b/samples/snippets/quickstart/quickstart_test.py @@ -0,0 +1,70 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid +import pytest + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + +import quickstart + + +PROJECT_ID = os.environ['GCLOUD_PROJECT'] +REGION = 'us-central1' +CLUSTER_NAME = 'py-qs-test-{}'.format(str(uuid.uuid4())) +STAGING_BUCKET = 'py-dataproc-qs-bucket-{}'.format(str(uuid.uuid4())) +JOB_FILE_NAME = 'sum.py' +JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME) +SORT_CODE = ( + "import pyspark\n" + "sc = pyspark.SparkContext()\n" + "rdd = sc.parallelize((1,2,3,4,5))\n" + "sum = rdd.reduce(lambda x, y: x + y)\n" +) + + +@pytest.fixture(autouse=True) +def setup_teardown(): + storage_client = storage.Client() + bucket = storage_client.create_bucket(STAGING_BUCKET) + blob = bucket.blob(JOB_FILE_NAME) + blob.upload_from_string(SORT_CODE) + + yield + + cluster_client = dataproc.ClusterControllerClient(client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION) + }) + + # The quickstart sample deletes the cluster, but if the test fails + # before cluster deletion occurs, it can be manually deleted here. + clusters = cluster_client.list_clusters(PROJECT_ID, REGION) + + for cluster in clusters: + if cluster.cluster_name == CLUSTER_NAME: + cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME) + + blob.delete() + + +def test_quickstart(capsys): + quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH) + + out, _ = capsys.readouterr() + assert 'Cluster created successfully' in out + assert 'Submitted job' in out + assert 'finished with state DONE:' in out + assert 'successfully deleted' in out