Skip to content

Commit

Permalink
Test Rust update code path with batch size 1 (i.e. without batching).
Browse files Browse the repository at this point in the history
  • Loading branch information
obi1kenobi committed Dec 12, 2024
1 parent c8d606b commit c26b5a5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 49 deletions.
16 changes: 8 additions & 8 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ def __init__(
if langsmith_pyo3:
# TODO: tweak these constants as needed
queue_capacity = 1_000_000
batch_size = 100
batch_size = 1
batch_timeout_millis = 1000
worker_threads = 1

Expand Down Expand Up @@ -1287,10 +1287,10 @@ def create_run(
run_create.get("trace_id") is not None
and run_create.get("dotted_order") is not None
):
# if self._pyo3_client is not None:
# print("RUN_CREATE", run_create)
# self._pyo3_client.create_run(run_create)
if self.tracing_queue is not None:
if self._pyo3_client is not None:
print("RUN_CREATE", run_create)
self._pyo3_client.create_run(run_create)
elif self.tracing_queue is not None:
serialized_op = serialize_run_dict("post", run_create)
self.tracing_queue.put(
TracingQueueItem(run_create["dotted_order"], serialized_op)
Expand Down Expand Up @@ -1756,9 +1756,9 @@ def update_run(

print("UPDATE_RUN", data)

# if self._pyo3_client is not None:
# self._pyo3_client.update_run(data)
if use_multipart and self.tracing_queue is not None:
if self._pyo3_client is not None:
self._pyo3_client.update_run(data)
elif use_multipart and self.tracing_queue is not None:
# not collecting attachments currently, use empty dict
serialized_op = serialize_run_dict(operation="patch", payload=data)
self.tracing_queue.put(
Expand Down
74 changes: 34 additions & 40 deletions python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ httpx = ">=0.23.0,<1"
requests-toolbelt = "^1.0.0"

# Enabled via `langsmith_pyo3` extra: `pip install langsmith[langsmith_pyo3]`.
langsmith-pyo3 = { version = "^0.1.0rc2", optional = true }
langsmith-pyo3 = { version = "^0.1.0rc4", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,22 @@ impl RunProcessor {
_ => {

Check warning on line 66 in rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs

View workflow job for this annotation

GitHub Actions / Check lint and rustfmt

Diff in /home/runner/work/langsmith-sdk/langsmith-sdk/rust/crates/langsmith-tracing-client/src/client/blocking/processor.rs
buffer.push(queued_run);
if buffer.len() >= batch_size {
eprintln!("reached buffer size cap ({} >= {batch_size}), sending", buffer.len());
self.send_and_clear_buffer(&mut buffer)?;
last_send_time = Instant::now();
}
}
},
Err(mpsc::RecvTimeoutError::Timeout) => {
if !buffer.is_empty() && last_send_time.elapsed() >= batch_timeout {
eprintln!("reached timeout with {} buffered items, sending", buffer.len());
self.send_and_clear_buffer(&mut buffer)?;
last_send_time = Instant::now();
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
if !buffer.is_empty() {
eprintln!("disconnected with {} buffered items, sending", buffer.len());
self.send_and_clear_buffer(&mut buffer)?;
}
break;
Expand Down

0 comments on commit c26b5a5

Please sign in to comment.