diff --git a/python/bench/tracing_client_via_pyo3.py b/python/bench/tracing_client_via_pyo3.py index cb1cd6b97..411b97f2c 100644 --- a/python/bench/tracing_client_via_pyo3.py +++ b/python/bench/tracing_client_via_pyo3.py @@ -75,7 +75,7 @@ def benchmark_run_creation(json_size, num_runs) -> None: if client._pyo3_client: # Wait for the queue to drain. - client._pyo3_client.drain() + del client else: client.tracing_queue.join() diff --git a/python/bench/tracing_rust_client_bench.py b/python/bench/tracing_rust_client_bench.py index 6c5203d98..3f846e5d9 100644 --- a/python/bench/tracing_rust_client_bench.py +++ b/python/bench/tracing_rust_client_bench.py @@ -49,7 +49,7 @@ def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> D client.create_run(run) # wait for client queues to be empty - client.drain() + del client elapsed = time.perf_counter() - start print(f"runs complete: {elapsed:.3f}s") diff --git a/python/bench/tracing_script.py b/python/bench/tracing_script.py new file mode 100644 index 000000000..6749a47b9 --- /dev/null +++ b/python/bench/tracing_script.py @@ -0,0 +1,64 @@ +import os + +os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py" +os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true" + +from langsmith import Client, traceable + +client = Client( + api_url="https://beta.api.smith.langchain.com", + api_key=os.environ["LANGCHAIN_API_KEY"], +) + + +@traceable(client=client) +def format_prompt(subject): + return [ + { + "role": "system", + "content": "You are a helpful assistant.", + }, + { + "role": "user", + "content": f"What's a good name for a store that sells {subject}?", + }, + ] + + +@traceable(run_type="llm", client=client) +def invoke_llm(messages): + return { + "choices": [ + { + "message": { + "role": "assistant", + "content": "Sure, how about 'Rainbow Socks'?", + } + } + ] + } + + +@traceable(client=client) +def parse_output(response): + return response["choices"][0]["message"]["content"] + + +@traceable(client=client) +def run_pipeline(): + messages = format_prompt("colorful socks") + response = invoke_llm(messages) + result = parse_output(response) + + import time + + time.sleep(2) + + return result + + +if __name__ == "__main__": + print("running pipeline") + run_pipeline() + # import time + # time.sleep(30) diff --git a/python/langsmith/_internal/_operations.py b/python/langsmith/_internal/_operations.py index 66decff0f..89d5f12fc 100644 --- a/python/langsmith/_internal/_operations.py +++ b/python/langsmith/_internal/_operations.py @@ -141,6 +141,27 @@ def serialize_run_dict( attachments=attachments if attachments is not None else None, ) +def serialize_run_dict_for_compressed_ingest( + operation: Literal["post", "patch"], payload: dict +): + inputs = payload.pop("inputs", None) + outputs = payload.pop("outputs", None) + events = payload.pop("events", None) + attachments = payload.pop("attachments", None) + serialized = ... + extra = ... + return SerializedRunOperation( + operation=operation, + id=payload["id"], + trace_id=payload["trace_id"], + _none=_dumps_json(payload), + inputs=_dumps_json(inputs) if inputs is not None else None, + outputs=_dumps_json(outputs) if outputs is not None else None, + events=_dumps_json(events) if events is not None else None, + attachments=attachments if attachments is not None else None, + ) + + def combine_serialized_queue_operations( ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]], diff --git a/python/langsmith/client.py b/python/langsmith/client.py index e422dbec1..ecf620b76 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -550,6 +550,7 @@ def __init__( if langsmith_pyo3: # TODO: tweak these constants as needed + print("using pyo3 client") queue_capacity = 1_000_000 batch_size = 100 batch_timeout_millis = 1000 @@ -571,6 +572,9 @@ def __init__( repr(e), ) + if self._pyo3_client is None: + print("NOT using pyo3 client") + self._settings: Union[ls_schemas.LangSmithSettings, None] = None self._manual_cleanup = False @@ -1272,6 +1276,7 @@ def create_run( "inputs": inputs, "run_type": run_type, } + print("RUN_CREATE called", run_create) if not self._filter_for_sampling([run_create]): return if revision_id is not None: @@ -1287,6 +1292,7 @@ def create_run( run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): + print("RUN_CREATE batch", run_create) if self._pyo3_client is not None: self._pyo3_client.create_run(run_create) elif self.tracing_queue is not None: @@ -1753,6 +1759,8 @@ def update_run( if data["extra"]: self._insert_runtime_env([data]) + print("UPDATE_RUN", data) + if self._pyo3_client is not None: self._pyo3_client.update_run(data) elif use_multipart and self.tracing_queue is not None: diff --git a/python/poetry.lock b/python/poetry.lock index 77d4b5c08..92a48a976 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "annotated-types" @@ -589,47 +589,41 @@ files = [ [[package]] name = "langsmith-pyo3" -version = "0.1.0rc2" +version = "0.1.0rc4" description = "" optional = true -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "langsmith_pyo3-0.1.0rc2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6a77834cf7225b863615456b4110fcc7df3ebd41a2d6ea0b8359f7ff8a785f21"}, - {file = "langsmith_pyo3-0.1.0rc2-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:57a3f4a777f601305bdd8a40d103cc4e24f06fdfddb26d9e2713991e636ed26d"}, - {file = "langsmith_pyo3-0.1.0rc2-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:bbc771e40e78dfd02f55b81c9ad6dda94922f7feebb9193963bbb83bd8af3eae"}, - {file = "langsmith_pyo3-0.1.0rc2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:108739044d80909ac60069b2dc4f0b6c4ba46ce4bf6a2cfbded2b25b67524f7c"}, - {file = "langsmith_pyo3-0.1.0rc2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:9df02ad699243b2e54dea4309918dbe6923bc366a89e9b5f7ad857f9ae910f0d"}, - {file = "langsmith_pyo3-0.1.0rc2-cp310-none-win_amd64.whl", hash = "sha256:f4f79a3b6e8d58c2123c022a3e314064e5b170b94bde966fd352253631fa4857"}, - {file = "langsmith_pyo3-0.1.0rc2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e93a45b4d08fa3f188c6d76a98ab9b3fba7d0d604b0aa5e6370ce65334c0af6a"}, - {file = "langsmith_pyo3-0.1.0rc2-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:c98ea4804d6a5d9213c7833b6d36fa967f8201bfbc57ac9e743f9b15f455d389"}, - {file = "langsmith_pyo3-0.1.0rc2-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:15f32257d5182324541aa2f370acf81b797afcb14238187b50244255676570e3"}, - {file = "langsmith_pyo3-0.1.0rc2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:7676baa04abce61298118b8790d0743246f8617e47b97036bd734a4623160c9a"}, - {file = "langsmith_pyo3-0.1.0rc2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:725c687b036b333334c394dee110e40c73db2d86551c11821f1b089e61487407"}, - {file = "langsmith_pyo3-0.1.0rc2-cp311-none-win_amd64.whl", hash = "sha256:3eb3ad8804d215b9670ef6c135714ced1e6db6d5f53c335fa3c1da9cbc24fef8"}, - {file = "langsmith_pyo3-0.1.0rc2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b4dd2f456686b400bb47a400ceea571fb6c6cc6757cf6a9a4d5174ffa7c188a4"}, - {file = "langsmith_pyo3-0.1.0rc2-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:248b1aaab324f8f535b888d6ea1fff0f5e639b21686fe772010ae2cf360b2327"}, - {file = "langsmith_pyo3-0.1.0rc2-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:c3dcd1f8bb6951f0ef4181d74f713fcf864b86f49132228acdf8f8c877605daa"}, - {file = "langsmith_pyo3-0.1.0rc2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:62ee1c0ac5079809d8fb746d4522f573e37457197aebb71965eb2672a75bff38"}, - {file = "langsmith_pyo3-0.1.0rc2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:17ffbbe8a62f8d87b4d9096bddfa2c4421cb29d45043b0b09d78bed8a9b7741f"}, - {file = "langsmith_pyo3-0.1.0rc2-cp312-none-win_amd64.whl", hash = "sha256:3a06377c9ac390ed76a65f62e29b88480be32739b96ed9f51b6e6f6210551202"}, - {file = "langsmith_pyo3-0.1.0rc2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:d66e40f56495a84d11b9d47d69421750214e2de3ba683cdbc9eb04ded11a3f66"}, - {file = "langsmith_pyo3-0.1.0rc2-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:12b27b6441666a3123a6016fcf78288d9194f54e48f021b5172fe8fc58994eba"}, - {file = "langsmith_pyo3-0.1.0rc2-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:6e124f0aa1142087b7a6b0d2b9f6dd82415fa64899f12e9650174957918300f4"}, - {file = "langsmith_pyo3-0.1.0rc2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:46bd1c411ebbda514954020c46eb65b3a8a9378cfc153fc35a09e375fc5feead"}, - {file = "langsmith_pyo3-0.1.0rc2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:4d08430ba4e93ec9ac704c1b0116130d9af7cee86b7d4b3a74829b239d5d557a"}, - {file = "langsmith_pyo3-0.1.0rc2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7ac4c3a138cea449d5ed5621425daf148b0ed000df4a490cfb304099e0770004"}, - {file = "langsmith_pyo3-0.1.0rc2-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:0d9d2bef3a0098e2ff28d7135320660abdf342b857c00f5ca17c0b03870193c8"}, - {file = "langsmith_pyo3-0.1.0rc2-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:4b82c9b6ba9bb6fd464aaca50b2f8094aba92f2948df0e6301b8b0fc2bb46baf"}, - {file = "langsmith_pyo3-0.1.0rc2-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:19e70971e432661314b6196357eb92633049e2dd0bc6fba61b86aa113a09aedf"}, - {file = "langsmith_pyo3-0.1.0rc2-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:372d80979cd1b7d59e290ab80f9cad3d7059f5aa66c9757d522898f0d399bbed"}, - {file = "langsmith_pyo3-0.1.0rc2-cp38-none-win_amd64.whl", hash = "sha256:7fbb73b1c448ac4964358c9ca1be3107bb2c0c38715343c5da7f92ed0e3ee490"}, - {file = "langsmith_pyo3-0.1.0rc2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ddeca8b3ae912d090ec9fd5589e0b80cd5475c80dbc439976d3f92bcbe93da81"}, - {file = "langsmith_pyo3-0.1.0rc2-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:6daa12978c18f4560858eac2c84d60090cd5ba7e55e657e052ba7b558f23c1d8"}, - {file = "langsmith_pyo3-0.1.0rc2-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:60ec1e51f674141ab96f8c2d814d42410f8163f9323f1e98bde8d26cf4676513"}, - {file = "langsmith_pyo3-0.1.0rc2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:2c4813d11d515386b34a827c958edabecd9ef32306800a5e7d2f12ea2d1d0943"}, - {file = "langsmith_pyo3-0.1.0rc2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:11c8d34d8583d5bb009a081fbfebab8b73c1730069626940ee05644c40f77625"}, - {file = "langsmith_pyo3-0.1.0rc2-cp39-none-win_amd64.whl", hash = "sha256:f341dff48be2c289c23733489e60adf7e1f005eea95ebb6275b20314fd7fb5a6"}, - {file = "langsmith_pyo3-0.1.0rc2.tar.gz", hash = "sha256:30eb26aa33deca44eb9210b77d478ec2157a0cb51f96da30f87072dd5912e3ed"}, + {file = "langsmith_pyo3-0.1.0rc4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1db5cb43432eef70e839a2d33e2a64c5380b70feceb500d95c090ab62013df62"}, + {file = "langsmith_pyo3-0.1.0rc4-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:5aecf8f73ce2930c7900748ed9cd4bf35c8cd06856abbfadfd705079e382fa8b"}, + {file = "langsmith_pyo3-0.1.0rc4-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:8fa532101c0ad5c761e3465ed1b53b8c1837cbbbdd12b139058b673ad2b8d099"}, + {file = "langsmith_pyo3-0.1.0rc4-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:84c42d5119eb56993132725a7f1240deea79bea25ac8cf2befd061d9d44fe408"}, + {file = "langsmith_pyo3-0.1.0rc4-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:c2fea7965b381558f30a0e0a4d645acf7b0c6cdfd6f3495e46a3ea97e4ed27d6"}, + {file = "langsmith_pyo3-0.1.0rc4-cp310-cp310-win_amd64.whl", hash = "sha256:fcf4a36e4af7d6c7980df8f9bdba0ac2a73ad4761b09eeab4996170ff3a19901"}, + {file = "langsmith_pyo3-0.1.0rc4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3fe99ecd0ca02fea3ba5968d2491c22b63af26e5553cd27aaaf63aa9edb66dff"}, + {file = "langsmith_pyo3-0.1.0rc4-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:007161667ffc3e288eb33cd4fb39e57071d9cffb02aae3c1ea44f9ca290ae6e5"}, + {file = "langsmith_pyo3-0.1.0rc4-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:15ff54e95eca2f7ba2d794f0d22e78e40ca606761ee7f8c6e30111f58e11dc6e"}, + {file = "langsmith_pyo3-0.1.0rc4-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:3abc8b883d44e419874b8763b9d0c0ed6d8fd87701d5e3f28afbd2d1d7eeae7e"}, + {file = "langsmith_pyo3-0.1.0rc4-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:43317dd5a409d9d55e0bb02329ff26c12641fb0edb3dc6dc9d3c4136ef57effa"}, + {file = "langsmith_pyo3-0.1.0rc4-cp311-cp311-win_amd64.whl", hash = "sha256:bf03ba879ebf587804129e4e989f153b6020d3fb561781661d270242464f2a7a"}, + {file = "langsmith_pyo3-0.1.0rc4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:93fe3c90e55a0ef56958cd2ae0b5d29f274e2e63a8ae62e126476471d379d2d2"}, + {file = "langsmith_pyo3-0.1.0rc4-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:f8dd9eb6e9f9041e83b0e56424645cfc9849616b43f84de57bef3efdf47d187b"}, + {file = "langsmith_pyo3-0.1.0rc4-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:669809717d1a1c5a6de95bc48689f956da05a9b2380352b583e2e416254bff54"}, + {file = "langsmith_pyo3-0.1.0rc4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:e152433e3960652e909e5ad50bd7c99c777103fd9b4f553c2a28ebcb5b71f287"}, + {file = "langsmith_pyo3-0.1.0rc4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:8c1fc1f1f61b764d991539849f098565fd96676173b613040580e38827ed6105"}, + {file = "langsmith_pyo3-0.1.0rc4-cp312-cp312-win_amd64.whl", hash = "sha256:3caa5156811f29df75d520b5b075d2ce1aee9320082e51015606a264a01950df"}, + {file = "langsmith_pyo3-0.1.0rc4-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:dfa3b1291ac1ebe928668210f88c7cb540850be867aff00dc8b9415f29646051"}, + {file = "langsmith_pyo3-0.1.0rc4-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:f0ba946b5383fc50ff38afe68d4bdee37f883965032886d1d9718551711b3379"}, + {file = "langsmith_pyo3-0.1.0rc4-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:122e44f1844a6874ef8e1384dbddcba2848d975c2effd9dc7a913696dc6bcdda"}, + {file = "langsmith_pyo3-0.1.0rc4-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:3f2b637fb9ce583e27b2e87418a414c5b5ec39245e000977f1f6b8c642cb5480"}, + {file = "langsmith_pyo3-0.1.0rc4-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:1909eb28339b1c9652d7bf0c24397451b63d1eea872ca5f5f6e9e7e913405092"}, + {file = "langsmith_pyo3-0.1.0rc4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c1e6e214d9885c33f350a07cab1ee69795884f3d7234eb051310aa110c003f51"}, + {file = "langsmith_pyo3-0.1.0rc4-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:96b96057b77c1fa687109bfe9e16124e145f5d5bef7202a9cd57550137129ff4"}, + {file = "langsmith_pyo3-0.1.0rc4-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:341fac93e382a2794cc76097a45b2ee396214c6e967833d0ad03e496a87f8c90"}, + {file = "langsmith_pyo3-0.1.0rc4-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:a00d9f4dc6320c62cf6875c180d6c1198d4dfa73120f5e9c50456421f7c9727e"}, + {file = "langsmith_pyo3-0.1.0rc4-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:0ef1e7e8d33ff60d2b356624833c3a887c5d6c0b21840e567a27c0fa33749353"}, + {file = "langsmith_pyo3-0.1.0rc4-cp39-cp39-win_amd64.whl", hash = "sha256:c88598eea17d72150c37fa7e4cb9e258b57fb7cfea3d8d51809037ba75f39936"}, + {file = "langsmith_pyo3-0.1.0rc4.tar.gz", hash = "sha256:a122ef9b9357b6e94c35de428bc0ccf2f1f2fc9ed1b928fd0a25ec4c2d7d8873"}, ] [[package]] @@ -2113,4 +2107,4 @@ vcr = [] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "c7acc8c8f123bf7968b265a0f0cdd0b679d88559bfbff33488bff25bb4f54f0f" +content-hash = "2a72dff2f1cc3dfea3f5f7406b3be7bd45c9978fbbb0b3e7e9e927c2e46223ca" diff --git a/python/pyproject.toml b/python/pyproject.toml index a831ff0df..d8a9ce1c5 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -36,7 +36,7 @@ httpx = ">=0.23.0,<1" requests-toolbelt = "^1.0.0" # Enabled via `langsmith_pyo3` extra: `pip install langsmith[langsmith_pyo3]`. -langsmith-pyo3 = { version = "^0.1.0rc2", optional = true } +langsmith-pyo3 = { version = "^0.1.0rc4", optional = true } [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" diff --git a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs index 77020bd74..6b3816f15 100644 --- a/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs +++ b/rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs @@ -68,14 +68,19 @@ impl BlockingTracingClient { Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner())) .map_err(|e| into_py_err(slf.py(), e)) } - - pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> { - let unpacked = slf.get(); - Python::allow_threads(slf.py(), || unpacked.client.drain()) - .map_err(|e| into_py_err(slf.py(), e)) - } } fn into_py_err(py: Python<'_>, e: langsmith_tracing_client::client::TracingClientError) -> PyErr { crate::errors::TracingClientError::new_err(format!("{e}").into_py(py)) } + +impl Drop for BlockingTracingClient { + fn drop(&mut self) { + if Arc::strong_count(&self.client) == 1 { + // This is the only copy of the client in Python, + // so let it drain its in-progress requests before proceeding. + // This runs when Python runs GC on the client, such as when the application is exiting. + self.client.drain().expect("draining failed"); + } + } +} diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs index 8005e0e55..1d8e8b50c 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::io::Read as _; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{mpsc, Arc, Mutex}; use std::time::{Duration, Instant}; @@ -55,17 +57,15 @@ impl RunProcessor { } self.drain_sender.send(()).expect("drain_sender should never fail"); - - // Put this thread to sleep, so we know the remaining `Drain` messages - // are almost certainly answered by other worker threads. - // - // HACK: This is very hacky! - // Drain should only be used for benchmarking. - std::thread::sleep(Duration::from_secs(120)); + break; } _ => { buffer.push(queued_run); if buffer.len() >= batch_size { + eprintln!( + "reached buffer size cap ({} >= {batch_size}), sending", + buffer.len() + ); self.send_and_clear_buffer(&mut buffer)?; last_send_time = Instant::now(); } @@ -73,12 +73,14 @@ impl RunProcessor { }, Err(mpsc::RecvTimeoutError::Timeout) => { if !buffer.is_empty() && last_send_time.elapsed() >= batch_timeout { + eprintln!("reached timeout with {} buffered items, sending", buffer.len()); self.send_and_clear_buffer(&mut buffer)?; last_send_time = Instant::now(); } } Err(mpsc::RecvTimeoutError::Disconnected) => { if !buffer.is_empty() { + eprintln!("disconnected with {} buffered items, sending", buffer.len()); self.send_and_clear_buffer(&mut buffer)?; } break; @@ -97,6 +99,62 @@ impl RunProcessor { Ok(()) } + // If we have a `QueuedRun::Create` and `QueuedRun::Update` for the same run ID in the batch, + // combine the update data into the create so we can send just one operation instead of two. + fn combine_batch_operations(batch: Vec) -> Vec { + let mut output = Vec::with_capacity(batch.len()); + let mut id_to_index = HashMap::with_capacity(batch.len()); + + for queued_run in batch { + match queued_run { + QueuedRun::Create(ref run_create_extended) => { + // Record the `Create` operation's ID and index, + // in case we need to modify it later. + let RunCreateExtended { run_create, .. } = run_create_extended; + let run_id = run_create.common.id.clone(); + let index = output.len(); + id_to_index.insert(run_id, index); + output.push(queued_run); + } + QueuedRun::Update(run_update_extended) => { + let run_id = run_update_extended.run_update.common.id.as_str(); + if let Some(create_index) = id_to_index.get(run_id) { + // This `run_id` matches a `Create` in this batch. + // Merge the `Update` data into the `Create` and + // drop the separate `Update` operation from the batch. + let RunUpdateExtended { run_update, io, attachments } = run_update_extended; + let QueuedRun::Create(matching_create) = &mut output[*create_index] else { + panic!("index {create_index} did not point to a Create operation in {output:?}"); + }; + debug_assert_eq!( + run_update.common.id, matching_create.run_create.common.id, + "Create operation at index {create_index} did not have expected ID {}: {matching_create:?}", + run_update.common.id, + ); + + matching_create.run_create.common.merge(run_update.common); + matching_create.run_create.end_time = Some(run_update.end_time); + matching_create.io.merge(io); + if let Some(mut _existing_attachments) = + matching_create.attachments.as_mut() + { + unimplemented!("figure out how to merge attachments -- in Python they are a dict but here they are a Vec"); + } else { + matching_create.attachments = attachments; + } + } else { + // No matching `Create` operations for this `Update`, add it as-is. + output.push(QueuedRun::Update(run_update_extended)); + } + } + // Allow other operations to pass through unchanged. + _ => output.push(queued_run), + } + } + + output + } + #[expect(unused_variables)] fn send_batch(&self, batch: Vec) -> Result<(), TracingClientError> { //println!("Handling a batch of {} runs", batch.len()); @@ -104,6 +162,8 @@ impl RunProcessor { let mut json_data = Vec::new(); let mut attachment_parts = Vec::new(); + let batch = Self::combine_batch_operations(batch); + let start_iter = Instant::now(); for queued_run in batch { match queued_run { @@ -117,11 +177,13 @@ impl RunProcessor { to_vec(&run_create).unwrap(), // TODO: get rid of unwrap )); - if let Some(inputs) = io.inputs { + if let Some(mut inputs) = io.inputs { + assert_eq!(inputs.pop(), Some(0)); json_data.push((format!("post.{}.inputs", run_id), inputs)); } - if let Some(outputs) = io.outputs { + if let Some(mut outputs) = io.outputs { + assert_eq!(outputs.pop(), Some(0)); json_data.push((format!("post.{}.outputs", run_id), outputs)); } @@ -150,7 +212,8 @@ impl RunProcessor { to_vec(&run_update).unwrap(), // TODO: get rid of unwrap )); - if let Some(outputs) = io.outputs { + if let Some(mut outputs) = io.outputs { + assert_eq!(outputs.pop(), Some(0)); json_data.push((format!("patch.{}.outputs", run_id), outputs)); } @@ -188,31 +251,45 @@ impl RunProcessor { .into_par_iter() .map(|(part_name, data_bytes)| { let part_size = data_bytes.len() as u64; + let part2 = Part::bytes(data_bytes.clone()) + .mime_str(&format!("application/json; length={}", part_size))?; let part = Part::bytes(data_bytes) .mime_str(&format!("application/json; length={}", part_size))?; - Ok::<(String, Part), TracingClientError>((part_name, part)) + Ok::<(String, Part, Part), TracingClientError>((part_name, part, part2)) }) .collect::, TracingClientError>>()?; // println!("JSON processing took {:?}", start.elapsed()); + let mut form2 = Form::new(); let mut form = Form::new(); - for (part_name, part) in json_parts.into_iter().chain(attachment_parts) { + for (part_name, part, part2) in json_parts.into_iter() { + form2 = form2.part(part_name.clone(), part2); form = form.part(part_name, part); } + let mut reader = form2.reader(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).expect("error reading multipart form"); + dbg!(buf); + // send the multipart POST request + eprintln!("preparing to send!"); let start_send_batch = Instant::now(); - let response = self + let request = dbg!(self .http_client .post(format!("{}/runs/multipart", self.config.endpoint)) .multipart(form) - .headers(self.config.headers.as_ref().cloned().unwrap_or_default()) - .send()?; + .headers(self.config.headers.as_ref().cloned().unwrap_or_default())); + eprintln!("request ready -- sending!"); + let result = dbg!(request.send()); + let response = result?; + eprintln!("request sent!"); // println!("Sending batch took {:?}", start_send_batch.elapsed()); - if response.status().is_success() { + if dbg!(response.status()).is_success() { Ok(()) } else { + eprintln!("tracing error: {response:?}"); Err(TracingClientError::HttpError(response.status())) } } diff --git a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs index f9f11e5a2..4861ffe3d 100644 --- a/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs +++ b/rust/crates/langsmith-tracing-client/src/client/blocking/tracing_client.rs @@ -82,6 +82,15 @@ impl TracingClient { self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull) } + /// Complete all in-progress requests, then allow the worker threads to exit. + /// + /// Convenience function for the PyO3 bindings, which cannot use [`Self::shutdown`] + /// due to its by-value `self`. This means we cannot `.join()` the threads, + /// but the client is nevertheless unusable after this call. + /// + /// Sending further data after a [`Self::drain()`] call has unspecified behavior. + /// It will not cause *undefined behavior* in the programming language sense, + /// but it may e.g. cause errors, panics, or even silently fail, with no guarantees. pub fn drain(&self) -> Result<(), TracingClientError> { for _ in &self.handles { self.sender.send(QueuedRun::Drain).map_err(|_| TracingClientError::QueueFull)?; diff --git a/rust/crates/langsmith-tracing-client/src/client/run.rs b/rust/crates/langsmith-tracing-client/src/client/run.rs index b5308bc71..a83718c78 100644 --- a/rust/crates/langsmith-tracing-client/src/client/run.rs +++ b/rust/crates/langsmith-tracing-client/src/client/run.rs @@ -24,6 +24,18 @@ pub struct RunIO { pub outputs: Option>, } +impl RunIO { + #[inline] + pub(crate) fn merge(&mut self, other: RunIO) { + if other.inputs.is_some() { + self.inputs = other.inputs; + } + if other.outputs.is_some() { + self.outputs = other.outputs; + } + } +} + #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct RunCommon { pub id: String, @@ -39,6 +51,36 @@ pub struct RunCommon { pub session_name: Option, } +impl RunCommon { + #[inline] + pub(crate) fn merge(&mut self, other: RunCommon) { + if other.parent_run_id.is_some() { + self.parent_run_id = other.parent_run_id; + } + if other.extra.is_some() { + self.extra = other.extra; + } + if other.error.is_some() { + self.error = other.error; + } + if other.serialized.is_some() { + self.serialized = other.serialized; + } + if other.events.is_some() { + self.events = other.events; + } + if other.tags.is_some() { + self.tags = other.tags; + } + if other.session_id.is_some() { + self.session_id = other.session_id; + } + if other.session_name.is_some() { + self.session_name = other.session_name; + } + } +} + #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct RunCreate { #[serde(flatten)] @@ -93,6 +135,6 @@ pub(crate) enum QueuedRun { Update(RunUpdateExtended), #[expect(dead_code)] RunBytes(RunEventBytes), - Drain, + Drain, // Like `Shutdown`, but explicitly sends a message confirming draining is complete. Shutdown, }