From 947d58115dad45c7b5d503e398113f7dc627823d Mon Sep 17 00:00:00 2001 From: Abhilasha Lodha <77407100+AbhilashaLodha@users.noreply.github.com> Date: Thu, 18 Jul 2024 00:50:44 -0700 Subject: [PATCH] google drive crawler (#101) * google drive crawler * updated gdrive crawler config * Refactored GdriveCrawler to use date filtering and removed local storage for processed files, and handling PDF files * Refactor: Switch to using index_file() for direct uploads, sanitize filenames * added numpy * Refactor gdrive_crawler.py: use slugify, logging.info, adjust date comparison, and rename byte_stream * standardize date handling * changed the file to earlier versions * changed crawing to crawling * removed redundant checks on dates and clubbed download() and export() into one * resolving commit issues * minor fixes * small mypy fix * updated Docker load of credentials.json * added typing annotations --------- Co-authored-by: Abhilasha Lodha Co-authored-by: Ofer Mendelevitch --- Dockerfile | 2 +- config/vectara-gdrive.yaml | 12 +++ crawlers/gdrive_crawler.py | 175 +++++++++++++++++++++++++++++++++++++ ingest.py | 13 ++- run.sh | 1 + 5 files changed, 200 insertions(+), 3 deletions(-) create mode 100644 config/vectara-gdrive.yaml create mode 100644 crawlers/gdrive_crawler.py diff --git a/Dockerfile b/Dockerfile index 65d0bee..24eb621 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,7 +31,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* -# install python packages +# Install python packages WORKDIR ${HOME} COPY requirements.txt requirements-extra.txt $HOME/ RUN pip install --no-cache-dir torch==2.1.2 --index-url https://download.pytorch.org/whl/cpu diff --git a/config/vectara-gdrive.yaml b/config/vectara-gdrive.yaml new file mode 100644 index 0000000..524650c --- /dev/null +++ b/config/vectara-gdrive.yaml @@ -0,0 +1,12 @@ +vectara: + corpus_id: 277 + customer_id: 1526022105 + reindex: true + +crawling: + crawler_type: gdrive + +gdrive_crawler: + delegated_users: + - + - diff --git a/crawlers/gdrive_crawler.py b/crawlers/gdrive_crawler.py new file mode 100644 index 0000000..6113fbe --- /dev/null +++ b/crawlers/gdrive_crawler.py @@ -0,0 +1,175 @@ +import os +from core.crawler import Crawler +from omegaconf import OmegaConf +import logging +import io +from datetime import datetime, timedelta +from google.oauth2 import service_account +from googleapiclient.discovery import build, Resource +from googleapiclient.errors import HttpError +from googleapiclient.http import MediaIoBaseDownload +import pandas as pd +from slugify import slugify +from typing import List, Tuple, Optional + +SCOPES = ["https://www.googleapis.com/auth/drive.readonly"] +SERVICE_ACCOUNT_FILE = '/home/vectara/env/credentials.json' + +def get_credentials(delegated_user: str) -> service_account.Credentials: + credentials = service_account.Credentials.from_service_account_file( + SERVICE_ACCOUNT_FILE, scopes=SCOPES) + delegated_credentials = credentials.with_subject(delegated_user) + return delegated_credentials + +def download_or_export_file(service: Resource, file_id: str, mime_type: Optional[str] = None) -> Optional[io.BytesIO]: + try: + if mime_type: + request = service.files().export_media(fileId=file_id, mimeType=mime_type) + else: + request = service.files().get_media(fileId=file_id) + + byte_stream = io.BytesIO() # an in-memory bytestream + downloader = MediaIoBaseDownload(byte_stream, request) + done = False + while not done: + status, done = downloader.next_chunk() + logging.info(f"Download {int(status.progress() * 100)}.") + byte_stream.seek(0) # Reset the file pointer to the beginning + return byte_stream + except HttpError as error: + logging.info(f"An error occurred: {error}") + return None + # Note: Handling of large files that may exceed memory limits should be implemented if necessary. + +def save_local_file(service: Resource, file_id: str, name: str, mime_type: Optional[str] = None) -> Optional[str]: + sanitized_name = slugify(name) + file_path = os.path.join("/tmp", sanitized_name) + try: + byte_stream = download_or_export_file(service, file_id, mime_type) + if byte_stream: + with open(file_path, 'wb') as f: + f.write(byte_stream.read()) + return file_path + except Exception as e: + logging.info(f"Error saving local file: {e}") + return None + +class GdriveCrawler(Crawler): + + def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None: + super().__init__(cfg, endpoint, customer_id, corpus_id, api_key) + logging.info("Google Drive Crawler initialized") + + self.delegated_users = cfg.gdrive_crawler.delegated_users + self.creds = None + self.service = None + + def list_files(self, service: Resource, parent_id: Optional[str] = None, date_threshold: Optional[str] = None) -> List[dict]: + results = [] + page_token = None + query = f"('{parent_id}' in parents or sharedWithMe) and trashed=false and modifiedTime > '{date_threshold}'" if parent_id else f"('root' in parents or sharedWithMe) and trashed=false and modifiedTime > '{date_threshold}'" + + while True: + try: + params = { + 'fields': 'nextPageToken, files(id, name, mimeType, permissions, modifiedTime, createdTime, owners, size)', + 'q': query, + 'corpora': 'allDrives', + 'includeItemsFromAllDrives': True, + 'supportsAllDrives': True + } + if page_token: + params['pageToken'] = page_token + response = service.files().list(**params).execute() + files = response.get('files', []) + for file in files: + permissions = file.get('permissions', []) + if any(p.get('displayName') == 'Vectara' or p.get('displayName') == 'all' for p in permissions): + results.append(file) + page_token = response.get('nextPageToken', None) + if not page_token: + break + except HttpError as error: + logging.info(f"An error occurred: {error}") + break + return results + + def handle_file(self, file: dict) -> Tuple[Optional[str], Optional[str]]: + file_id = file['id'] + mime_type = file['mimeType'] + name = file['name'] + permissions = file.get('permissions', []) + + logging.info(f"\nHandling file: {name} with MIME type: {mime_type}") + + if not any(p.get('displayName') == 'Vectara' or p.get('displayName') == 'all' for p in permissions): + logging.info(f"Skipping restricted file: {name}") + return None + + if mime_type == 'application/vnd.google-apps.document': + local_file_path = save_local_file(self.service, file_id, name + '.docx', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') + url = f'https://docs.google.com/document/d/{file_id}/edit' + elif mime_type == 'application/vnd.google-apps.spreadsheet': + local_file_path = save_local_file(self.service, file_id, name + '.csv', 'text/csv') + url = f'https://docs.google.com/spreadsheets/d/{file_id}/edit' + elif mime_type == 'application/vnd.google-apps.presentation': + local_file_path = save_local_file(self.service, file_id, name + '.pptx', 'application/vnd.openxmlformats-officedocument.presentationml.presentation') + url = f'https://docs.google.com/presentation/d/{file_id}/edit' + elif mime_type.startswith('application/'): + local_file_path = save_local_file(self.service, file_id, name) + if local_file_path and name.endswith('.xlsx'): + df = pd.read_excel(local_file_path) + csv_file_path = local_file_path.replace('.xlsx', '.csv') + df.to_csv(csv_file_path, index=False) + local_file_path = csv_file_path + url = f'https://drive.google.com/file/d/{file_id}/view' + else: + logging.info(f"Unsupported file type: {mime_type}") + return None, None + + if local_file_path: + logging.info(f"local_file_path :: {local_file_path}") + return local_file_path, url + else: + logging.info("local_file_path :: None") + return None, None + + def crawl_file(self, file: dict) -> None: + local_file_path, url = self.handle_file(file) + if local_file_path: + file_id = file['id'] + name = file['name'] + created_time = file.get('createdTime', 'N/A') + modified_time = file.get('modifiedTime', 'N/A') + owners = ', '.join([owner['displayName'] for owner in file.get('owners', [])]) + size = file.get('size', 'N/A') + + logging.info(f'\nCrawling file {name}') + + file_metadata = { + 'id': file_id, + 'name': name, + 'created_at': created_time, + 'modified_at': modified_time, + 'owners': owners, + 'size': size, + 'source': 'gdrive' + } + + try: + self.indexer.index_file(filename=local_file_path, uri=url, metadata=file_metadata) + except Exception as e: + logging.info(f"Error {e} indexing document for file {name}, file_id {file_id}") + + def crawl(self) -> None: + N = 7 # Number of days to look back + date_threshold = datetime.utcnow() - timedelta(days=N) + + for user in self.delegated_users: + logging.info(f"Processing files for user: {user}") + self.creds = get_credentials(user) + self.service = build("drive", "v3", credentials=self.creds) + + list_files = self.list_files(self.service, date_threshold=date_threshold.isoformat() + 'Z') + for file in list_files: + self.crawl_file(file) \ No newline at end of file diff --git a/ingest.py b/ingest.py index 903804b..b00ee55 100644 --- a/ingest.py +++ b/ingest.py @@ -14,6 +14,7 @@ from authlib.integrations.requests_client import OAuth2Session def instantiate_crawler(base_class, folder_name: str, class_name: str, *args, **kwargs) -> Any: # type: ignore + logging.info(f'inside instantiate crawler') sys.path.insert(0, os.path.abspath(folder_name)) crawler_name = class_name.split('Crawler')[0] @@ -27,6 +28,7 @@ def instantiate_crawler(base_class, folder_name: str, class_name: str, *args, ** raise TypeError(f"{class_name} is not a subclass of {base_class.__name__}") # Instantiate the class and return the instance + logging.info(f'end of instantiate crawler') return class_(*args, **kwargs) def get_jwt_token(auth_url: str, auth_id: str, auth_secret: str, customer_id: str) -> Any: @@ -76,6 +78,8 @@ def main() -> None: if len(sys.argv) != 3: logging.info("Usage: python ingest.py ") return + + logging.info("Starting the Crawler...") config_name = sys.argv[1] profile_name = sys.argv[2] @@ -89,7 +93,7 @@ def main() -> None: if profile_name not in env_dict: logging.info(f'Profile "{profile_name}" not found in secrets.toml') return - + logging.info(f'Using profile "{profile_name}" from secrets.toml') # Add all keys from "general" section to the vectara config general_dict = env_dict.get('general', {}) for k,v in general_dict.items(): @@ -129,6 +133,7 @@ def main() -> None: # default (otherwise) - add to vectara config OmegaConf.update(cfg['vectara'], k, v) + logging.info("Configuration loaded...") endpoint = cfg.vectara.get("endpoint", "api.vectara.io") customer_id = cfg.vectara.customer_id corpus_id = cfg.vectara.corpus_id @@ -136,8 +141,12 @@ def main() -> None: crawler_type = cfg.crawling.crawler_type # instantiate the crawler - crawler = instantiate_crawler(Crawler, 'crawlers', f'{crawler_type.capitalize()}Crawler', cfg, endpoint, customer_id, corpus_id, api_key) + crawler = instantiate_crawler( + Crawler, 'crawlers', f'{crawler_type.capitalize()}Crawler', + cfg, endpoint, customer_id, corpus_id, api_key + ) + logging.info("Crawling instantiated...") # When debugging a crawler, it is sometimes useful to reset the corpus (remove all documents) # To do that you would have to set this to True and also include and in the secrets.toml file # NOTE: use with caution; this will delete all documents in the corpus and is irreversible diff --git a/run.sh b/run.sh index 9034b0a..69c3161 100644 --- a/run.sh +++ b/run.sh @@ -23,6 +23,7 @@ fi # Mount secrets file into docker container mkdir -p ~/tmp/mount cp secrets.toml ~/tmp/mount +cp credentials.json ~/tmp/mount cp $1 ~/tmp/mount/ # Build docker container