Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/python): support pickle [de]serialization for Operator #5324

Merged
merged 13 commits into from
Nov 16, 2024
Merged
1 change: 1 addition & 0 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ impl AsyncFile {
#[pymethods]
impl AsyncFile {
/// Read and return at most size bytes, or if size is not given, until EOF.
#[pyo3(signature = (size=None))]
pub fn read<'p>(&'p self, py: Python<'p>, size: Option<usize>) -> PyResult<Bound<PyAny>> {
let state = self.0.clone();

Expand Down
132 changes: 93 additions & 39 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::time::Duration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::types::PyDict;
use pyo3::types::PyTuple;
use pyo3_async_runtimes::tokio::future_into_py;

use crate::*;
Expand All @@ -45,7 +46,11 @@ fn build_operator(
///
/// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`).
#[pyclass(module = "opendal")]
pub struct Operator(ocore::BlockingOperator);
pub struct Operator {
core: ocore::BlockingOperator,
__scheme: ocore::Scheme,
__map: HashMap<String, String>,
}

#[pymethods]
impl Operator {
Expand All @@ -65,18 +70,26 @@ impl Operator {
})
.unwrap_or_default();

Ok(Operator(build_operator(scheme, map)?.blocking()))
Ok(Operator {
core: build_operator(scheme.clone(), map.clone())?.blocking(),
__scheme: scheme,
__map: map,
})
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone().into());
Ok(Self(op.blocking()))
let op = layer.0.layer(self.core.clone().into());
Ok(Self {
core: op.blocking(),
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

/// Open a file-like reader for the given path.
pub fn open(&self, path: String, mode: String) -> PyResult<File> {
let this = self.0.clone();
let this = self.core.clone();
if mode == "rb" {
let r = this
.reader(&path)
Expand All @@ -96,15 +109,15 @@ impl Operator {

/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult<Bound<PyAny>> {
let buffer = self.0.read(path).map_err(format_pyerr)?.to_vec();
let buffer = self.core.read(path).map_err(format_pyerr)?.to_vec();
Buffer::new(buffer).into_bytes_ref(py)
}

/// Write bytes into given path.
#[pyo3(signature = (path, bs, **kwargs))]
pub fn write(&self, path: &str, bs: Vec<u8>, kwargs: Option<WriteOptions>) -> PyResult<()> {
let kwargs = kwargs.unwrap_or_default();
let mut write = self.0.write_with(path, bs).append(kwargs.append);
let mut write = self.core.write_with(path, bs).append(kwargs.append);
if let Some(chunk) = kwargs.chunk {
write = write.chunk(chunk);
}
Expand All @@ -123,22 +136,25 @@ impl Operator {

/// Get current path's metadata **without cache** directly.
pub fn stat(&self, path: &str) -> PyResult<Metadata> {
self.0.stat(path).map_err(format_pyerr).map(Metadata::new)
self.core
.stat(path)
.map_err(format_pyerr)
.map(Metadata::new)
}

/// Copy source to target.
pub fn copy(&self, source: &str, target: &str) -> PyResult<()> {
self.0.copy(source, target).map_err(format_pyerr)
self.core.copy(source, target).map_err(format_pyerr)
}

/// Rename filename.
pub fn rename(&self, source: &str, target: &str) -> PyResult<()> {
self.0.rename(source, target).map_err(format_pyerr)
self.core.rename(source, target).map_err(format_pyerr)
}

/// Remove all file
pub fn remove_all(&self, path: &str) -> PyResult<()> {
self.0.remove_all(path).map_err(format_pyerr)
self.core.remove_all(path).map_err(format_pyerr)
}

/// Create a dir at given path.
Expand All @@ -154,7 +170,7 @@ impl Operator {
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir(&self, path: &str) -> PyResult<()> {
self.0.create_dir(path).map_err(format_pyerr)
self.core.create_dir(path).map_err(format_pyerr)
}

/// Delete given path.
Expand All @@ -163,19 +179,19 @@ impl Operator {
///
/// - Delete not existing error won't return errors.
pub fn delete(&self, path: &str) -> PyResult<()> {
self.0.delete(path).map_err(format_pyerr)
self.core.delete(path).map_err(format_pyerr)
}

/// List current dir path.
pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
let l = self.0.lister(path).map_err(format_pyerr)?;
let l = self.core.lister(path).map_err(format_pyerr)?;
Ok(BlockingLister::new(l))
}

/// List dir in flat way.
pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
let l = self
.0
.core
.lister_with(path)
.recursive(true)
.call()
Expand All @@ -184,15 +200,21 @@ impl Operator {
}

pub fn capability(&self) -> PyResult<capability::Capability> {
Ok(capability::Capability::new(self.0.info().full_capability()))
Ok(capability::Capability::new(
self.core.info().full_capability(),
))
}

pub fn to_async_operator(&self) -> PyResult<AsyncOperator> {
Ok(AsyncOperator(self.0.clone().into()))
Ok(AsyncOperator {
core: self.core.clone().into(),
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

fn __repr__(&self) -> String {
Copy link
Member

@Zheaoli Zheaoli Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need add extra __hash__ method to represent two object are equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to implement, we can't get internal information of Operator.

Copy link
Member

@Zheaoli Zheaoli Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about hash the scheme and the map?

repr is used for show what the object is looking at. Maybe not a good method for identify

Copy link
Contributor Author

@TennyZhuang TennyZhuang Nov 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer don’t implement hash and eq for Operator, and remove the equal assertion based on repr.

(schema, map) can’t identify an Operator, leave eq and empty empty is better. Only the ID of PyObject can identify the Operator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer don’t implement hash and eq for Operator, and remove the equal assertion based on repr.

Agreed. ocore::Operator doesn't implement Eq ahd Hash too.

let info = self.0.info();
let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root())
Expand All @@ -204,13 +226,24 @@ impl Operator {
)
}
}

fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
let args = vec![self.__scheme.to_string().to_object(py)];
let args = PyTuple::new_bound(py, args);
let kwargs = self.__map.clone().into_py(py);
Ok(PyTuple::new_bound(py, [args.to_object(py), kwargs.to_object(py)]).to_object(py))
}
}

/// `AsyncOperator` is the entry for all public async APIs
///
/// Create a new `AsyncOperator` with the given `scheme` and options(`**kwargs`).
#[pyclass(module = "opendal")]
pub struct AsyncOperator(ocore::Operator);
pub struct AsyncOperator {
core: ocore::Operator,
__scheme: ocore::Scheme,
__map: HashMap<String, String>,
}

#[pymethods]
impl AsyncOperator {
Expand All @@ -230,13 +263,21 @@ impl AsyncOperator {
})
.unwrap_or_default();

Ok(AsyncOperator(build_operator(scheme, map)?))
Ok(AsyncOperator {
core: build_operator(scheme.clone(), map.clone())?.into(),
__scheme: scheme,
__map: map,
})
}

/// Add new layers upon existing operator
pub fn layer(&self, layer: &layers::Layer) -> PyResult<Self> {
let op = layer.0.layer(self.0.clone());
Ok(Self(op))
let op = layer.0.layer(self.core.clone());
Ok(Self {
core: op,
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

/// Open a file-like reader for the given path.
Expand All @@ -246,7 +287,7 @@ impl AsyncOperator {
path: String,
mode: String,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();

future_into_py(py, async move {
if mode == "rb" {
Expand All @@ -271,7 +312,7 @@ impl AsyncOperator {

/// Read the whole path into bytes.
pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res: Vec<u8> = this.read(&path).await.map_err(format_pyerr)?.to_vec();
Python::with_gil(|py| Buffer::new(res).into_bytes(py))
Expand All @@ -288,7 +329,7 @@ impl AsyncOperator {
kwargs: Option<WriteOptions>,
) -> PyResult<Bound<PyAny>> {
let kwargs = kwargs.unwrap_or_default();
let this = self.0.clone();
let this = self.core.clone();
let bs = bs.as_bytes().to_vec();
future_into_py(py, async move {
let mut write = this.write_with(&path, bs).append(kwargs.append);
Expand All @@ -310,7 +351,7 @@ impl AsyncOperator {

/// Get current path's metadata **without cache** directly.
pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res: Metadata = this
.stat(&path)
Expand All @@ -329,7 +370,7 @@ impl AsyncOperator {
source: String,
target: String,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.copy(&source, &target).await.map_err(format_pyerr)
})
Expand All @@ -342,15 +383,15 @@ impl AsyncOperator {
source: String,
target: String,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.rename(&source, &target).await.map_err(format_pyerr)
})
}

/// Remove all file
pub fn remove_all<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.remove_all(&path).await.map_err(format_pyerr)
})
Expand All @@ -369,7 +410,7 @@ impl AsyncOperator {
/// - Create on existing dir will succeed.
/// - Create dir is always recursive, works like `mkdir -p`
pub fn create_dir<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
this.create_dir(&path).await.map_err(format_pyerr)
})
Expand All @@ -381,7 +422,7 @@ impl AsyncOperator {
///
/// - Delete not existing error won't return errors.
pub fn delete<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(
py,
async move { this.delete(&path).await.map_err(format_pyerr) },
Expand All @@ -390,7 +431,7 @@ impl AsyncOperator {

/// List current dir path.
pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let lister = this.lister(&path).await.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py));
Expand All @@ -400,7 +441,7 @@ impl AsyncOperator {

/// List dir in flat way.
pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let lister = this
.lister_with(&path)
Expand All @@ -419,7 +460,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_stat(&path, Duration::from_secs(expire_second))
Expand All @@ -438,7 +479,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_read(&path, Duration::from_secs(expire_second))
Expand All @@ -457,7 +498,7 @@ impl AsyncOperator {
path: String,
expire_second: u64,
) -> PyResult<Bound<PyAny>> {
let this = self.0.clone();
let this = self.core.clone();
future_into_py(py, async move {
let res = this
.presign_write(&path, Duration::from_secs(expire_second))
Expand All @@ -470,15 +511,21 @@ impl AsyncOperator {
}

pub fn capability(&self) -> PyResult<capability::Capability> {
Ok(capability::Capability::new(self.0.info().full_capability()))
Ok(capability::Capability::new(
self.core.info().full_capability(),
))
}

pub fn to_operator(&self) -> PyResult<Operator> {
Ok(Operator(self.0.clone().blocking()))
Ok(Operator {
core: self.core.clone().blocking(),
__scheme: self.__scheme.clone(),
__map: self.__map.clone(),
})
}

fn __repr__(&self) -> String {
let info = self.0.info();
let info = self.core.info();
let name = info.name();
if name.is_empty() {
format!(
Expand All @@ -494,6 +541,13 @@ impl AsyncOperator {
)
}
}

fn __getnewargs_ex__(&self, py: Python) -> PyResult<PyObject> {
let args = vec![self.__scheme.to_string().to_object(py)];
let args = PyTuple::new_bound(py, args);
let kwargs = self.__map.clone().into_py(py);
Ok(PyTuple::new_bound(py, [args.to_object(py), kwargs.to_object(py)]).to_object(py))
}
}

#[pyclass(module = "opendal")]
Expand Down
Loading
Loading