Skip to content

Commit

Permalink
Use rayon global thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
deedy5 committed Dec 6, 2024
1 parent f615a61 commit 938ec0d
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 28 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ html2text = "0.13"
bytes = "1"
pythonize = "0.23"
serde_json = "1"
rayon = "1"

[profile.release]
codegen-units = 1
Expand Down
67 changes: 39 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fs;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use std::sync::{mpsc, Arc, LazyLock};
use std::time::Duration;

use ahash::RandomState;
Expand All @@ -11,11 +11,12 @@ use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pythonize::depythonize;
use rayon::{ThreadPool, ThreadPoolBuilder};
use rquest::boring::x509::{store::X509StoreBuilder, X509};
use rquest::header::{HeaderMap, HeaderName, HeaderValue, COOKIE};
use rquest::tls::Impersonate;
use rquest::multipart;
use rquest::redirect::Policy;
use rquest::tls::Impersonate;
use rquest::Method;
use serde_json::Value;
use tokio::runtime::{self, Runtime};
Expand All @@ -25,6 +26,9 @@ use response::Response;

mod utils;

// Rayon global thread pool
static CPU_POOL: LazyLock<ThreadPool> = LazyLock::new(|| ThreadPoolBuilder::new().build().unwrap());

// Tokio global one-thread runtime
static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
runtime::Builder::new_current_thread()
Expand Down Expand Up @@ -95,8 +99,8 @@ impl Client {
/// )
/// ```
#[new]
#[pyo3(signature = (auth=None, auth_bearer=None, params=None, headers=None, cookies=None,
cookie_store=None, referer=None, proxy=None, timeout=None, impersonate=None, follow_redirects=None,
#[pyo3(signature = (auth=None, auth_bearer=None, params=None, headers=None, cookies=None,
cookie_store=None, referer=None, proxy=None, timeout=None, impersonate=None, follow_redirects=None,
max_redirects=None, verify=None, ca_cert_file=None, http1=None, http2=None))]
fn new(
auth: Option<(String, Option<String>)>,
Expand Down Expand Up @@ -238,7 +242,7 @@ impl Client {
/// # Errors
///
/// * `PyException` - If there is an error making the request.
#[pyo3(signature = (method, url, params=None, headers=None, cookies=None, content=None,
#[pyo3(signature = (method, url, params=None, headers=None, cookies=None, content=None,
data=None, json=None, files=None, auth=None, auth_bearer=None, timeout=None))]
fn request(
&self,
Expand Down Expand Up @@ -371,18 +375,25 @@ impl Client {
Ok((buf, cookies, headers, status_code, url))
};

// Execute an async future, releasing the Python GIL for concurrency.
// Use Tokio global runtime to block on the future.
let result: Result<
(
Bytes,
IndexMap<String, String, RandomState>,
IndexMap<String, String, RandomState>,
u16,
String,
),
Error,
> = py.allow_threads(|| RUNTIME.block_on(future));
// Execute an async future in Python, releasing the GIL for concurrency.
// Uses Rayon's global thread pool and Tokio global runtime to block on the future.
let (tx, rx) = mpsc::sync_channel(1);
py.allow_threads(|| {
CPU_POOL.install(|| {
let result: Result<
(
Bytes,
IndexMap<String, String, RandomState>,
IndexMap<String, String, RandomState>,
u16,
String,
),
Error,
> = RUNTIME.block_on(future);
_ = tx.send(result);
});
});
let result = rx.recv()?;
let (f_buf, f_cookies, f_headers, f_status_code, f_url) = result?;

Ok(Response {
Expand Down Expand Up @@ -511,7 +522,7 @@ impl Client {
)
}

#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None))]
fn post(
&self,
Expand Down Expand Up @@ -545,7 +556,7 @@ impl Client {
)
}

#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None))]
fn put(
&self,
Expand Down Expand Up @@ -579,7 +590,7 @@ impl Client {
)
}

#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None))]
fn patch(
&self,
Expand Down Expand Up @@ -616,7 +627,7 @@ impl Client {

/// Convenience functions that use a default Client instance under the hood
#[pyfunction]
#[pyo3(signature = (method, url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (method, url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None, impersonate=None, verify=None,
ca_cert_file=None))]
fn request(
Expand Down Expand Up @@ -673,7 +684,7 @@ fn request(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
timeout=None, impersonate=None, verify=None, ca_cert_file=None))]
fn get(
py: Python,
Expand Down Expand Up @@ -719,7 +730,7 @@ fn get(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
timeout=None, impersonate=None, verify=None, ca_cert_file=None))]
fn head(
py: Python,
Expand Down Expand Up @@ -765,7 +776,7 @@ fn head(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
timeout=None, impersonate=None, verify=None, ca_cert_file=None))]
fn options(
py: Python,
Expand Down Expand Up @@ -811,7 +822,7 @@ fn options(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, auth=None, auth_bearer=None,
timeout=None, impersonate=None, verify=None, ca_cert_file=None))]
fn delete(
py: Python,
Expand Down Expand Up @@ -857,7 +868,7 @@ fn delete(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None, impersonate=None, verify=None,
ca_cert_file=None))]
fn post(
Expand Down Expand Up @@ -912,7 +923,7 @@ fn post(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None, impersonate=None, verify=None,
ca_cert_file=None))]
fn put(
Expand Down Expand Up @@ -967,7 +978,7 @@ fn put(
}

#[pyfunction]
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
#[pyo3(signature = (url, params=None, headers=None, cookies=None, content=None, data=None,
json=None, files=None, auth=None, auth_bearer=None, timeout=None, impersonate=None, verify=None,
ca_cert_file=None))]
fn patch(
Expand Down

0 comments on commit 938ec0d

Please sign in to comment.