Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

google drive crawler #101

Merged
merged 17 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/vectara-gdrive.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
vectara:
corpus_id: 277
customer_id: 1526022105
reindex: true

crawling:
crawler_type: gdrive

gdrive_crawler:
delegated_users:
- <add email id>
- <add email id>
276 changes: 276 additions & 0 deletions crawlers/gdrive_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
import json
import os
from core.crawler import Crawler
from omegaconf import OmegaConf
import logging
import io
from datetime import datetime
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
from docx import Document
import pandas as pd
import pptx
from typing import List

SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
SERVICE_ACCOUNT_FILE = 'credentials.json'
PROCESSED_FILES_RECORD = 'processed_files.json'

def get_credentials(delegated_user):
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES)
delegated_credentials = credentials.with_subject(delegated_user)
return delegated_credentials

def download_file(service, file_id):
try:
request = service.files().get_media(fileId=file_id)
file = io.BytesIO()
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
downloader = MediaIoBaseDownload(file, request)
done = False
while not done:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}.")
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
file.seek(0) # Reset the file pointer to the beginning
return file
except HttpError as error:
print(f"An error occurred: {error}")
return None

def export_file(service, file_id, mime_type):
try:
request = service.files().export_media(fileId=file_id, mimeType=mime_type)
file = io.BytesIO()
downloader = MediaIoBaseDownload(file, request)
done = False
while not done:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}.")
file.seek(0) # Reset the file pointer to the beginning
return file
except HttpError as error:
print(f"An error occurred: {error}")
return None

def extract_text_from_docx(docx_file):
doc = Document(docx_file)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)

def extract_text_from_csv(csv_file):
try:
df = pd.read_csv(csv_file)
return df.to_string()
except pd.errors.ParserError as e:
print(f"Error parsing CSV: {e}")
return ""

def extract_text_from_xlsx(xlsx_file):
try:
dfs = pd.read_excel(xlsx_file, sheet_name=None)
full_text = []
for sheet_name, df in dfs.items():
full_text.append(f"Sheet: {sheet_name}")
full_text.append(df.to_string())
return '\n'.join(full_text)
except Exception as e:
print(f"Error reading XLSX file: {e}")
return ""

def extract_text_from_txt(txt_file):
try:
return txt_file.read().decode('utf-8')
except Exception as e:
print(f"Error reading TXT file: {e}")
return ""

def extract_text_from_pptx(pptx_file):
prs = pptx.Presentation(pptx_file)
full_text = []
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text"):
full_text.append(shape.text)
return '\n'.join(full_text)

class GdriveCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str, delegated_users: List[str]) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
logging.info("Google Drive Crawler initialized")

self.delegated_users = delegated_users
self.creds = None
self.service = None
self.processed_files = self.load_processed_files()

def load_processed_files(self):
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
if os.path.exists(PROCESSED_FILES_RECORD):
with open(PROCESSED_FILES_RECORD, 'r') as file:
return set(json.load(file))
return set()

def save_processed_file(self, file_id):
self.processed_files.add(file_id)
with open(PROCESSED_FILES_RECORD, 'w') as file:
json.dump(list(self.processed_files), file)

def list_files(self, service, parent_id=None):
results = []
page_token = None
query = f"'{parent_id}' in parents" if parent_id else "'root' in parents or sharedWithMe"

while True:
try:
params = {
'fields': 'nextPageToken, files(id, name, mimeType, permissions)',
'q': query,
'corpora': 'allDrives',
'includeItemsFromAllDrives': True,
'supportsAllDrives': True
}
if page_token:
params['pageToken'] = page_token
response = service.files().list(**params).execute()
files = response.get('files', [])
for file in files:
permissions = file.get('permissions', [])
if any(p.get('displayName') == 'Vectara' or p.get('displayName') == 'all' for p in permissions):
results.append(file)
page_token = response.get('nextPageToken', None)
if not page_token:
break
except HttpError as error:
print(f"An error occurred: {error}")
break
return results

