diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index b23c6d2903c..374db32ded0 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -350,6 +350,7 @@ impl AsyncFile { #[pymethods] impl AsyncFile { /// Read and return at most size bytes, or if size is not given, until EOF. + #[pyo3(signature = (size=None))] pub fn read<'p>(&'p self, py: Python<'p>, size: Option) -> PyResult> { let state = self.0.clone(); diff --git a/bindings/python/src/operator.rs b/bindings/python/src/operator.rs index 0e6c51ce107..c412cdc919c 100644 --- a/bindings/python/src/operator.rs +++ b/bindings/python/src/operator.rs @@ -22,6 +22,7 @@ use std::time::Duration; use pyo3::prelude::*; use pyo3::types::PyBytes; use pyo3::types::PyDict; +use pyo3::types::PyTuple; use pyo3_async_runtimes::tokio::future_into_py; use crate::*; @@ -45,7 +46,11 @@ fn build_operator( /// /// Create a new blocking `Operator` with the given `scheme` and options(`**kwargs`). #[pyclass(module = "opendal")] -pub struct Operator(ocore::BlockingOperator); +pub struct Operator { + core: ocore::BlockingOperator, + __scheme: ocore::Scheme, + __map: HashMap, +} #[pymethods] impl Operator { @@ -65,18 +70,26 @@ impl Operator { }) .unwrap_or_default(); - Ok(Operator(build_operator(scheme, map)?.blocking())) + Ok(Operator { + core: build_operator(scheme.clone(), map.clone())?.blocking(), + __scheme: scheme, + __map: map, + }) } /// Add new layers upon existing operator pub fn layer(&self, layer: &layers::Layer) -> PyResult { - let op = layer.0.layer(self.0.clone().into()); - Ok(Self(op.blocking())) + let op = layer.0.layer(self.core.clone().into()); + Ok(Self { + core: op.blocking(), + __scheme: self.__scheme.clone(), + __map: self.__map.clone(), + }) } /// Open a file-like reader for the given path. pub fn open(&self, path: String, mode: String) -> PyResult { - let this = self.0.clone(); + let this = self.core.clone(); if mode == "rb" { let r = this .reader(&path) @@ -96,7 +109,7 @@ impl Operator { /// Read the whole path into bytes. pub fn read<'p>(&'p self, py: Python<'p>, path: &str) -> PyResult> { - let buffer = self.0.read(path).map_err(format_pyerr)?.to_vec(); + let buffer = self.core.read(path).map_err(format_pyerr)?.to_vec(); Buffer::new(buffer).into_bytes_ref(py) } @@ -104,7 +117,7 @@ impl Operator { #[pyo3(signature = (path, bs, **kwargs))] pub fn write(&self, path: &str, bs: Vec, kwargs: Option) -> PyResult<()> { let kwargs = kwargs.unwrap_or_default(); - let mut write = self.0.write_with(path, bs).append(kwargs.append); + let mut write = self.core.write_with(path, bs).append(kwargs.append); if let Some(chunk) = kwargs.chunk { write = write.chunk(chunk); } @@ -123,22 +136,25 @@ impl Operator { /// Get current path's metadata **without cache** directly. pub fn stat(&self, path: &str) -> PyResult { - self.0.stat(path).map_err(format_pyerr).map(Metadata::new) + self.core + .stat(path) + .map_err(format_pyerr) + .map(Metadata::new) } /// Copy source to target. pub fn copy(&self, source: &str, target: &str) -> PyResult<()> { - self.0.copy(source, target).map_err(format_pyerr) + self.core.copy(source, target).map_err(format_pyerr) } /// Rename filename. pub fn rename(&self, source: &str, target: &str) -> PyResult<()> { - self.0.rename(source, target).map_err(format_pyerr) + self.core.rename(source, target).map_err(format_pyerr) } /// Remove all file pub fn remove_all(&self, path: &str) -> PyResult<()> { - self.0.remove_all(path).map_err(format_pyerr) + self.core.remove_all(path).map_err(format_pyerr) } /// Create a dir at given path. @@ -154,7 +170,7 @@ impl Operator { /// - Create on existing dir will succeed. /// - Create dir is always recursive, works like `mkdir -p` pub fn create_dir(&self, path: &str) -> PyResult<()> { - self.0.create_dir(path).map_err(format_pyerr) + self.core.create_dir(path).map_err(format_pyerr) } /// Delete given path. @@ -163,19 +179,19 @@ impl Operator { /// /// - Delete not existing error won't return errors. pub fn delete(&self, path: &str) -> PyResult<()> { - self.0.delete(path).map_err(format_pyerr) + self.core.delete(path).map_err(format_pyerr) } /// List current dir path. pub fn list(&self, path: &str) -> PyResult { - let l = self.0.lister(path).map_err(format_pyerr)?; + let l = self.core.lister(path).map_err(format_pyerr)?; Ok(BlockingLister::new(l)) } /// List dir in flat way. pub fn scan(&self, path: &str) -> PyResult { let l = self - .0 + .core .lister_with(path) .recursive(true) .call() @@ -184,15 +200,21 @@ impl Operator { } pub fn capability(&self) -> PyResult { - Ok(capability::Capability::new(self.0.info().full_capability())) + Ok(capability::Capability::new( + self.core.info().full_capability(), + )) } pub fn to_async_operator(&self) -> PyResult { - Ok(AsyncOperator(self.0.clone().into())) + Ok(AsyncOperator { + core: self.core.clone().into(), + __scheme: self.__scheme.clone(), + __map: self.__map.clone(), + }) } fn __repr__(&self) -> String { - let info = self.0.info(); + let info = self.core.info(); let name = info.name(); if name.is_empty() { format!("Operator(\"{}\", root=\"{}\")", info.scheme(), info.root()) @@ -204,13 +226,24 @@ impl Operator { ) } } + + fn __getnewargs_ex__(&self, py: Python) -> PyResult { + let args = vec![self.__scheme.to_string().to_object(py)]; + let args = PyTuple::new_bound(py, args); + let kwargs = self.__map.clone().into_py(py); + Ok(PyTuple::new_bound(py, [args.to_object(py), kwargs.to_object(py)]).to_object(py)) + } } /// `AsyncOperator` is the entry for all public async APIs /// /// Create a new `AsyncOperator` with the given `scheme` and options(`**kwargs`). #[pyclass(module = "opendal")] -pub struct AsyncOperator(ocore::Operator); +pub struct AsyncOperator { + core: ocore::Operator, + __scheme: ocore::Scheme, + __map: HashMap, +} #[pymethods] impl AsyncOperator { @@ -230,13 +263,21 @@ impl AsyncOperator { }) .unwrap_or_default(); - Ok(AsyncOperator(build_operator(scheme, map)?)) + Ok(AsyncOperator { + core: build_operator(scheme.clone(), map.clone())?.into(), + __scheme: scheme, + __map: map, + }) } /// Add new layers upon existing operator pub fn layer(&self, layer: &layers::Layer) -> PyResult { - let op = layer.0.layer(self.0.clone()); - Ok(Self(op)) + let op = layer.0.layer(self.core.clone()); + Ok(Self { + core: op, + __scheme: self.__scheme.clone(), + __map: self.__map.clone(), + }) } /// Open a file-like reader for the given path. @@ -246,7 +287,7 @@ impl AsyncOperator { path: String, mode: String, ) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { if mode == "rb" { @@ -271,7 +312,7 @@ impl AsyncOperator { /// Read the whole path into bytes. pub fn read<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let res: Vec = this.read(&path).await.map_err(format_pyerr)?.to_vec(); Python::with_gil(|py| Buffer::new(res).into_bytes(py)) @@ -288,7 +329,7 @@ impl AsyncOperator { kwargs: Option, ) -> PyResult> { let kwargs = kwargs.unwrap_or_default(); - let this = self.0.clone(); + let this = self.core.clone(); let bs = bs.as_bytes().to_vec(); future_into_py(py, async move { let mut write = this.write_with(&path, bs).append(kwargs.append); @@ -310,7 +351,7 @@ impl AsyncOperator { /// Get current path's metadata **without cache** directly. pub fn stat<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let res: Metadata = this .stat(&path) @@ -329,7 +370,7 @@ impl AsyncOperator { source: String, target: String, ) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { this.copy(&source, &target).await.map_err(format_pyerr) }) @@ -342,7 +383,7 @@ impl AsyncOperator { source: String, target: String, ) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { this.rename(&source, &target).await.map_err(format_pyerr) }) @@ -350,7 +391,7 @@ impl AsyncOperator { /// Remove all file pub fn remove_all<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { this.remove_all(&path).await.map_err(format_pyerr) }) @@ -369,7 +410,7 @@ impl AsyncOperator { /// - Create on existing dir will succeed. /// - Create dir is always recursive, works like `mkdir -p` pub fn create_dir<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { this.create_dir(&path).await.map_err(format_pyerr) }) @@ -381,7 +422,7 @@ impl AsyncOperator { /// /// - Delete not existing error won't return errors. pub fn delete<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py( py, async move { this.delete(&path).await.map_err(format_pyerr) }, @@ -390,7 +431,7 @@ impl AsyncOperator { /// List current dir path. pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let lister = this.lister(&path).await.map_err(format_pyerr)?; let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py)); @@ -400,7 +441,7 @@ impl AsyncOperator { /// List dir in flat way. pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let lister = this .lister_with(&path) @@ -419,7 +460,7 @@ impl AsyncOperator { path: String, expire_second: u64, ) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let res = this .presign_stat(&path, Duration::from_secs(expire_second)) @@ -438,7 +479,7 @@ impl AsyncOperator { path: String, expire_second: u64, ) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let res = this .presign_read(&path, Duration::from_secs(expire_second)) @@ -457,7 +498,7 @@ impl AsyncOperator { path: String, expire_second: u64, ) -> PyResult> { - let this = self.0.clone(); + let this = self.core.clone(); future_into_py(py, async move { let res = this .presign_write(&path, Duration::from_secs(expire_second)) @@ -470,15 +511,21 @@ impl AsyncOperator { } pub fn capability(&self) -> PyResult { - Ok(capability::Capability::new(self.0.info().full_capability())) + Ok(capability::Capability::new( + self.core.info().full_capability(), + )) } pub fn to_operator(&self) -> PyResult { - Ok(Operator(self.0.clone().blocking())) + Ok(Operator { + core: self.core.clone().blocking(), + __scheme: self.__scheme.clone(), + __map: self.__map.clone(), + }) } fn __repr__(&self) -> String { - let info = self.0.info(); + let info = self.core.info(); let name = info.name(); if name.is_empty() { format!( @@ -494,6 +541,13 @@ impl AsyncOperator { ) } } + + fn __getnewargs_ex__(&self, py: Python) -> PyResult { + let args = vec![self.__scheme.to_string().to_object(py)]; + let args = PyTuple::new_bound(py, args); + let kwargs = self.__map.clone().into_py(py); + Ok(PyTuple::new_bound(py, [args.to_object(py), kwargs.to_object(py)]).to_object(py)) + } } #[pyclass(module = "opendal")] diff --git a/bindings/python/tests/test_async_pickle_types.py b/bindings/python/tests/test_async_pickle_types.py new file mode 100644 index 00000000000..c163be3c28f --- /dev/null +++ b/bindings/python/tests/test_async_pickle_types.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import pickle +from random import randint +from uuid import uuid4 +import os + + +@pytest.mark.asyncio +@pytest.mark.need_capability("read", "write", "delete", "shared") +async def test_operator_pickle(service_name, operator, async_operator): + """ + Test AsyncOperator's pickle serialization and deserialization. + """ + + size = randint(1, 1024) + filename = f"random_file_{str(uuid4())}" + content = os.urandom(size) + await async_operator.write(filename, content) + + serialized = pickle.dumps(async_operator) + + deserialized = pickle.loads(serialized) + assert await deserialized.read(filename) == content + + await async_operator.delete(filename) diff --git a/bindings/python/tests/test_pickle.py b/bindings/python/tests/test_pickle_rw.py similarity index 100% rename from bindings/python/tests/test_pickle.py rename to bindings/python/tests/test_pickle_rw.py diff --git a/bindings/python/tests/test_sync_pickle_types.py b/bindings/python/tests/test_sync_pickle_types.py new file mode 100644 index 00000000000..337ebfa6bdd --- /dev/null +++ b/bindings/python/tests/test_sync_pickle_types.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import pickle +from random import randint +from uuid import uuid4 +import os + + +@pytest.mark.need_capability("read", "write", "delete", "shared") +def test_operator_pickle(service_name, operator, async_operator): + """ + Test Operator's pickle serialization and deserialization. + """ + + size = randint(1, 1024) + filename = f"random_file_{str(uuid4())}" + content = os.urandom(size) + operator.write(filename, content) + + serialized = pickle.dumps(operator) + + deserialized = pickle.loads(serialized) + assert deserialized.read(filename) == content + + operator.delete(filename)