Skip to content

Commit

Permalink
Add GoogleDrive-Pinecone Sample (#591)
Browse files Browse the repository at this point in the history
* Added GoogleDrive-Pinecone Sample

* Update Pinecone package
  • Loading branch information
Raj725 authored Nov 15, 2024
1 parent efcc3fb commit e67b011
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# OpenAI credentials
OPENAI_API_KEY="<OPENAI_API_KEY>"

# Pebblo configuration
PEBBLO_CLASSIFIER_URL="<PEBBLO-SERVER-HOST:PEBBLO-SERVER-PORT>" # e.g "http://localhost:8000/"
PEBBLO_API_KEY=<PEBBLO API KEY>
PEBBLO_CLOUD_URL=<PEBBLO CLOUD URL>

# Google Drive Config
GOOGLE_APPLICATION_CREDENTIALS="<PATH_TO_GOOGLE_APPLICATION_CREDENTIALS>"

# Vector DB Config
PINECONE_API_KEY="<PINECONE_API_KEY>"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
credentials
google_token.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
## Identity and Semantic Enforcement using Pebblo

This solution uses the following daxa/langchain and daxa/langchain-google branches:

- daxa-ai/langchain: https://github.com/daxa-ai/langchain/tree/pebblo-0.1.21
- daxa-ai/langchain-google: https://github.com/daxa-ai/langchain-google/tree/pebblo-0.1.21

### Prerequisites
1. Google Cloud project. Follow [LangChain GoogleDrive loader](https://python.langchain.com/v0.2/docs/integrations/document_loaders/google_drive/#prerequisites) docs for details on specific steps required to be completed in Google Cloud.
2. Sign up and set up your account on Pinecone (https://www.pinecone.io/).


### Instructions

1. Create Python virtual-env
```console
$ python3 -m venv .venv
$ source .venv/bin/activate
```

2. Install dependencies
```console
$ pip3 install -r requirements.txt
```

3. Install langchain-community from the branch `pebblo-0.1.21`
```console
$ git clone https://github.com/daxa-ai/langchain.git
$ cd langchain
$ git fetch && git checkout pebblo-0.1.21
$ cd libs/community
$ pip3 install langchain-community .
```

4. Install langchain-google from the branch `pebblo-0.1.21`
```console
$ git clone https://github.com/daxa-ai/langchain-google.git
$ cd langchain-google
$ git fetch && git checkout pebblo-0.1.21
$ cd libs/community
$ pip3 install langchain-google-community .
```

5. Copy the `.env.sample` file to `.env` and populate the necessary environment variable.

6. Update the `pebblo_saferag.py` file with the following details:
- _service_acc_def_: Google service account credentials file path
- _folder_id_: Google Drive folder ID where the documents are stored
- _ing_user_email_def_: Google Drive Admin/Ingestion user email ID

7. Run langchain sample app PebbloSafeLoader and PebbloRetrievalQA
```console
$ python3 pebblo_saferag.py
```

8. Retrieve the Pebblo PDF report in `$HOME/.pebblo/pebblo-identity-n-semantic-loader-pinecone/pebblo_report.pdf` file path on the system
where `Pebblo Server` is running.

9. To access the Pebblo UI, point the browser to `https://localhost:8000/pebblo` or `host:port/pebblo` if you are running the server on a different
host.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import List

from google.oauth2 import service_account
from googleapiclient.discovery import build


def get_authorized_identities(
admin_user_email_address: str, service_account_file_path: str, user_email: str
) -> List[str]:
"""
Get authorized identities from Google Directory API
"""
_authorized_identities = [user_email]
credentials = service_account.Credentials.from_service_account_file(
service_account_file_path,
scopes=[
"https://www.googleapis.com/auth/admin.directory.group.readonly",
"https://www.googleapis.com/auth/admin.directory.group",
],
subject=admin_user_email_address,
)
directory_service = build("admin", "directory_v1", credentials=credentials)

try:
groups = directory_service.groups().list(userKey=user_email).execute()
for group in groups.get("groups", []):
group_email = group["email"]
_authorized_identities.append(group_email)
except Exception as e:
print(f"Error in : {e}")
print(f"User: {user_email}, \nAuthorized Identities: {_authorized_identities}\n")
return _authorized_identities
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""
Sample app to demonstrate the usage of PebbloSafeLoader, and PebbloRetrievalQA
for semantic enforcement using Pinecone VectorDB in RAG.
"""
import os

import time
from pathlib import Path
from typing import List, Optional

from dotenv import load_dotenv
from google_auth import get_authorized_identities
from langchain_community.chains import PebbloRetrievalQA
from langchain_community.chains.pebblo_retrieval.models import (
AuthContext,
ChainInput,
SemanticContext,
)
from langchain_community.document_loaders import UnstructuredFileIOLoader
from langchain_community.document_loaders.pebblo import PebbloSafeLoader
from langchain_pinecone import PineconeVectorStore
from langchain_google_community import GoogleDriveLoader
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_openai.llms import OpenAI
from pinecone_index import create_pinecone_index
from utils import describe_pebblo_semantic_stats, format_text, get_input_as_list

load_dotenv()

class SafeRetrieverSemanticRAG:
"""
Sample app to demonstrate the usage of PebbloSafeLoader, and PebbloRetrievalQA
for semantic enforcement using Pinecone VectorDB in RAG.
Args:
folder_id (str): Google Drive folder id
index_name (str): Index name for Pinecone
"""

def __init__(self, folder_id: str, index_name: str):
self.loader_app_name = "pebblo-identity-n-semantic-loader-pinecone"
self.retrieval_app_name = "pebblo-identity-n-semantic-retriever-pinecone"
self.folder_id = folder_id
self.pinecone_index_name = index_name
# Prepare LLM
self.llm = OpenAI()
self.embeddings = OpenAIEmbeddings()
# Load documents from Google Drive
self.documents = self.load_documents()
# Initialize VectorDB
self.vectordb = self.init_vector_db()
# Initialize PebbloRetrievalQA
self.retrieval_chain = self.init_retrieval_chain()

def load_documents(self):
"""
Load documents from Google Drive
"""
print("\nLoading RAG documents ...")
loader = PebbloSafeLoader(
GoogleDriveLoader(
folder_id=self.folder_id,
credentials_path=Path("credentials/credentials.json"),
token_path=Path("./google_token.json"),
recursive=True,
# file_loader_cls=UnstructuredFileIOLoader,
file_loader_kwargs={"mode": "elements"},
load_auth=True,
),
name=self.loader_app_name, # App name (Mandatory)
owner="Joe Smith", # Owner (Optional)
description="Identity enabled SafeLoader app using Pebblo and Pinecone VectorDB", # Description (Optional)
load_semantic=True,
)
documents = loader.load()
print(f"Loaded {len(documents)} documents ...\n")
describe_pebblo_semantic_stats(documents)
return documents

def init_vector_db(self) -> PineconeVectorStore:
"""
Create a Pinecone index and load documents into it
"""
# Create index
create_pinecone_index(self.pinecone_index_name, recreate=True)

print("Loading docs into index...")
texts = [t.page_content for t in self.documents]
metadatas = [t.metadata for t in self.documents]

# pop "coordinates" from metadata(Nested JSONs are not supported in Pinecone)
for metadata in metadatas:
metadata.pop("coordinates", None)

vector_store = PineconeVectorStore.from_texts(
texts,
self.embeddings,
metadatas=metadatas,
index_name=self.pinecone_index_name,
)

# wait for index to be initialized
print("Waiting for index to be ready...")
time.sleep(5)

print("Done!")
return vector_store

def init_retrieval_chain(self):
"""
Initialize PebbloRetrievalQA chain
"""
return PebbloRetrievalQA.from_chain_type(
llm=self.llm,
app_name=self.retrieval_app_name,
owner="Joe Smith",
description="Identity enabled SafeLoader and SafeRetrival app using "
"Pebblo and Pinecone VectorDB",
chain_type="stuff",
retriever=self.vectordb.as_retriever(),
verbose=True,
)

def ask(
self,
question: str,
user_email: str,
auth_identifiers: List[str],
topics_to_deny: Optional[List[str]] = None,
entities_to_deny: Optional[List[str]] = None,
):
"""
Ask a question
"""
auth_context = {
"user_id": user_email,
"user_auth": auth_identifiers,
}
auth_context = AuthContext(**auth_context)
semantic_context = dict()
if topics_to_deny:
semantic_context["pebblo_semantic_topics"] = {"deny": topics_to_deny}
if entities_to_deny:
semantic_context["pebblo_semantic_entities"] = {"deny": entities_to_deny}

semantic_context = (
SemanticContext(**semantic_context) if semantic_context else None
)

chain_input = ChainInput(
query=question, auth_context=auth_context, semantic_context=semantic_context
)

return self.retrieval_chain.invoke(chain_input.dict())


if __name__ == "__main__":
input_index_name = "identity-semantic-enforcement-rag"
folder_id = "<google-drive-folder-id>"
service_acc_def = "credentials/service-account.json"
ing_user_email_def = "<ingestion-user-email-id>"

print("Please enter ingestion user details for loading data...")
print("Please enter admin user details...")
ingestion_user_email_address = (
input(f"Email address ({ing_user_email_def}): ") or ing_user_email_def
)
service_account_file_path = (
input(f"Path to the service_account.json file ({service_acc_def}): ") or service_acc_def
)
folder_id = input(f"Google Drive folder id ({folder_id}): ") or folder_id
rag_app = SafeRetrieverSemanticRAG(folder_id, input_index_name)

while True:
print("Please enter end user details below")
end_user_email_address = input("User email address : ")

auth_identifiers = get_authorized_identities(
admin_user_email_address=ingestion_user_email_address,
service_account_file_path=service_account_file_path,
user_email=end_user_email_address,
)

print(
"Please enter semantic filters below...\n"
"(Leave these fields empty if you do not wish to enforce any semantic filters)"
)
topic_to_deny = get_input_as_list(
"Topics to deny, comma separated (Optional): "
)
entity_to_deny = get_input_as_list(
"Entities to deny, comma separated (Optional): "
)

prompt = input("Please provide the prompt: ")
print(
f"User: {end_user_email_address}.\n"
f"\nTopics to deny: {topic_to_deny}\n"
f"Entities to deny: {entity_to_deny}\n"
f"Query: {format_text(prompt)}"
)
response = rag_app.ask(
prompt,
end_user_email_address,
auth_identifiers,
topic_to_deny,
entity_to_deny,
)

print(f"Response:\n" f"{format_text(response['result'])}")

try:
continue_or_exist = int(
input("\n\nType 1 to continue and 0 to exit (1): ") or 1
)
except ValueError:
print("Please provide valid input")
continue

if not continue_or_exist:
exit(0)

print("\n\n")
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
import time

from dotenv import load_dotenv
from pinecone import Pinecone, PodSpec

load_dotenv()

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")


def create_pinecone_index(pinecone_index_name: str, recreate: bool = False):
"""
Create a new Pinecone index
"""

# configure client
pc = Pinecone(api_key=PINECONE_API_KEY)
# Update the environment/PodSpec to match the one you have access to
environment = "gcp-starter"
spec = PodSpec(environment=environment)

# check for and delete index if already exists
if pinecone_index_name in pc.list_indexes().names():
if not recreate:
print(f"Index {pinecone_index_name} already exists. skipping...")
return
else:
# Delete and create a new index
print(f"Deleting and recreating index: {pinecone_index_name} ...")
pc.delete_index(pinecone_index_name)
print(f"Deleted index: {pinecone_index_name}.")

print(f"Creating index: {pinecone_index_name}...")
# create a new index
pc.create_index(
pinecone_index_name,
dimension=1536, # dimensionality of text-embedding-ada-002
metric="dotproduct",
spec=spec,
)

# wait for index to be initialized
while not pc.describe_index(pinecone_index_name).status["ready"]:
time.sleep(1)

index = pc.Index(pinecone_index_name)
index.describe_index_stats()
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
python-dotenv==1.0.0
requests==2.31.0
unstructured
google-api-python-client # For Google Auth
langchain-openai
pinecone-client # for Pinecone VectorStore
langchain-pinecone
Loading

0 comments on commit e67b011

Please sign in to comment.