Skip to content

Commit

Permalink
Upload with APIv2 (#138)
Browse files Browse the repository at this point in the history
* initial

* initial
  • Loading branch information
ofermend authored Dec 18, 2024
1 parent fe03363 commit aa258b6
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 34 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ Each configuration YAML file includes a set of standard variables, for example:
vectara:
# the corpus ID for indexing
corpus_id: 4
# the corpus key for indexing with APIv2
corpus_key: my-corpus-key
# the Vectara customer ID
customer_id: 1234567
Expand Down
3 changes: 2 additions & 1 deletion core/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ def __init__(
endpoint: str,
customer_id: str,
corpus_id: int,
corpus_key: str,
api_key: str,
) -> None:
self.cfg: DictConfig = DictConfig(cfg)
self.indexer = Indexer(cfg, endpoint, customer_id, corpus_id, api_key)
self.indexer = Indexer(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.verbose = cfg.vectara.get("verbose", False)
128 changes: 119 additions & 9 deletions core/indexer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import json
import html
import re
import os
from typing import Dict, Any, List, Optional
import uuid
Expand Down Expand Up @@ -52,12 +54,13 @@ class Indexer(object):
api_key (str): API key for the Vectara API.
"""
def __init__(self, cfg: OmegaConf, endpoint: str,
customer_id: str, corpus_id: int, api_key: str) -> None:
customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
self.cfg = cfg
self.browser_use_limit = 100
self.endpoint = endpoint
self.customer_id = customer_id
self.corpus_id = corpus_id
self.corpus_key = corpus_key
self.api_key = api_key
self.reindex = cfg.vectara.get("reindex", False)
self.verbose = cfg.vectara.get("verbose", False)
Expand Down Expand Up @@ -112,7 +115,6 @@ def setup(self, use_playwright: bool = True) -> None:
self.p = sync_playwright().start()
self.browser = self.p.firefox.launch(headless=True)
self.browser_use_count = 0
self.tmp_file = 'tmp_' + str(uuid.uuid4())
if self.store_docs:
self.store_docs_folder = '/home/vectara/env/indexed_docs_' + str(uuid.uuid4())
if os.path.exists(self.store_docs_folder):
Expand Down Expand Up @@ -365,9 +367,9 @@ def _list_docs(self) -> List[Dict[str, str]]:

return docs

def _index_file(self, filename: str, uri: str, metadata: Dict[str, Any]) -> bool:
def _index_file_v1(self, filename: str, uri: str, metadata: Dict[str, Any]) -> bool:
"""
Index a file on local file system by uploading it to the Vectara corpus.
Index a file on local file system by uploading it to the Vectara corpus, using APIv1.
Args:
filename (str): Name of the file to create.
uri (str): URI for where the document originated. In some cases the local file name is not the same, and we want to include this in the index.
Expand Down Expand Up @@ -409,6 +411,8 @@ def get_files(filename: str, metadata: dict):
else:
self.logger.info(f"REST upload for {uri} ({filename}) (reindex) failed with code = {response.status_code}, text = {response.text}")
return True
else:
self.logger.info(f"REST upload for {uri} failed with code {response.status_code}")
return False
elif response.status_code != 200:
self.logger.error(f"REST upload for {uri} failed with code {response.status_code}, text = {response.text}")
Expand All @@ -418,6 +422,69 @@ def get_files(filename: str, metadata: dict):
self.store_file(filename, url_to_filename(uri))
return True

def _index_file_v2(self, filename: str, uri: str, metadata: Dict[str, Any]) -> bool:
"""
Index a file on local file system by uploading it to the Vectara corpus, using APIv2
Args:
filename (str): Name of the file to create.
uri (str): URI for where the document originated. In some cases the local file name is not the same, and we want to include this in the index.
metadata (dict): Metadata for the document.
Returns:
bool: True if the upload was successful, False otherwise.
"""
if not os.path.exists(filename):
self.logger.error(f"File {filename} does not exist")
return False

post_headers = {
'Accept': 'application/json',
'x-api-key': self.api_key,
}
url = f"https://api.vectara.io/v2/corpora/{self.corpus_key}/upload_file"
files = {
'file': (filename.split('/')[-1], open(filename, 'rb')),
'metadata': (None, json.dumps(metadata), 'application/json'),
}
if self.summarize_tables:
files['table_extraction_config'] = (None, json.dumps({'extract_tables': True}), 'application/json')
response = self.session.request("POST", url, headers=post_headers, files=files)

if response.status_code == 400:
error_msg = json.loads(html.unescape(response.text))
if error_msg['httpCode'] == 409:
if self.reindex:
match = re.search(r"document id '([^']+)'", error_msg['details'])
if match:
doc_id = match.group(1)
else:
self.logger.error(f"Failed to extract document id from error message: {error_msg}")
return False
self.delete_doc(doc_id)
self.logger.info(f"DEBUG 1, Reindexing, document {doc_id}, url={url}, post_headers={post_headers}")
response = self.session.request("POST", url, headers=post_headers, files=files)
self.logger.info(f"DEBUG 2, response code={response.status_code}")
if response.status_code == 201:
self.logger.info(f"REST upload for {uri} successful (reindex)")
self.store_file(filename, url_to_filename(uri))
return True
else:
self.logger.info(f"REST upload for {uri} ({doc_id}) (reindex) failed with code = {response.status_code}, text = {response.text}")
return True
else:
self.logger.info(f"document {uri} already indexed, skipping")
return False
else:
self.logger.info(f"REST upload for {uri} failed with code {response.status_code}")
return False
return False
elif response.status_code != 201:
self.logger.error(f"REST upload for {uri} failed with code {response.status_code}, text = {response.text}")
return False

self.logger.info(f"REST upload for {uri} succeesful")
self.store_file(filename, url_to_filename(uri))
return True

def _index_document(self, document: Dict[str, Any], use_core_indexing: bool = False) -> bool:
"""
Index a document (by uploading it to the Vectara corpus) from the document dictionary
Expand Down Expand Up @@ -501,14 +568,14 @@ def index_url(self, url: str, metadata: Dict[str, Any], html_processing: dict =

# if file is going to download, then handle it as local file
if self.url_triggers_download(url):
file_path = self.tmp_file
file_path = "/tmp/" + slugify(url)
response = self.session.get(url, headers=get_headers, stream=True)
if response.status_code == 200:
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
self.logger.info(f"File downloaded successfully and saved as {file_path}")
res = self.index_file(file_path, url, metadata)
res = self.index_file(file_path, url, metadata)
safe_remove_file(file_path)
return res
else:
Expand Down Expand Up @@ -714,11 +781,11 @@ def index_file(self, filename: str, uri: str, metadata: Dict[str, Any]) -> bool:

# If we have a PDF file with size>50MB, or we want to use the summarize_tables option, then we parse locally and index
# Otherwise - send to Vectara's default upload file mechanism
openai_api_key = self.cfg.vectara.get("openai_api_key", None)
size_limit = 50
large_file_extensions = ['.pdf', '.html', '.htm', '.pptx', '.docx']
if (any(uri.endswith(extension) for extension in large_file_extensions) and
(get_file_size_in_MB(filename) >= size_limit or self.summarize_tables)):
openai_api_key = self.cfg.vectara.get("openai_api_key", None)
(get_file_size_in_MB(filename) >= size_limit or (self.summarize_tables and self.corpus_key is None))):
self.logger.info(f"Parsing file {filename}")
if self.doc_parser == "docling":
dp = DoclingDocumentParser(
Expand Down Expand Up @@ -772,8 +839,51 @@ def index_file(self, filename: str, uri: str, metadata: Dict[str, Any]) -> bool:
self.logger.info(f"For file {filename}, extracted text locally since file size is larger than {size_limit}MB")
return succeeded
else:
# Parse file content to extract images and get text content
self.logger.info(f"Reading contents of {filename} (url={uri})")
dp = UnstructuredDocumentParser(
verbose=self.verbose,
openai_api_key=openai_api_key,
summarize_tables=False,
summarize_images=self.summarize_images,
)
title, texts, metadatas, image_summaries = dp.parse(filename, uri)

# Get metadata from text content
if len(self.extract_metadata)>0:
all_text = "\n".join(texts)[:32768]
ex_metadata = get_attributes_from_text(
all_text,
metadata_questions=self.extract_metadata,
openai_api_key=openai_api_key
)
metadata.update(ex_metadata)
else:
ex_metadata = {}
metadata.update(ex_metadata)

# index the file within Vectara (use FILE UPLOAD API)
return self._index_file(filename, uri, metadata)
if self.corpus_key is None:
succeeded = self._index_file_v1(filename, uri, metadata)
self.logger.info(f"For {uri} - uploaded via Vectara file upload API v1")
else:
succeeded = self._index_file_v2(filename, uri, metadata)
self.logger.info(f"For {uri} - uploaded via Vectara file upload API v2")

# If needs to summarize images - then do it locally
if self.summarize_images and openai_api_key:
self.logger.info(f"Parsing images from {uri}")
self.logger.info(f"Extracted {len(image_summaries)} images from {uri}")
for inx,image_summary in enumerate(image_summaries):
if image_summary:
metadata = {'element_type': 'image'}
if ex_metadata:
metadata.update(ex_metadata)
doc_id = slugify(uri) + "_image_" + str(inx)
succeeded &= self.index_segments(doc_id=doc_id, texts=[image_summary], metadatas=metadatas,
doc_metadata=metadata, doc_title=title)

return succeeded

def index_media_file(self, file_path, metadata=None):
"""
Expand Down
3 changes: 2 additions & 1 deletion core/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def summarize_table_text(self, text: str):
prompt = f"""
Adopt the perspective of a professional data analyst, with expertise in generating insight from structured data.
Provide a detailed description of the results reported in this table, ensuring clarity, depth and relevance. Don't omit any data points.
Start with a description for each each row in the table. Then follow by a broader analysis of trends and insights, and conclude with an interpretation of the data.
Contextual Details:
- Examine the table headings, footnotes, or accompanying text to identify key contextual details such as the time period, location, subject area, and units of measurement.
- Always include the table title, time frame, and geographical or thematic scope in your description.
Expand All @@ -129,7 +130,7 @@ def summarize_table_text(self, text: str):
Clarity and Accuracy:
- Use clear and professional language, ensuring all descriptions are tied explicitly to the data.
- If uncertainties exist in the data or context, state them and clarify how they might impact the analysis.
Your response should be without headings.
Your response should be without headings, and in text (not markdown).
Table chunk: {text}
"""
try:
Expand Down
4 changes: 2 additions & 2 deletions crawlers/discourse_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def datetime_to_date(datetime_str: str) -> str:

class DiscourseCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.discourse_base_url = self.cfg.discourse_crawler.base_url
self.discourse_api_key = self.cfg.discourse_crawler.discourse_api_key
self.session = create_session_with_retries()
Expand Down
4 changes: 2 additions & 2 deletions crawlers/edgar_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def get_filings(ticker: str, start_date_str: str, end_date_str: str, filing_type

class EdgarCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.tickers = self.cfg.edgar_crawler.tickers
self.start_date = self.cfg.edgar_crawler.start_date
self.end_date = self.cfg.edgar_crawler.end_date
Expand Down
4 changes: 2 additions & 2 deletions crawlers/fmp_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
# To use this crawler you have to have an fmp API_key in your secrets.toml profile
class FmpCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
'''
Initialize the FmpCrawler
'''
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
cfg_dict: DictConfig = DictConfig(cfg)
self.tickers = cfg_dict.fmp_crawler.tickers
self.start_year = int(cfg_dict.fmp_crawler.start_year)
Expand Down
4 changes: 2 additions & 2 deletions crawlers/gdrive_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ def process(self, user: str) -> None:

class GdriveCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
logging.info("Google Drive Crawler initialized")

self.delegated_users = cfg.gdrive_crawler.delegated_users
Expand Down
4 changes: 2 additions & 2 deletions crawlers/github_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ def get_pr_comments(self, pull_number: int) -> List[Any]:

class GithubCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.github_token = self.cfg.github_crawler.get("github_token", None)
self.owner = self.cfg.github_crawler.owner
self.repos = self.cfg.github_crawler.repos
Expand Down
4 changes: 2 additions & 2 deletions crawlers/hackernews_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

class HackernewsCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.N_ARTICLES = self.cfg.hackernews_crawler.max_articles
self.days_back = self.cfg.hackernews_crawler.get("days_back", 3)
self.days_back_comprehensive = self.cfg.hackernews_crawler.get("days_back_comprehensive", False)
Expand Down
4 changes: 2 additions & 2 deletions crawlers/hubspot_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

class HubspotCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.hubspot_api_key = self.cfg.hubspot_crawler.hubspot_api_key

def crawl(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions crawlers/notion_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def extract_title(page):

class NotionCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.notion_api_key = self.cfg.notion_crawler.notion_api_key

def crawl(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions crawlers/pmc_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def get_top_n_papers(topic: str, n: int, email: str) -> Any:

class PmcCrawler(Crawler):

def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str) -> None:
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.site_urls: Set[str] = set()
self.crawled_pmc_ids: Set[str] = set()
self.rate_limiter = RateLimiter(self.cfg.pmc_crawler.get("num_per_second", 3))
Expand Down
4 changes: 2 additions & 2 deletions crawlers/slack_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ def contains_url(message):


class SlackCrawler(Crawler):
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, api_key: str):
super().__init__(cfg, endpoint, customer_id, corpus_id, api_key)
def __init__(self, cfg: OmegaConf, endpoint: str, customer_id: str, corpus_id: int, corpus_key: str, api_key: str):
super().__init__(cfg, endpoint, customer_id, corpus_id, corpus_key, api_key)
self.user_token = self.cfg.slack_crawler.slack_user_token
self.client = WebClient(token=self.user_token)
self.days_past = self.cfg.slack_crawler.get("days_past", None)
Expand Down
Loading

0 comments on commit aa258b6

Please sign in to comment.