Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add DocumentCleaner 2.0 #5976

Merged
merged 11 commits into from
Oct 13, 2023
3 changes: 2 additions & 1 deletion haystack/preview/components/preprocessors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from haystack.preview.components.preprocessors.text_document_cleaner import TextDocumentCleaner
from haystack.preview.components.preprocessors.text_document_splitter import TextDocumentSplitter

__all__ = ["TextDocumentSplitter"]
__all__ = ["TextDocumentSplitter", "TextDocumentCleaner"]
205 changes: 205 additions & 0 deletions haystack/preview/components/preprocessors/text_document_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
import re
from copy import deepcopy
from functools import partial, reduce
from itertools import chain
from typing import Any, Dict, Generator, List, Optional, Set

from haystack.preview import Document, component, default_from_dict, default_to_dict


@component
class TextDocumentCleaner:
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
"""
Makes text documents more readable by cleaning empty lines, extra whitespaces, headers and footers, etc.
This is useful for preparing the documents for further processing by LLMs.
"""
julian-risch marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
remove_empty_lines: bool = True,
remove_extra_whitespaces: bool = True,
remove_repeated_substrings: bool = False,
remove_substrings: Optional[List[str]] = None,
remove_regex: Optional[str] = None,
):
"""
:param remove_empty_lines: Whether to remove empty lines.
:param remove_extra_whitespaces: Whether to remove extra whitespaces.
:param remove_repeated_substrings: Whether to remove repeated substrings, such as headers and footers.
:param remove_substrings: List of substrings to remove from the text.
:param remove_regex: Regex to match and replace substrings by "".
"""

self.remove_empty_lines = remove_empty_lines
self.remove_extra_whitespaces = remove_extra_whitespaces
self.remove_repeated_substrings = remove_repeated_substrings
self.remove_substrings = remove_substrings
self.remove_regex = remove_regex

@component.output_types(documents=List[Document])
def run(self, documents: List[Document]):
if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
raise TypeError("TextDocumentCleaner expects a List of Documents as input.")

cleaned_docs = []
for doc in documents:
if doc.text is None:
raise ValueError(
f"TextDocumentCleaner only works with text documents but document.text for document ID {doc.id} is None."
)
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
text = doc.text

if self.remove_empty_lines:
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
text = self._remove_empty_lines(text)
if self.remove_extra_whitespaces:
text = self._remove_extra_whitespaces(text)
if self.remove_repeated_substrings:
text = self._remove_repeated_substrings(text)
if self.remove_substrings:
text = self._remove_substrings(text, self.remove_substrings)
if self.remove_regex:
text = self._remove_regex(text, self.remove_regex)

cleaned_docs.append(Document(text=text, metadata=deepcopy(doc.metadata)))
anakin87 marked this conversation as resolved.
Show resolved Hide resolved

return {"documents": cleaned_docs}

def to_dict(self) -> Dict[str, Any]:
"""
Serialize this component to a dictionary.
"""
return default_to_dict(
self,
clean_empty_lines=self.remove_empty_lines,
clean_whitespaces=self.remove_extra_whitespaces,
clean_repeated_substrings=self.remove_repeated_substrings,
)
julian-risch marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TextDocumentCleaner":
"""
Deserialize this component from a dictionary.
"""
return default_from_dict(cls, data)

def _remove_empty_lines(self, text: str) -> str:
"""
Remove empty lines and lines that contain nothing but whitespaces from text.
:param text: Text to clean.
"""
lines = text.split("\n")
non_empty_lines = filter(lambda line: line.strip() != "", lines)
return "\n".join(non_empty_lines)

def _remove_extra_whitespaces(self, text: str) -> str:
"""
Remove extra whitespaces from text.
:param text: Text to clean.
"""
return re.sub(r"\s\s+", " ", text).strip()

def _remove_regex(self, text: str, regex: str) -> str:
"""
Remove substrings that match the specified regex from the text.
:param text: Text to clean.
:param regex: Regex to match and replace substrings by "".
"""
return re.sub(regex, "", text).strip()

def _remove_substrings(self, text: str, substrings: List[str]) -> str:
"""
Remove all specified substrings from the text.
:param text: Text to clean.
:param substrings: Substrings to remove.
"""
for substring in substrings:
text = text.replace(substring, "")
return text

def _remove_repeated_substrings(self, text: str) -> str:
return self._find_and_remove_header_footer(
text, n_chars=300, n_first_pages_to_ignore=1, n_last_pages_to_ignore=1
anakin87 marked this conversation as resolved.
Show resolved Hide resolved
)

def _find_and_remove_header_footer(
self, text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int
) -> str:
"""
Heuristic to find footers and headers across different pages by searching for the longest common string.
For headers, we only search in the first n_chars characters (for footer: last n_chars).
Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX",
but won't detect "Page 3 of 4" or similar.

:param n_chars: number of first/last characters where the header/footer shall be searched in
:param n_first_pages_to_ignore: number of first pages to ignore (e.g. TOCs often don't contain footer/header)
:param n_last_pages_to_ignore: number of last pages to ignore
:return: (cleaned pages, found_header_str, found_footer_str)
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
"""

pages = text.split("\f")
anakin87 marked this conversation as resolved.
Show resolved Hide resolved

