Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIL-2509 [Apollo Agent] [Phase 1] Add Looker git proxy client #10

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/trufflehog_config/allowlist.json
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{
"Do not alert on git clone URL": "regex:https://.*:{self._token}@.*"
}
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ FROM base AS cloudrun
COPY requirements-cloudrun.txt ./
RUN . $VENV_DIR/bin/activate && pip install --no-cache-dir -r requirements-cloudrun.txt

RUN apt update
# install git as we need it for the git clone client
RUN apt install git -y

CMD . $VENV_DIR/bin/activate && gunicorn --bind :$PORT apollo.interfaces.cloudrun.main:app

FROM public.ecr.aws/lambda/python:3.11 AS lambda
Expand Down
9 changes: 9 additions & 0 deletions apollo/agent/proxy_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def _get_proxy_client_looker(
return LookerProxyClient(credentials=credentials, platform=platform)


def _get_proxy_client_git(
credentials: Optional[Dict], platform: str, **kwargs # type: ignore
) -> BaseProxyClient:
from apollo.integrations.git.git_proxy_client import GitProxyClient

return GitProxyClient(credentials=credentials, platform=platform)


@dataclass
class ProxyClientCacheEntry:
created_time: datetime
Expand All @@ -74,6 +82,7 @@ class ProxyClientCacheEntry:
"http": _get_proxy_client_http,
"storage": _get_proxy_client_storage,
"looker": _get_proxy_client_looker,
"git": _get_proxy_client_git,
}


Expand Down
9 changes: 9 additions & 0 deletions apollo/integrations/gcs/gcs_base_reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ def download_file(self, key: str, download_path: str) -> None:
bucket: Bucket = self._client.get_bucket(self._bucket_name)
bucket.blob(key).download_to_filename(download_path)

def upload_file(self, key: str, local_file_path: str) -> None:
"""
Uploads the file at `local_file_path` to `key` in the associated bucket.
:param key: path to the file, for example /dir/name.ext
:param local_file_path: local path to the file to upload.
"""
bucket: Bucket = self._client.get_bucket(self._bucket_name)
bucket.blob(key).upload_from_filename(local_file_path)

@convert_gcs_errors
def managed_download(self, key: str, download_path: str):
"""
Expand Down
171 changes: 171 additions & 0 deletions apollo/integrations/git/git_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import base64
import logging
import os
import re
import shutil
from dataclasses import dataclass
from itertools import chain
from pathlib import Path
from typing import Dict, List, Generator

import git

logger = logging.getLogger(__name__)


@dataclass
class GitFileData:
name: str
content: str


class GitCloneClient:
"""
Git client used to clone a repo, both ssh and https protocols are supported.
It exposes a method that returns an iterator for all files matching a list of extensions.
"""

# Only /tmp is writable in lambda.
_REPO_DIR = Path("/tmp/repo")
_KEY_FILE = Path("/tmp/mcd_rsa")

GIT_TYPE = {"ssh": "SSH", "https": "HTTPS"}

def __init__(self, credentials: Dict, **kwargs): # type: ignore
self._repo_url = credentials["repo_url"]
self._token = credentials.get("token")
self._username = credentials.get("username")
self._ssh_key = base64.b64decode(credentials.get("ssh_key", ""))

if self._token:
# remove https if it was included for https calls
self._repo_url = self._repo_url.lstrip("https://")

def get_files(
self, file_extensions: List[str]
) -> Generator[GitFileData, None, None]:
"""
Main method, reads the content of all files filtering by the specified extensions.
:param file_extensions: list of extensions to filter the returned files by, for example "lkml".
:return: a generator that returns GitFileData objects for each file with on of the specified extensions.
"""
if self._ssh_key:
self.write_key()
self.delete_repo_dir() # Prepare
self.git_clone()
yield from self.read_files(file_extensions)
self.delete_repo_dir() # Clean up

def delete_repo_dir(self):
"""Delete a directory if it exists."""
if self._REPO_DIR.exists():
logger.info(f"Delete repo dir: {self._REPO_DIR}")
shutil.rmtree(self._REPO_DIR)

@staticmethod
def git_version() -> Dict:
"""
Executes `git --version` and returns the output in a dictionary with two keys: `stdout` and `stderr`.
:return: the output for `git --version`.
"""
stdout, stderr = git.exec_command("--version")
return {
"stdout": stdout.decode("utf-8") if stdout else "",
"stderr": stderr.decode("utf-8") if stderr else "",
}

def git_clone(self):
"""
Clones a git repo.

