Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broader catch in post/patch #1307

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (92.8 ms) is 13% of the mean (710 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 710 ms +- 93 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (193 ms) is 14% of the mean (1.43 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_10_000_run_trees: Mean +- std dev: 1.43 sec +- 0.19 sec ........... WARNING: the benchmark result may be unstable * the standard deviation (180 ms) is 13% of the mean (1.43 sec) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_20_000_run_trees: Mean +- std dev: 1.43 sec +- 0.18 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 700 us +- 6 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.1 ms +- 0.3 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 2 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.3 ms +- 0.2 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (16.0 ms) is 23% of the mean (69.7 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 69.7 ms +- 16.0 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 199 ms +- 3 ms

Check notice on line 1 in python/langsmith/_internal/_operations.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 221 ms | 199 ms: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 724 ms | 710 ms: 1.02x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.6 ms | 25.3 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 105 ms | 104 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 705 us | 700 us: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.1 ms | 25.1 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.40 sec | 1.43 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.39 sec | 1.43 sec: 1.03x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 66.2 ms | 69.7 ms: 1.05x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------------+----------+------------------------+

import itertools
import logging
Expand Down Expand Up @@ -123,6 +123,26 @@
)


def _dumps_json_safe(value: object, which: str) -> bytes:
try:
return _dumps_json(value)
except Exception as e:
logger.warning(f"Error serializing {which}: {e}")
try:
return _dumps_json(
{
"langsmith_serialization_error": f"Error serializing {which}:"
f" {str(e)}\n\n{value}"
}
)
except Exception:
return _dumps_json(
{
"langsmith_serialization_error": f"Error serializing {which}: {str(e)}"
}
)


def serialize_run_dict(
operation: Literal["post", "patch"], payload: dict
) -> SerializedRunOperation:
Expand All @@ -135,9 +155,9 @@
id=payload["id"],
trace_id=payload["trace_id"],
_none=_dumps_json(payload),
inputs=_dumps_json(inputs) if inputs is not None else None,
outputs=_dumps_json(outputs) if outputs is not None else None,
events=_dumps_json(events) if events is not None else None,
inputs=_dumps_json_safe(inputs, "inputs") if inputs is not None else None,
outputs=_dumps_json_safe(outputs, "outputs") if outputs is not None else None,
events=_dumps_json_safe(events, "events") if events is not None else None,
attachments=attachments if attachments is not None else None,
)

Expand Down
104 changes: 55 additions & 49 deletions python/langsmith/run_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,60 +299,66 @@ def _get_dicts_safe(self):

def post(self, exclude_child_runs: bool = True) -> None:
"""Post the run tree to the API asynchronously."""
kwargs = self._get_dicts_safe()
self.client.create_run(**kwargs)
if attachments := kwargs.get("attachments"):
keys = [str(name) for name in attachments]
self.events.append(
{
"name": "uploaded_attachment",
"time": datetime.now(timezone.utc).isoformat(),
"message": set(keys),
}
)
if not exclude_child_runs:
for child_run in self.child_runs:
child_run.post(exclude_child_runs=False)
try:
kwargs = self._get_dicts_safe()
self.client.create_run(**kwargs)
if attachments := kwargs.get("attachments"):
keys = [str(name) for name in attachments]
self.events.append(
{
"name": "uploaded_attachment",
"time": datetime.now(timezone.utc).isoformat(),
"message": set(keys),
}
)
if not exclude_child_runs:
for child_run in self.child_runs:
child_run.post(exclude_child_runs=False)
except Exception as e:
logger.warning(f"Error creating run: {e}")

def patch(self) -> None:
"""Patch the run tree to the API in a background thread."""
if not self.end_time:
self.end()
attachments = self.attachments
try:
# Avoid loading the same attachment twice
if attachments:
uploaded = next(
(
ev
for ev in self.events
if ev.get("name") == "uploaded_attachment"
),
None,
)
if uploaded:
attachments = {
a: v
for a, v in attachments.items()
if a not in uploaded["message"]
}
if not self.end_time:
self.end()
attachments = self.attachments
try:
# Avoid loading the same attachment twice
if attachments:
uploaded = next(
(
ev
for ev in self.events
if ev.get("name") == "uploaded_attachment"
),
None,
)
if uploaded:
attachments = {
a: v
for a, v in attachments.items()
if a not in uploaded["message"]
}
except Exception as e:
logger.warning(f"Error filtering attachments to upload: {e}")
self.client.update_run(
name=self.name,
run_id=self.id,
outputs=self.outputs.copy() if self.outputs else None,
error=self.error,
parent_run_id=self.parent_run_id,
reference_example_id=self.reference_example_id,
end_time=self.end_time,
dotted_order=self.dotted_order,
trace_id=self.trace_id,
events=self.events,
tags=self.tags,
extra=self.extra,
attachments=attachments,
)
except Exception as e:
logger.warning(f"Error filtering attachments to upload: {e}")
self.client.update_run(
name=self.name,
run_id=self.id,
outputs=self.outputs.copy() if self.outputs else None,
error=self.error,
parent_run_id=self.parent_run_id,
reference_example_id=self.reference_example_id,
end_time=self.end_time,
dotted_order=self.dotted_order,
trace_id=self.trace_id,
events=self.events,
tags=self.tags,
extra=self.extra,
attachments=attachments,
)
logger.warning(f"Error updating run: {e}")

def wait(self) -> None:
"""Wait for all _futures to complete."""
Expand Down
Loading