Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide a rhai interface to the router service #3234

Merged
merged 29 commits into from
Sep 5, 2023
Merged
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1b37293
First compiling version with working requests
garypen Jun 6, 2023
cbaa181
finish implementing router_service rhai support
garypen Jun 7, 2023
650869d
rename the json encoding/decoding functions
garypen Jun 7, 2023
7d0a29b
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 7, 2023
a0711ae
cargo fmt
garypen Jun 7, 2023
e2b6809
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 14, 2023
48715f1
add minimal documentation and expect() less
garypen Jun 14, 2023
6dc7ab0
add a changeset
garypen Jun 15, 2023
416e5f3
add documentation for json encoding/decoding to changeset
garypen Jun 15, 2023
615019b
add documentation for json coding functions
garypen Jun 15, 2023
5fa868c
expect less in our bridge to the async world
garypen Jun 15, 2023
2a2a26c
add router_service into all rhai callback test
garypen Jun 15, 2023
b0d46ab
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 15, 2023
985fa5c
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 15, 2023
b3b3475
update documentation to include router_service details
garypen Jun 15, 2023
13d3658
code review comments
garypen Jun 21, 2023
d480a3c
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 21, 2023
9ef2fb3
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 11, 2023
7cc3889
work on streams of Bytes for router requests
Geal Aug 14, 2023
f52f7a5
lint and fix API
Geal Aug 14, 2023
412e77e
Simplify rhai macros (#3569)
Geal Aug 22, 2023
9cd29ec
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 22, 2023
84a3e02
deactivate body access
Geal Aug 22, 2023
89ac69a
fix docs
Geal Aug 22, 2023
d9839a6
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 23, 2023
259f0da
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 29, 2023
4a0a7f9
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 30, 2023
4577bfa
Apply suggestions from code review
Geal Sep 5, 2023
46d4370
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Sep 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
work on streams of Bytes for router requests
The Router level requests and responses have a stream of bytes as body.
Instead of accumulating the entire body, which might cause issues (DoS
on the request side, breaking `@defer` and subscriptions on the response
side), we will call the rhai code on every chunk.

On the request side:
- the first call contains the headers, but no body. This is done to
  avoid awaiting on the first chunk, which would need an async checkpoint,
  and having a clonable service as inner service for rhai
- subsequent calls will each contain a chunk, returned as a Vec<u8>, and
  cannot access the headers

On the response side:
- the first call contains the headers and the first chunk: so far we
  guarantee that there's at least one GraphQL response when creating the
  router response
- subsequent calls will each contain a chunk, returned as a Vec<u8>, and
  cannot access the headers
Geal committed Aug 14, 2023
commit 7cc3889329dce51d9ccabf130135ff68e7c7ab16
63 changes: 19 additions & 44 deletions apollo-router/src/plugins/rhai/engine.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ use http::uri::PathAndQuery;
use http::HeaderMap;
use http::Method;
use http::Uri;
use hyper::Body;
use rhai::module_resolvers::FileModuleResolver;
use rhai::plugin::*;
use rhai::serde::from_dynamic;
@@ -580,18 +579,8 @@ mod router_plugin {
#[rhai_fn(get = "body", pure, return_raw)]
pub(crate) fn get_originating_body_router_response(
obj: &mut SharedMut<router::Response>,
) -> Result<String, Box<EvalAltResult>> {
// Get the body
let bytes = obj.with_mut(|response| {
let http_response = std::mem::take(&mut response.response);
let (parts, body) = http_response.into_parts();
let bytes = http_body_as_bytes(body)?;
// Copy back the response so it can continue to be used
response.response = http::Response::from_parts(parts, bytes.clone().into());
Ok::<Bytes, Box<EvalAltResult>>(bytes)
})?;

String::from_utf8(bytes.to_vec()).map_err(|err| err.to_string().into())
) -> Result<Vec<u8>, Box<EvalAltResult>> {
garypen marked this conversation as resolved.
Show resolved Hide resolved
Ok(obj.with_mut(|response| response.response.body().to_vec()))
}

#[rhai_fn(get = "body", pure, return_raw)]
@@ -1060,14 +1049,25 @@ mod router_plugin {
}
}

#[derive(Default)]
pub(crate) struct RhaiRouterFirstRequest {
pub(crate) context: Context,
pub(crate) request: http::Request<()>,
}
#[derive(Default)]
pub(crate) struct RhaiRouterChunkedRequest {
pub(crate) context: Context,
pub(crate) request: Bytes,
}

#[derive(Default)]
pub(crate) struct RhaiRouterResponse {
pub(crate) context: Context,
pub(crate) response: http::Response<Body>,
pub(crate) response: http::Response<Bytes>,
}

#[derive(Default)]
pub(crate) struct RhaiRouterDeferredResponse {
pub(crate) struct RhaiRouterChunkedResponse {
pub(crate) context: Context,
pub(crate) response: Bytes,
}
@@ -1105,21 +1105,6 @@ macro_rules! if_subgraph {
};
}

fn http_body_as_bytes(body: Body) -> Result<Bytes, Box<EvalAltResult>> {
futures::executor::block_on(async move {
let hdl = tokio::runtime::Handle::current();
std::thread::spawn(move || {
let _guard = hdl.enter();
hdl.spawn(async move { hyper::body::to_bytes(body).await })
})
.join()
.map_err(|_e| "join failed".to_string())?
.await
.map_err(|e| e.to_string())?
.map_err(|e| e.to_string().into())
})
}

macro_rules! register_rhai_router_interface {
($engine: ident, $($base: ident), *) => {
$(
@@ -1177,31 +1162,21 @@ macro_rules! register_rhai_router_interface {

$engine.register_get(
"body",
|obj: &mut SharedMut<$base::Request>| -> Result<String, Box<EvalAltResult>> {
// Get the body
let bytes = obj.with_mut(|request| {
let http_request = std::mem::take(&mut request.router_request);
let (parts, body) = http_request.into_parts();
let bytes = http_body_as_bytes(body)?;
// Copy back the request so it can continue to be used
request.router_request = http::Request::from_parts(parts, bytes.clone().into());
Ok::<Bytes, Box<EvalAltResult>>(bytes)
})?;

String::from_utf8(bytes.to_vec()).map_err(|err| err.to_string().into())
|obj: &mut SharedMut<$base::ChunkedRequest>| -> Result<Vec<u8>, Box<EvalAltResult>> {
Ok( obj.with_mut(|request| { request.request.to_vec()}))
}
);

$engine.register_set(
"body",
|obj: &mut SharedMut<$base::Request>, body: String| {
|obj: &mut SharedMut<$base::ChunkedRequest>, body: Vec<u8>| {
if_subgraph! {
$base => {
let _unused = (obj, body);
Err("cannot mutate originating request on a subgraph".into())
} else {
let bytes = Bytes::from(body);
obj.with_mut(|request| *request.router_request.body_mut() = bytes.into());
obj.with_mut(|request| request.request = bytes);
Ok(())
}
}
164 changes: 144 additions & 20 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
@@ -49,7 +49,9 @@ use crate::plugin::PluginInit;
use crate::plugins::rhai::engine::OptionDance;
use crate::plugins::rhai::engine::RhaiExecutionDeferredResponse;
use crate::plugins::rhai::engine::RhaiExecutionResponse;
use crate::plugins::rhai::engine::RhaiRouterDeferredResponse;
use crate::plugins::rhai::engine::RhaiRouterChunkedRequest;
use crate::plugins::rhai::engine::RhaiRouterChunkedResponse;
use crate::plugins::rhai::engine::RhaiRouterFirstRequest;
use crate::plugins::rhai::engine::RhaiRouterResponse;
use crate::plugins::rhai::engine::RhaiSupergraphDeferredResponse;
use crate::plugins::rhai::engine::RhaiSupergraphResponse;
@@ -374,21 +376,8 @@ macro_rules! gen_map_request {
Ok(ControlFlow::Break(res))
}
let shared_request = Shared::new(Mutex::new(Some(request)));
let result: Result<Dynamic, Box<EvalAltResult>> = if $callback.is_curried() {
$callback.call(
&$rhai_service.engine,
&$rhai_service.ast,
(shared_request.clone(),),
)
} else {
let mut guard = $rhai_service.scope.lock().unwrap();
$rhai_service.engine.call_fn(
&mut guard,
&$rhai_service.ast,
$callback.fn_name(),
(shared_request.clone(),),
)
};
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_request.clone(),));
if let Err(error) = result {
let error_details = process_error(error);
tracing::error!("map_request callback failed: {error_details:#?}");
@@ -469,6 +458,140 @@ macro_rules! gen_map_deferred_request {
})
};
}

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_router_deferred_request {
($request: ident, $response: ident, $rhai_first_request: ident, $rhai_deferred_request: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$request) -> tracing::Span + Clone {
move |_request: &$request| {
tracing::info_span!(
RHAI_SPAN_NAME,
"rhai service" = stringify!($request),
"otel.kind" = "INTERNAL"
)
}
}
ServiceBuilder::new()
.instrument(rhai_service_span())
.checkpoint( move |chunked_request: $request| {
// Let's define a local function to build an error response
fn failure_message(
context: Context,
error_details: ErrorDetails,
) -> Result<ControlFlow<$response, $request>, BoxError> {
let res = if let Some(body) = error_details.body {
$response::builder()
.extensions(body.extensions)
.errors(body.errors)
.status_code(error_details.status)
.context(context)
.and_data(body.data)
.and_label(body.label)
.and_path(body.path)
.build()?
} else {
$response::error_builder()
.errors(vec![Error {
message: error_details.message.unwrap_or_default(),
..Default::default()
}])
.context(context)
.status_code(error_details.status)
.build()?
};

Ok(ControlFlow::Break(res))
}

// we split the request stream into headers+first body chunk, then a stream of chunks
// for which we will implement mapping later
let $request { router_request, context } = chunked_request;
let (parts, stream) = router_request.into_parts();

let request = $rhai_first_request {
context,
request: http::Request::from_parts(
parts,
(),
),
};
let shared_request = Shared::new(Mutex::new(Some(request)));
let result = execute(&$rhai_service, &$callback, (shared_request.clone(),));

if let Err(error) = result {
tracing::error!("map_request callback failed: {error}");
let error_details = process_error(error);
let mut guard = shared_request.lock().unwrap();
let request_opt = guard.take();
return failure_message(request_opt.unwrap().context, error_details);
}

let request_opt = shared_request.lock().unwrap().take();

let $rhai_first_request { context, request } =
request_opt.unwrap();
let (parts, _body) = http::Request::from(request).into_parts();


//let mut first = true;
let ctx = context.clone();
let rhai_service = $rhai_service.clone();
let callback = $callback.clone();

let mapped_stream = stream
.map_err(BoxError::from)
.and_then(move |chunk| {
//let rhai_service = $rhai_service.clone();
Geal marked this conversation as resolved.
Show resolved Hide resolved
let context = ctx.clone();
let rhai_service = rhai_service.clone();
let callback = callback.clone();
async move {
let request = $rhai_deferred_request {
context,
request: chunk.into(),
};
let shared_request = Shared::new(Mutex::new(Some(request)));

let result = execute(
&rhai_service,
&callback,
(shared_request.clone(),),
);

if let Err(error) = result {
tracing::error!("map_request callback failed: {error}");
let error_details = process_error(error);
let error = Error {
message: error_details.message.unwrap_or_default(),
..Default::default()
};
// We don't have a structured response to work with here. Let's
// throw away our response and custom build an error response
let error_response = graphql::Response::builder()
.errors(vec![error]).build();
return Ok(serde_json::to_vec(&error_response)?.into());
}

let request_opt = shared_request.lock().unwrap().take();
let $rhai_deferred_request { request, .. } =
request_opt.unwrap();
Ok(request)
}
});

// Finally, return a response which has a Body that wraps our stream of response chunks.
Ok(ControlFlow::Continue($request {
context,
router_request: http::Request::from_parts(parts, hyper::Body::wrap_stream(mapped_stream)),
}))
})
.service(service)
.boxed()
})
};
}

macro_rules! gen_map_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
$borrow.replace(|service| {
@@ -690,8 +813,7 @@ macro_rules! gen_map_router_deferred_response {

// Create our response stream which consists of the bytes from our first body chained with the
// rest of the responses in our mapped stream.
let bytes = hyper::body::to_bytes(body).await.map_err(BoxError::from);
let final_stream = once(ready(bytes)).chain(mapped_stream).boxed();
let final_stream = once(ready(Ok(body))).chain(mapped_stream).boxed();

// Finally, return a response which has a Body that wraps our stream of response chunks.
Ok($response {
@@ -849,9 +971,11 @@ impl ServiceStep {
fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_deferred_request!(
gen_map_router_deferred_request!(
RouterRequest,
RouterResponse,
RhaiRouterFirstRequest,
RhaiRouterChunkedRequest,
service,
rhai_service,
callback
@@ -887,7 +1011,7 @@ impl ServiceStep {
gen_map_router_deferred_response!(
RouterResponse,
RhaiRouterResponse,
RhaiRouterDeferredResponse,
RhaiRouterChunkedResponse,
service,
rhai_service,
callback
4 changes: 3 additions & 1 deletion apollo-router/src/plugins/rhai/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! router module
pub(crate) use crate::services::router::*;
pub(crate) type FirstRequest = super::engine::RhaiRouterFirstRequest;
pub(crate) type ChunkedRequest = super::engine::RhaiRouterChunkedRequest;
pub(crate) type Response = super::engine::RhaiRouterResponse;
pub(crate) type DeferredResponse = super::engine::RhaiRouterDeferredResponse;
pub(crate) type DeferredResponse = super::engine::RhaiRouterChunkedResponse;