Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Rust update code path with batch size 1. #1330

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion python/bench/tracing_client_via_pyo3.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def benchmark_run_creation(json_size, num_runs) -> None:

if client._pyo3_client:
# Wait for the queue to drain.
client._pyo3_client.drain()
del client
else:
client.tracing_queue.join()

Expand Down
2 changes: 1 addition & 1 deletion python/bench/tracing_rust_client_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> D
client.create_run(run)

# wait for client queues to be empty
client.drain()
del client
elapsed = time.perf_counter() - start

print(f"runs complete: {elapsed:.3f}s")
Expand Down
64 changes: 64 additions & 0 deletions python/bench/tracing_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os

os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py"
os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true"

from langsmith import Client, traceable

client = Client(
api_url="https://beta.api.smith.langchain.com",
api_key=os.environ["LANGCHAIN_API_KEY"],
)


@traceable(client=client)
def format_prompt(subject):
return [
{
"role": "system",
"content": "You are a helpful assistant.",
},
{
"role": "user",
"content": f"What's a good name for a store that sells {subject}?",
},
]


@traceable(run_type="llm", client=client)
def invoke_llm(messages):
return {
"choices": [
{
"message": {
"role": "assistant",
"content": "Sure, how about 'Rainbow Socks'?",
}
}
]
}


@traceable(client=client)
def parse_output(response):
return response["choices"][0]["message"]["content"]


@traceable(client=client)
def run_pipeline():
messages = format_prompt("colorful socks")
response = invoke_llm(messages)
result = parse_output(response)

import time

time.sleep(2)

return result


if __name__ == "__main__":
print("running pipeline")
run_pipeline()
# import time
# time.sleep(30)
21 changes: 21 additions & 0 deletions python/langsmith/_internal/_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,27 @@ def serialize_run_dict(
attachments=attachments if attachments is not None else None,
)

def serialize_run_dict_for_compressed_ingest(
operation: Literal["post", "patch"], payload: dict
):
inputs = payload.pop("inputs", None)
outputs = payload.pop("outputs", None)
events = payload.pop("events", None)
attachments = payload.pop("attachments", None)
serialized = ...
extra = ...
return SerializedRunOperation(
operation=operation,
id=payload["id"],
trace_id=payload["trace_id"],
_none=_dumps_json(payload),
inputs=_dumps_json(inputs) if inputs is not None else None,
outputs=_dumps_json(outputs) if outputs is not None else None,
events=_dumps_json(events) if events is not None else None,
attachments=attachments if attachments is not None else None,
)



def combine_serialized_queue_operations(
ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]],
Expand Down
8 changes: 8 additions & 0 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ def __init__(

if langsmith_pyo3:
# TODO: tweak these constants as needed
print("using pyo3 client")
queue_capacity = 1_000_000
batch_size = 100
batch_timeout_millis = 1000
Expand All @@ -571,6 +572,9 @@ def __init__(
repr(e),
)

if self._pyo3_client is None:
print("NOT using pyo3 client")

self._settings: Union[ls_schemas.LangSmithSettings, None] = None

self._manual_cleanup = False
Expand Down Expand Up @@ -1272,6 +1276,7 @@ def create_run(
"inputs": inputs,
"run_type": run_type,
}
print("RUN_CREATE called", run_create)
if not self._filter_for_sampling([run_create]):
return
if revision_id is not None:
Expand All @@ -1287,6 +1292,7 @@ def create_run(
run_create.get("trace_id") is not None
and run_create.get("dotted_order") is not None
):
print("RUN_CREATE batch", run_create)
if self._pyo3_client is not None:
self._pyo3_client.create_run(run_create)
elif self.tracing_queue is not None:
Expand Down Expand Up @@ -1753,6 +1759,8 @@ def update_run(
if data["extra"]:
self._insert_runtime_env([data])

print("UPDATE_RUN", data)

if self._pyo3_client is not None:
self._pyo3_client.update_run(data)
elif use_multipart and self.tracing_queue is not None:
Expand Down
74 changes: 34 additions & 40 deletions python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ httpx = ">=0.23.0,<1"
requests-toolbelt = "^1.0.0"

# Enabled via `langsmith_pyo3` extra: `pip install langsmith[langsmith_pyo3]`.
langsmith-pyo3 = { version = "^0.1.0rc2", optional = true }
langsmith-pyo3 = { version = "^0.1.0rc4", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
Expand Down
17 changes: 11 additions & 6 deletions rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ impl BlockingTracingClient {
Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner()))
.map_err(|e| into_py_err(slf.py(), e))
}

pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> {
let unpacked = slf.get();
Python::allow_threads(slf.py(), || unpacked.client.drain())
.map_err(|e| into_py_err(slf.py(), e))
}
}

fn into_py_err(py: Python<'_>, e: langsmith_tracing_client::client::TracingClientError) -> PyErr {
crate::errors::TracingClientError::new_err(format!("{e}").into_py(py))
}

impl Drop for BlockingTracingClient {
fn drop(&mut self) {
if Arc::strong_count(&self.client) == 1 {
// This is the only copy of the client in Python,
// so let it drain its in-progress requests before proceeding.
// This runs when Python runs GC on the client, such as when the application is exiting.
self.client.drain().expect("draining failed");
}
}
}
Loading