Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 164: Enable invocation of async PravegaReader method using python's asyncio library. #161

Merged
merged 12 commits into from
Oct 12, 2020
2 changes: 1 addition & 1 deletion bindings/PythonBinding.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Steps to generate and publish Pravega Python bindings.

*Pre-requisites*
- Python 3.5 and up.
- Python 3.7 and up.

1. Ensure `cargo build` works fine.
2. There are two ways of running generating bindings. This describes the steps where maturin is manually installed.
Expand Down
3 changes: 3 additions & 0 deletions bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
extern crate cfg_if;

mod stream_manager;
mod stream_reader;
mod stream_writer;
mod stream_writer_transactional;
mod transaction;
Expand All @@ -23,6 +24,7 @@ cfg_if! {
#[macro_use]
extern crate derive_new;
use stream_writer::StreamWriter;
use stream_reader::StreamReader;
use crate::stream_writer_transactional::StreamTxnWriter;
use crate::transaction::StreamTransaction;
use pyo3::create_exception;
Expand All @@ -44,6 +46,7 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<StreamWriter>()?;
m.add_class::<StreamTxnWriter>()?;
m.add_class::<StreamTransaction>()?;
m.add_class::<StreamReader>()?;
m.add("TxnFailedException", py.get_type::<TxnFailedException>())?;
Ok(())
}
58 changes: 58 additions & 0 deletions bindings/src/pravega_reader_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

import unittest
import secrets
import string
import pravega_client;
import asyncio
import random


# Helper method to invoke an coroutine inside a test.
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)


class PravegaReaderTest(unittest.TestCase):
def test_writeEventAndRead(self):
scope = "testScope"+str(random.randint(0, 100))
print("Creating a Stream Manager, ensure Pravega is running")
stream_manager = pravega_client.StreamManager("127.0.0.1:9090")

print("Creating a scope")
scope_result = stream_manager.create_scope(scope)
print(scope_result)
print("Creating a stream")
stream_result = stream_manager.create_stream(scope, "testStream", 1)
print(stream_result)

print("Creating a writer for Stream")
w1 = stream_manager.create_writer(scope, "testStream")

print("Write events")
w1.write_event("test event")
w1.write_event("test event")

r1 = stream_manager.create_reader(scope, "testStream")
segment_slice = _run(self.get_segment_slice(r1))
print(segment_slice)
# consume the segment slice for events.
count=0
for event in segment_slice:
count+=1
print(event.data())
self.assertEqual(b'test event', event.data(), "Invalid event data")
self.assertEqual(count, 2, "Two events are expected")

# wrapper function to ensure we pass a co-routine to run method, since we cannot directly invoke
# await reader.get_segment_slice_async() inside the test.
async def get_segment_slice(self, reader):
return await reader.get_segment_slice_async()
44 changes: 41 additions & 3 deletions bindings/src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
// http://www.apache.org/licenses/LICENSE-2.0
//

use std::sync::Arc;
use tokio::sync::Mutex;
cfg_if! {
if #[cfg(feature = "python_binding")] {
use crate::stream_writer_transactional::StreamTxnWriter;
use crate::stream_writer::StreamWriter;
use crate::stream_reader::StreamReader;
use pravega_client_rust::client_factory::ClientFactory;
use pravega_rust_client_shared::*;
use pravega_rust_client_config::{ClientConfig, ClientConfigBuilder};
Expand All @@ -22,9 +25,6 @@ cfg_if! {
}
}

