Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supergraph coprocessor implementation #3408

Merged
merged 29 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b7292d3
move the coprocessor to a subfolder
Geal Jul 10, 2023
72f42cc
request side supergraph coprocessor
Geal Jul 10, 2023
c53e9b0
snapshot
Geal Jul 11, 2023
6a5a3b1
add one test
Geal Jul 11, 2023
26fd65c
another test
Geal Jul 11, 2023
82b1886
lint
Geal Jul 11, 2023
8106f25
lint
Geal Jul 11, 2023
8f29ed1
fix tests
Geal Jul 11, 2023
cfc6e15
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Jul 11, 2023
b9ca416
supergraph response side
Geal Jul 11, 2023
97efd5d
fixes to other coprocessor implementations
Geal Jul 11, 2023
fc62b53
lint
Geal Jul 11, 2023
90b96b3
add a test for the response side
Geal Jul 11, 2023
1ffe47d
changeset placeholder
Geal Jul 11, 2023
4a43689
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Jul 11, 2023
494b939
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Jul 13, 2023
2680e59
cleanup
Geal Jul 13, 2023
4b19e59
changeset
Geal Jul 13, 2023
2c39481
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Jul 13, 2023
bdfea1d
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Jul 27, 2023
35a875e
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Jul 28, 2023
285d817
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Aug 1, 2023
2df96f5
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Aug 9, 2023
e5405ca
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Aug 9, 2023
a38452c
metrics
Geal Aug 9, 2023
7f711a9
Update apollo-router/src/plugins/coprocessor/supergraph.rs
Geal Aug 10, 2023
63b43bb
lint
Geal Aug 11, 2023
6cf1c4d
add docs
Geal Aug 11, 2023
7cf6470
Merge branch 'dev' into geal/supergraph-coprocessor
Geal Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changesets/feat_geal_supergraph_coprocessor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
### Supergraph coprocessor implementation ([PR #3408](https://github.com/apollographql/router/pull/3408))

This adds support for coprocessors at the supergraph service level. Supergraph plugins work on the request side with a parsed GraphQL request object, so the query and operation name, variables and extensions are directly accessible. On the response side, they handle GraphQL response objects, with label, data, path, errors, extensions. The supergraph response contains a stream of GraphQL responses, which can contain multiple elements if the query uses `@defer` or subscriptions. When configured to observe the responses, the coprocessor will be called for each of the deferred responses.

By [@Geal](https://github.com/Geal) in https://github.com/apollographql/router/pull/3408
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,113 @@ expression: "&schema"
},
"additionalProperties": false
},
"supergraph": {
"description": "The supergraph stage request/response configuration",
"default": {
"request": {
"headers": false,
"context": false,
"body": false,
"sdl": false,
"path": false,
"method": false
Comment on lines +779 to +782
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think method and path are relevant for the supergraph coprocessor, at this point we're speaking graphql right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think there should be other fields. I didn't know if we wanted to have an entire response field or be able to split the response parts when sending to the coprocessor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to know/decide what's required at the different stages... I tend to err on the side of "if you think someone might want it, and it's easy to provide, then include it, since it can be configured out".

},
"response": {
"headers": false,
"context": false,
"body": false,
"sdl": false,
"status_code": false
}
},
"type": "object",
"properties": {
"request": {
"description": "The request configuration",
"default": {
"headers": false,
"context": false,
"body": false,
"sdl": false,
"path": false,
"method": false
},
"type": "object",
"properties": {
"body": {
"description": "Send the body",
"default": false,
"type": "boolean"
},
"context": {
"description": "Send the context",
"default": false,
"type": "boolean"
},
"headers": {
"description": "Send the headers",
"default": false,
"type": "boolean"
},
"method": {
"description": "Send the method",
"default": false,
"type": "boolean"
},
"path": {
"description": "Send the path",
"default": false,
"type": "boolean"
},
"sdl": {
"description": "Send the SDL",
"default": false,
"type": "boolean"
}
},
"additionalProperties": false
},
"response": {
"description": "What information is passed to a router request/response stage",
"default": {
"headers": false,
"context": false,
"body": false,
"sdl": false,
"status_code": false
},
"type": "object",
"properties": {
"body": {
"description": "Send the body",
"default": false,
"type": "boolean"
},
"context": {
"description": "Send the context",
"default": false,
"type": "boolean"
},
"headers": {
"description": "Send the headers",
"default": false,
"type": "boolean"
},
"sdl": {
"description": "Send the SDL",
"default": false,
"type": "boolean"
},
"status_code": {
"description": "Send the HTTP status",
"default": false,
"type": "boolean"
}
},
"additionalProperties": false
}
}
},
"timeout": {
"description": "The timeout for external requests",
"default": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::layers::ServiceBuilderExt;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::register_plugin;
use crate::services;
use crate::services::external::Control;
use crate::services::external::Externalizable;
use crate::services::external::PipelineStep;
Expand All @@ -44,6 +45,11 @@ use crate::services::router;
use crate::services::subgraph;
use crate::tracer::TraceId;

#[cfg(test)]
mod test;

mod supergraph;

pub(crate) const EXTERNAL_SPAN_NAME: &str = "external_plugin";
const POOL_IDLE_TIMEOUT_DURATION: Option<Duration> = Some(Duration::from_secs(5));

Expand Down Expand Up @@ -86,6 +92,13 @@ impl Plugin for CoprocessorPlugin<HTTPClientService> {
self.router_service(service)
}

fn supergraph_service(
&self,
service: services::supergraph::BoxService,
) -> services::supergraph::BoxService {
self.supergraph_service(service)
}

fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
self.subgraph_service(name, service)
}
Expand Down Expand Up @@ -149,6 +162,18 @@ where
)
}

