Skip to content

Commit

Permalink
[Import-automation migration] Python code change for GKE executor (#835)
Browse files Browse the repository at this point in the history
* SCHEDULES=scripts/covid19_india/cases_count_states_data:covid19IndiaCasesCountStatesData

* fixes

* more fixes

* Fix cloud build for gke

* lint fixes

* Fix tests and lint

* more test fixes

* address Github PR comments

* lint

---------

Co-authored-by: Alex Chen <[email protected]>
  • Loading branch information
Fructokinase and Alex Chen authored Mar 28, 2023
1 parent 131e828 commit dd89b76
Show file tree
Hide file tree
Showing 25 changed files with 488 additions and 403 deletions.
2 changes: 1 addition & 1 deletion cloudbuild.py.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ steps:
- -c
- |
./run_tests.sh -p util/
./run_tests.sh -p import-automation/
./run_tests.sh -p import-automation/executor
./run_tests.sh -p scripts/
- id: python_format_check
Expand Down
40 changes: 40 additions & 0 deletions import-automation/cloudbuild/cloudbuild.gke.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
steps:
# Create a Cloud Tasks task that passes information
# about the commit to the GKE executor.
# Reference: https://cloud.google.com/sdk/gcloud/reference/tasks/create-http-task
- name: 'gcr.io/cloud-builders/gcloud'
args: [
'tasks',
'create-http-task',
'$COMMIT_SHA',
'--project=$_GCP_PROJECT_ID',
'--location=us-central1',
'--queue=$_TASK_QUEUE_NAME',
'--url=$_URL_TO_SCHEDULING_ENDPOINT',
'--header=Content-Type: application/json',
'--method=POST',
'--oidc-service-account-email=$_OIDC_SERVICE_ACCOUNT_EMAIL',
'--oidc-token-audience=$_OIDC_TOKEN_AUDIENCE',
'--body-content={
"COMMIT_SHA": "$COMMIT_SHA",
"REPO_NAME": "$REPO_NAME",
"BRANCH_NAME": "$BRANCH_NAME",
"HEAD_BRANCH": "$_HEAD_BRANCH",
"BASE_BRANCH": "$_BASE_BRANCH",
"PR_NUMBER": "$_PR_NUMBER",
"configs": {
"gcp_project_id": "$_GCP_PROJECT_ID",
"dashboard_oauth_client_id": "$_DASHBOARD_OAUTH_CLIENT_ID",
"importer_oauth_client_id": "$_IMPORTER_OAUTH_CLIENT_ID",
"github_auth_username": "$_GITHUB_AUTH_USERNAME",
"github_auth_access_token": "$_GITHUB_AUTH_ACCESS_TOKEN",
"github_repo_name": "$_GITHUB_REPO_NAME",
"github_repo_owner_username": "$_GITHUB_REPO_OWNER_USERNAME",
"email_account": "$_EMAIL_ACCOUNT",
"email_token": "$_EMAIL_TOKEN",
"gcs_project_id": "$_GCS_PROJECT",
"storage_prod_bucket_name": "$_STORAGE_PROD_BUKCET_NAME",
"executor_type": "GKE"
}
}'
]
7 changes: 5 additions & 2 deletions import-automation/cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
steps:
# Create a Cloud Tasks task that passes information
# about the commit to the executor.
# about the commit to the GAE executor.
# Reference: https://cloud.google.com/sdk/gcloud/reference/tasks/create-app-engine-task
- name: 'gcr.io/cloud-builders/gcloud'
args: [
'tasks',
Expand All @@ -26,7 +27,9 @@ steps:
"github_repo_name": "$_GITHUB_REPO_NAME",
"github_repo_owner_username": "$_GITHUB_REPO_OWNER_USERNAME",
"email_account": "$_EMAIL_ACCOUNT",
"email_token": "$_EMAIL_TOKEN"
"email_token": "$_EMAIL_TOKEN",
"gcs_project_id": "$_GCS_PROJECT",
"storage_prod_bucket_name": "$_STORAGE_PROD_BUKCET_NAME"
}
}'
]
6 changes: 6 additions & 0 deletions import-automation/executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ Commons knowledge graph using the importer.
- `github_auth_username`
- `github_auth_access_token`

## Running locally

