From b917b8c117b46a2d508428c0856f4927dfcfc341 Mon Sep 17 00:00:00 2001 From: Bryn Cooke Date: Wed, 11 Oct 2023 11:58:29 +0100 Subject: [PATCH] Fix and add test for co-processors handling of streaming responses. (#4014) The logic for supporting deferred responses can't work. The issues is here: [router/apollo-router/src/plugins/coprocessor/supergraph.rs](https://github.com/apollographql/router/blob/cf4a79a126132eb30e368bdf2277a89278ae2a9b/apollo-router/src/plugins/coprocessor/supergraph.rs#L431-L432) This will always panic as the wrong builder is used for the stage. Fixes #4013 --- **Checklist** Complete the checklist (and note appropriate exceptions) before the PR is marked ready-for-review. - [ ] Changes are compatible[^1] - [ ] Documentation[^2] completed - [ ] Performance impact assessed and acceptable - Tests added and passing[^3] - [ ] Unit Tests - [ ] Integration Tests - [ ] Manual Tests **Exceptions** *Note any exceptions here* **Notes** [^1]: It may be appropriate to bring upcoming changes to the attention of other (impacted) groups. Please endeavour to do this before seeking PR approval. The mechanism for doing this will vary considerably, so use your judgement as to how and when to do this. [^2]: Configuration is an important part of many changes. Where applicable please try to document configuration examples. [^3]: Tick whichever testing boxes are applicable. If you are adding Manual Tests, please document the manual testing (extensively) in the Exceptions. Co-authored-by: bryn --- .../fix_bryn_fix_coprocessor_stream.md | 5 + .../src/plugins/coprocessor/supergraph.rs | 93 ++++++++++++++++++- apollo-router/src/services/external.rs | 11 +++ apollo-router/src/services/supergraph.rs | 28 ++++++ 4 files changed, 132 insertions(+), 5 deletions(-) create mode 100644 .changesets/fix_bryn_fix_coprocessor_stream.md diff --git a/.changesets/fix_bryn_fix_coprocessor_stream.md b/.changesets/fix_bryn_fix_coprocessor_stream.md new file mode 100644 index 0000000000..4b3d721153 --- /dev/null +++ b/.changesets/fix_bryn_fix_coprocessor_stream.md @@ -0,0 +1,5 @@ +### Fix panic when streaming responses to co-processor ([Issue #4013](https://github.com/apollographql/router/issues/4013)) + +Streamed responses will no longer cause a panic in the co-processor plugin. This affected defer and stream queries. + +By [@BrynCooke](https://github.com/BrynCooke) in https://github.com/apollographql/router/pull/4014 diff --git a/apollo-router/src/plugins/coprocessor/supergraph.rs b/apollo-router/src/plugins/coprocessor/supergraph.rs index 389207153b..e64310ce9c 100644 --- a/apollo-router/src/plugins/coprocessor/supergraph.rs +++ b/apollo-router/src/plugins/coprocessor/supergraph.rs @@ -428,7 +428,7 @@ where // Note: We deliberately DO NOT send headers or status_code even if the user has // requested them. That's because they are meaningless on a deferred response and // providing them will be a source of confusion. - let payload = Externalizable::router_builder() + let payload = Externalizable::supergraph_builder() .stage(PipelineStep::SupergraphResponse) .and_id(TraceId::maybe_new().map(|id| id.to_string())) .and_body(body_to_send) @@ -753,13 +753,13 @@ mod tests { let value = response.headers().get("aheader").unwrap(); - assert_eq!("a value", value); + assert_eq!(value, "a value"); assert_eq!( - "my error message", response.body_mut().next().await.unwrap().errors[0] .message - .as_str() + .as_str(), + "my error message" ); } @@ -852,7 +852,7 @@ mod tests { "this-is-a-test-context": 42 } }, - "sdl": "the sdl shouldnt change" + "sdl": "the sdl shouldn't change" }); Ok(hyper::Response::builder() .body(Body::from(serde_json::to_string(&input).unwrap())) @@ -889,8 +889,91 @@ mod tests { let body = res.response.body_mut().next().await.unwrap(); // the body should have changed: assert_eq!( + serde_json::to_value(&body).unwrap(), json!({ "data": { "test": 42_u32 } }), + ); + } + + #[tokio::test] + async fn defer() { + let supergraph_stage = SupergraphStage { + response: SupergraphResponseConf { + headers: true, + context: true, + body: true, + sdl: true, + status_code: false, + }, + request: Default::default(), + }; + + let mut mock_supergraph_service = MockSupergraphService::new(); + + mock_supergraph_service + .expect_call() + .returning(|req: supergraph::Request| { + Ok(supergraph::Response::fake_stream_builder() + .response( + graphql::Response::builder() + .data(json!({ "test": 1 })) + .has_next(true) + .build(), + ) + .response( + graphql::Response::builder() + .data(json!({ "test": 2 })) + .has_next(false) + .build(), + ) + .context(req.context) + .build() + .unwrap()) + }); + + let mock_http_client = mock_with_deferred_callback(move |res: hyper::Request| { + Box::pin(async { + let deserialized_response: Externalizable = + serde_json::from_slice(&hyper::body::to_bytes(res.into_body()).await.unwrap()) + .unwrap(); + assert_eq!(EXTERNALIZABLE_VERSION, deserialized_response.version); + assert_eq!( + PipelineStep::SupergraphResponse.to_string(), + deserialized_response.stage + ); + + Ok(hyper::Response::builder() + .body(Body::from( + serde_json::to_string(&deserialized_response).unwrap(), + )) + .unwrap()) + }) + }); + + let service = supergraph_stage.as_service( + mock_http_client, + mock_supergraph_service.boxed(), + "http://test".to_string(), + Arc::new("".to_string()), + ); + + let request = supergraph::Request::canned_builder() + .query("foo") + .build() + .unwrap(); + + let mut res = service.oneshot(request).await.unwrap(); + + let body = res.response.body_mut().next().await.unwrap(); + // the body should have changed: + assert_eq!( + serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 1 }, "hasNext": true }), + ); + let body = res.response.body_mut().next().await.unwrap(); + // the body should have changed: + assert_eq!( serde_json::to_value(&body).unwrap(), + json!({ "data": { "test": 2 }, "hasNext": false }), ); } } diff --git a/apollo-router/src/services/external.rs b/apollo-router/src/services/external.rs index 2161ac053d..0c30cec9da 100644 --- a/apollo-router/src/services/external.rs +++ b/apollo-router/src/services/external.rs @@ -261,6 +261,17 @@ mod test { .build(); } + #[test] + #[should_panic] + fn it_will_not_build_router_externalizable_incorrectl_supergraph() { + Externalizable::::router_builder() + .stage(PipelineStep::SupergraphRequest) + .build(); + Externalizable::::router_builder() + .stage(PipelineStep::SupergraphResponse) + .build(); + } + #[test] fn it_will_build_subgraph_externalizable_correctly() { Externalizable::::subgraph_builder() diff --git a/apollo-router/src/services/supergraph.rs b/apollo-router/src/services/supergraph.rs index 6f9707fff4..8647d0cdc3 100644 --- a/apollo-router/src/services/supergraph.rs +++ b/apollo-router/src/services/supergraph.rs @@ -251,6 +251,34 @@ impl Response { ) } + /// This is the constructor (or builder) to use when constructing a "fake" Response stream. + /// + /// This does not enforce the provision of the data that is required for a fully functional + /// Response. It's usually enough for testing, when a fully constructed Response is + /// difficult to construct and not required for the purposes of the test. + /// + /// In addition, fake responses are expected to be valid, and will panic if given invalid values. + #[builder(visibility = "pub")] + fn fake_stream_new( + responses: Vec, + status_code: Option, + headers: MultiMap, + context: Context, + ) -> Result { + let mut builder = http::Response::builder().status(status_code.unwrap_or(StatusCode::OK)); + for (key, values) in headers { + let header_name: HeaderName = key.try_into()?; + for value in values { + let header_value: HeaderValue = value.try_into()?; + builder = builder.header(header_name.clone(), header_value); + } + } + + let stream = futures::stream::iter(responses); + let response = builder.body(stream.boxed())?; + Ok(Self { response, context }) + } + /// This is the constructor (or builder) to use when constructing a Response that represents a global error. /// It has no path and no response data. /// This is useful for things such as authentication errors.