Will use ssh if an ssh key is provided and will use https if it is not present.
"""
if self._ssh_key:
self._ssh_git_clone()
else:
self._https_git_clone()

def read_files(
self, file_extensions: List[str]
) -> Generator[GitFileData, None, None]:
"""
Traverse a directory, selecting only files with the given extensions. It is important for this to return
a generator to avoid loading the content of all files into memory.
"""
logger.info(f'Read files with extensions: {",".join(file_extensions)}')
globs = [self._REPO_DIR.rglob(f"*.{ext}") for ext in file_extensions]
for file in chain(*globs): # globs are generators, need to be chained.
if file.is_file():
original_name = str(
file.relative_to(self._REPO_DIR)
) # Drop local dir from file name.
yield GitFileData(
original_name, file.read_text(errors="replace")
) # Replace encoding errors with "?".

def _https_git_clone(self):
"""
Clone a git repo. It uses Https and a git authorization token.

"repo_url" can be a full git https URL (https://server/project.git) or the shorter version
(server/project.git).
"""
logger.info(f"Clone repo: {self._repo_url}")
url = f"https://oauth2:{self._token}@{self._repo_url}"
if self._username:
# This allows for support of bitbucket as they handle access tokens slightly differently
url = f"https://{self._username}:{self._token}@{self._repo_url}"
# Use depth 1 to bring only the latest revision.
git_params = ["clone", "--depth", "1", url, str(self._REPO_DIR)]
try:
git.exec_command(*git_params)
except git.exceptions.GitExecutionError as e:
password_removed_message = self._replace_passwords_in_urls(str(e))

raise git.exceptions.GitExecutionError(password_removed_message)

@staticmethod
def _replace_passwords_in_urls(text: str, placeholder: str = "********"):
pattern = r"(?<=://)(.*?:)(.*?)(?=@)"
replaced_text = re.sub(pattern, r"\1" + placeholder, text)
return replaced_text

def write_key(self):
"""Write SSH key to a file, overwriting it if file exists."""
if self._ssh_key:
if self._KEY_FILE.exists():
logger.info(f"Key already exists: {self._KEY_FILE}, overwrite it")
self._KEY_FILE.chmod(0o600)
logger.info(f"Write key to file: {self._KEY_FILE}")
self._KEY_FILE.write_bytes(self._ssh_key)
self._KEY_FILE.chmod(0o400)

def _ssh_git_clone(self):
"""
Clone a git repo. It uses GIT_SSH_COMMAND to pass the SSH key and to set SSH options necessary to run git in
the Lambda environment.

"repo_url" can be a full ssh URL (ssh://[user@]server/project.git) or the shorter version
([user@]server:project.git).
"""
logger.info(f"Clone repo: {self._repo_url}")
ssh_options = " ".join(
[
"-v", # Verbose helps in case of problems
"-o StrictHostKeyChecking=no", # We do not know all the possible hosts, so do not check
"-o UserKnownHostsFile=/dev/null", # Do not write known_hosts
"-o GlobalKnownHostsFile=/dev/null", # Do not write known_hosts
f"-i {self._KEY_FILE}", # Use this key
]
)
env = {**os.environ, **{"GIT_SSH_COMMAND": f"ssh {ssh_options}"}}
# Use depth 1 to bring only the latest revision.
git_params = ["clone", "--depth", "1", self._repo_url, str(self._REPO_DIR)]
git.exec_command(*git_params, env=env)

@property
def repo_url(self):
"""
Returns the url used to configure this client in `credentials["repo_url"]`.
"""
return self._repo_url
61 changes: 61 additions & 0 deletions apollo/integrations/git/git_proxy_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import uuid
import zipfile
from typing import Dict, Optional, List, Generator

from apollo.agent.utils import AgentUtils
from apollo.integrations.base_proxy_client import BaseProxyClient
from apollo.integrations.git.git_client import GitCloneClient, GitFileData
from apollo.integrations.storage.storage_proxy_client import StorageProxyClient

ZIP_FILE_EXPIRATION = 15 * 60 # 15 minutes


class GitProxyClient(BaseProxyClient):
"""
Git Clone Proxy Client, clones the requested repo, uploads a zip file with its contents to the associated bucket
and returns a pre-signed url to download it.
"""

def __init__(self, credentials: Optional[Dict], platform: str, **kwargs): # type: ignore
"""
Credentials are expected to include:
- repo_url
- ssh_key
- username (if ssh_key not specified)
- token (if ssh_key not specified)
"""
if not credentials:
raise ValueError("Credentials are required for Git")
self._platform = platform
self._client = GitCloneClient(credentials=credentials)

@property
def wrapped_client(self):
return self._client

def get_files(self, file_extensions: List[str]) -> Dict:
"""
Clones the repo, filters the files with the given extensions, uploads a zip file to the associated bucket and
returns a pre-signed url to download it.
:param file_extensions: a list of file extensions to filter the repository contents.
:return: a dictionary with two keys: `key` with the path to the file in the bucket and `url` with the
pre-signed url to download it.
"""
files = self._client.get_files(file_extensions)
zip_file_path = self._zip_file(files)
storage_client = StorageProxyClient(self._platform)

key = f"/tmp/{uuid.uuid4()}.zip"
storage_client.upload_file(key, zip_file_path)
url = storage_client.generate_presigned_url(key, ZIP_FILE_EXPIRATION)
os.remove(zip_file_path)
return {"key": key, "url": url}

@staticmethod
def _zip_file(files: Generator[GitFileData, None, None]) -> str:
tmp_path = AgentUtils.temp_file_path()
with zipfile.ZipFile(tmp_path, mode="w") as zf:
for file in files:
zf.writestr(file.name, file.content)
return tmp_path
8 changes: 8 additions & 0 deletions apollo/integrations/s3/s3_base_reader_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ def download_file(self, key: str, download_path: str) -> None:
self._bucket_name, key, download_path
)

def upload_file(self, key: str, local_file_path: str) -> None:
"""
Uploads the file at `local_file_path` to `key` in the associated bucket.
:param key: path to the file, for example /dir/name.ext
:param local_file_path: local path to the file to upload.
"""
self.s3_client.upload_file(local_file_path, self._bucket_name, key)

def managed_download(self, key: str, download_path: str):
"""
Performs a managed transfer that might be multipart, downloads the file at `key` to the local file at
Expand Down
9 changes: 9 additions & 0 deletions apollo/integrations/storage/base_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ def download_file(self, key: str, download_path: str) -> None:
"""
raise NotImplementedError()

@abstractmethod
def upload_file(self, key: str, local_file_path: str) -> None:
"""
Uploads the file at `local_file_path` to `key` in the associated bucket.
:param key: path to the file, for example /dir/name.ext
:param local_file_path: local path to the file to upload.
"""
raise NotImplementedError()

def read_json(self, key: str) -> Dict:
"""
Returns the contents as a dictionary of the JSON file at `key`.
Expand Down
22 changes: 20 additions & 2 deletions apollo/integrations/storage/storage_proxy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def get_error_type(self, error: Exception) -> Optional[str]:
"""
Returns an error type string for the given exception, this is used client side to create again the required
exception type.
:param error: the exception occurred.
:return: an error type if the exception is mapped to an error type for this client, `None` otherwise.
"""
if isinstance(error, BaseStorageClient.PermissionsError):
return _ERROR_TYPE_PERMISSIONS
Expand All @@ -77,6 +79,9 @@ def get_error_type(self, error: Exception) -> Optional[str]:
return super().get_error_type(error)

def log_payload(self, operation: AgentOperation) -> Dict:
"""
Implements `log_payload` from `BaseProxyClient` to include the bucket name in log messages for this client.
"""
payload: Dict[str, Any] = {
**super().log_payload(operation),
"bucket_name": self._client.bucket_name,
Expand All @@ -85,15 +90,27 @@ def log_payload(self, operation: AgentOperation) -> Dict:

def download_file(self, key: str) -> BinaryIO:
"""
Downloads the file to a temporary file and returns a BinaryIO object with the contents
Downloads the file to a temporary file and returns a BinaryIO object with the contents.
:param key: path to the file in the bucket
:return: BinaryIO object with the contents of the file.
"""
path = AgentUtils.temp_file_path()
self._client.download_file(key, path)
return AgentUtils.open_file(path)

def upload_file(self, key: str, local_file_path: str):
"""
Uploads the local file at `local_file_path` to `key` in the associated bucket
:param key: target path in the bucket for the uploaded file
:param local_file_path: local path of the file to upload.
"""
self._client.upload_file(key, local_file_path)

def managed_download(self, key: str) -> BinaryIO:
"""
Downloads the file to a temporary file and returns a BinaryIO object with the contents
Downloads the file to a temporary file and returns a BinaryIO object with the contents.
:param key: path to the file in the bucket.
:return: BinaryIO object with the contents of the file.
"""
path = AgentUtils.temp_file_path()
self._client.managed_download(key, path)
Expand All @@ -115,6 +132,7 @@ def generate_presigned_url(self, key: str, expiration: int) -> str:
"""
Generates a pre-signed URL, converts the received expiration seconds to timedelta as that's the
parameter type required by the storage client.
:param key: path to the file in the bucket
"""
return self._client.generate_presigned_url(
key=key, expiration=timedelta(seconds=expiration)
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ flask==2.3.3
google-api-python-client==2.98.0
google-cloud-storage==2.10.0
gunicorn==21.2.0
lambda-git==0.1.1
looker-sdk==23.16.0
retry2==0.9.5
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ jmespath==1.0.1
# via
# boto3
# botocore
lambda-git==0.1.1
# via -r requirements.in
looker-sdk==23.16.0
# via -r requirements.in
lz4==4.3.2
Expand Down
Loading