def handle_file(self, file):
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
file_id = file['id']
mime_type = file['mimeType']
name = file['name']
permissions = file.get('permissions', [])

print(f"Handling file: {name} with MIME type: {mime_type}")

if not any(p.get('displayName') == 'Vectara' or p.get('displayName') == 'all' for p in permissions):
print(f"Skipping restricted file: {name}")
return None

if file_id in self.processed_files:
print(f"Skipping already processed file: {name} (ID: {file_id})")
return None

self.processed_files.add(file_id)

if mime_type == 'application/vnd.google-apps.document':
file = export_file(self.service, file_id, 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
if file:
text = extract_text_from_docx(file)
self.save_processed_file(file_id)
return text
elif mime_type == 'application/vnd.google-apps.spreadsheet':
file = export_file(self.service, file_id, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
if file:
text = extract_text_from_xlsx(file)
self.save_processed_file(file_id)
return text
elif mime_type == 'application/vnd.google-apps.presentation':
file = export_file(self.service, file_id, 'application/vnd.openxmlformats-officedocument.presentationml.presentation')
if file:
text = extract_text_from_pptx(file)
self.save_processed_file(file_id)
return text
elif mime_type == 'application/vnd.openxmlformats-officedocument.wordprocessingml.document':
file = download_file(self.service, file_id)
if file:
text = extract_text_from_docx(file)
self.save_processed_file(file_id)
return text
elif mime_type == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet':
file = download_file(self.service, file_id)
if file:
text = extract_text_from_xlsx(file)
self.save_processed_file(file_id)
return text
elif mime_type == 'text/csv':
file = download_file(self.service, file_id)
if file:
text = extract_text_from_csv(file)
self.save_processed_file(file_id)
return text
elif mime_type == 'application/zip':
return "ZIP files cannot be processed directly."
elif mime_type == 'application/pdf':
file = download_file(self.service, file_id)
if file:
self.save_processed_file(file_id)
return "PDF files are not currently supported."
elif mime_type == 'application/vnd.google-apps.shortcut':
self.save_processed_file(file_id)
return "Shortcuts are not supported."
elif mime_type == 'application/vnd.google-apps.folder':
files = self.list_files(self.service, file_id)
for f in files:
self.handle_file(f)
self.save_processed_file(file_id)
return f"Folder '{name}' with ID '{file_id}'"
elif mime_type == 'application/vnd.google-apps.form':
self.save_processed_file(file_id)
return f"Google Form '{name}' with ID '{file_id}' (Forms content requires Google Forms API)"
else:
print(f"Unsupported file type: {mime_type}")
return None

def crawl_file(self, file):
text = self.handle_file(file)
if text:
file_id = file['id']
mime_type = file['mimeType']
name = file['name']
created_time = file.get('createdTime', 'N/A')
modified_time = file.get('modifiedTime', 'N/A')
owners = ', '.join([owner['displayName'] for owner in file.get('owners', [])])
size = file.get('size', 'N/A')

print(f'\n\nCrawling file {name}')

file_metadata = {
'id': file_id,
'mimeType': mime_type,
'name': name,
'created_at': created_time,
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
'modified_at': modified_time,
'owners': owners,
'size': size,
'source': 'gdrive'
}

file_doc = {
'documentId': f'google-drive-{file_id}',
'title': name,
'metadataJson': json.dumps(file_metadata),
'section': [{
'title': name,
'text': text,
}]
}

try:
self.indexer.index_document(file_doc)
except Exception as e:
logging.info(f"Error {e} indexing document for file {name}, file_id {file_id}")

def crawl(self) -> None:
for user in self.delegated_users:
print(f"Processing files for user: {user}")
self.creds = get_credentials(user)
self.service = build("drive", "v3", credentials=self.creds)

list_files = self.list_files(self.service)
for file in list_files:
self.crawl_file(file)
23 changes: 21 additions & 2 deletions ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from authlib.integrations.requests_client import OAuth2Session

def instantiate_crawler(base_class, folder_name: str, class_name: str, *args, **kwargs) -> Any: # type: ignore
logging.info(f'inside instantiate crawler')
sys.path.insert(0, os.path.abspath(folder_name))

crawler_name = class_name.split('Crawler')[0]
Expand All @@ -27,6 +28,7 @@ def instantiate_crawler(base_class, folder_name: str, class_name: str, *args, **
raise TypeError(f"{class_name} is not a subclass of {base_class.__name__}")

# Instantiate the class and return the instance
logging.info(f'end of instantiate crawler')
return class_(*args, **kwargs)

def get_jwt_token(auth_url: str, auth_id: str, auth_secret: str, customer_id: str) -> Any:
Expand Down Expand Up @@ -76,6 +78,8 @@ def main() -> None:
if len(sys.argv) != 3:
logging.info("Usage: python ingest.py <config_file> <secrets-profile>")
return

logging.info("Starting the Crawler...")
config_name = sys.argv[1]
profile_name = sys.argv[2]

Expand All @@ -89,7 +93,7 @@ def main() -> None:
if profile_name not in env_dict:
logging.info(f'Profile "{profile_name}" not found in secrets.toml')
return

logging.info(f'Using profile "{profile_name}" from secrets.toml')
# Add all keys from "general" section to the vectara config
general_dict = env_dict.get('general', {})
for k,v in general_dict.items():
Expand Down Expand Up @@ -129,15 +133,30 @@ def main() -> None:
# default (otherwise) - add to vectara config
OmegaConf.update(cfg['vectara'], k, v)

logging.info(f"Configuration loaded...")
endpoint = cfg.vectara.get("endpoint", "api.vectara.io")
customer_id = cfg.vectara.customer_id
corpus_id = cfg.vectara.corpus_id
api_key = cfg.vectara.api_key
crawler_type = cfg.crawling.crawler_type

# instantiate the crawler
crawler = instantiate_crawler(Crawler, 'crawlers', f'{crawler_type.capitalize()}Crawler', cfg, endpoint, customer_id, corpus_id, api_key)
# crawler = instantiate_crawler(Crawler, 'crawlers', f'{crawler_type.capitalize()}Crawler', cfg, endpoint, customer_id, corpus_id, api_key)

# Conditionally extract delegated_users if the crawler type is gdrive
if crawler_type == "gdrive":
delegated_users = cfg.gdrive_crawler.delegated_users
crawler = instantiate_crawler(
Crawler, 'crawlers', f'{crawler_type.capitalize()}Crawler',
cfg, endpoint, customer_id, corpus_id, api_key, delegated_users
)
else:
crawler = instantiate_crawler(
Crawler, 'crawlers', f'{crawler_type.capitalize()}Crawler',
cfg, endpoint, customer_id, corpus_id, api_key
)

logging.info(f"Crawing instantiated...")
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
# When debugging a crawler, it is sometimes useful to reset the corpus (remove all documents)
# To do that you would have to set this to True and also include <auth_url> and <auth_id> in the secrets.toml file
# NOTE: use with caution; this will delete all documents in the corpus and is irreversible
Expand Down
7 changes: 5 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ biopython==1.81
boto3==1.26.116
mwviews==0.2.1
toml==0.10.2
pandas==1.3.5
pandas==2.2.2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to upgrade pandas and numpy?
This may break other things, so unless it's tested thoroughly I would avoid without a specific need.

python-dateutil==2.8.2
playwright==1.41.2
pdf2image>=1.16
Expand Down Expand Up @@ -52,4 +52,7 @@ pydub==0.25.1
pytube==15.0.0
openai-whisper==20231117
youtube-transcript-api==0.6.2
sec-downloader==0.11.1
sec-downloader==0.11.1
python-docx==1.1.2
AbhilashaLodha marked this conversation as resolved.
Show resolved Hide resolved
openpyxl==3.1.4
python-pptx==0.6.23