Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max_heap_size option and related error variant #229

Merged
merged 4 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ deno_core = "0.313.0"
deno_ast = { version = "0.42.2", features = ["transpiling"]}

# Runtime for async tasks
tokio = "1.36.0"
tokio = "=1.36.0"
tokio-util = "0.7.12"

# For URL imports
# Pinned for now due to upstream issues
Expand Down
26 changes: 26 additions & 0 deletions examples/max_heap_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
///
/// This example shows how to set the maximum heap size for the V8 isolate.
/// This is useful when you want to limit the amount of memory a script can consume.
/// A `HeapExhausted` error will be returned if the script exceeds the limit.
///
use rustyscript::{Error, Module, Runtime, RuntimeOptions};

fn main() -> Result<(), Error> {
// Will exceed the defined heap size
let module = Module::new(
"test.js",
"const largeArray = new Array(40 * 1024 * 1024).fill('a');",
);

let mut runtime = Runtime::new(RuntimeOptions {
max_heap_size: Some(5 * 1024 * 1024),
..Default::default()
})?;

// Will return a `HeapExhausted` error
let module_handle = runtime.load_module(&module);

assert!(module_handle.is_err());

Ok(())
}
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub enum Error {
/// Triggers when a module times out before finishing
#[error("Module timed out: {0}")]
Timeout(String),

/// Triggers when the heap (via `max_heap_size`) is exhausted during execution
#[error("Heap exhausted")]
HeapExhausted,
}

impl Error {
Expand Down
104 changes: 74 additions & 30 deletions src/inner_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
rc::Rc,
time::Duration,
};
use tokio_util::sync::CancellationToken;

