Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide a rhai interface to the router service #3234

Merged
merged 29 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1b37293
First compiling version with working requests
garypen Jun 6, 2023
cbaa181
finish implementing router_service rhai support
garypen Jun 7, 2023
650869d
rename the json encoding/decoding functions
garypen Jun 7, 2023
7d0a29b
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 7, 2023
a0711ae
cargo fmt
garypen Jun 7, 2023
e2b6809
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 14, 2023
48715f1
add minimal documentation and expect() less
garypen Jun 14, 2023
6dc7ab0
add a changeset
garypen Jun 15, 2023
416e5f3
add documentation for json encoding/decoding to changeset
garypen Jun 15, 2023
615019b
add documentation for json coding functions
garypen Jun 15, 2023
5fa868c
expect less in our bridge to the async world
garypen Jun 15, 2023
2a2a26c
add router_service into all rhai callback test
garypen Jun 15, 2023
b0d46ab
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 15, 2023
985fa5c
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 15, 2023
b3b3475
update documentation to include router_service details
garypen Jun 15, 2023
13d3658
code review comments
garypen Jun 21, 2023
d480a3c
Merge branch 'dev' into garypen/2278-rhai-router-service
garypen Jun 21, 2023
9ef2fb3
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 11, 2023
7cc3889
work on streams of Bytes for router requests
Geal Aug 14, 2023
f52f7a5
lint and fix API
Geal Aug 14, 2023
412e77e
Simplify rhai macros (#3569)
Geal Aug 22, 2023
9cd29ec
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 22, 2023
84a3e02
deactivate body access
Geal Aug 22, 2023
89ac69a
fix docs
Geal Aug 22, 2023
d9839a6
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 23, 2023
259f0da
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 29, 2023
4a0a7f9
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Aug 30, 2023
4577bfa
Apply suggestions from code review
Geal Sep 5, 2023
46d4370
Merge branch 'dev' into garypen/2278-rhai-router-service
Geal Sep 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 19 additions & 44 deletions apollo-router/src/plugins/rhai/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use http::uri::PathAndQuery;
use http::HeaderMap;
use http::Method;
use http::Uri;
use hyper::Body;
use rhai::module_resolvers::FileModuleResolver;
use rhai::plugin::*;
use rhai::serde::from_dynamic;
Expand Down Expand Up @@ -580,18 +579,8 @@ mod router_plugin {
#[rhai_fn(get = "body", pure, return_raw)]
pub(crate) fn get_originating_body_router_response(
obj: &mut SharedMut<router::Response>,
) -> Result<String, Box<EvalAltResult>> {
// Get the body
let bytes = obj.with_mut(|response| {
let http_response = std::mem::take(&mut response.response);
let (parts, body) = http_response.into_parts();
let bytes = http_body_as_bytes(body)?;
// Copy back the response so it can continue to be used
response.response = http::Response::from_parts(parts, bytes.clone().into());
Ok::<Bytes, Box<EvalAltResult>>(bytes)
})?;

String::from_utf8(bytes.to_vec()).map_err(|err| err.to_string().into())
) -> Result<Vec<u8>, Box<EvalAltResult>> {
garypen marked this conversation as resolved.
Show resolved Hide resolved
Ok(obj.with_mut(|response| response.response.body().to_vec()))
}

#[rhai_fn(get = "body", pure, return_raw)]
Expand Down Expand Up @@ -1060,14 +1049,25 @@ mod router_plugin {
}
}

#[derive(Default)]
pub(crate) struct RhaiRouterFirstRequest {
pub(crate) context: Context,
pub(crate) request: http::Request<()>,
}
#[derive(Default)]
pub(crate) struct RhaiRouterChunkedRequest {
pub(crate) context: Context,
pub(crate) request: Bytes,
}

#[derive(Default)]
pub(crate) struct RhaiRouterResponse {
pub(crate) context: Context,
pub(crate) response: http::Response<Body>,
pub(crate) response: http::Response<Bytes>,
}

#[derive(Default)]
pub(crate) struct RhaiRouterDeferredResponse {
pub(crate) struct RhaiRouterChunkedResponse {
pub(crate) context: Context,
pub(crate) response: Bytes,
}
Expand Down Expand Up @@ -1105,21 +1105,6 @@ macro_rules! if_subgraph {
};
}

fn http_body_as_bytes(body: Body) -> Result<Bytes, Box<EvalAltResult>> {
futures::executor::block_on(async move {
let hdl = tokio::runtime::Handle::current();
std::thread::spawn(move || {
let _guard = hdl.enter();
hdl.spawn(async move { hyper::body::to_bytes(body).await })
})
.join()
.map_err(|_e| "join failed".to_string())?
.await
.map_err(|e| e.to_string())?
.map_err(|e| e.to_string().into())
})
}

macro_rules! register_rhai_router_interface {
($engine: ident, $($base: ident), *) => {
$(
Expand Down Expand Up @@ -1177,31 +1162,21 @@ macro_rules! register_rhai_router_interface {

$engine.register_get(
"body",
|obj: &mut SharedMut<$base::Request>| -> Result<String, Box<EvalAltResult>> {
// Get the body
let bytes = obj.with_mut(|request| {
let http_request = std::mem::take(&mut request.router_request);
let (parts, body) = http_request.into_parts();
let bytes = http_body_as_bytes(body)?;
// Copy back the request so it can continue to be used
request.router_request = http::Request::from_parts(parts, bytes.clone().into());
Ok::<Bytes, Box<EvalAltResult>>(bytes)
})?;

String::from_utf8(bytes.to_vec()).map_err(|err| err.to_string().into())
|obj: &mut SharedMut<$base::ChunkedRequest>| -> Result<Vec<u8>, Box<EvalAltResult>> {
Ok( obj.with_mut(|request| { request.request.to_vec()}))
}
);

$engine.register_set(
"body",
|obj: &mut SharedMut<$base::Request>, body: String| {
|obj: &mut SharedMut<$base::ChunkedRequest>, body: Vec<u8>| {
if_subgraph! {
$base => {
let _unused = (obj, body);
Err("cannot mutate originating request on a subgraph".into())
} else {
let bytes = Bytes::from(body);
obj.with_mut(|request| *request.router_request.body_mut() = bytes.into());
obj.with_mut(|request| request.request = bytes);
Ok(())
}
}
Expand Down
164 changes: 144 additions & 20 deletions apollo-router/src/plugins/rhai/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use crate::plugin::PluginInit;
use crate::plugins::rhai::engine::OptionDance;
use crate::plugins::rhai::engine::RhaiExecutionDeferredResponse;
use crate::plugins::rhai::engine::RhaiExecutionResponse;
use crate::plugins::rhai::engine::RhaiRouterDeferredResponse;
use crate::plugins::rhai::engine::RhaiRouterChunkedRequest;
use crate::plugins::rhai::engine::RhaiRouterChunkedResponse;
use crate::plugins::rhai::engine::RhaiRouterFirstRequest;
use crate::plugins::rhai::engine::RhaiRouterResponse;
use crate::plugins::rhai::engine::RhaiSupergraphDeferredResponse;
use crate::plugins::rhai::engine::RhaiSupergraphResponse;
Expand Down Expand Up @@ -374,21 +376,8 @@ macro_rules! gen_map_request {
Ok(ControlFlow::Break(res))
}
let shared_request = Shared::new(Mutex::new(Some(request)));
let result: Result<Dynamic, Box<EvalAltResult>> = if $callback.is_curried() {
$callback.call(
&$rhai_service.engine,
&$rhai_service.ast,
(shared_request.clone(),),
)
} else {
let mut guard = $rhai_service.scope.lock().unwrap();
$rhai_service.engine.call_fn(
&mut guard,
&$rhai_service.ast,
$callback.fn_name(),
(shared_request.clone(),),
)
};
let result: Result<Dynamic, Box<EvalAltResult>> =
execute(&$rhai_service, &$callback, (shared_request.clone(),));
if let Err(error) = result {
let error_details = process_error(error);
tracing::error!("map_request callback failed: {error_details:#?}");
Expand Down Expand Up @@ -469,6 +458,140 @@ macro_rules! gen_map_deferred_request {
})
};
}

// Actually use the checkpoint function so that we can shortcut requests which fail
macro_rules! gen_map_router_deferred_request {
($request: ident, $response: ident, $rhai_first_request: ident, $rhai_deferred_request: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
$borrow.replace(|service| {
fn rhai_service_span() -> impl Fn(&$request) -> tracing::Span + Clone {
move |_request: &$request| {
tracing::info_span!(
RHAI_SPAN_NAME,
"rhai service" = stringify!($request),
"otel.kind" = "INTERNAL"
)
}
}
ServiceBuilder::new()
.instrument(rhai_service_span())
.checkpoint( move |chunked_request: $request| {
// Let's define a local function to build an error response
fn failure_message(
context: Context,
error_details: ErrorDetails,
) -> Result<ControlFlow<$response, $request>, BoxError> {
let res = if let Some(body) = error_details.body {
$response::builder()
.extensions(body.extensions)
.errors(body.errors)
.status_code(error_details.status)
.context(context)
.and_data(body.data)
.and_label(body.label)
.and_path(body.path)
.build()?
} else {
$response::error_builder()
.errors(vec![Error {
message: error_details.message.unwrap_or_default(),
..Default::default()
}])
.context(context)
.status_code(error_details.status)
.build()?
};

Ok(ControlFlow::Break(res))
}

// we split the request stream into headers+first body chunk, then a stream of chunks
// for which we will implement mapping later
let $request { router_request, context } = chunked_request;
let (parts, stream) = router_request.into_parts();

let request = $rhai_first_request {
context,
request: http::Request::from_parts(
parts,
(),
),
};
let shared_request = Shared::new(Mutex::new(Some(request)));
let result = execute(&$rhai_service, &$callback, (shared_request.clone(),));

if let Err(error) = result {
tracing::error!("map_request callback failed: {error}");
let error_details = process_error(error);
let mut guard = shared_request.lock().unwrap();
let request_opt = guard.take();
return failure_message(request_opt.unwrap().context, error_details);
}

let request_opt = shared_request.lock().unwrap().take();

let $rhai_first_request { context, request } =
request_opt.unwrap();
let (parts, _body) = http::Request::from(request).into_parts();


//let mut first = true;
let ctx = context.clone();
let rhai_service = $rhai_service.clone();
let callback = $callback.clone();

let mapped_stream = stream
.map_err(BoxError::from)
.and_then(move |chunk| {
//let rhai_service = $rhai_service.clone();
Geal marked this conversation as resolved.
Show resolved Hide resolved
let context = ctx.clone();
let rhai_service = rhai_service.clone();
let callback = callback.clone();
async move {
let request = $rhai_deferred_request {
context,
request: chunk.into(),
};
let shared_request = Shared::new(Mutex::new(Some(request)));

let result = execute(
&rhai_service,
&callback,
(shared_request.clone(),),
);

if let Err(error) = result {
tracing::error!("map_request callback failed: {error}");
let error_details = process_error(error);
let error = Error {
message: error_details.message.unwrap_or_default(),
..Default::default()
};
// We don't have a structured response to work with here. Let's
// throw away our response and custom build an error response
let error_response = graphql::Response::builder()
.errors(vec![error]).build();
return Ok(serde_json::to_vec(&error_response)?.into());
}

let request_opt = shared_request.lock().unwrap().take();
let $rhai_deferred_request { request, .. } =
request_opt.unwrap();
Ok(request)
}
});

// Finally, return a response which has a Body that wraps our stream of response chunks.
Ok(ControlFlow::Continue($request {
context,
router_request: http::Request::from_parts(parts, hyper::Body::wrap_stream(mapped_stream)),
}))
})
.service(service)
.boxed()
})
};
}

macro_rules! gen_map_response {
($base: ident, $borrow: ident, $rhai_service: ident, $callback: ident) => {
$borrow.replace(|service| {
Expand Down Expand Up @@ -690,8 +813,7 @@ macro_rules! gen_map_router_deferred_response {

// Create our response stream which consists of the bytes from our first body chained with the
// rest of the responses in our mapped stream.
let bytes = hyper::body::to_bytes(body).await.map_err(BoxError::from);
let final_stream = once(ready(bytes)).chain(mapped_stream).boxed();
let final_stream = once(ready(Ok(body))).chain(mapped_stream).boxed();

// Finally, return a response which has a Body that wraps our stream of response chunks.
Ok($response {
Expand Down Expand Up @@ -849,9 +971,11 @@ impl ServiceStep {
fn map_request(&mut self, rhai_service: RhaiService, callback: FnPtr) {
match self {
ServiceStep::Router(service) => {
gen_map_deferred_request!(
gen_map_router_deferred_request!(
RouterRequest,
RouterResponse,
RhaiRouterFirstRequest,
RhaiRouterChunkedRequest,
service,
rhai_service,
callback
Expand Down Expand Up @@ -887,7 +1011,7 @@ impl ServiceStep {
gen_map_router_deferred_response!(
RouterResponse,
RhaiRouterResponse,
RhaiRouterDeferredResponse,
RhaiRouterChunkedResponse,
service,
rhai_service,
callback
Expand Down
4 changes: 3 additions & 1 deletion apollo-router/src/plugins/rhai/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! router module

pub(crate) use crate::services::router::*;
pub(crate) type FirstRequest = super::engine::RhaiRouterFirstRequest;
pub(crate) type ChunkedRequest = super::engine::RhaiRouterChunkedRequest;
pub(crate) type Response = super::engine::RhaiRouterResponse;
pub(crate) type DeferredResponse = super::engine::RhaiRouterDeferredResponse;
pub(crate) type DeferredResponse = super::engine::RhaiRouterChunkedResponse;