Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(2781): support gRPC reflection custom headers #2790

Merged
merged 9 commits into from
Sep 25, 2024
Merged
2 changes: 1 addition & 1 deletion examples/grpc-reflection.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
schema
@server(port: 8000)
@upstream(baseURL: "http://localhost:50051", httpCache: 42, batch: {delay: 10})
@link(src: "http://localhost:50051", type: Grpc) {
@link(src: "http://localhost:50051", type: Grpc, headers: [{key: "authorization", value: "Bearer 123"}]) {
query: Query
}

Expand Down
4 changes: 4 additions & 0 deletions generated/.tailcallrc.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ The @link directive allows you to import external resources, such as configurati
will be later used by `@grpc` directive –.
"""
directive @link(
"""
Custom headers for gRPC reflection server.
"""
headers: [KeyValue]
"""
The id of the link. It is used to reference the link in the schema.
"""
Expand Down
10 changes: 10 additions & 0 deletions generated/.tailcallrc.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,16 @@
"description": "The @link directive allows you to import external resources, such as configuration – which will be merged into the config importing it –, or a .proto file – which will be later used by `@grpc` directive –.",
"type": "object",
"properties": {
"headers": {
"description": "Custom headers for gRPC reflection server.",
"type": [
"array",
"null"
],
"items": {
"$ref": "#/definitions/KeyValue"
}
},
"id": {
"description": "The id of the link. It is used to reference the link in the schema.",
"type": [
Expand Down
5 changes: 5 additions & 0 deletions src/core/config/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use tailcall_macros::DirectiveDefinition;

use super::super::is_default;
use super::KeyValue;

#[derive(
Default,
Expand Down Expand Up @@ -57,6 +58,10 @@ pub struct Link {
/// The type of the link. It can be `Config`, or `Protobuf`.
#[serde(default, skip_serializing_if = "is_default", rename = "type")]
pub type_of: LinkType,
///
/// Custom headers for gRPC reflection server.
#[serde(default, skip_serializing_if = "is_default")]
pub headers: Option<Vec<KeyValue>>,
/// Additional metadata pertaining to the linked resource.
#[serde(default, skip_serializing_if = "is_default")]
pub meta: Option<serde_json::Value>,
Expand Down
5 changes: 4 additions & 1 deletion src/core/config/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ impl ConfigReader {
})
}
LinkType::Grpc => {
let meta = self.proto_reader.fetch(link.src.as_str()).await?;
let meta = self
.proto_reader
.fetch(link.src.as_str(), link.headers.clone())
.await?;

for m in meta {
extensions.add_proto(m);
Expand Down
1 change: 1 addition & 0 deletions src/core/generator/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl Generator {
id: None,
src: metadata.path.to_owned(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
});
Ok(config)
Expand Down
1 change: 1 addition & 0 deletions src/core/grpc/data_loader_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod tests {
id: None,
src: test_file.to_string(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
}]);
let method = GrpcMethod {
Expand Down
1 change: 1 addition & 0 deletions src/core/grpc/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub mod tests {
id: Some(id.clone()),
src: path.to_string(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
}]);

Expand Down
1 change: 1 addition & 0 deletions src/core/grpc/request_template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ mod tests {
id: Some(id.clone()),
src: test_file.to_string(),
type_of: LinkType::Protobuf,
headers: None,
meta: None,
}]);
let method = GrpcMethod {
Expand Down
86 changes: 73 additions & 13 deletions src/core/proto_reader/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::str::FromStr;

use anyhow::{Context, Result};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand All @@ -9,7 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;

use crate::core::blueprint::GrpcMethod;
use crate::core::config::ConfigReaderContext;
use crate::core::config::{ConfigReaderContext, KeyValue};
use crate::core::grpc::protobuf::ProtobufSet;
use crate::core::grpc::request_template::RequestBody;
use crate::core::grpc::RequestTemplate;
Expand Down Expand Up @@ -72,11 +74,16 @@ struct ReflectionResponse {
pub struct GrpcReflection {
server_reflection_method: GrpcMethod,
url: String,
headers: Option<Vec<KeyValue>>,
target_runtime: TargetRuntime,
}

impl GrpcReflection {
pub fn new<T: AsRef<str>>(url: T, target_runtime: TargetRuntime) -> Self {
pub fn new<T: AsRef<str>>(
url: T,
headers: Option<Vec<KeyValue>>,
target_runtime: TargetRuntime,
) -> Self {
let server_reflection_method = GrpcMethod {
package: "grpc.reflection.v1alpha".to_string(),
service: "ServerReflection".to_string(),
Expand All @@ -85,6 +92,7 @@ impl GrpcReflection {
Self {
server_reflection_method,
url: url.as_ref().to_string(),
headers,
target_runtime,
}
}
Expand Down Expand Up @@ -135,16 +143,28 @@ impl GrpcReflection {
)
.as_str(),
);

let mut headers = vec![];
if let Some(custom_headers) = &self.headers {
for header in custom_headers {
headers.push((
HeaderName::from_str(&header.key)?,
Mustache::parse(header.value.as_str()),
));
}
}
headers.push((
HeaderName::from_static("content-type"),
Mustache::parse("application/grpc+proto"),
));
let body_ = Some(RequestBody {
mustache: Some(Mustache::parse(body.to_string().as_str())),
value: Default::default(),
});
let req_template = RequestTemplate {
url: Mustache::parse(url.as_str()),
headers: vec![(
HeaderName::from_static("content-type"),
Mustache::parse("application/grpc+proto"),
)],
body: Some(RequestBody {
mustache: Some(Mustache::parse(body.to_string().as_str())),
value: Default::default(),
}),
headers,
body: body_,
operation: operation.clone(),
operation_type: Default::default(),
};
Expand Down Expand Up @@ -230,6 +250,7 @@ mod grpc_fetch {

let grpc_reflection = GrpcReflection::new(
format!("http://localhost:{}", server.port()),
None,
crate::core::runtime::test::init(None),
);

Expand Down Expand Up @@ -258,6 +279,7 @@ mod grpc_fetch {

let grpc_reflection = GrpcReflection::new(
format!("http://localhost:{}", server.port()),
None,
crate::core::runtime::test::init(None),
);

Expand Down Expand Up @@ -290,7 +312,7 @@ mod grpc_fetch {
let runtime = crate::core::runtime::test::init(None);

let grpc_reflection =
GrpcReflection::new(format!("http://localhost:{}", server.port()), runtime);
GrpcReflection::new(format!("http://localhost:{}", server.port()), None, runtime);

let resp = grpc_reflection.list_all_files().await?;

Expand Down Expand Up @@ -322,7 +344,7 @@ mod grpc_fetch {
let runtime = crate::core::runtime::test::init(None);

let grpc_reflection =
GrpcReflection::new(format!("http://localhost:{}", server.port()), runtime);
GrpcReflection::new(format!("http://localhost:{}", server.port()), None, runtime);

let resp = grpc_reflection.list_all_files().await;

Expand All @@ -349,7 +371,7 @@ mod grpc_fetch {
let runtime = crate::core::runtime::test::init(None);

let grpc_reflection =
GrpcReflection::new(format!("http://localhost:{}", server.port()), runtime);
GrpcReflection::new(format!("http://localhost:{}", server.port()), None, runtime);

let result = grpc_reflection.get_by_service("nonexistent.Service").await;
assert!(result.is_err());
Expand All @@ -358,4 +380,42 @@ mod grpc_fetch {

Ok(())
}

#[tokio::test]
async fn test_custom_headers_resp_list_all() -> Result<()> {
let server = start_mock_server();

let http_reflection_service_not_found = server.mock(|when, then| {
when.method(httpmock::Method::POST)
.path("/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo")
.header("authorization", "Bearer 123");
then.status(200).body(get_fake_resp());
});

let runtime = crate::core::runtime::test::init(None);

let grpc_reflection = GrpcReflection::new(
format!("http://localhost:{}", server.port()),
Some(vec![KeyValue {
key: "authorization".to_string(),
value: "Bearer 123".to_string(),
}]),
runtime,
);

let resp = grpc_reflection.list_all_files().await?;

assert_eq!(
[
"news.NewsService".to_string(),
"grpc.reflection.v1alpha.ServerReflection".to_string()
]
.to_vec(),
resp
);

http_reflection_service_not_found.assert();

Ok(())
}
}
13 changes: 11 additions & 2 deletions src/core/proto_reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures_util::FutureExt;
use prost_reflect::prost_types::{FileDescriptorProto, FileDescriptorSet};
use protox::file::{FileResolver, GoogleFileResolver};

use crate::core::config::KeyValue;
use crate::core::proto_reader::fetch::GrpcReflection;
use crate::core::resource_reader::{Cached, ResourceReader};
use crate::core::runtime::TargetRuntime;
Expand All @@ -31,8 +32,16 @@ impl ProtoReader {
}

/// Fetches proto files from a grpc server (grpc reflection)
pub async fn fetch<T: AsRef<str>>(&self, url: T) -> anyhow::Result<Vec<ProtoMetadata>> {
let grpc_reflection = Arc::new(GrpcReflection::new(url.as_ref(), self.runtime.clone()));
pub async fn fetch<T: AsRef<str>>(
&self,
url: T,
headers: Option<Vec<KeyValue>>,
) -> anyhow::Result<Vec<ProtoMetadata>> {
let grpc_reflection = Arc::new(GrpcReflection::new(
url.as_ref(),
headers,
self.runtime.clone(),
));

let mut proto_metadata = vec![];
let service_list = grpc_reflection.list_all_files().await?;
Expand Down
13 changes: 11 additions & 2 deletions tailcall-upstream-grpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::{runtime, Resource};
use tonic::metadata::MetadataMap;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::Server as TonicServer;
use tonic::{Response, Status};
use tonic::{Request, Response, Status};
use tonic_tracing_opentelemetry::middleware::server;
use tower::make::Shared;
use tracing_subscriber::layer::SubscriberExt;
Expand Down Expand Up @@ -215,6 +216,14 @@
Ok(())
}

/// Intercepts the request and checks if the token is valid.
fn intercept(req: Request<()>) -> Result<Request<()>, Status> {
match req.metadata().get("authorization") {
Some(token) if token == "Bearer 123" => Ok(req),
_ => Err(Status::permission_denied("Unauthorized")),

Check warning on line 223 in tailcall-upstream-grpc/src/main.rs

View check run for this annotation

Codecov / codecov/patch

tailcall-upstream-grpc/src/main.rs#L220-L223

Added lines #L220 - L223 were not covered by tests
}
}

Check warning on line 225 in tailcall-upstream-grpc/src/main.rs

View check run for this annotation

Codecov / codecov/patch

tailcall-upstream-grpc/src/main.rs#L225

Added line #L225 was not covered by tests

#[tokio::main]
async fn main() -> Result<(), Error> {
if std::env::var("HONEYCOMB_API_KEY").is_ok() {
Expand All @@ -234,7 +243,7 @@
let tonic_service = TonicServer::builder()
.layer(server::OtelGrpcLayer::default())
.add_service(NewsServiceServer::new(news_service))
.add_service(service)
.add_service(InterceptedService::new(service, intercept))

Check warning on line 246 in tailcall-upstream-grpc/src/main.rs

View check run for this annotation

Codecov / codecov/patch

tailcall-upstream-grpc/src/main.rs#L246

Added line #L246 was not covered by tests
.into_service();
let make_svc = Shared::new(tonic_service);
println!("Server listening on grpc://{}", addr);
Expand Down
Loading