-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/add new snapshot trigger and fix collector startup #1014
Feature/add new snapshot trigger and fix collector startup #1014
Conversation
…based on time or the number of rows
src/evidently/collector/app.py
Outdated
async def on_startup(app: Litestar) -> None: | ||
# TODO: check task health | ||
await loop(seconds=service.check_interval, func=check_snapshots_factory(service, service.storage)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The canonical way of handling long-running tasks in Litestar is to make use of the lifespan
context managers, which are purpose built for this:
https://docs.litestar.dev/latest/usage/applications.html#lifespan-context-managers
You could define one like so:
async def on_startup(app: Litestar) -> None: | |
# TODO: check task health | |
await loop(seconds=service.check_interval, func=check_snapshots_factory(service, service.storage)) | |
@contextlib.asynccontextmanager | |
def lifespan(app: Litestar) -> AsyncGenerator[None, None]: | |
func = check_snapshots_factory(service, service.storage) | |
event = asyncio.Event() | |
async def _loop(app: Litestar): | |
while not event.is_set(): | |
await func() | |
await asyncio.sleep(service.check_interval) | |
task = asyncio.create_task(_loop()) | |
try: | |
yield | |
finally: | |
event.set() | |
await task |
and add it to Litestar(..., lifespan=[lifespan])
.
Litestar guarantees that things you call there won't be cancelled and will gracefully shutdown, even in case of a server error. The on_startup
hook, while it can be used to achieve similar things, doesn't explicitly guarantee this and doesn't offer a way to handle graceful shutdowns of the tasks because it's a one-off function, which means you'll end up cancelling your tasks, which can lead to some warnings or potential errors because of unfinished coroutines. It's not a big deal in this case but I thought I'd mention it :)
Alternatively, if you don't want to deal with managing the task manually at all:
@contextlib.asynccontextmanager
def lifespan(app: Litestar) -> AsyncGenerator[None, None]:
func = check_snapshots_factory(service, service.storage)
async def _loop():
while True:
await func()
await asyncio.sleep(service.check_interval)
async with anyio.create_task_group() as tg:
try:
tg.start_soon(loop)
yield
finally:
tg.cancel_scope.cancel()
As for mypy, please ensure that you are using correct version from evidently/src/evidently/ui/api/projects.py Line 286 in e27e366
|
What is the issue exactly? If something is holding you guys up here don't hesitate to open an issue here or shoot us a message on our discord. It's always valuable for us to know what issues users are facing :) Also, just a suggestion, but if you need the raw body you can inject it via the @post("/{project_id:uuid}/snapshots")
async def add_snapshot(
project_id: Annotated[uuid.UUID, Parameter(title="id of project")],
body: bytes,
project_manager: Annotated[ProjectManager, Dependency()],
log_event: Annotated[Callable, Dependency()],
user_id: Annotated[UserID, Dependency()],
) -> None:
snapshot = Snapshot.parse_raw(body) |
@provinzkraut for now I know this. |
created an issue litestar-org/litestar#3150 |
Thanks, I'll look into it! |
This has been fixed here. A patch release will be published soon! |
@c0t0ber schema generation should be fixed with next litestar release, kudos to @provinzkraut |
src/evidently/collector/app.py
Outdated
|
||
|
||
async def create_snapshot(collector: CollectorConfig, storage: CollectorStorage) -> None: | ||
async with storage.lock(collector.id): | ||
current = storage.get_and_flush(collector.id) | ||
current = await run_in_thread_poll_executor(storage.get_and_flush, collector.id) # FIXME: sync function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is running in a Litestar application, there's a slight benefit of having this function use the same executor as Litestar does (the executor will be re-used by Litestar), so you could just make use of litestar.concurrency.sync_to_thread
:
current = await run_in_thread_poll_executor(storage.get_and_flush, collector.id) # FIXME: sync function | |
current = await sync_to_thread(storage.get_and_flush, collector.id) # FIXME: sync function |
Goes for the other instances as well.
…e_snapshot to avoid blocking the loop
95d625b
to
045e48f
Compare
I tried running it on Windows and Linux on versions 3.10, 3.8 and I am getting this error. I perform the dependency installation using pip install -e ".[dev]" and pip install -r requirements.dev.txt according to the contributor guide |
@c0t0ber This seems to be a mypy bug. The version used here is a bit outdated. Upgrading to 1.8.0 solved this issue for me. |
We'll probably need to add tests to collector as well at some point 😅 |
I'll try to add smoke tests |
I think there are enough changes in this merge request. I suggest creating an issue for the collector's client refinement, writing tests for the collector, and updating the mypy version |
edeb578
to
92a0bfd
Compare
For some reason test for win with py3.10 fails. I restarted it multiple times in case its a timeout problem, but error persists. Do you have access to win machine to check it out? |
Co-authored-by: Janek Nouvertné <[email protected]>
I found an issue with passing nan, inf, -inf values; msgpack cannot handle these values and fails on JSON parsing. One of the solutions I came up with is to parse the request manually in all places where these values could potentially appear. @provinzkraut |
Sure! You can extend the JSON parsing/serialisation by passing a custom encoding function for a type to the def is_nan(v: Any) -> bool:
...
def decode_nan(value: Any) -> nan:
...
app = Litestar(
type_decoders=[
(is_nan, decode_nan)
]
) That being said, msgspec does support support these values. If you're encoding them to their string representation (e.g. |
Hey @c0t0ber! We decided to skip those tests for now because we got other complaints about errors in collector, so we need to your fixes merged before next release :) We can work on tests in another PR if you like, and this one we will merge once CI passes |
As for nan/inf issue, can you give us a reproducible example? |
Added: New trigger has been added to initiate a snapshot for the collector based on time or the number of rows
fixed: Collector litestar server cant start #1012
adde
Also I can't get mypy to run, I've tried Python versions 3.8 and 3.10
Automatic generation of the Linestar API specification also does not work.