Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(llm): timely execute vid embedding & enhance some HTTP logic #141

Merged
merged 21 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


import argparse
import asyncio
from contextlib import asynccontextmanager

import gradio as gr
import uvicorn
Expand All @@ -40,10 +42,10 @@
from hugegraph_llm.demo.rag_demo.vector_graph_block import create_vector_graph_block
from hugegraph_llm.resources.demo.css import CSS
from hugegraph_llm.utils.log import log
from hugegraph_llm.utils.graph_index_utils import fit_vid_index

sec = HTTPBearer()


def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)):
correct_token = admin_settings.user_token
if credentials.credentials != correct_token:
Expand All @@ -55,6 +57,35 @@ def authenticate(credentials: HTTPAuthorizationCredentials = Depends(sec)):
headers={"WWW-Authenticate": "Bearer"},
)

async def schedule_fit_vid_index():
try:
while True:
log.info("Executing fit_vid_index function...")
try:
await asyncio.to_thread(fit_vid_index)
log.info("fit_vid_index function executed successfully.")
except Exception as e:
log.error("Error executing fit_vid_index: %s", e, exc_info=True)
raise Exception("Error executing fit_vid_index") from e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate try-except logic?

await asyncio.sleep(3600)
except asyncio.CancelledError as ce:
log.info("Periodic task has been cancelled due to: %s", ce)
except Exception as e:
log.error("Unexpected error in schedule_fit_vid_index: %s", e, exc_info=True)
raise Exception("Unexpected error in schedule_fit_vid_index") from e

@asynccontextmanager
async def lifespan(app: FastAPI): #pylint: disable=W0621
log.info("Starting periodic task...")
task = asyncio.create_task(schedule_fit_vid_index())
yield

log.info("Stopping periodic task...")
task.cancel()
try:
await task
except asyncio.CancelledError:
log.info("Periodic task has been cancelled.")

# pylint: disable=C0301
def init_rag_ui() -> gr.Interface:
Expand Down Expand Up @@ -158,7 +189,7 @@ def refresh_ui_config_prompt() -> tuple:
parser.add_argument("--host", type=str, default="0.0.0.0", help="host")
parser.add_argument("--port", type=int, default=8001, help="port")
args = parser.parse_args()
app = FastAPI()
app = FastAPI(lifespan=lifespan)

# we don't need to manually check the env now
# settings.check_env()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
log.debug("Request URL: %s", url)
try:
if method.upper() == "GET":
resp = requests.get(url, headers=headers, params=params, timeout=5, auth=auth)
resp = requests.get(url, headers=headers, params=params, timeout=(1.0, 5.0), auth=auth)
Dismissed Show dismissed Hide dismissed
elif method.upper() == "POST":
resp = requests.post(url, headers=headers, params=params, json=body, timeout=5, auth=auth)
resp = requests.post(url, headers=headers, params=params, json=body, timeout=(1.0, 5.0), auth=auth)
Dismissed Show dismissed Hide dismissed
else:
raise ValueError("Unsupported HTTP method, please use GET/POST instead")
except requests.exceptions.RequestException as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ def create_vector_graph_block():
graph_index_btn1 = gr.Button("Clear Graph Data & Index", size="sm")

vector_import_bt = gr.Button("Import into Vector", variant="primary")
graph_index_rebuild_bt = gr.Button("Rebuild vid Index")
graph_extract_bt = gr.Button("Extract Graph Data (1)", variant="primary")
graph_loading_bt = gr.Button("Load into GraphDB (2)", interactive=True)
graph_index_rebuild_bt = gr.Button("Rebuild vid Index")

vector_index_btn0.click(get_vector_index_info, outputs=out).then(
store_prompt,
Expand Down Expand Up @@ -119,7 +119,7 @@ def create_vector_graph_block():
extract_graph, inputs=[input_file, input_text, input_schema, info_extract_template], outputs=[out]
).then(store_prompt, inputs=[input_schema, info_extract_template], )

graph_loading_bt.click(import_graph_data, inputs=[out, input_schema], outputs=[out]).then(
graph_loading_bt.click(import_graph_data, inputs=[out, input_schema], outputs=[out]).then(fit_vid_index).then(
store_prompt,
inputs=[input_schema, info_extract_template],
)
Expand Down
2 changes: 1 addition & 1 deletion hugegraph-llm/src/hugegraph_llm/models/rerankers/cohere.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_rerank_lists(self, query: str, documents: List[str], top_n: Optional[int
"top_n": top_n,
"documents": documents,
}
response = requests.post(url, headers=headers, json=payload, timeout=60)
response = requests.post(url, headers=headers, json=payload, timeout=(1.0, 10.0))
response.raise_for_status() # Raise an error for bad status codes
results = response.json()["results"]
sorted_docs = [documents[item["index"]] for item in results]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def get_rerank_lists(self, query: str, documents: List[str], top_n: Optional[int
"content-type": Constants.HEADER_CONTENT_TYPE,
"authorization": f"Bearer {self.api_key}",
}
response = requests.post(url, json=payload, headers=headers, timeout=60)
response = requests.post(url, json=payload, headers=headers, timeout=(1.0, 10.0))
response.raise_for_status() # Raise an error for bad status codes
results = response.json()["results"]
sorted_docs = [documents[item["index"]] for item in results]
Expand Down
4 changes: 2 additions & 2 deletions hugegraph-python-client/src/pyhugegraph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def __init__(
user: str,
pwd: str,
graphspace: Optional[str] = None,
timeout: int = 10,
timeout: Optional[tuple[float, float]] = None
):
self.cfg = HGraphConfig(ip, port, user, pwd, graph, graphspace, timeout)
self.cfg = HGraphConfig(ip, port, user, pwd, graph, graphspace, timeout or (0.5, 15.0))

@manager_builder
def schema(self) -> "SchemaManager":
Expand Down
4 changes: 2 additions & 2 deletions hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HGraphConfig:
password: str
graph_name: str
graphspace: Optional[str] = None
timeout: int = 10
timeout: tuple[float, float] = (0.5, 15.0)
gs_supported: bool = field(default=False, init=False)
version: List[int] = field(default_factory=list)

Expand All @@ -44,7 +44,7 @@ def __post_init__(self):
else:
try:
response = requests.get(
f"http://{self.ip}:{self.port}/versions", timeout=1
f"http://{self.ip}:{self.port}/versions", timeout=0.5
)
core = response.json()["versions"]["core"]
log.info( # pylint: disable=logging-fstring-interpolation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class HGraphSession:
def __init__(
self,
cfg: HGraphConfig,
retries: int = 5,
backoff_factor: int = 1,
retries: int = 3,
backoff_factor: int = 0.1,
status_forcelist=(500, 502, 504),
session: Optional[requests.Session] = None,
):
Expand Down
Loading