#[cfg(feature = "python_binding")]
#[pyclass]
#[text_signature = "(controller_uri)"]
///
/// Create a StreamManager by providing a controller uri.
/// ```
Expand All @@ -34,12 +34,24 @@ cfg_if! {
/// manager.create_scope("scope")
/// ```
///
#[cfg(feature = "python_binding")]
#[pyclass]
#[text_signature = "(controller_uri)"]
pub(crate) struct StreamManager {
controller_ip: String,
cf: ClientFactory,
config: ClientConfig,
}

///
/// Create a StreamManager by providing a controller uri.
/// ```
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // this manager can be used to create scopes, streams, writers and readers against Pravega.
/// manager.create_scope("scope")
/// ```
///
#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamManager {
Expand Down Expand Up @@ -235,6 +247,32 @@ impl StreamManager {
Ok(txn_stream_writer)
}

///
/// Create a Reader for a given Stream.
///
/// ```
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // Create a reader against an already created Pravega scope and Stream.
/// reader=manager.create_reader("scope", "stream")
/// ```
///
#[text_signature = "($self, scope_name, stream_name)"]
pub fn create_reader(&self, scope_name: &str, stream_name: &str) -> PyResult<StreamReader> {
let scoped_stream = ScopedStream {
scope: Scope::from(scope_name.to_string()),
stream: Stream::from(stream_name.to_string()),
};
let handle = self.cf.get_runtime_handle();
let reader = handle.block_on(self.cf.create_event_stream_reader(scoped_stream.clone()));
let stream_reader = StreamReader::new(
Arc::new(Mutex::new(reader)),
self.cf.get_runtime_handle(),
scoped_stream,
);
Ok(stream_reader)
}

/// Returns the facet string representation.
fn to_str(&self) -> String {
format!(
Expand Down
213 changes: 213 additions & 0 deletions bindings/src/stream_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
//
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//

cfg_if! {
if #[cfg(feature = "python_binding")] {
use pravega_client_rust::event_reader::EventReader;
use pravega_rust_client_shared::ScopedStream;
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::PyObjectProtocol;
use tokio::runtime::Handle;
use log::info;
use std::sync::Arc;
use pravega_client_rust::segment_slice::{Event, SegmentSlice};
use pyo3::PyIterProtocol;
use tokio::sync::Mutex;
}
}

///
/// This represents a Stream reader for a given Stream.
/// Note: A python object of StreamReader cannot be created directly without using the StreamManager.
///
#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(new)]
pub(crate) struct StreamReader {
reader: Arc<Mutex<EventReader>>,
handle: Handle,
stream: ScopedStream,
}

#[cfg(feature = "python_binding")]
#[pymethods]
impl StreamReader {
///
/// This method returns a Python Future which completes when a segment slice is acquired for consumption.
/// A segment slice is data chunk received from a segment of a Pravega stream. It can contain one
/// or more events and the user can iterate over the segment slice to read the events.
/// If there are multiple segments in the stream then this API can return a segment slice of any
/// segments in the stream. The reader ensures that events returned by the stream are in order.
///
/// ```
/// import pravega_client;
/// manager=pravega_client.StreamManager("127.0.0.1:9090")
/// // lets assume the Pravega scope and stream are already created.
/// reader=manager.create_reader("scope", "stream");
/// slice=await reader.get_segment_slice_async()
/// for event in slice:
/// print(event.data())
///```
///
pub fn get_segment_slice_async(&self) -> PyResult<PyObject> {
// create a python future object.
let (py_future, py_future_clone, event_loop): (PyObject, PyObject, PyObject) = {
let gil = Python::acquire_gil();
let py = gil.python();
let loop_event = StreamReader::get_loop(py)?;
let fut: PyObject = loop_event.call_method0(py, "create_future")?;
(fut.clone_ref(py), fut, loop_event)
};
let read = self.reader.clone();
self.handle.spawn(async move {
let slice_result = read.lock().await.acquire_segment().await;
let slice_py: Slice = match slice_result {
Some(slice) => Slice {
seg_slice: slice,
is_empty: false,
},
None => Slice {
seg_slice: SegmentSlice::default(),
is_empty: true,
},
};
let gil = Python::acquire_gil();
let py = gil.python();
let py_container = PyCell::new(py, slice_py).unwrap();
if let Err(e) = StreamReader::set_fut_result(event_loop, py_future, PyObject::from(py_container))
{
let gil = Python::acquire_gil();
let py = gil.python();
e.print(py);
}
});

Ok(py_future_clone)
}

/// Returns the facet string representation.
fn to_str(&self) -> String {
format!("Stream: {:?} ", self.stream)
}
}

impl StreamReader {
//
// This is used to set the mark the Python future as complete and set its result.
// ref: https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.set_result
//
fn set_fut_result(event_loop: PyObject, fut: PyObject, res: PyObject) -> PyResult<()> {
let gil = Python::acquire_gil();
let py = gil.python();
let sr = fut.getattr(py, "set_result")?;
// The future is set on the event loop.
// ref :https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe
// call_soon_threadsafe schedules the callback (setting the future to complete) to be called
// in the next iteration of the event loop.
event_loop.call_method1(py, "call_soon_threadsafe", (sr, res))?;

Ok(())
}

//
// Return the running event loop in the current OS thread.
// https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop
// This supported in Python 3.7 onwards.
//
fn get_loop(py: Python) -> PyResult<PyObject> {
let asyncio = PyModule::import(py, "asyncio")?;
let event_loop = asyncio.call0("get_running_loop")?;
Ok(event_loop.into())
}
}

///
/// This represents a Stream reader for a given Stream.
/// Note: A python object of StreamReader cannot be created directly without using the StreamManager.
///
#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(new)]
pub(crate) struct EventData {
offset_in_segment: i64,
value: Vec<u8>,
}

#[cfg(feature = "python_binding")]
#[pymethods]
impl EventData {
///Return the data
fn data(&self) -> &[u8] {
self.value.as_slice()
}
/// Returns the facet string representation.
fn to_str(&self) -> String {
format!("offset {:?} data :{:?}", self.offset_in_segment, self.value)
}
}

///
/// This represents a Stream reader for a given Stream.
/// Note: A python object of StreamReader cannot be created directly without using the StreamManager.
///
#[cfg(feature = "python_binding")]
#[pyclass]
#[derive(new)]
pub(crate) struct Slice {
seg_slice: SegmentSlice,
is_empty: bool,
}

#[pyproto]
impl PyIterProtocol for Slice {
fn __iter__(slf: PyRef<Self>) -> PyResult<Py<Slice>> {
Ok(slf.into())
}

fn __next__(mut slf: PyRefMut<Self>) -> Option<EventData> {
if slf.is_empty {
info!("Empty Slice while reading");
None
} else {
let next_event: Option<Event> = slf.seg_slice.next();
next_event.map(|e| EventData {
offset_in_segment: e.offset_in_segment,
value: e.value,
})
}
}
}

///
/// Refer https://docs.python.org/3/reference/datamodel.html#basic-customization
/// This function will be called by the repr() built-in function to compute the “official” string
/// representation of an Python object.
///
#[cfg(feature = "python_binding")]
#[pyproto]
impl PyObjectProtocol for StreamReader {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("StreamReader({})", self.to_str()))
}
}

///
/// Refer https://docs.python.org/3/reference/datamodel.html#basic-customization
/// This function will be called by the repr() built-in function to compute the “official” string
/// representation of an Python object.
///
#[cfg(feature = "python_binding")]
#[pyproto]
impl PyObjectProtocol for EventData {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("EventData({})", self.to_str()))
}
}
Loading