Skip to content

Commit

Permalink
Community[minor]: Added checksum in while send data to pebblo-cloud (l…
Browse files Browse the repository at this point in the history
…angchain-ai#23968)

- **Description:** 
            - Updated checksum in doc metadata
- Sending checksum and removing actual content, while sending data to
`pebblo-cloud` if `classifier-location `is `pebblo-cloud` in
`/loader/doc` API
            - Adding `pb_id` i.e. pebblo id to doc metadata
            - Refactoring as needed.
- Sending `content-checksum` and removing actual content, while sending
data to `pebblo-cloud` if `classifier-location `is `pebblo-cloud` in
`prmopt` API
- **Issue:** NA
- **Dependencies:** NA
- **Tests:** Updated
- **Docs** NA

---------

Co-authored-by: dristy.cd <[email protected]>
  • Loading branch information
2 people authored and olgamurraft committed Aug 16, 2024
1 parent 0f62848 commit b09572b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 72 deletions.
40 changes: 22 additions & 18 deletions libs/community/langchain_community/chains/pebblo_retrieval/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ def _call(
),
"doc": doc.page_content,
"vector_db": self.retriever.vectorstore.__class__.__name__,
**(
{"pb_checksum": doc.metadata.get("pb_checksum")}
if doc.metadata.get("pb_checksum")
else {}
),
}
for doc in docs
if isinstance(doc, Document)
Expand Down Expand Up @@ -457,25 +462,24 @@ def _send_prompt(self, qa_payload: Qa) -> None:
if self.api_key:
if self.classifier_location == "local":
if pebblo_resp:
payload["response"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("response", {})
)
payload["context"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("context", [])
)
payload["prompt"] = (
json.loads(pebblo_resp.text)
.get("retrieval_data", {})
.get("prompt", {})
)
resp = json.loads(pebblo_resp.text)
if resp:
payload["response"].update(
resp.get("retrieval_data", {}).get("response", {})
)
payload["response"].pop("data")
payload["prompt"].update(
resp.get("retrieval_data", {}).get("prompt", {})
)
payload["prompt"].pop("data")
context = payload["context"]
for context_data in context:
context_data.pop("doc")
payload["context"] = context
else:
payload["response"] = None
payload["context"] = None
payload["prompt"] = None
payload["response"] = {}
payload["prompt"] = {}
payload["context"] = []
headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{PROMPT_URL}"
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class Context(BaseModel):
retrieved_from: Optional[str]
doc: Optional[str]
vector_db: str
pb_checksum: Optional[str]


class Prompt(BaseModel):
Expand Down
106 changes: 55 additions & 51 deletions libs/community/langchain_community/document_loaders/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import uuid
from http import HTTPStatus
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Dict, Iterator, List, Optional

