Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add DocumentLanguageClassifier 2.0 #6037

Merged
merged 18 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions e2e/preview/pipelines/test_preprocessing_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json

from haystack.preview import Pipeline
from haystack.preview.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.preview.components.file_converters import TextFileToDocument
from haystack.preview.components.preprocessors import TextDocumentSplitter, DocumentCleaner, DocumentLanguageClassifier
from haystack.preview.components.routers import FileTypeRouter
from haystack.preview.components.writers import DocumentWriter
from haystack.preview.document_stores import MemoryDocumentStore


def test_preprocessing_pipeline(tmp_path):
# Create the pipeline and its components
document_store = MemoryDocumentStore()
preprocessing_pipeline = Pipeline()
preprocessing_pipeline.add_component(instance=FileTypeRouter(mime_types=["text/plain"]), name="file_type_router")
preprocessing_pipeline.add_component(instance=TextFileToDocument(), name="text_file_converter")
preprocessing_pipeline.add_component(instance=DocumentLanguageClassifier(), name="language_classifier")
preprocessing_pipeline.add_component(instance=DocumentCleaner(), name="cleaner")
preprocessing_pipeline.add_component(
instance=TextDocumentSplitter(split_by="sentence", split_length=1), name="splitter"
)
preprocessing_pipeline.add_component(
instance=SentenceTransformersDocumentEmbedder(model_name_or_path="sentence-transformers/all-MiniLM-L6-v2"),
name="embedder",
)
preprocessing_pipeline.add_component(instance=DocumentWriter(document_store=document_store), name="writer")
preprocessing_pipeline.connect("file_type_router.text/plain", "text_file_converter.paths")
preprocessing_pipeline.connect("text_file_converter.documents", "language_classifier.documents")
preprocessing_pipeline.connect("language_classifier.en", "cleaner.documents")
preprocessing_pipeline.connect("cleaner.documents", "splitter.documents")
preprocessing_pipeline.connect("splitter.documents", "embedder.documents")
preprocessing_pipeline.connect("embedder.documents", "writer.documents")

# Draw the pipeline
preprocessing_pipeline.draw(tmp_path / "test_preprocessing_pipeline.png")

# Serialize the pipeline to JSON
with open(tmp_path / "test_preprocessing_pipeline.json", "w") as f:
print(json.dumps(preprocessing_pipeline.to_dict(), indent=4))
json.dump(preprocessing_pipeline.to_dict(), f)

# Load the pipeline back
with open(tmp_path / "test_preprocessing_pipeline.json", "r") as f:
preprocessing_pipeline = Pipeline.from_dict(json.load(f))

# Write a txt file
with open(tmp_path / "test_file_english.txt", "w") as f:
f.write(
"This is an english sentence. There is more to it. It's a long text."
"Spans multiple lines."
""
"Even contains empty lines. And extra whitespaces."
)

# Write a txt file
with open(tmp_path / "test_file_german.txt", "w") as f:
f.write("Ein deutscher Satz ohne Verb.")

# Add two txt files and one non-txt file
paths = [
tmp_path / "test_file_english.txt",
tmp_path / "test_file_german.txt",
tmp_path / "test_preprocessing_pipeline.json",
]

result = preprocessing_pipeline.run({"file_type_router": {"sources": paths}})

assert result["writer"]["documents_written"] == 6
filled_document_store = preprocessing_pipeline.get_component("writer").document_store
assert filled_document_store.count_documents() == 6

# Check preprocessed texts and mime_types
stored_documents = filled_document_store.filter_documents()
expected_texts = [
"This is an english sentence.",
" There is more to it.",
" It's a long text.",
"Spans multiple lines.",
"Even contains empty lines.",
" And extra whitespaces.",
]
assert expected_texts == [document.text for document in stored_documents]
assert all(document.mime_type == "text/plain" for document in stored_documents)
3 changes: 2 additions & 1 deletion haystack/preview/components/preprocessors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from haystack.preview.components.preprocessors.text_document_cleaner import DocumentCleaner
from haystack.preview.components.preprocessors.text_document_splitter import TextDocumentSplitter
from haystack.preview.components.preprocessors.document_language_classifier import DocumentLanguageClassifier
from haystack.preview.components.preprocessors.text_language_classifier import TextLanguageClassifier

