From bfd17efb1bd3183c4a7b4e61280ee9769befd1e6 Mon Sep 17 00:00:00 2001 From: bryn Date: Wed, 11 Oct 2023 11:29:44 +0100 Subject: [PATCH] Fix and add test for co-processors handling of streaming responses. Fixes #4013 --- .../fix_bryn_fix_coprocessor_stream.md | 5 + .../src/plugins/coprocessor/supergraph.rs | 93 ++++++++++++++++++- apollo-router/src/services/external.rs | 11 +++ apollo-router/src/services/supergraph.rs | 28 ++++++ 4 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 .changesets/fix_bryn_fix_coprocessor_stream.md diff --git a/.changesets/fix_bryn_fix_coprocessor_stream.md b/.changesets/fix_bryn_fix_coprocessor_stream.md new file mode 100644 index 0000000000..4b3d721153 --- /dev/null +++ b/.changesets/fix_bryn_fix_coprocessor_stream.md @@ -0,0 +1,5 @@ +### Fix panic when streaming responses to co-processor ([Issue #4013](https://github.com/apollographql/router/issues/4013)) + +Streamed responses will no longer cause a panic in the co-processor plugin. This affected defer and stream queries. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/4014 diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 389207153b..e64310ce9c 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -428,7 +428,7 @@ where // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and // providing them will be a source of confusion. - let payload = Externalizable::router_builder() + let payload = Externalizable::supergraph_builder() .stage(PipelineStep::SupergraphResponse) .and_id(TraceId::maybe_new().map(|id| id.to_string())) .and_body(body_to_send) @@ -753,13 +753,13 @@ mod tests { let value = response.headers().get("aheader").unwrap(); - assert_eq!("a value", value); + assert_eq!(value, "a value"); assert_eq!( - "my error message", response.body_mut().next().await.unwrap().errors[0] .message - .as_str() + .as_str(), + "my error message" ); } @@ -852,7 +852,7 @@ mod tests { "this-is-a-test-context": 42 } }, - "sdl": "the sdl shouldnt change" + "sdl": "the sdl shouldn't change" }); Ok(hyper::Response::builder() .body(Body::from(serde_json::to_string(&input).unwrap())) @@ -889,8 +889,91 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); // the body should have changed: assert_eq!( + serde_json::to_value(&body).unwrap(), json!({ "data": { "test": 42_u32 } }), + ); + } + + #[tokio::test] + async fn defer() { + let supergraph_stage = SupergraphStage { + response: SupergraphResponseConf { + headers: true, + context: true, + body: true, + sdl: true, + status_code: false, + }, + request: Default::default(), + }; + + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + Ok(supergraph::Response::fake_stream_builder() + .response( + graphql::Response::builder() + .data(json!({ "test": 1 })) + .has_next(true) + .build(), + ) + .response( + graphql::Response::builder() + .data(json!({ "test": 2 })) + .has_next(false) + .build(), + ) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request| { + Box::pin(async { + let deserialized_response: Externalizable = + serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); + assert_eq!( + PipelineStep::SupergraphResponse.to_string(), + deserialized_response.stage + ); + + Ok(hyper::Response::builder() + .body(Body::from( + serde_json::to_string(&deserialized_response).unwrap(), + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::canned_builder() + .query("foo") + .build() + .unwrap(); + + let mut res = service.oneshot(request).await.unwrap(); + + let body = res.response.body_mut().next().await.unwrap(); + // the body should have changed: + assert_eq!( + serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 1 }, "hasNext": true }), + ); + let body = res.response.body_mut().next().await.unwrap(); + // the body should have changed: + assert_eq!( serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 2 }, "hasNext": false }), ); } } diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index 2161ac053d..0c30cec9da 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -261,6 +261,17 @@ mod test { .build(); } + #[test] + #[should_panic] + fn it_will_not_build_router_externalizable_incorrectl_supergraph() { + Externalizable::::router_builder() + .stage(PipelineStep::SupergraphRequest) + .build(); + Externalizable::::router_builder() + .stage(PipelineStep::SupergraphResponse) + .build(); + } + #[test] fn it_will_build_subgraph_externalizable_correctly() { Externalizable::::subgraph_builder() diff --git a/apollo-router/src/services/supergraph.rs b/apollo-router/src/services/supergraph.rs index 6f9707fff4..8647d0cdc3 100644 --- a/apollo-router/src/services/supergraph.rs +++ b/apollo-router/src/services/supergraph.rs @@ -251,6 +251,34 @@ impl Response { ) } + /// This is the constructor (or builder) to use when constructing a "fake" Response stream. + /// + /// This does not enforce the provision of the data that is required for a fully functional + /// Response. It's usually enough for testing, when a fully constructed Response is + /// difficult to construct and not required for the purposes of the test. + /// + /// In addition, fake responses are expected to be valid, and will panic if given invalid values. + #[builder(visibility = "pub")] + fn fake_stream_new( + responses: Vec, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK)); + for (key, values) in headers { + let header_name: HeaderName = key.try_into()?; + for value in values { + let header_value: HeaderValue = value.try_into()?; + builder = builder.header(header_name.clone(), header_value); + } + } + + let stream = futures::stream::iter(responses); + let response = builder.body(stream.boxed())?; + Ok(Self { response, context }) + } + /// This is the constructor (or builder) to use when constructing a Response that represents a global error. /// It has no path and no response data. /// This is useful for things such as authentication errors.