Skip to content

Commit

Permalink
Fixes to CSV crawler (#123)
Browse files Browse the repository at this point in the history
* speed/memory improvements

* bugfix
  • Loading branch information
ofermend authored Nov 1, 2024
1 parent 6c5301b commit ef54b1c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
9 changes: 7 additions & 2 deletions core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,16 @@ def normalize_url(url: str, keep_query_params: bool = False) -> str:
if '://' not in url:
url = 'http://' + url
p = urlparse(url)
path = p.path if p.path and p.path != '/' else '/'
query = p.query if keep_query_params else ''
return ParseResult(p.scheme, p.netloc, p.path, '', query, '').geturl()
return ParseResult(p.scheme, p.netloc, path, '', query, '').geturl()

def clean_urls(urls: Set[str], keep_query_params: bool = False) -> List[str]:
return list(set(normalize_url(url, keep_query_params) for url in urls))
normalized_set = set()
for url in urls:
normalized_url = normalize_url(url, keep_query_params)
normalized_set.add(normalized_url)
return list(normalized_set)

def clean_email_text(text: str) -> str:
"""
Expand Down
43 changes: 22 additions & 21 deletions crawlers/csv_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def process(self, doc_id: str, df: pd.DataFrame) -> None:
for _, row in df.iterrows():
if self.title_column:
titles.append(str(row[self.title_column]))
text = ' - '.join(str(x) for x in row[self.text_columns].tolist() if x) + '\n'
text = ' - '.join(str(row[col]) for col in self.text_columns if pd.notnull(row[col])) + '\n'
texts.append(unicodedata.normalize('NFD', text))
md = {column: row[column] for column in self.metadata_columns if not pd.isnull(row[column])}
md = {column: row[column] for column in self.metadata_columns if pd.notnull(row[column])}
metadatas.append(md)
if len(df)>1:
logging.info(f"Indexing df for '{doc_id}' with {len(df)} rows")
Expand All @@ -52,40 +52,39 @@ def process(self, doc_id: str, df: pd.DataFrame) -> None:
title = titles[0] if titles else doc_id
self.indexer.index_segments(doc_id, texts=texts, titles=titles, metadatas=metadatas,
doc_title=title, doc_metadata = doc_metadata)
gc.collect()
self.count += 1
if self.count % 100==0:
logging.info(f"Indexed {self.count} documents in actor {ray.get_runtime_context().get_actor_id()}")

gc.collect()

class CsvCrawler(Crawler):

def index_dataframe(self, df: pd.DataFrame,
text_columns, title_column, metadata_columns, doc_id_columns,
rows_per_chunk: int = 500,
source: str = 'csv',
ray_workers: int = 0
) -> None:
all_columns = text_columns + metadata_columns
if title_column:
all_columns.append(title_column)

dfs_to_index = []
def generate_dfs_to_index(self, df: pd.DataFrame, doc_id_columns, rows_per_chunk: int):
if doc_id_columns:
grouped = df.groupby(doc_id_columns)
for name, group in grouped:
if isinstance(name, str):
doc_id = name
else:
doc_id = ' - '.join([str(x) for x in name if x])
dfs_to_index.append((doc_id, group))
yield (doc_id, group)
else:
if rows_per_chunk < len(df):
rows_per_chunk = len(df)
for inx in range(0, df.shape[0], rows_per_chunk):
sub_df = df[inx: inx+rows_per_chunk]
name = f'rows {inx}-{inx+rows_per_chunk-1}'
dfs_to_index.append((doc_id, sub_df))
yield (name, sub_df)

def index_dataframe(self, df: pd.DataFrame,
text_columns, title_column, metadata_columns, doc_id_columns,
rows_per_chunk: int = 500,
source: str = 'csv',
ray_workers: int = 0
) -> None:
all_columns = text_columns + metadata_columns
if title_column:
all_columns.append(title_column)

if ray_workers == -1:
ray_workers = psutil.cpu_count(logical=True)
Expand All @@ -98,10 +97,11 @@ def index_dataframe(self, df: pd.DataFrame,
for a in actors:
a.setup.remote()
pool = ray.util.ActorPool(actors)
_ = list(pool.map(lambda a, args_inx: a.process.remote(args_inx[0], args_inx[1]), dfs_to_index))
_ = list(pool.map(lambda a, args_inx: a.process.remote(args_inx[0], args_inx[1]),
self.generate_dfs_to_index(df, doc_id_columns, rows_per_chunk)))
else:
crawl_worker = DFIndexer(self.indexer, self, title_column, text_columns, metadata_columns, source)
for df_tuple in dfs_to_index:
for df_tuple in self.generate_dfs_to_index(df, doc_id_columns, rows_per_chunk):
crawl_worker.process(df_tuple[0], df_tuple[1])

def crawl(self) -> None:
Expand All @@ -119,7 +119,7 @@ def crawl(self) -> None:
try:
if orig_file_path.endswith('.csv'):
dtypes = {column: 'Int64' if column_types.get(column)=='int' else column_types.get(column, 'str')
for column in all_columns} # str if unspecified
for column in all_columns}
sep = self.cfg.csv_crawler.get("separator", ",")
df = pd.read_csv(file_path, usecols=all_columns, sep=sep, dtype=dtypes)
df = df.astype(object) # convert to native types
Expand All @@ -136,15 +136,16 @@ def crawl(self) -> None:

# make sure all ID columns are a string type
df[doc_id_columns] = df[doc_id_columns].astype(str)
orig_size = len(df)

select_condition = self.cfg.csv_crawler.get("select_condition", None)
if select_condition:
df = df.query(select_condition)
logging.info(f"Selected {len(df)} rows out of {orig_size} rows using the select condition")

# index the dataframe
rows_per_chunk = int(self.cfg.csv_crawler.get("rows_per_chunk", 500) if 'csv_crawler' in self.cfg else 500)
ray_workers = self.cfg.csv_crawler.get("ray_workers", 0)

logging.info(f"indexing {len(df)} rows from the file {file_path}")

self.index_dataframe(df, text_columns, title_column, metadata_columns, doc_id_columns, rows_per_chunk, source='csv', ray_workers=ray_workers)
2 changes: 1 addition & 1 deletion crawlers/notion_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def crawl(self) -> None:
logging.info(f"Indexing failed for notion page {page_id}")


# report pages crawled if specified
# report pages crawled if specified
if self.cfg.notion_crawler.get("crawl_report", False):
logging.info(f"Indexed {len(pages)} Pages. See pages_indexed.txt for a full report.")
with open('/home/vectara/env/pages_indexed.txt', 'w') as f:
Expand Down

0 comments on commit ef54b1c

Please sign in to comment.