__all__ = ["TextDocumentSplitter", "DocumentCleaner", "TextLanguageClassifier"]
__all__ = ["TextDocumentSplitter", "DocumentCleaner", "TextLanguageClassifier", "DocumentLanguageClassifier"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
from typing import List, Dict, Any, Optional

from haystack.preview import component, default_from_dict, default_to_dict, Document
from haystack.preview.lazy_imports import LazyImport

logger = logging.getLogger(__name__)

with LazyImport("Run 'pip install langdetect'") as langdetect_import:
import langdetect


@component
class DocumentLanguageClassifier:
"""
Routes documents onto different output connections depending on their language.
This is useful for routing documents to different models in a pipeline depending on their language.
The set of supported languages can be specified.
For routing texts based on their language use the related TextLanguageClassifier component.
julian-risch marked this conversation as resolved.
Show resolved Hide resolved

Example usage in and indexing pipeline that writes only English language documents to a Store:
julian-risch marked this conversation as resolved.
Show resolved Hide resolved

```python
document_store = MemoryDocumentStore()
p = Pipeline()
p.add_component(instance=TextFileToDocument(), name="text_file_converter")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related to this PR, but why inverting the order of the original signature:

def add_component(self, name: str, instance: Component) -> None:

if passing the instance first is more intuitive we're on time to change it!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's more intuitive, yes. I created an issue here: https://github.com/deepset-ai/canals/issues/137

p.add_component(instance=DocumentLanguageClassifier(), name="language_classifier")
p.add_component(instance=TextDocumentSplitter(), name="splitter")
p.add_component(instance=DocumentWriter(document_store=document_store), name="writer")
p.connect("text_file_converter.documents", "language_classifier.documents")
p.connect("language_classifier.documents", "splitter.documents")
julian-risch marked this conversation as resolved.
Show resolved Hide resolved
p.connect("splitter.documents", "writer.documents")
```
"""

def __init__(self, languages: Optional[List[str]] = None):
"""
:param languages: A list of languages in ISO code, each corresponding to a different output connection (see [langdetect` documentation](https://github.com/Mimino666/langdetect#languages)). By default, only ["en"] is supported and Documents of any other language are routed to "unmatched".
"""
langdetect_import.check()
if not languages:
languages = ["en"]
self.languages = languages
component.set_output_types(
self, unmatched=List[Document], **{language: List[Document] for language in languages}
)

def run(self, documents: List[Document]):
"""
Run the DocumentLanguageClassifier. This method routes the documents to different edges based on their language.
If a Document's text does not match any of the languages specified at initialization, it is routed to
a connection named "unmatched".

:param documents: A list of documents to route to different edges.
"""
if not isinstance(documents, list) or documents and not isinstance(documents[0], Document):
raise TypeError(
"DocumentLanguageClassifier expects a list of Document as input. In case you want to classify a text, please use the TextLanguageClassifier."
)

output: Dict[str, List[Document]] = {language: [] for language in self.languages}
output["unmatched"] = []

for document in documents:
detected_language = self.detect_language(document)
if detected_language in self.languages:
output[detected_language].append(document)
else:
output["unmatched"].append(document)

return output

def to_dict(self) -> Dict[str, Any]:
"""
Serialize this component to a dictionary.
"""
return default_to_dict(self, languages=self.languages)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DocumentLanguageClassifier":
"""
Deserialize this component from a dictionary.
"""
return default_from_dict(cls, data)
anakin87 marked this conversation as resolved.
Show resolved Hide resolved

def detect_language(self, document: Document) -> Optional[str]:
try:
language = langdetect.detect(document.text)
except langdetect.LangDetectException:
logger.warning("Langdetect cannot detect the language of Document with id: %s", document.id)
language = None
return language
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
preview:
- |
Added DocumentLanguageClassifier component so that Documents can be routed to different components based on the detected language for example during preprocessing.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
import pytest

from haystack.preview import Document
from haystack.preview.components.preprocessors import DocumentLanguageClassifier


class TestDocumentLanguageClassifier:
@pytest.mark.unit
def test_init(self):
component = DocumentLanguageClassifier()
assert component.languages == ["en"]

@pytest.mark.unit
def test_to_dict(self):
component = DocumentLanguageClassifier()
data = component.to_dict()
assert data == {"type": "DocumentLanguageClassifier", "init_parameters": {"languages": ["en"]}}

@pytest.mark.unit
def test_to_dict_with_custom_init_parameters(self):
component = DocumentLanguageClassifier(languages=["en", "de"])
data = component.to_dict()
assert data == {"type": "DocumentLanguageClassifier", "init_parameters": {"languages": ["en", "de"]}}

@pytest.mark.unit
def test_from_dict(self):
data = {"type": "DocumentLanguageClassifier", "init_parameters": {"languages": ["en", "de"]}}
component = DocumentLanguageClassifier.from_dict(data)
assert component.languages == ["en", "de"]

@pytest.mark.unit
def test_non_document_input(self):
with pytest.raises(TypeError, match="DocumentLanguageClassifier expects a list of Document as input."):
classifier = DocumentLanguageClassifier()
classifier.run(documents="This is an english sentence.")

@pytest.mark.unit
def test_single_document(self):
with pytest.raises(TypeError, match="DocumentLanguageClassifier expects a list of Document as input."):
classifier = DocumentLanguageClassifier()
classifier.run(documents=Document(text="This is an english sentence."))

@pytest.mark.unit
def test_empty_list(self):
classifier = DocumentLanguageClassifier()
result = classifier.run(documents=[])
assert result == {"en": [], "unmatched": []}

@pytest.mark.unit
def test_detect_language(self):
classifier = DocumentLanguageClassifier()
detected_language = classifier.detect_language(Document(text="This is an english sentence."))
assert detected_language == "en"

@pytest.mark.unit
def test_route_to_en_and_unmatched(self):
classifier = DocumentLanguageClassifier()
english_document = Document(text="This is an english sentence.")
german_document = Document(text="Ein deutscher Satz ohne Verb.")
result = classifier.run(documents=[english_document, german_document])
assert result == {"en": [english_document], "unmatched": [german_document]}

@pytest.mark.unit
def test_warning_if_no_language_detected(self, caplog):
with caplog.at_level(logging.WARNING):
classifier = DocumentLanguageClassifier()
classifier.run(documents=[Document(text=".")])
assert "Langdetect cannot detect the language of Document with id" in caplog.text