# header
start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
found_header = self._find_longest_common_ngram(start_of_pages)
if found_header:
pages = [page.replace(found_header, "") for page in pages]

# footer
end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]]
found_footer = self._find_longest_common_ngram(end_of_pages)
if found_footer:
pages = [page.replace(found_footer, "") for page in pages]
# logger.debug("Removed header '%s' and footer '%s' in document", found_header, found_footer)
text = "\f".join(pages)
return text

def _ngram(self, seq: str, n: int) -> Generator[str, None, None]:
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
"""
Return ngram (of tokens - currently split by whitespace)
:param seq: str, string from which the ngram shall be created
:param n: int, n of ngram
:return: str, ngram as string
"""

# In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization,
# we add a space here and remove it after creation of the ngrams again (see below)
seq = seq.replace("\n", " \n")
seq = seq.replace("\t", " \t")

words = seq.split(" ")
ngrams = (
" ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1)
)

return ngrams

def _allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]:
lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq))
ngrams = map(partial(self._ngram, seq), lengths)
res = set(chain.from_iterable(ngrams))
return res

def _find_longest_common_ngram(
self, sequences: List[str], max_ngram: int = 30, min_ngram: int = 3
) -> Optional[str]:
"""
Find the longest common ngram across different text sequences (e.g. start of pages).
Considering all ngrams between the specified range. Helpful for finding footers, headers etc.

:param sequences: list[str], list of strings that shall be searched for common n_grams
:param max_ngram: int, maximum length of ngram to consider
:param min_ngram: minimum length of ngram to consider
:return: str, common string of all sections
"""
sequences = [s for s in sequences if s] # filter empty sequences
if not sequences:
return None
seqs_ngrams = map(partial(self._allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences)
intersection = reduce(set.intersection, seqs_ngrams)

try:
longest = max(intersection, key=len)
except ValueError:
# no common sequence found
longest = ""
return longest if longest.strip() else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
preview:
- |
Add TextDocumentCleaner, which removes extra whitespace, empty lines, headers, etc. from Documents containing text.
Useful as a preprocessing step before splitting into shorter text documents.
115 changes: 115 additions & 0 deletions test/preview/components/preprocessors/test_text_document_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import pytest

from haystack.preview import Document
from haystack.preview.components.preprocessors import TextDocumentCleaner


class TestTextDocumentCleaner:
@pytest.mark.unit
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
def test_non_text_document(self):
with pytest.raises(
ValueError, match="TextDocumentCleaner only works with text documents but document.text for document ID"
):
cleaner = TextDocumentCleaner()
cleaner.run(documents=[Document()])

@pytest.mark.unit
def test_single_document(self):
with pytest.raises(TypeError, match="TextDocumentCleaner expects a List of Documents as input."):
cleaner = TextDocumentCleaner()
cleaner.run(documents=Document())

@pytest.mark.unit
def test_empty_list(self):
cleaner = TextDocumentCleaner()
result = cleaner.run(documents=[])
assert result == {"documents": []}

@pytest.mark.unit
def test_remove_empty_lines(self):
cleaner = TextDocumentCleaner(remove_extra_whitespaces=False)
result = cleaner.run(
documents=[
Document(
text="This is a text with some words. "
""
"There is a second sentence. "
""
"And there is a third sentence."
)
]
)
assert len(result["documents"]) == 1
assert (
result["documents"][0].text
== "This is a text with some words. There is a second sentence. And there is a third sentence."
)

@pytest.mark.unit
def test_remove_whitespaces(self):
cleaner = TextDocumentCleaner(remove_empty_lines=False)
result = cleaner.run(
documents=[
Document(
text=" This is a text with some words. "
""
"There is a second sentence. "
""
"And there is a third sentence. "
)
]
)
assert len(result["documents"]) == 1
assert result["documents"][0].text == (
"This is a text with some words. " "" "There is a second sentence. " "" "And there is a third sentence."
)

@pytest.mark.unit
def test_remove_substrings(self):
cleaner = TextDocumentCleaner(remove_substrings=["This", "A", "words"])
result = cleaner.run(documents=[Document(text="This is a text with some words.")])
assert len(result["documents"]) == 1
assert result["documents"][0].text == " is a text with some ."

@pytest.mark.unit
def test_remove_regex(self):
cleaner = TextDocumentCleaner(remove_regex=r"\s\s+")
result = cleaner.run(documents=[Document(text="This is a text with some words.")])
assert len(result["documents"]) == 1
assert result["documents"][0].text == "This is a text with some words."

@pytest.mark.unit
def test_remove_repeated_substrings(self):
cleaner = TextDocumentCleaner(
remove_empty_lines=False, remove_extra_whitespaces=False, remove_repeated_substrings=True
)

text = """First Page This is a header.
Page of
2
4
Lorem ipsum dolor sit amet
This is a footer number 1
This is footer number 2 This is a header.
Page of
3
4
Sid ut perspiciatis unde
This is a footer number 1
This is footer number 2 This is a header.
Page of
4
4
Sed do eiusmod tempor.
This is a footer number 1
This is footer number 2"""

expected_text = """First Page 2
4
Lorem ipsum dolor sit amet 3
4
Sid ut perspiciatis unde 4
4
Sed do eiusmod tempor."""
result = cleaner.run(documents=[Document(text=text)])
assert result["documents"][0].text == expected_text