Skip to content

Commit

Permalink
fix(api, tests): configmap reloading core dump (#1085)
Browse files Browse the repository at this point in the history
* Replaces watchfiles with watchdog to deal with the coredump issue being caused by a combination of awatch, asyncio and test code that is sensitive to async activity.

---------

Co-authored-by: Justin Law <[email protected]>
  • Loading branch information
CollectiveUnicorn and justinthelaw authored Sep 20, 2024
1 parent 3ddb98c commit 4a19c8e
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 81 deletions.
24 changes: 16 additions & 8 deletions src/leapfrogai_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,25 @@
logger = logging.getLogger(__name__)


# handle startup & shutdown tasks
# Handle startup & shutdown tasks
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Handle startup and shutdown tasks for the FastAPI app."""
# startup
logger.info("Starting to watch for configs with this being an info")
asyncio.create_task(get_model_config().watch_and_load_configs())
yield
# shutdown
logger.info("Clearing model configs")
asyncio.create_task(get_model_config().clear_all_models())
# Startup
logger.info("Starting to watch for configs.")
config = get_model_config()
config_task = asyncio.create_task(config.watch_and_load_configs())
try:
yield
finally:
# Shutdown
logger.info("Stopping config watcher and clearing model configs.")
config_task.cancel()
try:
await config_task
except asyncio.CancelledError:
pass # Task was cancelled, which is expected during shutdown
await config.clear_all_models()


app = FastAPI(lifespan=lifespan)
Expand Down
2 changes: 1 addition & 1 deletion src/leapfrogai_api/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies = [
"uvicorn == 0.23.2",
"docx2txt == 0.8",
"python-multipart == 0.0.7", #indirect dep of FastAPI to receive form data for file uploads
"watchfiles == 0.21.0",
"watchdog == 5.0.2",
"leapfrogai_sdk",
"supabase == 2.6.0",
"langchain == 0.2.12",
Expand Down
144 changes: 84 additions & 60 deletions src/leapfrogai_api/utils/config.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
import asyncio
import fnmatch
import glob
import logging
import os
import toml
import yaml
from watchfiles import Change, awatch
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

from leapfrogai_api.typedef.models import Model


logger = logging.getLogger(__name__)


class ConfigHandler(FileSystemEventHandler):
def __init__(self, config):
self.config = config
super().__init__()

def on_created(self, event):
self.process(event)

def on_modified(self, event):
self.process(event)

def on_deleted(self, event):
self.process(event)

def process(self, event):
# Ignore directory events
if event.is_directory:
return

filename = os.path.basename(event.src_path)
logger.debug(f"Processing event '{event.event_type}' for file '{filename}'")

# Check if the file matches the config filename or pattern
if fnmatch.fnmatch(filename, self.config.filename):
if event.event_type == "deleted":
logger.info(f"Detected deletion of config file '{filename}'")
self.config.remove_model_by_config(filename)
else:
logger.info(
f"Detected modification/creation of config file '{filename}'"
)
self.config.load_config_file(self.config.directory, filename)


class Config:
models: dict[str, Model] = {}
config_sources: dict[str, list] = {}
Expand All @@ -21,89 +56,83 @@ def __init__(
):
self.models = models
self.config_sources = config_sources
self.directory = "."
self.filename = "config.yaml"

def __str__(self):
return f"Models: {self.models}"

async def watch_and_load_configs(self, directory=".", filename="config.yaml"):
# Get the config directory and filename from the environment variables if provided
env_directory = os.environ.get("LFAI_CONFIG_PATH", directory)
if env_directory is not None and env_directory != "":
if env_directory:
directory = env_directory
env_filename = os.environ.get("LFAI_CONFIG_FILENAME", filename)
if env_filename is not None and env_filename != "":
if env_filename:
filename = env_filename

self.directory = directory
self.filename = filename

# Process all the configs that were already in the directory
self.load_all_configs(directory, filename)

# Watch the directory for changes until the end of time
while True:
async for changes in awatch(directory, recursive=False, step=50):
# get two unique lists of files that have been (updated files and deleted files)
# (awatch can return duplicates depending on the type of updates that happen)
logger.info("Config changes detected: {}".format(changes))
unique_new_files = set()
unique_deleted_files = set()
for change in changes:
if change[0] == Change.deleted:
unique_deleted_files.add(os.path.basename(change[1]))
else:
unique_new_files.add(os.path.basename(change[1]))

# filter the files to those that match the filename or glob pattern
filtered_new_matches = fnmatch.filter(unique_new_files, filename)
filtered_deleted_matches = fnmatch.filter(
unique_deleted_files, filename
)
# Set up the event handler and observer
event_handler = ConfigHandler(self)
observer = Observer()
observer.schedule(event_handler, path=directory, recursive=False)

# load all the updated config files
for match in filtered_new_matches:
self.load_config_file(directory, match)
# Start the observer
observer.start()
logger.info(f"Started watching directory: {directory}")

# remove deleted models
for match in filtered_deleted_matches:
self.remove_model_by_config(match)
try:
while True:
await asyncio.sleep(1)
except (KeyboardInterrupt, asyncio.CancelledError):
# Stop the observer if the script is interrupted
observer.stop()
logger.info(f"Stopped watching directory: {directory}")

# Wait for the observer to finish
observer.join()

async def clear_all_models(self):
# reset the model config on shutdown (so old model configs don't get cached)
# Reset the model config on shutdown (so old model configs don't get cached)
self.models = {}
self.config_sources = {}
logger.info("All models have been removed")

def load_config_file(self, directory: str, config_file: str):
logger.info("Loading config file: {}/{}".format(directory, config_file))
logger.info(f"Loading config file: {directory}/{config_file}")

# load the config file into the config object
# Load the config file into the config object
config_path = os.path.join(directory, config_file)
with open(config_path) as c:
# Load the file into a python object
loaded_artifact = {}
if config_path.endswith(".toml"):
loaded_artifact = toml.load(c)
elif config_path.endswith(".yaml"):
loaded_artifact = yaml.safe_load(c)
else:
# TODO: Return an error ???
logger.error(f"Unsupported file type: {config_path}")
return

# parse the object into our config
self.parse_models(loaded_artifact, config_file)

logger.info("loaded artifact at {}".format(config_path))

return
try:
with open(config_path) as c:
# Load the file into a python object
if config_path.endswith(".toml"):
loaded_artifact = toml.load(c)
elif config_path.endswith(".yaml"):
loaded_artifact = yaml.safe_load(c)
else:
logger.error(f"Unsupported file type: {config_path}")
return

# Parse the object into our config
self.parse_models(loaded_artifact, config_file)

logger.info(f"Loaded artifact at {config_path}")
except Exception as e:
logger.error(f"Failed to load config file {config_path}: {e}")

def load_all_configs(self, directory="", filename="config.yaml"):
logger.info(
"Loading all configs in {} that match the name '{}'".format(
directory, filename
)
f"Loading all configs in {directory} that match the name '{filename}'"
)

if not os.path.exists(directory):
logger.error("The config directory ({}) does not exist".format(directory))
logger.error(f"The config directory ({directory}) does not exist")
return "THE CONFIG DIRECTORY DOES NOT EXIST"

# Get all config files and load them into the config object
Expand All @@ -112,13 +141,8 @@ def load_all_configs(self, directory="", filename="config.yaml"):
dir_path, file_path = os.path.split(config_path)
self.load_config_file(directory=dir_path, config_file=file_path)

return

def get_model_backend(self, model: str) -> Model | None:
if model in self.models:
return self.models[model]
else:
return None
return self.models.get(model)

def parse_models(self, loaded_artifact, config_file):
for m in loaded_artifact["models"]:
Expand Down
45 changes: 33 additions & 12 deletions tests/pytest/leapfrogai_api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
)
LFAI_CONFIG_FILEPATH = os.path.join(LFAI_CONFIG_PATH, LFAI_CONFIG_FILENAME)

MODEL = "repeater"
TEXT_INPUT = (
"This is the input content for completions, embeddings, and chat completions"
)
TEXT_INPUT_LEN = len(TEXT_INPUT)


#########################
#########################
Expand Down Expand Up @@ -74,39 +80,54 @@ def test_config_load():
response = client.get("/leapfrogai/v1/models")

assert response.status_code == 200
assert response.json() == {
"config_sources": {"repeater-test-config.yaml": ["repeater"]},
"models": {"repeater": {"backend": "localhost:50051", "name": "repeater"}},
expected_response = {
"config_sources": {"repeater-test-config.yaml": [MODEL]},
"models": {MODEL: {"backend": "localhost:50051", "name": MODEL}},
"directory": LFAI_CONFIG_PATH,
"filename": LFAI_CONFIG_FILENAME,
}
assert response.json() == expected_response


def test_config_delete(tmp_path):
"""Test that the config is deleted correctly."""
# move repeater-test-config.yaml to temp dir so that we can remove it at a later step
# Move repeater-test-config.yaml to temp dir so that we can remove it at a later step
tmp_config_filepath = shutil.copyfile(
LFAI_CONFIG_FILEPATH, os.path.join(tmp_path, LFAI_CONFIG_FILENAME)
)
os.environ["LFAI_CONFIG_PATH"] = str(tmp_path)

with TestClient(app) as client:
# ensure the API loads the temp config
# Ensure the API loads the temp config
response = client.get("/leapfrogai/v1/models")
assert response.status_code == 200

assert response.json() == {
"config_sources": {"repeater-test-config.yaml": ["repeater"]},
"models": {"repeater": {"backend": "localhost:50051", "name": "repeater"}},
expected_response = {
"config_sources": {"repeater-test-config.yaml": [MODEL]},
"models": {MODEL: {"backend": "localhost:50051", "name": MODEL}},
"directory": os.environ["LFAI_CONFIG_PATH"],
"filename": LFAI_CONFIG_FILENAME,
}
# delete source config from temp dir
assert response.json() == expected_response

# Delete source config from temp dir
os.remove(tmp_config_filepath)

# wait for the api to be able to detect the change
# Wait for the API to detect the change
time.sleep(0.5)
# assert response is now empty

# Assert response is now empty
response = client.get("/leapfrogai/v1/models")
assert response.status_code == 200
assert response.json() == {"config_sources": {}, "models": {}}
expected_empty_response = {
"config_sources": {},
"models": {},
"directory": os.environ["LFAI_CONFIG_PATH"],
"filename": LFAI_CONFIG_FILENAME,
}
assert response.json() == expected_empty_response

# Reset the environment variable
os.environ["LFAI_CONFIG_PATH"] = os.path.join(os.path.dirname(__file__), "fixtures")


Expand Down

0 comments on commit 4a19c8e

Please sign in to comment.