```
PYTHONPATH=$(pwd) python app/main.py
``
## Local Executor
Expand Down
Empty file.
Empty file.
21 changes: 21 additions & 0 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ class ExecutorConfig:
# Maximum time a blocking call to the importer to
# delete an import can take in seconds.
importer_delete_timeout: float = 10 * 60
# Executor type depends on where the executor runs
# Suppports one of: "GKE", "GAE"
executor_type: str = 'GAE'

def get_data_refresh_config(self):
"""Returns the config used for Cloud Scheduler data refresh jobs."""
fields = set([
'github_repo_name',
'github_repo_owner_username',
'github_auth_username',
'github_auth_access_token',
'dashboard_oauth_client_id',
'importer_oauth_client_id',
'email_account',
'email_token',
'gcs_project_id',
'storage_prod_bucket_name',
])
return {
k: v for k, v in dataclasses.asdict(self).items() if k in fields
}


def _setup_logging():
Expand Down
Empty file.
134 changes: 134 additions & 0 deletions import-automation/executor/app/executor/cloud_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interface for Cloud Scheduler API.
This module contains:
- How to format a request (GKE, GAE http requests are supported)
- How to call cloud Scheduler API (create_or_update is supported)
"""
import json
import os
from typing import Dict

from google.cloud import scheduler_v1
from google.protobuf import json_format
from google.api_core.exceptions import AlreadyExists

GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN', 'import.datacommons.dev')
GKE_CALLER_SERVICE_ACCOUNT = os.getenv('GKE_CALLER_SERVICE_ACCOUNT')
GKE_OAUTH_AUDIENCE = os.getenv('GKE_OAUTH_AUDIENCE')


def _base_job_request(absolute_import_name, schedule: str):
"""Base of http_job_request and appengine_job_request."""
return {
'name': _fix_absolute_import_name(absolute_import_name),
'description': absolute_import_name,
'schedule': schedule,
'time_zone': 'Etc/UTC',
'retry_config': {
'retry_count': 2,
'min_backoff_duration': {
# 1h
'seconds': 60 * 60
}
},
'attempt_deadline': {
# 30m is the max allowed deadline
'seconds': 60 * 30
}
# <'http_request'|'appengine_job_request'>: {...}
}


def http_job_request(absolute_import_name, schedule,
json_encoded_job_body: str) -> Dict:
"""Cloud Scheduler request that targets executors launched in GKE."""
job = _base_job_request(absolute_import_name, schedule)
job['name'] = f'{job["name"]}_GKE'
job['http_target'] = {
'uri': f'https://{GKE_SERVICE_DOMAIN}/update',
'http_method': 'POST',
'headers': {
'Content-Type': 'application/json',
},
'body': json_encoded_job_body,
'oidc_token': {
'service_account_email': GKE_CALLER_SERVICE_ACCOUNT,
'audience': GKE_OAUTH_AUDIENCE,
}
}
return job


def appengine_job_request(absolute_import_name, schedule,
json_encoded_job_body: str) -> Dict:
"""Cloud Scheduler request that targets executors launched in GAE."""
job = _base_job_request(absolute_import_name, schedule)
job['name'] = f'{job["name"]}_GAE'
job['app_engine_http_target'] = {
'http_method': 'POST',
'app_engine_routing': {
'service': 'default',
},
'relative_uri': '/update',
'headers': {
'Content-Type': 'application/json'
},
'body': json_encoded_job_body
}
return job


def create_or_update_job(project_id, location: str, job: Dict) -> Dict:
"""Creates/updates a Cloud Scheduler job.
Args:
project_id: GCP project id of the scheduler
location: GCP location of the scheduler
job_req: appengine_job_request(...) or http_job_request(...)
Returns:
json transcoded cloud scheduler job created.
"""
client = scheduler_v1.CloudSchedulerClient()
# Name requires the full GCP resource path.
parent = f'projects/{project_id}/locations/{location}'
job['name'] = f'{parent}/jobs/{job["name"]}'

try:
job = client.create_job(
scheduler_v1.CreateJobRequest(parent=parent, job=job))
except AlreadyExists:
job = client.update_job(scheduler_v1.UpdateJobRequest(job=job))

scheduled = json_format.MessageToDict(job._pb,
preserving_proto_field_name=True)

if 'app_engine_http_target' in job:
scheduled['app_engine_http_target']['body'] = json.loads(
job.app_engine_http_target.body)

if 'http_target' in job:
scheduled['http_target']['body'] = json.loads(job.http_target.body)

return scheduled


def _fix_absolute_import_name(absolute_import_name: str) -> str:
"""Replaces all the forward slashes and colons in an absolute import name
with underscores. This is for conforming to restrictions of Cloud Scheduler
job names."""
return absolute_import_name.replace('/', '_').replace(':', '_')
8 changes: 4 additions & 4 deletions import-automation/executor/app/executor/import_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,13 +693,13 @@ def _mark_system_run_failed(run_id: str, message: str,
dashboard: dashboard_api.DashboardAPI) -> Dict:
"""Communicates with the import progress dashboard that a system run
has failed.
Args:
run_id: ID of the system run.
message: An additional message to log to the dashboard
with level critical.
dashboard: DashboardAPI object for the communicaiton.
Returns:
Updated system run returned from the dashboard.
"""
Expand All @@ -715,13 +715,13 @@ def _mark_import_attempt_failed(attempt_id: str, message: str,
dashboard: dashboard_api.DashboardAPI) -> Dict:
"""Communicates with the import progress dashboard that an import attempt
has failed.
Args:
attempt_id: ID of the import attempt.
message: An additional message to log to the dashboard
with level critical.
dashboard: DashboardAPI object for the communicaiton.
Returns:
Updated import attempt returned from the dashboard.
"""
Expand Down
113 changes: 113 additions & 0 deletions import-automation/executor/app/executor/scheduler_job_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module scheduling cron jobs.
Currently scheduling can only occur through commit messages
in the format of "SCHEDULES=<target1>,<target2>,...
where a <target> is in the format of <path/to/manifest.json>:<import name>
Example commit message:
"
This change schedules the cron of india covid stats
SCHEDULES=IMPORTS=scripts/covid19_india/cases_count_states_data:covid19IndiaCasesCountStatesData
"
"""

from dataclasses import dataclass
import os
import logging
import json
import traceback
import tempfile
from typing import Dict

from app import configs
from app.service import github_api
from app.executor import import_target
from app.executor import import_executor
from app.executor import cloud_scheduler


def schedule_on_commit(github: github_api.GitHubRepoAPI,
config: configs.ExecutorConfig, commit_sha: str):
"""Creates or updates all schedules associated with a commit."""
targets = import_target.find_targets_in_commit(commit_sha, 'SCHEDULES',
github)
if not targets:
return import_executor.ExecutionResult(
'pass', [], 'No import target specified in commit message')
logging.info('Found targets in commit message: %s', ",".join(targets))
manifest_dirs = github.find_dirs_in_commit_containing_file(
commit_sha, config.manifest_filename)

with tempfile.TemporaryDirectory() as tmpdir:
repo_dir = github.download_repo(tmpdir, commit_sha,
config.repo_download_timeout)
logging.info('Downloaded repo with commit: %s', commit_sha)

imports_to_execute = import_target.find_imports_to_execute(
targets=targets,
manifest_dirs=manifest_dirs,
manifest_filename=config.manifest_filename,
repo_dir=repo_dir)

scheduled = []
for relative_dir, spec in imports_to_execute:
schedule = spec.get('cron_schedule')
if not schedule:
manifest_path = os.path.join(relative_dir,
config.manifest_filename)
raise KeyError(f'cron_schedule not found in {manifest_path}')
try:
absolute_import_name = import_target.get_absolute_import_name(
relative_dir, spec['import_name'])
logging.info('Scheduling a data update job for %s',
absolute_import_name)
job = _create_or_update_import_schedule(absolute_import_name,
schedule, config)
scheduled.append(job)
except Exception:
raise import_executor.ExecutionError(
import_executor.ExecutionResult('failed', scheduled,
traceback.format_exc()))
return import_executor.ExecutionResult('succeeded', scheduled,
'No issues')


def _create_or_update_import_schedule(absolute_import_name, schedule: str,
config: configs.ExecutorConfig):
"""Create/Update the import schedule for 1 import."""
# Note: this is the content of what is passed to /update API
# inside each cronjob http calls.
json_encoded_job_body = json.dumps({
'absolute_import_name': absolute_import_name,
'config': config.get_data_refresh_config()
}).encode()

if config.executor_type == "GKE":
req = cloud_scheduler.http_job_request(absolute_import_name, schedule,
json_encoded_job_body)
elif config.executor_type == "GAE":
req = cloud_scheduler.appengine_job_request(absolute_import_name,
schedule,
json_encoded_job_body)
else:
raise Exception(
"Invalid executor_type %s, expects one of ('GKE', 'GAE')",
config.executor_type)

return cloud_scheduler.create_or_update_job(config.gcp_project_id,
config.scheduler_location, req)
Loading

0 comments on commit dd89b76

Please sign in to comment.