From ffc530be44c0d00a88b6d34643c0c8e05276f951 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Mon, 9 Dec 2024 07:41:54 -0800 Subject: [PATCH] Broader catch in post/patch --- python/langsmith/_internal/_operations.py | 26 +++++- python/langsmith/run_trees.py | 104 ++++++++++++---------- 2 files changed, 78 insertions(+), 52 deletions(-) diff --git a/python/langsmith/_internal/_operations.py b/python/langsmith/_internal/_operations.py index 66decff0f..a97dec161 100644 --- a/python/langsmith/_internal/_operations.py +++ b/python/langsmith/_internal/_operations.py @@ -123,6 +123,26 @@ def serialize_feedback_dict( ) +def _dumps_json_safe(value: object, which: str) -> bytes: + try: + return _dumps_json(value) + except Exception as e: + logger.warning(f"Error serializing {which}: {e}") + try: + return _dumps_json( + { + "langsmith_serialization_error": f"Error serializing {which}:" + f" {str(e)}\n\n{value}" + } + ) + except Exception: + return _dumps_json( + { + "langsmith_serialization_error": f"Error serializing {which}: {str(e)}" + } + ) + + def serialize_run_dict( operation: Literal["post", "patch"], payload: dict ) -> SerializedRunOperation: @@ -135,9 +155,9 @@ def serialize_run_dict( id=payload["id"], trace_id=payload["trace_id"], _none=_dumps_json(payload), - inputs=_dumps_json(inputs) if inputs is not None else None, - outputs=_dumps_json(outputs) if outputs is not None else None, - events=_dumps_json(events) if events is not None else None, + inputs=_dumps_json_safe(inputs, "inputs") if inputs is not None else None, + outputs=_dumps_json_safe(outputs, "outputs") if outputs is not None else None, + events=_dumps_json_safe(events, "events") if events is not None else None, attachments=attachments if attachments is not None else None, ) diff --git a/python/langsmith/run_trees.py b/python/langsmith/run_trees.py index 63f0cb4e5..fcef5ab7b 100644 --- a/python/langsmith/run_trees.py +++ b/python/langsmith/run_trees.py @@ -299,60 +299,66 @@ def _get_dicts_safe(self): def post(self, exclude_child_runs: bool = True) -> None: """Post the run tree to the API asynchronously.""" - kwargs = self._get_dicts_safe() - self.client.create_run(**kwargs) - if attachments := kwargs.get("attachments"): - keys = [str(name) for name in attachments] - self.events.append( - { - "name": "uploaded_attachment", - "time": datetime.now(timezone.utc).isoformat(), - "message": set(keys), - } - ) - if not exclude_child_runs: - for child_run in self.child_runs: - child_run.post(exclude_child_runs=False) + try: + kwargs = self._get_dicts_safe() + self.client.create_run(**kwargs) + if attachments := kwargs.get("attachments"): + keys = [str(name) for name in attachments] + self.events.append( + { + "name": "uploaded_attachment", + "time": datetime.now(timezone.utc).isoformat(), + "message": set(keys), + } + ) + if not exclude_child_runs: + for child_run in self.child_runs: + child_run.post(exclude_child_runs=False) + except Exception as e: + logger.warning(f"Error creating run: {e}") def patch(self) -> None: """Patch the run tree to the API in a background thread.""" - if not self.end_time: - self.end() - attachments = self.attachments try: - # Avoid loading the same attachment twice - if attachments: - uploaded = next( - ( - ev - for ev in self.events - if ev.get("name") == "uploaded_attachment" - ), - None, - ) - if uploaded: - attachments = { - a: v - for a, v in attachments.items() - if a not in uploaded["message"] - } + if not self.end_time: + self.end() + attachments = self.attachments + try: + # Avoid loading the same attachment twice + if attachments: + uploaded = next( + ( + ev + for ev in self.events + if ev.get("name") == "uploaded_attachment" + ), + None, + ) + if uploaded: + attachments = { + a: v + for a, v in attachments.items() + if a not in uploaded["message"] + } + except Exception as e: + logger.warning(f"Error filtering attachments to upload: {e}") + self.client.update_run( + name=self.name, + run_id=self.id, + outputs=self.outputs.copy() if self.outputs else None, + error=self.error, + parent_run_id=self.parent_run_id, + reference_example_id=self.reference_example_id, + end_time=self.end_time, + dotted_order=self.dotted_order, + trace_id=self.trace_id, + events=self.events, + tags=self.tags, + extra=self.extra, + attachments=attachments, + ) except Exception as e: - logger.warning(f"Error filtering attachments to upload: {e}") - self.client.update_run( - name=self.name, - run_id=self.id, - outputs=self.outputs.copy() if self.outputs else None, - error=self.error, - parent_run_id=self.parent_run_id, - reference_example_id=self.reference_example_id, - end_time=self.end_time, - dotted_order=self.dotted_order, - trace_id=self.trace_id, - events=self.events, - tags=self.tags, - extra=self.extra, - attachments=attachments, - ) + logger.warning(f"Error updating run: {e}") def wait(self) -> None: """Wait for all _futures to complete."""