Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Community[minor]: Added checksum in while send data to pebblo-cloud #23968

Merged
merged 13 commits into from
Jul 19, 2024
40 changes: 22 additions & 18 deletions libs/community/langchain_community/chains/pebblo_retrieval/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ def _call(
),
"doc": doc.page_content,
"vector_db": self.retriever.vectorstore.__class__.__name__,
**(
{"pb_checksum": doc.metadata.get("pb_checksum")}
if doc.metadata.get("pb_checksum")
else {}
),
}
for doc in docs
if isinstance(doc, Document)
Expand Down Expand Up @@ -457,25 +462,24 @@ def _send_prompt(self, qa_payload: Qa) -> None:
if self.api_key:
if self.classifier_location == "local":
if pebblo_resp:
payload["response"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("response", {})
)
payload["context"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("context", [])
)
payload["prompt"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("prompt", {})
)
resp = json.loads(pebblo_resp.text)
if resp:
payload["response"].update(
resp.get("retrieval_data", {}).get("response", {})
)
payload["response"].pop("data")
payload["prompt"].update(
resp.get("retrieval_data", {}).get("prompt", {})
)
payload["prompt"].pop("data")
context = payload["context"]
for context_data in context:
context_data.pop("doc")
payload["context"] = context
else:
payload["response"] = None
payload["context"] = None
payload["prompt"] = None
payload["response"] = {}
payload["prompt"] = {}
payload["context"] = []
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{PROMPT_URL}"
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class Context(BaseModel):
retrieved_from: Optional[str]
doc: Optional[str]
vector_db: str
pb_checksum: Optional[str]


class Prompt(BaseModel):
Expand Down
106 changes: 55 additions & 51 deletions libs/community/langchain_community/document_loaders/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Dict, Iterator, List, Optional

import requests # type: ignore
from langchain_core.documents import Document
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(
self.source_path = get_loader_full_path(self.loader)
self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path)
self.docs: List[Document] = []
self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = []
self.docs_with_id: List[IndexedDocument] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = self.get_source_size(self.source_path)
Expand Down Expand Up @@ -89,17 +89,13 @@ def load(self) -> List[Document]:
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
# Add pebblo-specific metadata to docs
self._add_pebblo_specific_metadata()
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
return self.docs
self.docs_with_id = self._index_docs()
classified_docs = self._classify_doc(self.docs_with_id, loading_end=True)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_docs
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
classified_docs = self._classify_doc(loading_end=True)
self._add_pebblo_specific_metadata(classified_docs)
if self.load_semantic:
self.docs = self._add_semantic_to_docs(classified_docs)
else:
self.docs = self._unindex_docs() # type: ignore
return self.docs

def lazy_load(self) -> Iterator[Document]:
Expand All @@ -125,19 +121,14 @@ def lazy_load(self) -> Iterator[Document]:
self.docs = []
break
self.docs = list((doc,))
# Add pebblo-specific metadata to docs
self._add_pebblo_specific_metadata()
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
yield self.docs[0]
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc()
self._add_pebblo_specific_metadata(classified_doc)
if self.load_semantic:
self.docs = self._add_semantic_to_docs(classified_doc)
else:
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc(self.docs)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_doc
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
yield self.docs[0]
self.docs = self._unindex_docs()
yield self.docs[0]

@classmethod
def set_discover_sent(cls) -> None:
Expand All @@ -147,13 +138,12 @@ def set_discover_sent(cls) -> None:
def set_loader_sent(cls) -> None:
cls._loader_sent = True

def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
def _classify_doc(self, loading_end: bool = False) -> dict:
"""Send documents fetched from loader to pebblo-server. Then send
classified documents to Daxa cloud(If api_key is present). Internal method.

Args:

loaded_docs (list): List of documents fetched from loader's load operation.
loading_end (bool, optional): Flag indicating the halt of data
loading by loader. Defaults to False.
"""
Expand All @@ -163,9 +153,8 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
}
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
doc_content = [doc.dict() for doc in loaded_docs]
doc_content = [doc.dict() for doc in self.docs_with_id]
docs = []
classified_docs = []
for doc in doc_content:
doc_metadata = doc.get("metadata", {})
doc_authorized_identities = doc_metadata.get("authorized_identities", [])
Expand All @@ -183,12 +172,12 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
page_content = str(doc.get("page_content"))
page_content_size = self.calculate_content_size(page_content)
self.source_aggregate_size += page_content_size
doc_id = doc.get("id", None) or 0
doc_id = doc.get("pb_id", None) or 0
docs.append(
{
"doc": page_content,
"source_path": doc_source_path,
"id": doc_id,
"pb_id": doc_id,
"last_modified": doc.get("metadata", {}).get("last_modified"),
"file_owner": doc_source_owner,
**(
Expand Down Expand Up @@ -221,14 +210,18 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
self.source_aggregate_size
)
payload = Doc(**payload).dict(exclude_unset=True)
classified_docs = {}
# Raw payload to be sent to classifier
if self.classifier_location == "local":
load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}"
try:
pebblo_resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=300
)
classified_docs = json.loads(pebblo_resp.text).get("docs", None)

