From 4bb436e45b8a046421a185f8e49770dc13ee34e2 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 8 Oct 2020 20:04:16 +0530 Subject: [PATCH 01/12] POC: PythonFuture inside Rust. Signed-off-by: Sandeep --- bindings/src/lib.rs | 3 + bindings/src/pravega_reader_test.py | 43 ++++++ bindings/src/stream_manager.rs | 24 ++++ src/event_reader.rs | 208 +++++++++++++++++++++++++++- src/segment_slice.rs | 9 +- 5 files changed, 281 insertions(+), 6 deletions(-) create mode 100644 bindings/src/pravega_reader_test.py diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index 2e5542844..afda49d7b 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -12,6 +12,7 @@ extern crate cfg_if; mod stream_manager; +mod stream_reader; mod stream_writer; mod stream_writer_transactional; mod transaction; @@ -23,6 +24,7 @@ cfg_if! { #[macro_use] extern crate derive_new; use stream_writer::StreamWriter; + use stream_reader::StreamReader; use crate::stream_writer_transactional::StreamTxnWriter; use crate::transaction::StreamTransaction; use pyo3::create_exception; @@ -44,6 +46,7 @@ fn pravega_client(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add("TxnFailedException", py.get_type::())?; Ok(()) } diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py new file mode 100644 index 000000000..202157956 --- /dev/null +++ b/bindings/src/pravega_reader_test.py @@ -0,0 +1,43 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# + +import unittest +import secrets +import string +import pravega_client; +import asyncio + + +async def test_writeEventAndRead(): + scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) + for i in range(10)) + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager = pravega_client.StreamManager("127.0.0.1:9090") + + print("Creating a scope") + scope_result = stream_manager.create_scope(scope) + print(scope_result) + print("Creating a stream") + stream_result = stream_manager.create_stream(scope, "testStream", 1) + print(stream_result) + # + # print("Creating a writer for Stream") + # w1 = stream_manager.create_writer(scope, "testStream") + # + # print("Write events") + # w1.write_event("test event1") + # w1.write_event("test event2") + + r1 = stream_manager.create_reader(scope, "testStream") + r2 = await r1.get_segment_slice() + print("completed invoked") + print(r2) + +asyncio.run(test_writeEventAndRead()) \ No newline at end of file diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs index c275c09a0..f011ff3b6 100644 --- a/bindings/src/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -8,10 +8,12 @@ // http://www.apache.org/licenses/LICENSE-2.0 // +use std::sync::Arc; cfg_if! { if #[cfg(feature = "python_binding")] { use crate::stream_writer_transactional::StreamTxnWriter; use crate::stream_writer::StreamWriter; + use crate::stream_reader::StreamReader; use pravega_client_rust::client_factory::ClientFactory; use pravega_rust_client_shared::*; use pravega_rust_client_config::{ClientConfig, ClientConfigBuilder}; @@ -235,6 +237,28 @@ impl StreamManager { Ok(txn_stream_writer) } + /// + /// Create a Writer for a given Stream. + /// + /// ``` + /// import pravega_client; + /// manager=pravega_client.StreamManager("127.0.0.1:9090") + /// // Create a writer against an already created Pravega scope and Stream. + /// writer=manager.create_writer("scope", "stream") + /// ``` + /// + #[text_signature = "($self, scope_name, stream_name)"] + pub fn create_reader(&self, scope_name: &str, stream_name: &str) -> PyResult { + let scoped_stream = ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }; + let handle = self.cf.get_runtime_handle(); + let reader = handle.block_on(self.cf.create_event_stream_reader(scoped_stream.clone())); + let stream_reader = StreamReader::new(Arc::new(reader), self.cf.get_runtime_handle(), scoped_stream); + Ok(stream_reader) + } + /// Returns the facet string representation. fn to_str(&self) -> String { format!( diff --git a/src/event_reader.rs b/src/event_reader.rs index e45d3d192..7800a47bb 100644 --- a/src/event_reader.rs +++ b/src/event_reader.rs @@ -64,7 +64,7 @@ pub type SegmentReadResult = Result; /// if let Some(event) = segment_slice.next() { /// println!("Event read is {:?}", event); /// // release the segment slice back to the reader. -/// reader.release_segment_at(segment_slice); +/// reader.release_segment(segment_slice); /// } /// } /// } @@ -312,7 +312,7 @@ impl EventReader { /// /// Release a partially read segment slice back to event reader. /// - pub fn release_segment_at(&mut self, slice: SegmentSlice) { + pub fn release_segment(&mut self, slice: SegmentSlice) { //stop reading data if let Some(tx) = slice.slice_return_tx { if let Err(_e) = tx.send(slice.meta.clone()) { @@ -328,6 +328,61 @@ impl EventReader { self.meta.add_slices(slice.meta); } + /// + /// Release a segment back to the reader and also indicate the offset upto which the segment slice is consumed. + /// + pub fn release_segment_at(&mut self, slice: SegmentSlice, offset: i64) { + assert!( + offset >= 0, + "the offset where the segment slice is released should be a positive number" + ); + assert!( + slice.meta.start_offset <= offset, + "The offset where the segment slice is released should be greater than the start offset" + ); + assert!( + slice.meta.end_offset >= offset, + "The offset where the segment slice is released should be less than the end offset" + ); + + let segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str()); + if slice.meta.read_offset != offset { + self.meta.stop_reading(&slice.meta.scoped_segment); + + let slice_meta = SliceMetadata { + start_offset: slice.meta.read_offset, + scoped_segment: slice.meta.scoped_segment, + last_event_offset: slice.meta.last_event_offset, + read_offset: offset, + end_offset: slice.meta.end_offset, + segment_data: SegmentDataBuffer::empty(), + partial_data_present: false, + }; + + // reinitialize the segment data reactor. + let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel(); + self.factory + .get_runtime_handle() + .spawn(SegmentSlice::get_segment_data( + segment.clone(), + slice_meta.read_offset, // start reading from the offset provided. + self.tx.clone(), + rx_drop_fetch, + self.factory.clone(), + )); + self.meta.add_stop_reading_tx(segment.to_string(), tx_drop_fetch); + self.meta.add_slices(slice_meta); + } else { + self.release_segment(slice); + } + } + + pub async fn acquire_segment_test(&self) -> Option { + use std::time::Duration; + std::thread::sleep(Duration::from_secs(3)); + Some(self.stream.to_string()) + } + /// /// This function returns a SegmentSlice from the data received from the SegmentStore(s). /// Individual events can be read from the data received using `SegmentSlice.next()`. @@ -484,6 +539,8 @@ mod tests { use std::iter; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; + use tokio::sync::oneshot; + use tokio::sync::oneshot::error::TryRecvError; use tokio::time::{delay_for, Duration}; use tracing::Level; @@ -686,7 +743,7 @@ mod tests { assert_eq!(event.offset_in_segment, 0); // first event. // release the segment slice. - reader.release_segment_at(slice); + reader.release_segment(slice); // acquire the next segment let slice = cf @@ -694,7 +751,7 @@ mod tests { .block_on(reader.acquire_segment()) .unwrap(); //Do not read, simply return it back. - reader.release_segment_at(slice); + reader.release_segment(slice); // Try acquiring the segment again. let mut slice = cf @@ -708,6 +765,94 @@ mod tests { assert_eq!(event.offset_in_segment, 8 + 1); // first event. } + #[test] + fn test_return_slice_at_offset() { + const NUM_EVENTS: usize = 2; + let (tx, rx) = mpsc::channel(1); + let (stop_tx, stop_rx) = oneshot::channel(); + tracing_subscriber::fmt().with_max_level(Level::TRACE).finish(); + let cf = ClientFactory::new( + ClientConfigBuilder::default() + .controller_uri(MOCK_CONTROLLER_URI) + .build() + .unwrap(), + ); + let stream = get_scoped_stream("scope", "test"); + + // simulate data being received from Segment store. + cf.get_runtime_handle().enter(|| { + tokio::spawn(generate_constant_size_events( + tx.clone(), + 20, + NUM_EVENTS, + 0, + false, + stop_rx, + )); + }); + let mut stop_reading_map: HashMap> = HashMap::new(); + stop_reading_map.insert("scope/test/0.#epoch.0".to_string(), stop_tx); + + // simulate initialization of a Reader + let init_segments = vec![create_segment_slice(0), create_segment_slice(1)]; + + // create a new Event Reader with the segment slice data. + let mut reader = EventReader::init_event_reader( + stream, + cf.clone(), + tx.clone(), + rx, + create_slice_map(init_segments), + stop_reading_map, + ); + + // acquire a segment + let mut slice = cf + .get_runtime_handle() + .block_on(reader.acquire_segment()) + .unwrap(); + + // read an event. + let event = slice.next().unwrap(); + assert_eq!(event.value.len(), 1); + assert!(is_all_same(event.value.as_slice()), "Event has been corrupted"); + assert_eq!(event.offset_in_segment, 0); // first event. + + let result = slice.next(); + assert!(result.is_some()); + let event = result.unwrap(); + assert_eq!(event.value.len(), 1); + assert!(is_all_same(event.value.as_slice()), "Event has been corrupted"); + assert_eq!(event.offset_in_segment, 9); // second event. + + // release the segment slice. + reader.release_segment_at(slice, 0); + + // simulate a segment read at offset 0. + let (_stop_tx, stop_rx) = oneshot::channel(); + cf.get_runtime_handle().enter(|| { + tokio::spawn(generate_constant_size_events( + tx.clone(), + 20, + NUM_EVENTS, + 0, + false, + stop_rx, + )); + }); + + // acquire the next segment + let mut slice = cf + .get_runtime_handle() + .block_on(reader.acquire_segment()) + .unwrap(); + // Verify a partial event being present. This implies + let event = slice.next().unwrap(); + assert_eq!(event.value.len(), 1); + assert!(is_all_same(event.value.as_slice()), "Event has been corrupted"); + assert_eq!(event.offset_in_segment, 0); // first event. + } + fn read_n_events(slice: &mut SegmentSlice, events_to_read: usize) { let mut event_count = 0; loop { @@ -749,6 +894,60 @@ mod tests { stream } + // Generate events to simulate Pravega SegmentReadCommand. + async fn generate_constant_size_events( + mut tx: Sender, + buf_size: usize, + num_events: usize, + segment_id: usize, + should_delay: bool, + mut stop_generation: oneshot::Receiver<()>, + ) { + let mut segment_name = "scope/test/".to_owned(); + segment_name.push_str(segment_id.to_string().as_ref()); + let mut buf = BytesMut::with_capacity(buf_size); + let mut offset: i64 = 0; + for _i in 1..num_events + 1 { + if let Ok(_) | Err(TryRecvError::Closed) = stop_generation.try_recv() { + break; + } + let mut data = event_data(1); // constant event data. + if data.len() < buf.capacity() - buf.len() { + buf.put(data); + } else { + while data.len() > 0 { + let free_space = buf.capacity() - buf.len(); + if free_space == 0 { + if should_delay { + delay_for(Duration::from_millis(100)).await; + } + tx.send(Ok(SegmentDataBuffer { + segment: ScopedSegment::from(segment_name.as_str()).to_string(), + offset_in_segment: offset, + value: buf, + })) + .await + .unwrap(); + offset += buf_size as i64; + buf = BytesMut::with_capacity(buf_size); + } else if free_space >= data.len() { + buf.put(data.split()); + } else { + buf.put(data.split_to(free_space)); + } + } + } + } + // send the last event. + tx.send(Ok(SegmentDataBuffer { + segment: ScopedSegment::from(segment_name.as_str()).to_string(), + offset_in_segment: offset, + value: buf, + })) + .await + .unwrap(); + } + // Generate events to simulate Pravega SegmentReadCommand. async fn generate_variable_size_events( mut tx: Sender, @@ -820,6 +1019,7 @@ mod tests { meta: SliceMetadata { start_offset: 0, scoped_segment: segment.to_string(), + last_event_offset: 0, read_offset: 0, end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), diff --git a/src/segment_slice.rs b/src/segment_slice.rs index 5e35aab17..a0e4774dc 100644 --- a/src/segment_slice.rs +++ b/src/segment_slice.rs @@ -47,6 +47,7 @@ pub struct SegmentSlice { pub struct SliceMetadata { pub start_offset: i64, pub scoped_segment: String, + pub last_event_offset: i64, pub read_offset: i64, pub end_offset: i64, pub(crate) segment_data: SegmentDataBuffer, @@ -164,6 +165,7 @@ impl Default for SliceMetadata { SliceMetadata { start_offset: Default::default(), scoped_segment: Default::default(), + last_event_offset: Default::default(), read_offset: Default::default(), end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), @@ -187,7 +189,8 @@ impl SegmentSlice { meta: SliceMetadata { start_offset, scoped_segment: segment.to_string(), - read_offset: 0, + last_event_offset: 0, + read_offset: start_offset, end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), partial_data_present: false, @@ -366,7 +369,8 @@ impl Iterator for SegmentSlice { match res { Some(event) => { - self.meta.read_offset = event.offset_in_segment; + self.meta.last_event_offset = event.offset_in_segment; + self.meta.read_offset = self.meta.segment_data.offset_in_segment; Some(event) } None => { @@ -476,6 +480,7 @@ mod tests { meta: SliceMetadata { start_offset: 0, scoped_segment: segment.to_string(), + last_event_offset: 0, read_offset: 0, end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), From c7051a128d66ffceffd9ba1f71e03eddb11ce0b2 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 8 Oct 2020 20:04:49 +0530 Subject: [PATCH 02/12] python binding for reader api. Signed-off-by: Sandeep --- bindings/src/stream_reader.rs | 161 ++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 bindings/src/stream_reader.rs diff --git a/bindings/src/stream_reader.rs b/bindings/src/stream_reader.rs new file mode 100644 index 000000000..0865feae8 --- /dev/null +++ b/bindings/src/stream_reader.rs @@ -0,0 +1,161 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +use pravega_client_rust::segment_slice::{Event, SegmentSlice}; +use pyo3::PyIterProtocol; +cfg_if! { + if #[cfg(feature = "python_binding")] { + use pravega_client_rust::event_reader::EventReader; + use pravega_rust_client_shared::ScopedStream; + use pyo3::prelude::*; + use pyo3::PyResult; + use pyo3::PyObjectProtocol; + use tokio::runtime::Handle; + use log::info; + use pyo3::types::PyString; + use std::sync::Arc; + } +} + +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] +/// +/// This represents a Stream reader for a given Stream. +/// Note: A python object of StreamReader cannot be created directly without using the StreamManager. +/// +pub(crate) struct StreamReader { + reader: Arc, + handle: Handle, + stream: ScopedStream, +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamReader { + /// + /// Return a Python Future which completes when a segment slice is acquired. + /// + pub fn get_segment_slice_async(&self) -> PyResult { + // create a python future object. + let (py_future, py_future_clone, event_loop): (PyObject, PyObject, PyObject) = { + let gil = Python::acquire_gil(); + let py = gil.python(); + let loop_ = StreamReader::get_loop(py)?; + let fut: PyObject = loop_.call_method0(py, "create_future")?.into(); + (fut.clone_ref(py), fut, loop_.into()) + }; + let read = self.reader.clone(); + self.handle.spawn(async move { + let slice = read.acquire_segment_test().await; + if let Some(slice) = slice { + info!("Segment slice acquired {:?}", slice); + let gil = Python::acquire_gil(); + let py = gil.python(); + let r = PyString::new(py, slice.as_str()); + if let Err(e) = StreamReader::set_fut_result(event_loop, py_future, PyObject::from(r)) { + let gil = Python::acquire_gil(); + let py = gil.python(); + e.print(py); + } + } else { + info!("No new slices to be acquired"); + } + }); + + Ok(py_future_clone) + } + + /// Returns the facet string representation. + fn to_str(&self) -> String { + format!("Stream: {:?} ", self.stream) + } +} + +impl StreamReader { + // + // This is used to set the mark the Python future as complete and set its result. + // ref: https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.set_result + // + fn set_fut_result(loop_: PyObject, fut: PyObject, res: PyObject) -> PyResult<()> { + let gil = Python::acquire_gil(); + let py = gil.python(); + let sr = fut.getattr(py, "set_result")?; + // The future is set on the event loop. + // ref :https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe + // call_soon_threadsafe schedules the callback (setting the future to complete) to be called + // in the next iteration of the event loop. + loop_.call_method1(py, "call_soon_threadsafe", (sr, res))?; + + Ok(()) + } + + // + // Return the running event loop in the current OS thread. + // https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop + // This supported in Python 3.7 onwards. + // + fn get_loop(py: Python) -> PyResult { + let asyncio = PyModule::import(py, "asyncio")?; + let loop_ = asyncio.call0("get_running_loop")?; + Ok(loop_.into()) + } +} + +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] +/// +/// This represents a Stream reader for a given Stream. +/// Note: A python object of StreamReader cannot be created directly without using the StreamManager. +/// +pub(crate) struct EventData { + offset_in_segment: i64, + value: Vec, +} + +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] +/// +/// This represents a Stream reader for a given Stream. +/// Note: A python object of StreamReader cannot be created directly without using the StreamManager. +/// +pub(crate) struct Slice { + seg_slice: SegmentSlice, +} + +#[pyproto] +impl PyIterProtocol for Slice { + fn __iter__(slf: PyRef) -> PyResult> { + Ok(slf.into()) + } + + fn __next__(mut slf: PyRefMut) -> Option { + let next_event: Option = slf.seg_slice.next(); + next_event.map(|e| EventData { + offset_in_segment: e.offset_in_segment, + value: e.value, + }) + } +} + +/// +/// Refer https://docs.python.org/3/reference/datamodel.html#basic-customization +/// This function will be called by the repr() built-in function to compute the “official” string +/// representation of an Python object. +/// +#[cfg(feature = "python_binding")] +#[pyproto] +impl PyObjectProtocol for StreamReader { + fn __repr__(&self) -> PyResult { + Ok(format!("StreamReader({})", self.to_str())) + } +} From 2865fb6e8934ee15d4742df79e5ce609eca60d78 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 8 Oct 2020 20:14:28 +0530 Subject: [PATCH 03/12] Fix typo in python test. Signed-off-by: Sandeep --- bindings/src/pravega_reader_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index 202157956..a90d23aeb 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -36,7 +36,7 @@ async def test_writeEventAndRead(): # w1.write_event("test event2") r1 = stream_manager.create_reader(scope, "testStream") - r2 = await r1.get_segment_slice() + r2 = await r1.get_segment_slice_async() print("completed invoked") print(r2) From b2e1744fde7811bac834bd295f98b01dbcb16f37 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 13:08:25 +0530 Subject: [PATCH 04/12] Change pyo3 version. Signed-off-by: Sandeep --- bindings/Cargo.toml | 3 ++- bindings/src/pravega_reader_test.py | 14 +++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index 217a4a063..5aa3de393 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -38,7 +38,8 @@ uuid = {version = "0.8", features = ["v4"]} futures = "0.3.5" derive-new = "0.5" #Python bindings -pyo3 = { features = ["extension-module"], optional = true, version = "0.11.0" } +pyo3 = { features = ["extension-module"], optional = true, version = "0.11.1" } +pyo3-log = "0.1.1" #WASM bindings wasm-bindgen = { version = "0.2.63", optional = true } cfg-if = "0.1.10" diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index a90d23aeb..34a85ac62 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -27,13 +27,13 @@ async def test_writeEventAndRead(): print("Creating a stream") stream_result = stream_manager.create_stream(scope, "testStream", 1) print(stream_result) - # - # print("Creating a writer for Stream") - # w1 = stream_manager.create_writer(scope, "testStream") - # - # print("Write events") - # w1.write_event("test event1") - # w1.write_event("test event2") + + print("Creating a writer for Stream") + w1 = stream_manager.create_writer(scope, "testStream") + + print("Write events") + w1.write_event("test event1") + w1.write_event("test event2") r1 = stream_manager.create_reader(scope, "testStream") r2 = await r1.get_segment_slice_async() From 983e1e6623e0c148a4b72d57aebb353d366ac5c5 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 14:49:04 +0530 Subject: [PATCH 05/12] return segment slice and verify the data. Signed-off-by: Sandeep --- bindings/Cargo.toml | 3 +- bindings/src/pravega_reader_test.py | 61 +++++++++++++++++------------ bindings/src/stream_manager.rs | 7 +++- bindings/src/stream_reader.rs | 43 ++++++++++++++++---- bindings/tox.ini | 2 +- 5 files changed, 79 insertions(+), 37 deletions(-) diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml index 5aa3de393..217a4a063 100644 --- a/bindings/Cargo.toml +++ b/bindings/Cargo.toml @@ -38,8 +38,7 @@ uuid = {version = "0.8", features = ["v4"]} futures = "0.3.5" derive-new = "0.5" #Python bindings -pyo3 = { features = ["extension-module"], optional = true, version = "0.11.1" } -pyo3-log = "0.1.1" +pyo3 = { features = ["extension-module"], optional = true, version = "0.11.0" } #WASM bindings wasm-bindgen = { version = "0.2.63", optional = true } cfg-if = "0.1.10" diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index 34a85ac62..ea235b002 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -15,29 +15,38 @@ import asyncio -async def test_writeEventAndRead(): - scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) - for i in range(10)) - print("Creating a Stream Manager, ensure Pravega is running") - stream_manager = pravega_client.StreamManager("127.0.0.1:9090") - - print("Creating a scope") - scope_result = stream_manager.create_scope(scope) - print(scope_result) - print("Creating a stream") - stream_result = stream_manager.create_stream(scope, "testStream", 1) - print(stream_result) - - print("Creating a writer for Stream") - w1 = stream_manager.create_writer(scope, "testStream") - - print("Write events") - w1.write_event("test event1") - w1.write_event("test event2") - - r1 = stream_manager.create_reader(scope, "testStream") - r2 = await r1.get_segment_slice_async() - print("completed invoked") - print(r2) - -asyncio.run(test_writeEventAndRead()) \ No newline at end of file +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +class PravegaReaderTest(unittest.TestCase): + def test_writeEventAndRead(self): + scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) + for i in range(10)) + print("Creating a Stream Manager, ensure Pravega is running") + stream_manager = pravega_client.StreamManager("127.0.0.1:9090") + + print("Creating a scope") + scope_result = stream_manager.create_scope(scope) + print(scope_result) + print("Creating a stream") + stream_result = stream_manager.create_stream(scope, "testStream", 1) + print(stream_result) + + print("Creating a writer for Stream") + w1 = stream_manager.create_writer(scope, "testStream") + + print("Write events") + w1.write_event("test event") + w1.write_event("test event") + + r1 = stream_manager.create_reader(scope, "testStream") + slice = _run( self.get_segment_slice(r1)) + print("completed invoked") + print(slice) + for event in slice: + print(event.data()) + self.assertEqual(b'test event', event.data()) + + async def get_segment_slice(self, reader): + return await reader.get_segment_slice_async() diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs index f011ff3b6..d6d73f223 100644 --- a/bindings/src/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -9,6 +9,7 @@ // use std::sync::Arc; +use tokio::sync::Mutex; cfg_if! { if #[cfg(feature = "python_binding")] { use crate::stream_writer_transactional::StreamTxnWriter; @@ -255,7 +256,11 @@ impl StreamManager { }; let handle = self.cf.get_runtime_handle(); let reader = handle.block_on(self.cf.create_event_stream_reader(scoped_stream.clone())); - let stream_reader = StreamReader::new(Arc::new(reader), self.cf.get_runtime_handle(), scoped_stream); + let stream_reader = StreamReader::new( + Arc::new(Mutex::new(reader)), + self.cf.get_runtime_handle(), + scoped_stream, + ); Ok(stream_reader) } diff --git a/bindings/src/stream_reader.rs b/bindings/src/stream_reader.rs index 0865feae8..4fdb059f6 100644 --- a/bindings/src/stream_reader.rs +++ b/bindings/src/stream_reader.rs @@ -10,6 +10,7 @@ use pravega_client_rust::segment_slice::{Event, SegmentSlice}; use pyo3::PyIterProtocol; +use tokio::sync::Mutex; cfg_if! { if #[cfg(feature = "python_binding")] { use pravega_client_rust::event_reader::EventReader; @@ -32,7 +33,7 @@ cfg_if! { /// Note: A python object of StreamReader cannot be created directly without using the StreamManager. /// pub(crate) struct StreamReader { - reader: Arc, + reader: Arc>, handle: Handle, stream: ScopedStream, } @@ -52,15 +53,17 @@ impl StreamReader { let fut: PyObject = loop_.call_method0(py, "create_future")?.into(); (fut.clone_ref(py), fut, loop_.into()) }; - let read = self.reader.clone(); + let mut read = self.reader.clone(); self.handle.spawn(async move { - let slice = read.acquire_segment_test().await; - if let Some(slice) = slice { - info!("Segment slice acquired {:?}", slice); + let slice_result = read.lock().await.acquire_segment().await; + if let Some(slice) = slice_result { + info!("Segment slice acquired "); let gil = Python::acquire_gil(); let py = gil.python(); - let r = PyString::new(py, slice.as_str()); - if let Err(e) = StreamReader::set_fut_result(event_loop, py_future, PyObject::from(r)) { + // let r = Slice { seg_slice: slice }; + let r1 = PyCell::new(py, Slice { seg_slice: slice }).unwrap(); + // let r = PyString::new(py, slice.as_str()); + if let Err(e) = StreamReader::set_fut_result(event_loop, py_future, PyObject::from(r1)) { let gil = Python::acquire_gil(); let py = gil.python(); e.print(py); @@ -121,6 +124,19 @@ pub(crate) struct EventData { value: Vec, } +#[cfg(feature = "python_binding")] +#[pymethods] +impl EventData { + ///Return the data + fn data(&self) -> &[u8] { + self.value.as_slice() + } + /// Returns the facet string representation. + fn to_str(&self) -> String { + format!("offset {:?} data :{:?}", self.offset_in_segment, self.value) + } +} + #[cfg(feature = "python_binding")] #[pyclass] #[derive(new)] @@ -159,3 +175,16 @@ impl PyObjectProtocol for StreamReader { Ok(format!("StreamReader({})", self.to_str())) } } + +/// +/// Refer https://docs.python.org/3/reference/datamodel.html#basic-customization +/// This function will be called by the repr() built-in function to compute the “official” string +/// representation of an Python object. +/// +#[cfg(feature = "python_binding")] +#[pyproto] +impl PyObjectProtocol for EventData { + fn __repr__(&self) -> PyResult { + Ok(format!("EventData({})", self.to_str())) + } +} diff --git a/bindings/tox.ini b/bindings/tox.ini index 7506f1b87..6e7b0860a 100644 --- a/bindings/tox.ini +++ b/bindings/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py35, py36, py37, py38 +envlist = py37, py38 minversion = 3.4.0 skip_missing_interpreters = true isolated_build = true From 4069143864e1060f3502f9d877990374c2ddae2e Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 15:59:55 +0530 Subject: [PATCH 06/12] Improve the readers. Signed-off-by: Sandeep --- bindings/PythonBinding.md | 2 +- bindings/src/stream_reader.rs | 20 +++++++++++++++++--- bindings/tox.ini | 2 +- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/bindings/PythonBinding.md b/bindings/PythonBinding.md index 7221999c8..51132485a 100644 --- a/bindings/PythonBinding.md +++ b/bindings/PythonBinding.md @@ -3,7 +3,7 @@ ## Steps to generate and publish Pravega Python bindings. *Pre-requisites* -- Python 3.5 and up. +- Python 3.7 and up. 1. Ensure `cargo build` works fine. 2. There are two ways of running generating bindings. This describes the steps where maturin is manually installed. diff --git a/bindings/src/stream_reader.rs b/bindings/src/stream_reader.rs index 4fdb059f6..c1933d9f9 100644 --- a/bindings/src/stream_reader.rs +++ b/bindings/src/stream_reader.rs @@ -20,7 +20,6 @@ cfg_if! { use pyo3::PyObjectProtocol; use tokio::runtime::Handle; use log::info; - use pyo3::types::PyString; use std::sync::Arc; } } @@ -42,7 +41,22 @@ pub(crate) struct StreamReader { #[pymethods] impl StreamReader { /// - /// Return a Python Future which completes when a segment slice is acquired. + /// Return a Python Future which completes when a segment slice is acquired for consumption. + /// A segment slice is data chunk received from a segment of a Pravega stream. It can contain one + /// or more events and the user can iterate over the segment slice to read the events. + /// If there are more than one segment in the stream then this API can return a segment slice of any + /// segments in the stream. The reader ensures that events returned by the stream are in order. + /// This method returns a Python Future object. + /// + /// ``` + /// import pravega_client; + /// manager=pravega_client.StreamManager("127.0.0.1:9090") + /// // lets assume the Pravega scope and stream are already created. + /// reader=manager.create_reader("scope", "stream"); + /// slice=await reader.get_segment_slice_async() + /// for event in slice: + /// print(event.data()) + ///``` /// pub fn get_segment_slice_async(&self) -> PyResult { // create a python future object. @@ -53,7 +67,7 @@ impl StreamReader { let fut: PyObject = loop_.call_method0(py, "create_future")?.into(); (fut.clone_ref(py), fut, loop_.into()) }; - let mut read = self.reader.clone(); + let read = self.reader.clone(); self.handle.spawn(async move { let slice_result = read.lock().await.acquire_segment().await; if let Some(slice) = slice_result { diff --git a/bindings/tox.ini b/bindings/tox.ini index 6e7b0860a..ad79c112f 100644 --- a/bindings/tox.ini +++ b/bindings/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37, py38 +envlist = py37 minversion = 3.4.0 skip_missing_interpreters = true isolated_build = true From 61fbcc40689bf4b3877a2b9d9c1c05fd4a39cade Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 16:15:03 +0530 Subject: [PATCH 07/12] Remove out of scope changes. Signed-off-by: Sandeep --- bindings/src/pravega_reader_test.py | 5 +- src/event_reader.rs | 208 +--------------------------- src/segment_slice.rs | 5 +- 3 files changed, 8 insertions(+), 210 deletions(-) diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index ea235b002..221148472 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -14,11 +14,10 @@ import pravega_client; import asyncio - +# Helper method to invoke an coroutine inside a test. def _run(coro): return asyncio.get_event_loop().run_until_complete(coro) - class PravegaReaderTest(unittest.TestCase): def test_writeEventAndRead(self): scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) @@ -48,5 +47,7 @@ def test_writeEventAndRead(self): print(event.data()) self.assertEqual(b'test event', event.data()) + # wrapper function to ensure we pass a co-routine to run method, since we cannot directly invoke + # await reader.get_segment_slice_async() inside the test. async def get_segment_slice(self, reader): return await reader.get_segment_slice_async() diff --git a/src/event_reader.rs b/src/event_reader.rs index 7800a47bb..e45d3d192 100644 --- a/src/event_reader.rs +++ b/src/event_reader.rs @@ -64,7 +64,7 @@ pub type SegmentReadResult = Result; /// if let Some(event) = segment_slice.next() { /// println!("Event read is {:?}", event); /// // release the segment slice back to the reader. -/// reader.release_segment(segment_slice); +/// reader.release_segment_at(segment_slice); /// } /// } /// } @@ -312,7 +312,7 @@ impl EventReader { /// /// Release a partially read segment slice back to event reader. /// - pub fn release_segment(&mut self, slice: SegmentSlice) { + pub fn release_segment_at(&mut self, slice: SegmentSlice) { //stop reading data if let Some(tx) = slice.slice_return_tx { if let Err(_e) = tx.send(slice.meta.clone()) { @@ -328,61 +328,6 @@ impl EventReader { self.meta.add_slices(slice.meta); } - /// - /// Release a segment back to the reader and also indicate the offset upto which the segment slice is consumed. - /// - pub fn release_segment_at(&mut self, slice: SegmentSlice, offset: i64) { - assert!( - offset >= 0, - "the offset where the segment slice is released should be a positive number" - ); - assert!( - slice.meta.start_offset <= offset, - "The offset where the segment slice is released should be greater than the start offset" - ); - assert!( - slice.meta.end_offset >= offset, - "The offset where the segment slice is released should be less than the end offset" - ); - - let segment = ScopedSegment::from(slice.meta.scoped_segment.clone().as_str()); - if slice.meta.read_offset != offset { - self.meta.stop_reading(&slice.meta.scoped_segment); - - let slice_meta = SliceMetadata { - start_offset: slice.meta.read_offset, - scoped_segment: slice.meta.scoped_segment, - last_event_offset: slice.meta.last_event_offset, - read_offset: offset, - end_offset: slice.meta.end_offset, - segment_data: SegmentDataBuffer::empty(), - partial_data_present: false, - }; - - // reinitialize the segment data reactor. - let (tx_drop_fetch, rx_drop_fetch) = oneshot::channel(); - self.factory - .get_runtime_handle() - .spawn(SegmentSlice::get_segment_data( - segment.clone(), - slice_meta.read_offset, // start reading from the offset provided. - self.tx.clone(), - rx_drop_fetch, - self.factory.clone(), - )); - self.meta.add_stop_reading_tx(segment.to_string(), tx_drop_fetch); - self.meta.add_slices(slice_meta); - } else { - self.release_segment(slice); - } - } - - pub async fn acquire_segment_test(&self) -> Option { - use std::time::Duration; - std::thread::sleep(Duration::from_secs(3)); - Some(self.stream.to_string()) - } - /// /// This function returns a SegmentSlice from the data received from the SegmentStore(s). /// Individual events can be read from the data received using `SegmentSlice.next()`. @@ -539,8 +484,6 @@ mod tests { use std::iter; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; - use tokio::sync::oneshot; - use tokio::sync::oneshot::error::TryRecvError; use tokio::time::{delay_for, Duration}; use tracing::Level; @@ -743,7 +686,7 @@ mod tests { assert_eq!(event.offset_in_segment, 0); // first event. // release the segment slice. - reader.release_segment(slice); + reader.release_segment_at(slice); // acquire the next segment let slice = cf @@ -751,7 +694,7 @@ mod tests { .block_on(reader.acquire_segment()) .unwrap(); //Do not read, simply return it back. - reader.release_segment(slice); + reader.release_segment_at(slice); // Try acquiring the segment again. let mut slice = cf @@ -765,94 +708,6 @@ mod tests { assert_eq!(event.offset_in_segment, 8 + 1); // first event. } - #[test] - fn test_return_slice_at_offset() { - const NUM_EVENTS: usize = 2; - let (tx, rx) = mpsc::channel(1); - let (stop_tx, stop_rx) = oneshot::channel(); - tracing_subscriber::fmt().with_max_level(Level::TRACE).finish(); - let cf = ClientFactory::new( - ClientConfigBuilder::default() - .controller_uri(MOCK_CONTROLLER_URI) - .build() - .unwrap(), - ); - let stream = get_scoped_stream("scope", "test"); - - // simulate data being received from Segment store. - cf.get_runtime_handle().enter(|| { - tokio::spawn(generate_constant_size_events( - tx.clone(), - 20, - NUM_EVENTS, - 0, - false, - stop_rx, - )); - }); - let mut stop_reading_map: HashMap> = HashMap::new(); - stop_reading_map.insert("scope/test/0.#epoch.0".to_string(), stop_tx); - - // simulate initialization of a Reader - let init_segments = vec![create_segment_slice(0), create_segment_slice(1)]; - - // create a new Event Reader with the segment slice data. - let mut reader = EventReader::init_event_reader( - stream, - cf.clone(), - tx.clone(), - rx, - create_slice_map(init_segments), - stop_reading_map, - ); - - // acquire a segment - let mut slice = cf - .get_runtime_handle() - .block_on(reader.acquire_segment()) - .unwrap(); - - // read an event. - let event = slice.next().unwrap(); - assert_eq!(event.value.len(), 1); - assert!(is_all_same(event.value.as_slice()), "Event has been corrupted"); - assert_eq!(event.offset_in_segment, 0); // first event. - - let result = slice.next(); - assert!(result.is_some()); - let event = result.unwrap(); - assert_eq!(event.value.len(), 1); - assert!(is_all_same(event.value.as_slice()), "Event has been corrupted"); - assert_eq!(event.offset_in_segment, 9); // second event. - - // release the segment slice. - reader.release_segment_at(slice, 0); - - // simulate a segment read at offset 0. - let (_stop_tx, stop_rx) = oneshot::channel(); - cf.get_runtime_handle().enter(|| { - tokio::spawn(generate_constant_size_events( - tx.clone(), - 20, - NUM_EVENTS, - 0, - false, - stop_rx, - )); - }); - - // acquire the next segment - let mut slice = cf - .get_runtime_handle() - .block_on(reader.acquire_segment()) - .unwrap(); - // Verify a partial event being present. This implies - let event = slice.next().unwrap(); - assert_eq!(event.value.len(), 1); - assert!(is_all_same(event.value.as_slice()), "Event has been corrupted"); - assert_eq!(event.offset_in_segment, 0); // first event. - } - fn read_n_events(slice: &mut SegmentSlice, events_to_read: usize) { let mut event_count = 0; loop { @@ -894,60 +749,6 @@ mod tests { stream } - // Generate events to simulate Pravega SegmentReadCommand. - async fn generate_constant_size_events( - mut tx: Sender, - buf_size: usize, - num_events: usize, - segment_id: usize, - should_delay: bool, - mut stop_generation: oneshot::Receiver<()>, - ) { - let mut segment_name = "scope/test/".to_owned(); - segment_name.push_str(segment_id.to_string().as_ref()); - let mut buf = BytesMut::with_capacity(buf_size); - let mut offset: i64 = 0; - for _i in 1..num_events + 1 { - if let Ok(_) | Err(TryRecvError::Closed) = stop_generation.try_recv() { - break; - } - let mut data = event_data(1); // constant event data. - if data.len() < buf.capacity() - buf.len() { - buf.put(data); - } else { - while data.len() > 0 { - let free_space = buf.capacity() - buf.len(); - if free_space == 0 { - if should_delay { - delay_for(Duration::from_millis(100)).await; - } - tx.send(Ok(SegmentDataBuffer { - segment: ScopedSegment::from(segment_name.as_str()).to_string(), - offset_in_segment: offset, - value: buf, - })) - .await - .unwrap(); - offset += buf_size as i64; - buf = BytesMut::with_capacity(buf_size); - } else if free_space >= data.len() { - buf.put(data.split()); - } else { - buf.put(data.split_to(free_space)); - } - } - } - } - // send the last event. - tx.send(Ok(SegmentDataBuffer { - segment: ScopedSegment::from(segment_name.as_str()).to_string(), - offset_in_segment: offset, - value: buf, - })) - .await - .unwrap(); - } - // Generate events to simulate Pravega SegmentReadCommand. async fn generate_variable_size_events( mut tx: Sender, @@ -1019,7 +820,6 @@ mod tests { meta: SliceMetadata { start_offset: 0, scoped_segment: segment.to_string(), - last_event_offset: 0, read_offset: 0, end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), diff --git a/src/segment_slice.rs b/src/segment_slice.rs index a0e4774dc..632d415b0 100644 --- a/src/segment_slice.rs +++ b/src/segment_slice.rs @@ -47,7 +47,6 @@ pub struct SegmentSlice { pub struct SliceMetadata { pub start_offset: i64, pub scoped_segment: String, - pub last_event_offset: i64, pub read_offset: i64, pub end_offset: i64, pub(crate) segment_data: SegmentDataBuffer, @@ -165,7 +164,6 @@ impl Default for SliceMetadata { SliceMetadata { start_offset: Default::default(), scoped_segment: Default::default(), - last_event_offset: Default::default(), read_offset: Default::default(), end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), @@ -189,8 +187,7 @@ impl SegmentSlice { meta: SliceMetadata { start_offset, scoped_segment: segment.to_string(), - last_event_offset: 0, - read_offset: start_offset, + read_offset: 0, end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), partial_data_present: false, From 53c9651311e42cda8b639b975f9778c5aa2c7f49 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 16:20:15 +0530 Subject: [PATCH 08/12] remove redundant changes. Signed-off-by: Sandeep --- src/segment_slice.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/segment_slice.rs b/src/segment_slice.rs index 632d415b0..5e35aab17 100644 --- a/src/segment_slice.rs +++ b/src/segment_slice.rs @@ -366,8 +366,7 @@ impl Iterator for SegmentSlice { match res { Some(event) => { - self.meta.last_event_offset = event.offset_in_segment; - self.meta.read_offset = self.meta.segment_data.offset_in_segment; + self.meta.read_offset = event.offset_in_segment; Some(event) } None => { @@ -477,7 +476,6 @@ mod tests { meta: SliceMetadata { start_offset: 0, scoped_segment: segment.to_string(), - last_event_offset: 0, read_offset: 0, end_offset: i64::MAX, segment_data: SegmentDataBuffer::empty(), From 11df1aa7e24ad4a5c65ff92679b03f0618c87a43 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 17:48:23 +0530 Subject: [PATCH 09/12] Fix clippy and improve test. Signed-off-by: Sandeep --- bindings/src/pravega_reader_test.py | 15 ++++++++++----- bindings/src/stream_reader.rs | 4 ++-- src/error.rs | 5 +---- src/segment_reader.rs | 5 +---- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index 221148472..a960dcce2 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -14,10 +14,12 @@ import pravega_client; import asyncio + # Helper method to invoke an coroutine inside a test. def _run(coro): return asyncio.get_event_loop().run_until_complete(coro) + class PravegaReaderTest(unittest.TestCase): def test_writeEventAndRead(self): scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) @@ -40,12 +42,15 @@ def test_writeEventAndRead(self): w1.write_event("test event") r1 = stream_manager.create_reader(scope, "testStream") - slice = _run( self.get_segment_slice(r1)) - print("completed invoked") - print(slice) - for event in slice: + segment_slice = _run(self.get_segment_slice(r1)) + print(segment_slice) + # consume the segment slice for events. + count=0 + for event in segment_slice: + count+=1 print(event.data()) - self.assertEqual(b'test event', event.data()) + self.assertEqual(b'test event', event.data(), "Invalid event data") + self.assertEqual(count, 2, "Two events is expected") # wrapper function to ensure we pass a co-routine to run method, since we cannot directly invoke # await reader.get_segment_slice_async() inside the test. diff --git a/bindings/src/stream_reader.rs b/bindings/src/stream_reader.rs index c1933d9f9..e6575439b 100644 --- a/bindings/src/stream_reader.rs +++ b/bindings/src/stream_reader.rs @@ -64,8 +64,8 @@ impl StreamReader { let gil = Python::acquire_gil(); let py = gil.python(); let loop_ = StreamReader::get_loop(py)?; - let fut: PyObject = loop_.call_method0(py, "create_future")?.into(); - (fut.clone_ref(py), fut, loop_.into()) + let fut: PyObject = loop_.call_method0(py, "create_future")?; + (fut.clone_ref(py), fut, loop_) }; let read = self.reader.clone(); self.handle.spawn(async move { diff --git a/src/error.rs b/src/error.rs index 769245137..046a787bf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -42,10 +42,7 @@ pub enum RawClientError { impl RawClientError { pub fn refresh_token(&self) -> bool { - match self { - RawClientError::AuthTokenExpired { .. } => true, - _ => false, - } + matches!(self, RawClientError::AuthTokenExpired {..}) } } diff --git a/src/segment_reader.rs b/src/segment_reader.rs index 012df310a..92f0e81db 100644 --- a/src/segment_reader.rs +++ b/src/segment_reader.rs @@ -131,10 +131,7 @@ impl ReaderError { } fn refresh_token(&self) -> bool { - match self { - ReaderError::AuthTokenExpired { .. } => true, - _ => false, - } + matches!(self, ReaderError::AuthTokenExpired { .. }) } } // Implementation of Retryable trait for the error thrown by the Controller. From c0b453d0345f510db3ce1ad05d0626796f44b898 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 18:48:00 +0530 Subject: [PATCH 10/12] Improve error handling. Signed-off-by: Sandeep --- bindings/src/stream_manager.rs | 6 ++-- bindings/src/stream_reader.rs | 55 ++++++++++++++++++++-------------- src/segment_slice.rs | 9 ++++++ 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs index d6d73f223..f02a76f97 100644 --- a/bindings/src/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -239,13 +239,13 @@ impl StreamManager { } /// - /// Create a Writer for a given Stream. + /// Create a Reader for a given Stream. /// /// ``` /// import pravega_client; /// manager=pravega_client.StreamManager("127.0.0.1:9090") - /// // Create a writer against an already created Pravega scope and Stream. - /// writer=manager.create_writer("scope", "stream") + /// // Create a reader against an already created Pravega scope and Stream. + /// reader=manager.create_reader("scope", "stream") /// ``` /// #[text_signature = "($self, scope_name, stream_name)"] diff --git a/bindings/src/stream_reader.rs b/bindings/src/stream_reader.rs index e6575439b..a2ed9ae9b 100644 --- a/bindings/src/stream_reader.rs +++ b/bindings/src/stream_reader.rs @@ -8,9 +8,6 @@ // http://www.apache.org/licenses/LICENSE-2.0 // -use pravega_client_rust::segment_slice::{Event, SegmentSlice}; -use pyo3::PyIterProtocol; -use tokio::sync::Mutex; cfg_if! { if #[cfg(feature = "python_binding")] { use pravega_client_rust::event_reader::EventReader; @@ -21,6 +18,9 @@ cfg_if! { use tokio::runtime::Handle; use log::info; use std::sync::Arc; + use pravega_client_rust::segment_slice::{Event, SegmentSlice}; + use pyo3::PyIterProtocol; + use tokio::sync::Mutex; } } @@ -41,12 +41,11 @@ pub(crate) struct StreamReader { #[pymethods] impl StreamReader { /// - /// Return a Python Future which completes when a segment slice is acquired for consumption. + /// This method returns a Python Future which completes when a segment slice is acquired for consumption. /// A segment slice is data chunk received from a segment of a Pravega stream. It can contain one /// or more events and the user can iterate over the segment slice to read the events. - /// If there are more than one segment in the stream then this API can return a segment slice of any + /// If there are multiple segments in the stream then this API can return a segment slice of any /// segments in the stream. The reader ensures that events returned by the stream are in order. - /// This method returns a Python Future object. /// /// ``` /// import pravega_client; @@ -70,20 +69,24 @@ impl StreamReader { let read = self.reader.clone(); self.handle.spawn(async move { let slice_result = read.lock().await.acquire_segment().await; - if let Some(slice) = slice_result { - info!("Segment slice acquired "); + let slice_py: Slice = match slice_result { + Some(slice) => Slice { + seg_slice: slice, + is_empty: false, + }, + None => Slice { + seg_slice: SegmentSlice::default(), + is_empty: true, + }, + }; + let gil = Python::acquire_gil(); + let py = gil.python(); + let py_container = PyCell::new(py, slice_py).unwrap(); + if let Err(e) = StreamReader::set_fut_result(event_loop, py_future, PyObject::from(py_container)) + { let gil = Python::acquire_gil(); let py = gil.python(); - // let r = Slice { seg_slice: slice }; - let r1 = PyCell::new(py, Slice { seg_slice: slice }).unwrap(); - // let r = PyString::new(py, slice.as_str()); - if let Err(e) = StreamReader::set_fut_result(event_loop, py_future, PyObject::from(r1)) { - let gil = Python::acquire_gil(); - let py = gil.python(); - e.print(py); - } - } else { - info!("No new slices to be acquired"); + e.print(py); } }); @@ -160,6 +163,7 @@ impl EventData { /// pub(crate) struct Slice { seg_slice: SegmentSlice, + is_empty: bool, } #[pyproto] @@ -169,11 +173,16 @@ impl PyIterProtocol for Slice { } fn __next__(mut slf: PyRefMut) -> Option { - let next_event: Option = slf.seg_slice.next(); - next_event.map(|e| EventData { - offset_in_segment: e.offset_in_segment, - value: e.value, - }) + if slf.is_empty { + info!("Empty Slice while reading"); + None + } else { + let next_event: Option = slf.seg_slice.next(); + next_event.map(|e| EventData { + offset_in_segment: e.offset_in_segment, + value: e.value, + }) + } } } diff --git a/src/segment_slice.rs b/src/segment_slice.rs index 5e35aab17..496e265fe 100644 --- a/src/segment_slice.rs +++ b/src/segment_slice.rs @@ -172,6 +172,15 @@ impl Default for SliceMetadata { } } +impl Default for SegmentSlice { + fn default() -> Self { + SegmentSlice { + meta: Default::default(), + slice_return_tx: None, + } + } +} + impl SegmentSlice { /// /// Create a new SegmentSlice for a given start_offset, segment. From 1c47be6b5ef288cf9b6749b7d82c5a2d6f9a6faa Mon Sep 17 00:00:00 2001 From: Sandeep Date: Fri, 9 Oct 2020 18:49:41 +0530 Subject: [PATCH 11/12] fix typo. Signed-off-by: Sandeep --- bindings/src/pravega_reader_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index a960dcce2..184c0bb93 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -50,7 +50,7 @@ def test_writeEventAndRead(self): count+=1 print(event.data()) self.assertEqual(b'test event', event.data(), "Invalid event data") - self.assertEqual(count, 2, "Two events is expected") + self.assertEqual(count, 2, "Two events are expected") # wrapper function to ensure we pass a co-routine to run method, since we cannot directly invoke # await reader.get_segment_slice_async() inside the test. From 87bd31b2a084fb23352575c709a67f8c2b78e420 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Sat, 10 Oct 2020 19:44:53 +0530 Subject: [PATCH 12/12] CR Changes: Updated based on review feedback. Signed-off-by: Sandeep --- bindings/src/pravega_reader_test.py | 4 +-- bindings/src/stream_manager.rs | 15 ++++++++-- bindings/src/stream_reader.rs | 32 ++++++++++----------- bindings/src/stream_writer.rs | 6 ++-- bindings/src/stream_writer_transactional.rs | 6 ++-- bindings/src/transaction.rs | 6 ++-- bindings/tox.ini | 2 +- 7 files changed, 40 insertions(+), 31 deletions(-) diff --git a/bindings/src/pravega_reader_test.py b/bindings/src/pravega_reader_test.py index 184c0bb93..f6158a0ce 100644 --- a/bindings/src/pravega_reader_test.py +++ b/bindings/src/pravega_reader_test.py @@ -13,6 +13,7 @@ import string import pravega_client; import asyncio +import random # Helper method to invoke an coroutine inside a test. @@ -22,8 +23,7 @@ def _run(coro): class PravegaReaderTest(unittest.TestCase): def test_writeEventAndRead(self): - scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits) - for i in range(10)) + scope = "testScope"+str(random.randint(0, 100)) print("Creating a Stream Manager, ensure Pravega is running") stream_manager = pravega_client.StreamManager("127.0.0.1:9090") diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs index f02a76f97..63d37ea7f 100644 --- a/bindings/src/stream_manager.rs +++ b/bindings/src/stream_manager.rs @@ -25,9 +25,6 @@ cfg_if! { } } -#[cfg(feature = "python_binding")] -#[pyclass] -#[text_signature = "(controller_uri)"] /// /// Create a StreamManager by providing a controller uri. /// ``` @@ -37,12 +34,24 @@ cfg_if! { /// manager.create_scope("scope") /// ``` /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[text_signature = "(controller_uri)"] pub(crate) struct StreamManager { controller_ip: String, cf: ClientFactory, config: ClientConfig, } +/// +/// Create a StreamManager by providing a controller uri. +/// ``` +/// import pravega_client; +/// manager=pravega_client.StreamManager("127.0.0.1:9090") +/// // this manager can be used to create scopes, streams, writers and readers against Pravega. +/// manager.create_scope("scope") +/// ``` +/// #[cfg(feature = "python_binding")] #[pymethods] impl StreamManager { diff --git a/bindings/src/stream_reader.rs b/bindings/src/stream_reader.rs index a2ed9ae9b..ddd56553a 100644 --- a/bindings/src/stream_reader.rs +++ b/bindings/src/stream_reader.rs @@ -24,13 +24,13 @@ cfg_if! { } } -#[cfg(feature = "python_binding")] -#[pyclass] -#[derive(new)] /// /// This represents a Stream reader for a given Stream. /// Note: A python object of StreamReader cannot be created directly without using the StreamManager. /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] pub(crate) struct StreamReader { reader: Arc>, handle: Handle, @@ -62,9 +62,9 @@ impl StreamReader { let (py_future, py_future_clone, event_loop): (PyObject, PyObject, PyObject) = { let gil = Python::acquire_gil(); let py = gil.python(); - let loop_ = StreamReader::get_loop(py)?; - let fut: PyObject = loop_.call_method0(py, "create_future")?; - (fut.clone_ref(py), fut, loop_) + let loop_event = StreamReader::get_loop(py)?; + let fut: PyObject = loop_event.call_method0(py, "create_future")?; + (fut.clone_ref(py), fut, loop_event) }; let read = self.reader.clone(); self.handle.spawn(async move { @@ -104,7 +104,7 @@ impl StreamReader { // This is used to set the mark the Python future as complete and set its result. // ref: https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.set_result // - fn set_fut_result(loop_: PyObject, fut: PyObject, res: PyObject) -> PyResult<()> { + fn set_fut_result(event_loop: PyObject, fut: PyObject, res: PyObject) -> PyResult<()> { let gil = Python::acquire_gil(); let py = gil.python(); let sr = fut.getattr(py, "set_result")?; @@ -112,7 +112,7 @@ impl StreamReader { // ref :https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe // call_soon_threadsafe schedules the callback (setting the future to complete) to be called // in the next iteration of the event loop. - loop_.call_method1(py, "call_soon_threadsafe", (sr, res))?; + event_loop.call_method1(py, "call_soon_threadsafe", (sr, res))?; Ok(()) } @@ -124,18 +124,18 @@ impl StreamReader { // fn get_loop(py: Python) -> PyResult { let asyncio = PyModule::import(py, "asyncio")?; - let loop_ = asyncio.call0("get_running_loop")?; - Ok(loop_.into()) + let event_loop = asyncio.call0("get_running_loop")?; + Ok(event_loop.into()) } } -#[cfg(feature = "python_binding")] -#[pyclass] -#[derive(new)] /// /// This represents a Stream reader for a given Stream. /// Note: A python object of StreamReader cannot be created directly without using the StreamManager. /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] pub(crate) struct EventData { offset_in_segment: i64, value: Vec, @@ -154,13 +154,13 @@ impl EventData { } } -#[cfg(feature = "python_binding")] -#[pyclass] -#[derive(new)] /// /// This represents a Stream reader for a given Stream. /// Note: A python object of StreamReader cannot be created directly without using the StreamManager. /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] pub(crate) struct Slice { seg_slice: SegmentSlice, is_empty: bool, diff --git a/bindings/src/stream_writer.rs b/bindings/src/stream_writer.rs index 179ecc73a..828cae643 100644 --- a/bindings/src/stream_writer.rs +++ b/bindings/src/stream_writer.rs @@ -25,13 +25,13 @@ cfg_if! { } } -#[cfg(feature = "python_binding")] -#[pyclass] -#[derive(new)] /// /// This represents a Stream writer for a given Stream. /// Note: A python object of StreamWriter cannot be created directly without using the StreamManager. /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] pub(crate) struct StreamWriter { writer: EventStreamWriter, handle: Handle, diff --git a/bindings/src/stream_writer_transactional.rs b/bindings/src/stream_writer_transactional.rs index b0b55e876..32c1d93b5 100644 --- a/bindings/src/stream_writer_transactional.rs +++ b/bindings/src/stream_writer_transactional.rs @@ -23,13 +23,13 @@ cfg_if! { } } -#[cfg(feature = "python_binding")] -#[pyclass] -#[derive(new)] /// /// This represents a Transaction writer for a given Stream. /// Note: A python object of StreamTxnWriter cannot be created directly without using the StreamManager. /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] pub(crate) struct StreamTxnWriter { writer: TransactionalEventStreamWriter, handle: Handle, diff --git a/bindings/src/transaction.rs b/bindings/src/transaction.rs index 734f68749..0007cb7f7 100644 --- a/bindings/src/transaction.rs +++ b/bindings/src/transaction.rs @@ -28,13 +28,13 @@ cfg_if! { // The amount of time the python api will wait for the underlying write to be completed. const TIMEOUT_IN_SECONDS: u64 = 120; -#[cfg(feature = "python_binding")] -#[pyclass] -#[derive(new)] /// /// This represents a transaction on a given Stream. /// Note: A python object of StreamTransaction cannot be created directly without using the StreamTxnWriter. /// +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] pub(crate) struct StreamTransaction { txn: Transaction, handle: Handle, diff --git a/bindings/tox.ini b/bindings/tox.ini index ad79c112f..6e7b0860a 100644 --- a/bindings/tox.ini +++ b/bindings/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py37 +envlist = py37, py38 minversion = 3.4.0 skip_missing_interpreters = true isolated_build = true