import requests # type: ignore
from langchain_core.documents import Document
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(
self.source_path = get_loader_full_path(self.loader)
self.source_owner = PebbloSafeLoader.get_file_owner_from_path(self.source_path)
self.docs: List[Document] = []
self.docs_with_id: Union[List[IndexedDocument], List[Document], List] = []
self.docs_with_id: List[IndexedDocument] = []
loader_name = str(type(self.loader)).split(".")[-1].split("'")[0]
self.source_type = get_loader_type(loader_name)
self.source_path_size = self.get_source_size(self.source_path)
Expand Down Expand Up @@ -89,17 +89,13 @@ def load(self) -> List[Document]:
list: Documents fetched from load method of the wrapped `loader`.
"""
self.docs = self.loader.load()
# Add pebblo-specific metadata to docs
self._add_pebblo_specific_metadata()
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
return self.docs
self.docs_with_id = self._index_docs()
classified_docs = self._classify_doc(self.docs_with_id, loading_end=True)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_docs
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
classified_docs = self._classify_doc(loading_end=True)
self._add_pebblo_specific_metadata(classified_docs)
if self.load_semantic:
self.docs = self._add_semantic_to_docs(classified_docs)
else:
self.docs = self._unindex_docs() # type: ignore
return self.docs

def lazy_load(self) -> Iterator[Document]:
Expand All @@ -125,19 +121,14 @@ def lazy_load(self) -> Iterator[Document]:
self.docs = []
break
self.docs = list((doc,))
# Add pebblo-specific metadata to docs
self._add_pebblo_specific_metadata()
if not self.load_semantic:
self._classify_doc(self.docs, loading_end=True)
yield self.docs[0]
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc()
self._add_pebblo_specific_metadata(classified_doc)
if self.load_semantic:
self.docs = self._add_semantic_to_docs(classified_doc)
else:
self.docs_with_id = self._index_docs()
classified_doc = self._classify_doc(self.docs)
self.docs_with_id = self._add_semantic_to_docs(
self.docs_with_id, classified_doc
)
self.docs = self._unindex_docs(self.docs_with_id) # type: ignore
yield self.docs[0]
self.docs = self._unindex_docs()
yield self.docs[0]

@classmethod
def set_discover_sent(cls) -> None:
Expand All @@ -147,13 +138,12 @@ def set_discover_sent(cls) -> None:
def set_loader_sent(cls) -> None:
cls._loader_sent = True

def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
def _classify_doc(self, loading_end: bool = False) -> dict:
"""Send documents fetched from loader to pebblo-server. Then send
classified documents to Daxa cloud(If api_key is present). Internal method.
Args:
loaded_docs (list): List of documents fetched from loader's load operation.
loading_end (bool, optional): Flag indicating the halt of data
loading by loader. Defaults to False.
"""
Expand All @@ -163,9 +153,8 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
}
if loading_end is True:
PebbloSafeLoader.set_loader_sent()
doc_content = [doc.dict() for doc in loaded_docs]
doc_content = [doc.dict() for doc in self.docs_with_id]
docs = []
classified_docs = []
for doc in doc_content:
doc_metadata = doc.get("metadata", {})
doc_authorized_identities = doc_metadata.get("authorized_identities", [])
Expand All @@ -183,12 +172,12 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
page_content = str(doc.get("page_content"))
page_content_size = self.calculate_content_size(page_content)
self.source_aggregate_size += page_content_size
doc_id = doc.get("id", None) or 0
doc_id = doc.get("pb_id", None) or 0
docs.append(
{
"doc": page_content,
"source_path": doc_source_path,
"id": doc_id,
"pb_id": doc_id,
"last_modified": doc.get("metadata", {}).get("last_modified"),
"file_owner": doc_source_owner,
**(
Expand Down Expand Up @@ -221,14 +210,18 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:
self.source_aggregate_size
)
payload = Doc(**payload).dict(exclude_unset=True)
classified_docs = {}
# Raw payload to be sent to classifier
if self.classifier_location == "local":
load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}"
try:
pebblo_resp = requests.post(
load_doc_url, headers=headers, json=payload, timeout=300
)
classified_docs = json.loads(pebblo_resp.text).get("docs", None)

# Updating the structure of pebblo response docs for efficient searching
for classified_doc in json.loads(pebblo_resp.text).get("docs", []):
classified_docs.update({classified_doc["pb_id"]: classified_doc})
if pebblo_resp.status_code not in [
HTTPStatus.OK,
HTTPStatus.BAD_GATEWAY,
Expand Down Expand Up @@ -257,7 +250,21 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list:

if self.api_key:
if self.classifier_location == "local":
payload["docs"] = classified_docs
docs = payload["docs"]
for doc_data in docs:
classified_data = classified_docs.get(doc_data["pb_id"], {})
doc_data.update(
{
"pb_checksum": classified_data.get("pb_checksum", None),
"loader_source_path": classified_data.get(
"loader_source_path", None
),
"entities": classified_data.get("entities", {}),
"topics": classified_data.get("topics", {}),
}
)
doc_data.pop("doc")

headers.update({"x-api-key": self.api_key})
pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}"
try:
Expand Down Expand Up @@ -453,53 +460,46 @@ def _index_docs(self) -> List[IndexedDocument]:
List[IndexedDocument]: A list of IndexedDocument objects with unique IDs.
"""
docs_with_id = [
IndexedDocument(id=hex(i)[2:], **doc.dict())
IndexedDocument(pb_id=str(i), **doc.dict())
for i, doc in enumerate(self.docs)
]
return docs_with_id

def _add_semantic_to_docs(
self, docs_with_id: List[IndexedDocument], classified_docs: List[dict]
) -> List[Document]:
def _add_semantic_to_docs(self, classified_docs: Dict) -> List[Document]:
"""
Adds semantic metadata to the given list of documents.
Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects
containing the documents with their IDs.
classified_docs (List[dict]): A list of dictionaries containing the
classified documents.
classified_docs (Dict): A dictionary of dictionaries containing the
classified documents with pb_id as key.
Returns:
List[Document]: A list of Document objects with added semantic metadata.
"""
indexed_docs = {
doc.id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in docs_with_id
doc.pb_id: Document(page_content=doc.page_content, metadata=doc.metadata)
for doc in self.docs_with_id
}

for classified_doc in classified_docs:
doc_id = classified_doc.get("id")
for classified_doc in classified_docs.values():
doc_id = classified_doc.get("pb_id")
if doc_id in indexed_docs:
self._add_semantic_to_doc(indexed_docs[doc_id], classified_doc)

semantic_metadata_docs = [doc for doc in indexed_docs.values()]

return semantic_metadata_docs

def _unindex_docs(self, docs_with_id: List[IndexedDocument]) -> List[Document]:
def _unindex_docs(self) -> List[Document]:
"""
Converts a list of IndexedDocument objects to a list of Document objects.
Args:
docs_with_id (List[IndexedDocument]): A list of IndexedDocument objects.
Returns:
List[Document]: A list of Document objects.
"""
docs = [
Document(page_content=doc.page_content, metadata=doc.metadata)
for i, doc in enumerate(docs_with_id)
for i, doc in enumerate(self.docs_with_id)
]
return docs

Expand All @@ -522,12 +522,16 @@ def _add_semantic_to_doc(self, doc: Document, classified_doc: dict) -> Document:
)
return doc