fn supergraph_service(
&self,
service: services::supergraph::BoxService,
) -> services::supergraph::BoxService {
self.configuration.supergraph.as_service(
self.http_client.clone(),
service,
self.configuration.url.clone(),
self.sdl.clone(),
)
}

fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
self.configuration.subgraph.all.as_service(
self.http_client.clone(),
Expand Down Expand Up @@ -239,6 +264,9 @@ struct Conf {
/// The router stage request/response configuration
#[serde(default)]
router: RouterStage,
/// The supergraph stage request/response configuration
#[serde(default)]
supergraph: supergraph::SupergraphStage,
/// The subgraph stage request/response configuration
#[serde(default)]
subgraph: SubgraphStages,
Expand Down Expand Up @@ -676,7 +704,7 @@ where
// If first is None, or contains an error we return an error
let opt_first: Option<Bytes> = first.and_then(|f| f.ok());
let bytes = match opt_first {
Some(b) => b.to_vec(),
Some(b) => b,
None => {
tracing::error!(
"Coprocessor cannot convert body into future due to problem with first part"
Expand All @@ -695,7 +723,7 @@ where
.transpose()?;
let body_to_send = response_config
.body
.then(|| String::from_utf8(bytes.clone()))
.then(|| std::str::from_utf8(&bytes).map(|s| s.to_string()))
.transpose()?;
let status_to_send = response_config.status_code.then(|| parts.status.as_u16());
let context_to_send = response_config.context.then(|| response.context.clone());
Expand Down Expand Up @@ -857,7 +885,6 @@ where
// First, extract the data we need from our request and prepare our
// external call. Use our configuration to figure out which data to send.
let (parts, body) = request.subgraph_request.into_parts();
let bytes = Bytes::from(serde_json::to_vec(&body)?);

let headers_to_send = request_config
.headers
Expand All @@ -866,7 +893,7 @@ where

let body_to_send = request_config
.body
.then(|| serde_json::from_slice::<serde_json::Value>(&bytes))
.then(|| serde_json::to_value(&body))
.transpose()?;
let context_to_send = request_config.context.then(|| request.context.clone());
let uri = request_config.uri.then(|| parts.uri.to_string());
Expand Down Expand Up @@ -997,7 +1024,6 @@ where
// external call. Use our configuration to figure out which data to send.

let (parts, body) = response.response.into_parts();
let bytes = Bytes::from(serde_json::to_vec(&body)?);

let headers_to_send = response_config
.headers
Expand All @@ -1008,7 +1034,7 @@ where

let body_to_send = response_config
.body
.then(|| serde_json::from_slice::<serde_json::Value>(&bytes))
.then(|| serde_json::to_value(&body))
.transpose()?;
let context_to_send = response_config.context.then(|| response.context.clone());
let service_name = response_config.service_name.then_some(service_name);
Expand Down Expand Up @@ -1098,7 +1124,7 @@ fn validate_coprocessor_output<T>(
}

/// Convert a HeaderMap into a HashMap
pub(super) fn externalize_header_map(
pub(crate) fn externalize_header_map(
input: &HeaderMap<HeaderValue>,
) -> Result<HashMap<String, Vec<String>>, BoxError> {
let mut output = HashMap::new();
Expand Down
Loading