/// Represents a function that can be registered with the runtime
pub trait RsFunction:
Expand Down Expand Up @@ -87,6 +88,9 @@ pub struct RuntimeOptions {
/// Amount of time to run for before killing the thread
pub timeout: Duration,

/// Optional maximum heap size for the runtime
pub max_heap_size: Option<usize>,

/// Optional cache provider for the module loader
#[allow(deprecated)]
pub module_cache: Option<Box<dyn crate::module_loader::ModuleCacheProvider>>,
Expand Down Expand Up @@ -121,6 +125,7 @@ impl Default for RuntimeOptions {
extensions: Vec::default(),
default_entrypoint: None,
timeout: Duration::MAX,
max_heap_size: None,
module_cache: None,
import_provider: None,
startup_snapshot: None,
Expand All @@ -146,7 +151,10 @@ pub struct InnerRuntime {
pub default_entrypoint: Option<String>,
}
impl InnerRuntime {
pub fn new(options: RuntimeOptions) -> Result<Self, Error> {
pub fn new(
options: RuntimeOptions,
heap_exhausted_token: CancellationToken,
) -> Result<Self, Error> {
let module_loader = Rc::new(RustyLoader::new(LoaderOptions {
cache_provider: options.module_cache,
import_provider: options.import_provider,
Expand All @@ -160,14 +168,33 @@ impl InnerRuntime {
let extensions =
ext::all_extensions(options.extensions, options.extension_options, is_snapshot);

let deno_runtime = JsRuntime::try_new(deno_core::RuntimeOptions {
// If a heap size is provided, set the isolate params (preserving any user-provided params otherwise)
let isolate_params = match options.isolate_params {
Some(params) => {
if let Some(max_heap_size) = options.max_heap_size {
Some(params.heap_limits(0, max_heap_size))
} else {
Some(params)
}
}
None => {
if let Some(max_heap_size) = options.max_heap_size {
let params = v8::Isolate::create_params().heap_limits(0, max_heap_size);
Some(params)
} else {
None
}
}
};

let mut deno_runtime = JsRuntime::try_new(deno_core::RuntimeOptions {
module_loader: Some(module_loader.clone()),

extension_transpiler: Some(Rc::new(|specifier, code| {
transpile_extension(&specifier, &code)
})),

create_params: options.isolate_params,
create_params: isolate_params,
shared_array_buffer_store: options.shared_array_buffer_store.clone(),

startup_snapshot: options.startup_snapshot,
Expand All @@ -176,6 +203,22 @@ impl InnerRuntime {
..Default::default()
})?;

// Add a callback to terminate the runtime if the max_heap_size limit is approached
if options.max_heap_size.is_some() {
let isolate_handle = deno_runtime.v8_isolate().thread_safe_handle();

deno_runtime.add_near_heap_limit_callback(move |current_value, _| {
isolate_handle.terminate_execution();

// Signal the outer runtime to cancel block_on future (avoid hanging) and return friendly error
heap_exhausted_token.cancel();

// Spike the heap limit while terminating to avoid segfaulting
// Callback may fire multiple times if memory usage increases quicker then termination finalizes
5 * current_value
});
}

Ok(Self {
deno_runtime,
module_loader,
Expand Down Expand Up @@ -637,8 +680,8 @@ mod test_inner_runtime {

#[test]
fn test_decode_args() {
let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");
let mut scope = runtime.deno_runtime.handle_scope();

// empty
Expand Down Expand Up @@ -683,8 +726,8 @@ mod test_inner_runtime {

#[test]
fn test_put_take() {
let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

runtime.put(2usize).expect("Could not put value");
let v = runtime.take::<usize>().expect("Could not take value");
Expand All @@ -693,8 +736,8 @@ mod test_inner_runtime {

#[test]
fn test_register_async_function() {
let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");
runtime
.register_async_function(
"test",
Expand All @@ -720,8 +763,8 @@ mod test_inner_runtime {

#[test]
fn test_register_function() {
let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");
runtime
.register_function(
"test",
Expand All @@ -738,8 +781,8 @@ mod test_inner_runtime {
#[cfg(any(feature = "web", feature = "web_stub"))]
#[test]
fn test_eval() {
let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let result: usize = runtime.eval("2 + 2").expect("Could not eval");
assert_eq!(result, 4);
Expand Down Expand Up @@ -772,8 +815,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
let module = run_async_task(|| async move { rt.load_modules(Some(&module), vec![]).await });
Expand Down Expand Up @@ -813,8 +856,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
let module = run_async_task(|| async move { rt.load_modules(Some(&module), vec![]).await });
Expand Down Expand Up @@ -853,7 +896,8 @@ mod test_inner_runtime {

run_async_task(|| async move {
let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");
let handle = runtime.load_modules(Some(&module), vec![]).await?;

let f = runtime.get_function_by_name(None, "fna").unwrap();
Expand Down Expand Up @@ -895,8 +939,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
let module = run_async_task(|| async move { rt.load_modules(Some(&module), vec![]).await });
Expand All @@ -923,8 +967,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
let module = run_async_task(|| async move { rt.load_modules(Some(&module), vec![]).await });
Expand Down Expand Up @@ -954,8 +998,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
run_async_task(|| async move {
Expand Down Expand Up @@ -985,8 +1029,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
run_async_task(|| async move {
Expand Down Expand Up @@ -1015,8 +1059,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
let module = run_async_task(|| async move { rt.load_modules(Some(&module), vec![]).await });
Expand Down Expand Up @@ -1062,8 +1106,8 @@ mod test_inner_runtime {
",
);

let mut runtime =
InnerRuntime::new(RuntimeOptions::default()).expect("Could not load runtime");
let mut runtime = InnerRuntime::new(RuntimeOptions::default(), CancellationToken::new())
.expect("Could not load runtime");

let rt = &mut runtime;
let module = run_async_task(|| async move { rt.load_modules(Some(&module), vec![]).await });
Expand Down
36 changes: 34 additions & 2 deletions src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
};
use deno_core::serde_json;
use std::rc::Rc;
use tokio_util::sync::CancellationToken;

/// Represents the set of options accepted by the runtime constructor
pub use crate::inner_runtime::RuntimeOptions;
Expand All @@ -30,6 +31,7 @@ pub struct Runtime {
inner: InnerRuntime,
tokio: Rc<tokio::runtime::Runtime>,
timeout: std::time::Duration,
heap_exhausted_token: CancellationToken,
}

impl Runtime {
Expand Down Expand Up @@ -92,10 +94,12 @@ impl Runtime {
options: RuntimeOptions,
tokio: Rc<tokio::runtime::Runtime>,
) -> Result<Self, Error> {
let heap_exhausted_token = CancellationToken::new();
Ok(Self {
timeout: options.timeout,
inner: InnerRuntime::new(options)?,
inner: InnerRuntime::new(options, heap_exhausted_token.clone())?,
tokio,
heap_exhausted_token,
})
}

Expand All @@ -116,6 +120,12 @@ impl Runtime {
self.timeout
}

/// Returns the heap exhausted token for the runtime
#[must_use]
pub fn heap_exhausted_token(&self) -> CancellationToken {
self.heap_exhausted_token.clone()
}

/// Destroy the v8 runtime, releasing all resources
/// Then the internal tokio runtime will be returned
#[must_use]
Expand Down Expand Up @@ -1050,7 +1060,13 @@ impl Runtime {
{
let timeout = self.timeout();
let rt = self.tokio_runtime();
rt.block_on(async move { tokio::time::timeout(timeout, f(self)).await })?
let heap_exhausted_token = self.heap_exhausted_token();
rt.block_on(async move {
tokio::select! {
result = tokio::time::timeout(timeout, f(self)) => result?,
() = heap_exhausted_token.cancelled() => Err(Error::HeapExhausted),
}
})
}
}

Expand Down Expand Up @@ -1375,4 +1391,20 @@ mod test_runtime {
let unsafe_ops = get_unrecognized_ops().expect("Could not get unsafe ops");
assert_eq!(0, unsafe_ops.len(), "Found unsafe ops: {unsafe_ops:?}.\nOnce confirmed safe, add them to `src/ext/op_whitelist.js`");
}

#[test]
fn test_heap_exhaustion_handled() {
let mut runtime = Runtime::new(RuntimeOptions {
max_heap_size: Some(10 * 1024 * 1024),
..Default::default()
})
.expect("Could not create the runtime");
let module = Module::new(
"test.js",
"const largeArray = new Array(40 * 1024 * 1024).fill('a');",
);
runtime
.load_modules(&module, vec![])
.expect_err("Did not detect heap exhaustion");
}
}
Loading