-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialization issue with deploying compute_graph with fastapi #924
Comments
@sadath-12 At the moment we are not pickling the module where a function decorated with @indexify_function lives. So the file is not getting pickled but just the function. In this example, Total doesn't pickled but only the functions. The generate_numbers, squared, etc, works because they are not trying to access any classes defined in the module. If you extended Here is the workaround - @app.post("/deploy")
async def root():
from indexify import RemoteGraph
import sys
RemoteGraph.deploy(g,server_url="http://localhost:8900", additional_modules=[sys.modules[__name__]])
return {"message": "created graph succesfully"} I have tested and this works. Going to keep this issue open so that we can figure out a way to pickle the module where the functions live. |
thanks @diptanu works for readme but for not pdf example I have API code I use to deploy compute graph from fastapi import FastAPI
#from workflows import pdf # Import the users module from endpoints
#import sys
app = FastAPI()
# Include the users router with a prefix
#app.include_router(pdf.router, prefix="/pdf", tags=["pdfs"])
# uvicorn main:app --host 0.0.0.0 --port 8000 --reload
from indexify.functions_sdk.image import Image
import psycopg2
from psycopg2.extras import execute_values
from pydantic import BaseModel
from psycopg2.extras import Json
from typing import List, Optional, Dict
from inkwell import Page, Document as InkwellDocument
from indexify.functions_sdk.data_objects import File
from indexify.functions_sdk.graph import Graph
from indexify.functions_sdk.indexify_functions import IndexifyFunction, indexify_function
from pgvector.psycopg2 import register_vector
from sentence_transformers import SentenceTransformer
image1 = (
Image()
.name("tensorlake/indexify-executor-default")
.base_image("ubuntu:22.04")
.run("apt update")
.run("apt install -y libgl1-mesa-glx git g++")
.run("pip install torch")
.run("pip install numpy")
.run("pip install langchain")
.run("pip install langchain_text_splitters")
.run("pip install git+https://github.com/facebookresearch/[email protected]")
.run("apt install -y tesseract-ocr")
.run("apt install -y libtesseract-dev")
.run("pip install py-inkwell")
)
class DeployInput(BaseModel):
namespace: str
indexifyUrl: str
class Document(BaseModel):
pages: List[Page]
class TextChunk(BaseModel):
chunk: str
page_number: Optional[int] = None
embeddings: Optional[List[float]] = None
class PdfOutput(BaseModel):
file: File
source: str
dbSchema: str
labels: Dict[str, str]
size: str
mime_type: str
textchunk: List[TextChunk]
doc: Document
@indexify_function(image=image1)
def upload_pdf(pdf: PdfOutput) -> PdfOutput:
return pdf
class PDFParser(IndexifyFunction):
name = "pdf-parse"
image = image1
description = "Parser class that captures a PDF file"
def __init__(self):
super().__init__()
from inkwell import Pipeline
self._pipeline = Pipeline()
def run(self, input: PdfOutput) -> PdfOutput:
import tempfile
with tempfile.NamedTemporaryFile(mode="wb", suffix=".pdf") as f:
f.write(input.file.data)
document: InkwellDocument = self._pipeline.process(f.name)
input.doc = Document(pages=document.pages)
return input
class TextEmbeddingExtractor(IndexifyFunction):
name = "text-embedding-extractor"
description = "Extractor class that captures an embedding model"
system_dependencies = []
input_mime_types = ["text"]
image=image1
def __init__(self):
super().__init__()
self.model = None
def run(self, pdf: PdfOutput) -> PdfOutput:
if self.model is None:
self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
for chunk in pdf.textchunk:
embeddings = self.model.encode(chunk.chunk)
chunk.embeddings = embeddings.tolist()
return pdf
@indexify_function(image=image1)
def extract_chunks(pdf: PdfOutput) -> PdfOutput:
from inkwell import PageFragmentType
from langchain_text_splitters import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks: List[TextChunk] = []
document = pdf.doc
print("length of pages",len(document.pages))
for page in document.pages:
page_text = ""
for fragment in page.page_fragments:
if fragment.fragment_type in [PageFragmentType.TABLE, PageFragmentType.FIGURE]:
chunks.append(TextChunk(chunk=fragment.content.text, page_number=page.page_number))
elif fragment.fragment_type == PageFragmentType.TEXT:
page_text += fragment.content.text
texts = text_splitter.split_text(page_text)
for text in texts:
chunk = TextChunk(chunk=text, page_number=page.page_number)
chunks.append(chunk)
print("list of chunks",len(chunks))
pdf.textchunk = chunks
return pdf
class PGVectorWriter(IndexifyFunction):
name = "pgvector_writer"
def __init__(self):
super().__init__()
self.conn = psycopg2.connect(
dbname="test",
user="test",
password="test",
host="localhost",
port="5432"
)
register_vector(self.conn)
self._create_tables()
def _create_tables(self) -> None:
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS text_embeddings (
id SERIAL PRIMARY KEY,
vector vector(384),
text TEXT,
page_number INT,
size TEXT,
source TEXT,
labels JSONB,
dbSchema TEXT,
mime_type TEXT
);
""")
self.conn.commit()
def run(self, pdf: PdfOutput) -> bool:
print("Length of chunks is", len(pdf.textchunk))
for chunk in pdf.textchunk:
with self.conn.cursor() as cur:
query = """
INSERT INTO text_embeddings (vector, text, page_number, size, source, labels, dbSchema, mime_type)
VALUES %s
"""
values = [(chunk.embeddings, chunk.chunk, chunk.page_number, pdf.size, pdf.source, Json(pdf.labels), pdf.dbSchema, pdf.mime_type)]
execute_values(cur, query, values)
self.conn.commit()
return True
from fastapi import APIRouter
#router = APIRouter()
from fastapi import FastAPI, UploadFile, File, APIRouter
from typing import Dict, List, Optional
from pydantic import BaseModel
import psycopg2
from psycopg2.extras import execute_values
from psycopg2.extras import Json
from inkwell import Page, Document as InkwellDocument
from indexify.functions_sdk.data_objects import File as IndexifyFile
from indexify.functions_sdk.graph import Graph
from indexify.functions_sdk.indexify_functions import IndexifyFunction, indexify_function
from pgvector.psycopg2 import register_vector
from sentence_transformers import SentenceTransformer
# Your existing model definitions...
class PdfInput(BaseModel):
source: str
dbSchema: str
labels: Dict[str, str]
pgHost: str
indexifyUrl: str
#router = APIRouter()
@app.post("/deploy")
async def deploy_workflow(input: DeployInput):
g = Graph(
"pdf_graph-4",
start_node=upload_pdf,
)
g.add_edge(upload_pdf, PDFParser)
g.add_edge(PDFParser, extract_chunks)
g.add_edge(extract_chunks, TextEmbeddingExtractor)
g.add_edge(TextEmbeddingExtractor, PGVectorWriter)
from indexify import RemoteGraph
import sys
RemoteGraph.deploy(g, server_url="http://localhost:8900",additional_modules=[sys.modules[__name__]])
return "success" Then I invoke it via from indexify import RemoteGraph
import httpx
from indexify.functions_sdk.data_objects import File
from typing import Dict, List, Optional
from inkwell import Page, Document as InkwellDocument
from pydantic import BaseModel
class PdfInput(BaseModel):
source: str
dbSchema: str
labels: Dict[str, str]
pgHost: str
indexifyUrl: str
class Document(BaseModel):
pages: List[Page]
class TextChunk(BaseModel):
chunk: str
page_number: Optional[int] = None
embeddings: Optional[List[float]] = None
class PdfOutput(BaseModel):
file: File
source: str
dbSchema: str
labels: Dict[str, str]
size: str
mime_type: str
textchunk: List[TextChunk]
doc: Document
if __name__ == "__main__":
#graph = RemoteGraph.deploy(g,server_url="http://localhost:8900")
graph = RemoteGraph.by_name(name="pdf_graph-4", server_url="http://localhost:8900")
resp = httpx.get(url="https://arxiv.org/pdf/2106.00043.pdf", follow_redirects=True)
resp.raise_for_status()
file = File(data=resp.content, mime_type="application/pdf")
pdf_output = PdfOutput(
file=file,
source="pdf_input.source",
dbSchema="public",
labels={
"test":"test"
},
size="100", # File size calculation
mime_type="pdf",
textchunk=[],
doc=Document(pages=[])
)
invocation_id = graph.run(
block_until_done=True,
pdf=pdf_output
)
print("invocation is",invocation_id)
print(graph.output(invocation_id, "extract_chunks")) The errors originates from My indexify version Version: 0.2.10 |
@sadath-12 Please share some steps to reproduce. Don't have enough information about how to reproduce the error. |
I am trying to deploy compute_graph via python api server , in this case its fastapi
Here is my fastapi code in main.py
The deployment happens successfully and I can see the graph in the UI
but when I invoke the graph via code
I get error
How to tell indexify
main
is a module for my python api and not a pip package and since its not used it should ignore it ?The text was updated successfully, but these errors were encountered: