Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/datacommonsorg/data into …
Browse files Browse the repository at this point in the history
…floods
  • Loading branch information
ajaits committed Feb 12, 2024
2 parents f815f42 + 4e6c4a6 commit 16250d5
Show file tree
Hide file tree
Showing 53 changed files with 13,017 additions and 10,202 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ dc_generated/
*/*/*/data/*
lib/
bin/
pyvenv.cfg
pyvenv.cfg
# Ignore updates to the local configs json file.
import-automation/executor/config_override.json
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Install requirements and setup a virtual environment to isolate python developme
python3 -m venv .env
source .env/bin/activate
pip3 install -r requirements.txt
pip3 install -r requirements_all.txt
```

##### Testing
Expand Down Expand Up @@ -153,8 +153,8 @@ you import modules and run tests, as below.

##### Guidelines

* Any additional package required must be specified in the requirements.txt
in the top-level folder. No other requirements.txt files are allowed.
* Any additional package required must be specified in the `requirements_all.txt`
file in the top-level folder. No other `requirements.txt` files are allowed.
* Code must be formatted according to the
[Google Python Style Guide](https://google.github.io/styleguide/pyguide.html)
according to the [yapf formatter](https://github.com/google/yapf).
Expand Down
16 changes: 9 additions & 7 deletions import-automation/executor/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM gcr.io/google-appengine/python

RUN virtualenv /env -p python3.7
FROM python:3.11.4

ENV VIRTUAL_ENV /env
ENV PATH /env/bin:$PATH
RUN apt-get update \
&& apt-get -y upgrade \
&& apt-get -y autoremove \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /workspace

ADD requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
ADD requirements.txt /workspace/requirements.txt
RUN pip install -r /workspace/requirements.txt

ADD . /app
COPY app/. /workspace/app/

CMD gunicorn --timeout 1800 --workers 5 -b :$PORT app.main:FLASK_APP
76 changes: 74 additions & 2 deletions import-automation/executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,85 @@ Commons knowledge graph using the importer.

## Running locally

Authenticate with GCP first: `gcloud auth application-default login`

### Scheduling or Updating An Import Locally

You can schedule (on the GCP Cloud Scheduler) or execute an import job from your local machine.

Ensure this script is executed from the directory which contains `schedule_update_import.sh`, i.e. from `/data/import-automation/executor`. Configs (`<repo_root>/import-automation/executor/app/configs.py`) are loaded from GCS. To override any configs locally, set them in the file `<repo_root>/import-automation/executor/config_override.json`. note that the config fields must belong to `<repo_root>/import-automation/executor/app/configs.py`, else the update will produce an Exception. Note that the `user_script_args` field in configs can also be set in the config file.

Note: any local changes to the `<repo_root>/import-automation/executor/config_override.json` file are ignored by git. This was done using:

```
Run git update-index --skip-worktree <repo_root>/import-automation/executor/config_override.json
```

To start tracking changes to this file, execute the following:
```
Run git update-index --no-skip-worktree <repo_root>/import-automation/executor/config_override.json
```

To get a list of files that are skipped when checking for changes, execute:

```
Run git ls-files -v . | grep ^S
```

### Usage

Run `./schedule_update_import.sh --help` for usage.


#### Schedule an Import:
To schedule an import to run as a cron job on the GCP Cloud Scheduler, do the following:

```
Run `./schedule_update_import.sh -s <gke_project_id> <path_to_import>`
```

`<gke_project_id>` is the GCP project id where the import executer is run from e.g. `datcom-import-automation-prod`.
`<path_to_import>` is the path to the import (relative to the root directory of the `data` repo), with the name of the import provided with a colon, e.g. `scripts/us_usda/quickstats:UsdaAgSurvey`.

Example invocation:

```
Run `./schedule_update_import.sh -s datcom-import-automation-prod scripts/us_usda/quickstats:UsdaAgSurvey`
```

The script will log the name of the Cloud Scheduler job and a url for all the jobs on the scheduler. Please verify that all the job metadata was updated as expected.


#### Update an Import:
You can execute an import process locally. Note that this is not recommeded for import scripts which take longer than a few minutes to execute because all the processing is done locally. For all prod imports, the recommended path is to Schedule an Import.

Instead of downloading a fresh version of this repo from GitHub, this script uses the locally downloaded/cloned current state of the repo by inferring the path to the `data` root directory. A side effect is that upon completion, the local GitHub repo may have other artifacts, e.g. output CSV/TMCF files produced. You may want to revert those files if they are not intended to be committed.

Once the script runs to completion, the data directory's latest update is printed (along with the location on GCS) which can confirm whether the import actually produced new data. Note: it is a good idea to check the directory path printed to see if the expected import files are all there.

To excute an Update locally, do the following:

```
Run `./schedule_update_import.sh -u <gke_project_id> <path_to_import>`
```

`<gke_project_id>` is the GCP project id where the import executer is run from e.g. `datcom-import-automation-prod`.
`<path_to_import>` is the path to the import (relative to the root directory of the `data` repo), with the name of the import provided with a colon, e.g. `scripts/us_usda/quickstats:UsdaAgSurvey`.

Example invocation:

```
Run `./schedule_update_import.sh -u datcom-import-automation-prod scripts/us_usda/quickstats:UsdaAgSurvey`
```


## Local Executor [should be deprecated soon]

```
PYTHONPATH=$(pwd) python app/main.py
``
## Local Executor
Run `. run_local_executor.sh --help` for usage.
Expand Down
32 changes: 16 additions & 16 deletions import-automation/executor/app/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Configurations for the executor.
"""Configurations for the executor.
The app endpoints accept a configs field that allows customization of all the
configurations. See main.py.
"""

import dataclasses
import os
from typing import List
import dataclasses

from google.cloud import logging

Expand All @@ -32,9 +31,10 @@ def _production():
@dataclasses.dataclass
class ExecutorConfig:
"""Configurations for the executor."""

# ID of the Google Cloud project that hosts the executor. The project
# needs to enable App Engine and Cloud Scheduler.
gcp_project_id: str = 'google.com:datcom-data'
gcp_project_id: str = 'datcom-import-automation'
# ID of the Google Cloud project that stores generated CSVs and MCFs. The
# project needs to enable Cloud Storage and gives the service account the
# executor uses sufficient permissions to read and write the bucket below.
Expand Down Expand Up @@ -102,9 +102,13 @@ class ExecutorConfig:
# ID of the location where Cloud Scheduler is hosted.
scheduler_location: str = 'us-central1'
# Maximum time a user script can run for in seconds.
user_script_timeout: float = 600
user_script_timeout: float = 3600
# Arguments for the user script
user_script_args: List[str] = ()
# Environment variables for the user script
user_script_env: dict = None
# Maximum time venv creation can take in seconds.
venv_create_timeout: float = 600
venv_create_timeout: float = 3600
# Maximum time downloading a file can take in seconds.
file_download_timeout: float = 600
# Maximum time downloading the repo can take in seconds.
Expand All @@ -126,16 +130,12 @@ class ExecutorConfig:
def get_data_refresh_config(self):
"""Returns the config used for Cloud Scheduler data refresh jobs."""
fields = set([
'github_repo_name',
'github_repo_owner_username',
'github_auth_username',
'github_auth_access_token',
'dashboard_oauth_client_id',
'importer_oauth_client_id',
'email_account',
'email_token',
'gcs_project_id',
'storage_prod_bucket_name',
'github_repo_name', 'github_repo_owner_username',
'github_auth_username', 'github_auth_access_token',
'dashboard_oauth_client_id', 'importer_oauth_client_id',
'email_account', 'email_token', 'gcs_project_id',
'storage_prod_bucket_name', 'user_script_args', 'user_script_env',
'user_script_timeout'
])
return {
k: v for k, v in dataclasses.asdict(self).items() if k in fields
Expand Down
27 changes: 22 additions & 5 deletions import-automation/executor/app/executor/cloud_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from google.protobuf import json_format
from google.api_core.exceptions import AlreadyExists, NotFound

GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN', 'import.datacommons.dev')
GKE_SERVICE_DOMAIN = os.getenv('GKE_SERVICE_DOMAIN',
'importautomation.datacommons.org')
GKE_CALLER_SERVICE_ACCOUNT = os.getenv('CLOUD_SCHEDULER_CALLER_SA')
GKE_OAUTH_AUDIENCE = os.getenv('CLOUD_SCHEDULER_CALLER_OAUTH_AUDIENCE')

Expand All @@ -53,9 +54,25 @@ def _base_job_request(absolute_import_name, schedule: str):
}


def http_job_request(absolute_import_name, schedule,
json_encoded_job_body: str) -> Dict:
def http_job_request(absolute_import_name,
schedule,
json_encoded_job_body: str,
gke_caller_service_account: str = "",
gke_oauth_audience: str = "") -> Dict:
"""Cloud Scheduler request that targets executors launched in GKE."""

# If the service account and oauth audience are provided as
# function args, use them. If not, look for them in the
# environment (GKE_CALLER_SERVICE_ACCOUNT and GKE_OAUTH_AUDIENCE
# are set to read from environment variables).
service_account = gke_caller_service_account
oauth_audience = gke_oauth_audience

if not service_account:
service_account = GKE_CALLER_SERVICE_ACCOUNT
if not oauth_audience:
oauth_audience = GKE_OAUTH_AUDIENCE

job = _base_job_request(absolute_import_name, schedule)
job['name'] = f'{job["name"]}_GKE'
job['http_target'] = {
Expand All @@ -66,8 +83,8 @@ def http_job_request(absolute_import_name, schedule,
},
'body': json_encoded_job_body,
'oidc_token': {
'service_account_email': GKE_CALLER_SERVICE_ACCOUNT,
'audience': GKE_OAUTH_AUDIENCE,
'service_account_email': service_account,
'audience': oauth_audience,
}
}
return job
Expand Down
Loading

0 comments on commit 16250d5

Please sign in to comment.