Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dataproc quickstart sample added and create_cluster updated #2629

Merged
merged 19 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions dataproc/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# This sample walks a user through creating a Cloud Dataproc cluster using
# the Python client library.

# [START dataproc_create_cluster]
bradmiro marked this conversation as resolved.
Show resolved Hide resolved
from google.cloud import dataproc_v1 as dataproc


def create_cluster(project_id, region, cluster_name):
# [START dataproc_create_cluster]
from google.cloud import dataproc_v1 as dataproc
"""This sample walks a user through creating a Cloud Dataproc cluster
using the Python client library.

# TODO(developer): Uncomment and set the following variables
# project_id = 'YOUR_PROJECT_ID'
# region = 'YOUR_CLUSTER_REGION'
# cluster_name = 'YOUR_CLUSTER_NAME'
Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
"""

# Create a client with the endpoint set to the desired cluster region
client = dataproc.ClusterControllerClient(client_options={
# Create a client with the endpoint set to the desired cluster region.
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
})

# Create the cluster config
# Create the cluster config.
cluster = {
'project_id': project_id,
'cluster_name': cluster_name,
Expand All @@ -45,10 +52,10 @@ def create_cluster(project_id, region, cluster_name):
}
}

# Create the cluster
operation = client.create_cluster(project_id, region, cluster)
# Create the cluster.
operation = cluster_client.create_cluster(project_id, region, cluster)
result = operation.result()

# Output a success message
# Output a success message.
print('Cluster created successfully: {}'.format(result.cluster_name))
# [END dataproc_create_cluster]
9 changes: 6 additions & 3 deletions dataproc/create_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@

import create_cluster


PROJECT_ID = os.environ['GCLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'test-cluster-{}'.format(str(uuid.uuid4()))
CLUSTER_NAME = 'py-cc-test-{}'.format(str(uuid.uuid4()))


@pytest.fixture(autouse=True)
def teardown():
yield

client = dataproc.ClusterControllerClient(client_options={
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
})
# Client library function
client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
operation = cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
# Wait for cluster to delete
operation.result()


def test_cluster_create(capsys):
Expand Down
128 changes: 128 additions & 0 deletions dataproc/quickstart/quickstart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataproc_quickstart]
import time

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage


def quickstart(project_id, region, cluster_name, job_file_path):
"""This quickstart sample walks a user through creating a Cloud Dataproc
cluster, submitting a PySpark job from Google Cloud Storage to the
cluster, reading the output of the job and deleting the cluster, all
using the Python client library.

Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
job_file_path (string): Job in GCS to execute against the cluster.
"""

# Create the cluster client.
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
})

# Create the cluster config.
cluster = {
'project_id': project_id,
'cluster_name': cluster_name,
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-1'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-1'
}
}
}

# Create the cluster.
operation = cluster_client.create_cluster(project_id, region, cluster)
result = operation.result()

print('Cluster created successfully: {}'.format(result.cluster_name))

# Create the job client.
job_client = dataproc.JobControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
})

# Create the job config.
job = {
'placement': {
'cluster_name': cluster_name
},
'pyspark_job': {
'main_python_file_uri': job_file_path
}
}

job_response = job_client.submit_job(project_id, region, job)
job_id = job_response.reference.job_id

print('Submitted job \"{}\".'.format(job_id))

# Termimal states for a job.
terminal_states = {
dataproc.types.JobStatus.ERROR,
dataproc.types.JobStatus.CANCELLED,
dataproc.types.JobStatus.DONE
}

# Create a timeout such that the job gets cancelled if not in a
# terminal state after a fixed period of time.
timeout_seconds = 600
time_start = time.time()

# Wait for the job to complete.
while job_response.status.state not in terminal_states:
if time.time() > time_start + timeout_seconds:
job_client.cancel_job(project_id, region, job_id)
print('Job {} timed out after threshold of {} seconds.'.format(
job_id, timeout_seconds))

# Poll for job termination once a second.
time.sleep(1)
job_response = job_client.get_job(project_id, region, job_id)

# Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
cluster_info = cluster_client.get_cluster(
project_id, region, cluster_name)

storage_client = storage.Client()
bucket = storage_client.get_bucket(cluster_info.config.config_bucket)
output_blob = (
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
.format(cluster_info.cluster_uuid, job_id))
output = bucket.blob(output_blob).download_as_string()

print('Job {} finished with state {}:\n{}'.format(
job_id,
job_response.status.State.Name(job_response.status.state),
output))

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(project_id, region, cluster_name)
operation.result()

print('Cluster {} successfully deleted.'.format(cluster_name))
bradmiro marked this conversation as resolved.
Show resolved Hide resolved
# [END dataproc_quickstart]
70 changes: 70 additions & 0 deletions dataproc/quickstart/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import uuid
import pytest

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

import quickstart


PROJECT_ID = os.environ['GCLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'py-qs-test-{}'.format(str(uuid.uuid4()))
STAGING_BUCKET = 'py-dataproc-qs-bucket-{}'.format(str(uuid.uuid4()))
JOB_FILE_NAME = 'sum.py'
JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME)
SORT_CODE = (
"import pyspark\n"
"sc = pyspark.SparkContext()\n"
"rdd = sc.parallelize((1,2,3,4,5))\n"
"sum = rdd.reduce(lambda x, y: x + y)\n"
)


@pytest.fixture(autouse=True)
def setup_teardown():
storage_client = storage.Client()
bucket = storage_client.create_bucket(STAGING_BUCKET)
blob = bucket.blob(JOB_FILE_NAME)
blob.upload_from_string(SORT_CODE)

yield

cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
})

# The quickstart sample deletes the cluster, but if the test fails
# before cluster deletion occurs, it can be manually deleted here.
clusters = cluster_client.list_clusters(PROJECT_ID, REGION)

for cluster in clusters:
if cluster.cluster_name == CLUSTER_NAME:
cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)

blob.delete()


def test_quickstart(capsys):
quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH)

out, _ = capsys.readouterr()
assert 'Cluster created successfully' in out
assert 'Submitted job' in out
assert 'finished with state DONE:' in out
assert 'successfully deleted' in out