def _add_pebblo_specific_metadata(self) -> None:
def _add_pebblo_specific_metadata(self, classified_docs: dict) -> None:
"""Add Pebblo specific metadata to documents."""
for doc in self.docs:
for doc in self.docs_with_id:
doc_metadata = doc.metadata
doc_metadata["full_path"] = get_full_path(
doc_metadata.get(
"full_path", doc_metadata.get("source", self.source_path)
)
)
doc_metadata["pb_id"] = doc.pb_id
doc_metadata["pb_checksum"] = classified_docs.get(doc.pb_id, {}).get(
"pb_checksum", None
)
2 changes: 1 addition & 1 deletion libs/community/langchain_community/utilities/pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
class IndexedDocument(Document):
"""Pebblo Indexed Document."""

id: str
pb_id: str
"""Unique ID of the document."""


Expand Down
18 changes: 16 additions & 2 deletions libs/community/tests/unit_tests/document_loaders/test_pebblo.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,26 @@ def test_csv_loader_load_valid_data(mocker: MockerFixture) -> None:
full_file_path = os.path.abspath(file_path)
expected_docs = [
Document(
metadata={
"source": full_file_path,
"row": 0,
"full_path": full_file_path,
"pb_id": "0",
# For UT as here we are not calculating checksum
"pb_checksum": None,
},
page_content="column1: value1\ncolumn2: value2\ncolumn3: value3",
metadata={"source": file_path, "row": 0, "full_path": full_file_path},
),
Document(
metadata={
"source": full_file_path,
"row": 1,
"full_path": full_file_path,
"pb_id": "1",
# For UT as here we are not calculating checksum
"pb_checksum": None,
},
page_content="column1: value4\ncolumn2: value5\ncolumn3: value6",
metadata={"source": file_path, "row": 1, "full_path": full_file_path},
),
]

Expand Down

0 comments on commit b09572b

Please sign in to comment.