# Updating the structure of pebblo response docs for efficient searching
for classified_doc in json.loads(pebblo_resp.text).get("docs", []):
classified_docs.update({classified_doc["pb_id"]: classified_doc})
if pebblo_resp.status_code not in [
HTTPStatus.OK,
HTTPStatus.BAD_GATEWAY,
Expand Down Expand Up @@ -257,7 +250,21 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:

if self.api_key:
if self.classifier_location == "local":
payload["docs"] = classified_docs
docs = payload["docs"]
for doc_data in docs:
classified_data = classified_docs.get(doc_data["pb_id"], {})
doc_data.update(
{
"pb_checksum": classified_data.get("pb_checksum", None),
"loader_source_path": classified_data.get(
"loader_source_path", None
),
"entities": classified_data.get("entities", {}),
"topics": classified_data.get("topics", {}),
}
)
doc_data.pop("doc")

headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}"
try:
Expand Down Expand Up @@ -453,53 +460,46 @@ def _index_docs(self) -> List[IndexedDocument]:
List[IndexedDocument]: A list of IndexedDocument objects with unique IDs.
"""
docs_with_id = [
IndexedDocument(id=hex(i)[2:], **doc.dict())
IndexedDocument(pb_id=str(i), **doc.dict())
for i, doc in enumerate(self.docs)
]
return docs_with_id

def _add_semantic_to_docs(
self, docs_with_id: List[IndexedDocument], classified_docs: List[dict]
) -> List[Document]:
def _add_semantic_to_docs(self, classified_docs: Dict) -> List[Document]:
"""
Adds semantic metadata to the given list of documents.

Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects
containing the documents with their IDs.
classified_docs (List[dict]): A list of dictionaries containing the
classified documents.
classified_docs (Dict): A dictionary of dictionaries containing the
classified documents with pb_id as key.

Returns:
List[Document]: A list of Document objects with added semantic metadata.
"""
indexed_docs = {
doc.id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in docs_with_id
doc.pb_id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in self.docs_with_id
}

for classified_doc in classified_docs:
doc_id = classified_doc.get("id")
for classified_doc in classified_docs.values():
doc_id = classified_doc.get("pb_id")
if doc_id in indexed_docs:
self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc)

semantic_metadata_docs = [doc for doc in indexed_docs.values()]

return semantic_metadata_docs

def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]:
def _unindex_docs(self) -> List[Document]:
"""
Converts a list of IndexedDocument objects to a list of Document objects.

Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects.

Returns:
List[Document]: A list of Document objects.
"""
docs = [
Document(page_content=doc.page_content, metadata=doc.metadata)
for i, doc in enumerate(docs_with_id)
for i, doc in enumerate(self.docs_with_id)
]
return docs

Expand All @@ -522,12 +522,16 @@ def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document:
)
return doc

def _add_pebblo_specific_metadata(self) -> None:
def _add_pebblo_specific_metadata(self, classified_docs: dict) -> None:
"""Add Pebblo specific metadata to documents."""
for doc in self.docs:
for doc in self.docs_with_id:
doc_metadata = doc.metadata
doc_metadata["full_path"] = get_full_path(
doc_metadata.get(
"full_path", doc_metadata.get("source", self.source_path)
)
)
doc_metadata["pb_id"] = doc.pb_id
doc_metadata["pb_checksum"] = classified_docs.get(doc.pb_id, {}).get(
"pb_checksum", None
)
2 changes: 1 addition & 1 deletion libs/community/langchain_community/utilities/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
class IndexedDocument(Document):
"""Pebblo Indexed Document."""

id: str
pb_id: str
"""Unique ID of the document."""


Expand Down
18 changes: 16 additions & 2 deletions libs/community/tests/unit_tests/document_loaders/test_pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,26 @@ def test_csv_loader_load_valid_data(mocker: MockerFixture) -> None:
full_file_path = os.path.abspath(file_path)
expected_docs = [
Document(
metadata={
"source": full_file_path,
"row": 0,
"full_path": full_file_path,
"pb_id": "0",
# For UT as here we are not calculating checksum
"pb_checksum": None,
},
page_content="column1: value1\ncolumn2: value2\ncolumn3: value3",
metadata={"source": file_path, "row": 0, "full_path": full_file_path},
),
Document(
metadata={
"source": full_file_path,
"row": 1,
"full_path": full_file_path,
"pb_id": "1",
# For UT as here we are not calculating checksum
"pb_checksum": None,
},
page_content="column1: value4\ncolumn2: value5\ncolumn3: value6",
metadata={"source": file_path, "row": 1, "full_path": full_file_path},
),
]

Expand Down
Loading