Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make python binding's FSM API async #70

Closed
jopemachine opened this issue Jan 7, 2024 · 3 comments · Fixed by #79
Closed

Make python binding's FSM API async #70

jopemachine opened this issue Jan 7, 2024 · 3 comments · Fixed by #79
Labels
on hold python Python binding

Comments

@jopemachine
Copy link
Member

I think it should be easily accomplished after PyO3/pyo3#3611 is resolved

Usage example:
https://github.com/PyO3/pyo3/pull/3611/files#diff-84422521ed1ffead90ff986cc098b4b6dca6f898087c2824650bf8865d0584dcR10-R13

@jopemachine jopemachine added python Python binding on hold labels Jan 7, 2024
@jopemachine
Copy link
Member Author

I thought we can make these methods async like below,

    async fn snapshot(&self) -> Result<Vec<u8>> {
        let fut = Python::with_gil(|py| {
            pyo3_asyncio::tokio::into_future(
                self.store
                    .as_ref(py)
                    .call_method("snapshot", (), None)
                    .unwrap(),
            )
        })
        .unwrap();

        let result = fut.await;

        Python::with_gil(|py| {
            result
                .and_then(|py_result| py_result.extract::<Vec<u8>>(py).map(|res| res))
                .map_err(|err| Error::Other(Box::new(SnapshotError::new_err(err.to_string()))))
        })
    }

But the above code raise below error.

sys:1: RuntimeWarning: coroutine 'HashStore.snapshot' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
thread 'tokio-runtime-worker' panicked at src/bindings/state_machine.rs:157:10:
called `Result::unwrap()` on an `Err` value: PyErr { type: <class 'RuntimeError'>, value: RuntimeError('no running event loop'), traceback: None }

Ref: I found the below documentation.

But it would be better to find a way that is not uncomfortable for the user if possible.

https://pyo3.rs/v0.20.1/ecosystem/async-await#a-note-about-asynciorun

@jopemachine
Copy link
Member Author

jopemachine commented Feb 14, 2024

I found that I can make it work using below code,

In rust side,

    async fn apply(&mut self, log_entry: Vec<u8>) -> Result<Vec<u8>> {
        Python::with_gil(|py| {
            let asyncio = py.import("asyncio").unwrap();

            let py_fut = self.store.as_ref(py).call_method(
                "apply",
                (PyBytes::new(py, log_entry.as_slice()),),
                None,
            ).unwrap();

            let result = asyncio.call_method1("run", (py_fut,));

            result
                .and_then(|py_result| py_result.extract::<Vec<u8>>().map(|res| res))
                .map_err(|err| Error::Other(Box::new(ApplyError::new_err(err.to_string()))))
        })
    }

In python side,

class HashStore:
    def __init__(self):
        self._store = dict()

    async def apply(self, msg: bytes) -> bytes:
        # Assuming we need some async operation here...
        await asyncio.sleep(1)
        message = SetCommand.decode(msg)
        self._store[message.key] = message.value
        return msg

@jopemachine
Copy link
Member Author

jopemachine commented Mar 26, 2024

I believe the correct way should be like

Ref: https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#the-solution

    async fn apply(&mut self, log_entry: Vec<u8>) -> Result<Vec<u8>> {
        let fut = Python::with_gil(|py| {
            let event_loop = self
                .store
                .as_ref(py)
                .getattr("_loop")
                .expect("No event loop provided in the python!");

            let awaitable = event_loop.call_method1(
                "create_task",
                (self
                    .store
                    .as_ref(py)
                    .call_method("apply", (PyBytes::new(py, log_entry.as_slice()),), None)
                    .unwrap(),),
            )?;

            let task_local = TaskLocals::new(event_loop);
            pyo3_asyncio::into_future_with_locals(&task_local, awaitable)
        })
        .unwrap();

        let result = fut.await;

        Python::with_gil(|py| {
            result
                .and_then(|py_result| py_result.extract::<Vec<u8>>(py).map(|res| res))
                .map_err(|err| Error::Other(Box::new(SnapshotError::new_err(err.to_string()))))
        })
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on hold python Python binding
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant