diff --git a/.changesets/fix_bryn_fix_coprocessor_stream.md b/.changesets/fix_bryn_fix_coprocessor_stream.md
new file mode 100644
index 0000000000..4b3d721153
--- /dev/null
+++ b/.changesets/fix_bryn_fix_coprocessor_stream.md
@@ -0,0 +1,5 @@
+### Fix panic when streaming responses to co-processor ([Issue #4013](https://github.com/apollographql/router/issues/4013))
+
+Streamed responses will no longer cause a panic in the co-processor plugin. This affected defer and stream queries.
+
+By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/4014
diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs
index 389207153b..e64310ce9c 100644
--- a/apollo-router/src/plugins/coprocessor/supergraph.rs
+++ b/apollo-router/src/plugins/coprocessor/supergraph.rs
@@ -428,7 +428,7 @@ where
// Note: We deliberately DO NOT send headers or status_code even if the user has
// requested them. That's because they are meaningless on a deferred response and
// providing them will be a source of confusion.
- let payload = Externalizable::router_builder()
+ let payload = Externalizable::supergraph_builder()
.stage(PipelineStep::SupergraphResponse)
.and_id(TraceId::maybe_new().map(|id| id.to_string()))
.and_body(body_to_send)
@@ -753,13 +753,13 @@ mod tests {
let value = response.headers().get("aheader").unwrap();
- assert_eq!("a value", value);
+ assert_eq!(value, "a value");
assert_eq!(
- "my error message",
response.body_mut().next().await.unwrap().errors[0]
.message
- .as_str()
+ .as_str(),
+ "my error message"
);
}
@@ -852,7 +852,7 @@ mod tests {
"this-is-a-test-context": 42
}
},
- "sdl": "the sdl shouldnt change"
+ "sdl": "the sdl shouldn't change"
});
Ok(hyper::Response::builder()
.body(Body::from(serde_json::to_string(&input).unwrap()))
@@ -889,8 +889,91 @@ mod tests {
let body = res.response.body_mut().next().await.unwrap();
// the body should have changed:
assert_eq!(
+ serde_json::to_value(&body).unwrap(),
json!({ "data": { "test": 42_u32 } }),
+ );
+ }
+
+ #[tokio::test]
+ async fn defer() {
+ let supergraph_stage = SupergraphStage {
+ response: SupergraphResponseConf {
+ headers: true,
+ context: true,
+ body: true,
+ sdl: true,
+ status_code: false,
+ },
+ request: Default::default(),
+ };
+
+ let mut mock_supergraph_service = MockSupergraphService::new();
+
+ mock_supergraph_service
+ .expect_call()
+ .returning(|req: supergraph::Request| {
+ Ok(supergraph::Response::fake_stream_builder()
+ .response(
+ graphql::Response::builder()
+ .data(json!({ "test": 1 }))
+ .has_next(true)
+ .build(),
+ )
+ .response(
+ graphql::Response::builder()
+ .data(json!({ "test": 2 }))
+ .has_next(false)
+ .build(),
+ )
+ .context(req.context)
+ .build()
+ .unwrap())
+ });
+
+ let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request
| {
+ Box::pin(async {
+ let deserialized_response: Externalizable =
+ serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap())
+ .unwrap();
+ assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version);
+ assert_eq!(
+ PipelineStep::SupergraphResponse.to_string(),
+ deserialized_response.stage
+ );
+
+ Ok(hyper::Response::builder()
+ .body(Body::from(
+ serde_json::to_string(&deserialized_response).unwrap(),
+ ))
+ .unwrap())
+ })
+ });
+
+ let service = supergraph_stage.as_service(
+ mock_http_client,
+ mock_supergraph_service.boxed(),
+ "http://test".to_string(),
+ Arc::new("".to_string()),
+ );
+
+ let request = supergraph::Request::canned_builder()
+ .query("foo")
+ .build()
+ .unwrap();
+
+ let mut res = service.oneshot(request).await.unwrap();
+
+ let body = res.response.body_mut().next().await.unwrap();
+ // the body should have changed:
+ assert_eq!(
+ serde_json::to_value(&body).unwrap(),
+ json!({ "data": { "test": 1 }, "hasNext": true }),
+ );
+ let body = res.response.body_mut().next().await.unwrap();
+ // the body should have changed:
+ assert_eq!(
serde_json::to_value(&body).unwrap(),
+ json!({ "data": { "test": 2 }, "hasNext": false }),
);
}
}
diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs
index 2161ac053d..0c30cec9da 100644
--- a/apollo-router/src/services/external.rs
+++ b/apollo-router/src/services/external.rs
@@ -261,6 +261,17 @@ mod test {
.build();
}
+ #[test]
+ #[should_panic]
+ fn it_will_not_build_router_externalizable_incorrectl_supergraph() {
+ Externalizable::::router_builder()
+ .stage(PipelineStep::SupergraphRequest)
+ .build();
+ Externalizable::::router_builder()
+ .stage(PipelineStep::SupergraphResponse)
+ .build();
+ }
+
#[test]
fn it_will_build_subgraph_externalizable_correctly() {
Externalizable::::subgraph_builder()
diff --git a/apollo-router/src/services/supergraph.rs b/apollo-router/src/services/supergraph.rs
index 6f9707fff4..8647d0cdc3 100644
--- a/apollo-router/src/services/supergraph.rs
+++ b/apollo-router/src/services/supergraph.rs
@@ -251,6 +251,34 @@ impl Response {
)
}
+ /// This is the constructor (or builder) to use when constructing a "fake" Response stream.
+ ///
+ /// This does not enforce the provision of the data that is required for a fully functional
+ /// Response. It's usually enough for testing, when a fully constructed Response is
+ /// difficult to construct and not required for the purposes of the test.
+ ///
+ /// In addition, fake responses are expected to be valid, and will panic if given invalid values.
+ #[builder(visibility = "pub")]
+ fn fake_stream_new(
+ responses: Vec,
+ status_code: Option,
+ headers: MultiMap,
+ context: Context,
+ ) -> Result {
+ let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK));
+ for (key, values) in headers {
+ let header_name: HeaderName = key.try_into()?;
+ for value in values {
+ let header_value: HeaderValue = value.try_into()?;
+ builder = builder.header(header_name.clone(), header_value);
+ }
+ }
+
+ let stream = futures::stream::iter(responses);
+ let response = builder.body(stream.boxed())?;
+ Ok(Self { response, context })
+ }
+
/// This is the constructor (or builder) to use when constructing a Response that represents a global error.
/// It has no path and no response data.
/// This is useful for things such as authentication errors.