Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for custom headers #577 #838

Merged
merged 13 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import akka.stream.Materializer;

import akka.grpc.Codec;
import akka.grpc.Codecs;
import akka.grpc.Trailers;
import akka.grpc.javadsl.GrpcMarshalling;
import akka.grpc.javadsl.GrpcExceptionHandler;
import akka.grpc.javadsl.package$;

@{if (powerApis) "import akka.grpc.javadsl.Metadata;\nimport akka.grpc.javadsl.MetadataImpl;" else ""}
@{if (powerApis) "import akka.grpc.javadsl.Metadata;\nimport akka.grpc.javadsl.MetadataBuilder;" else ""}

import static @{service.packageName}.@{service.name}.Serializers.*;

Expand Down Expand Up @@ -50,7 +51,7 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
* Use `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` with `@{service.name}Handler.partial` when combining
* several services.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, io.grpc.Status>> eHandler, ActorSystem system) {
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
return create(implementation, @{service.name}.name, mat, eHandler, system);
}

Expand All @@ -76,7 +77,7 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, io.grpc.Status>> eHandler, ActorSystem system) {
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
return partial(implementation, prefix, mat, eHandler, system);
}

Expand All @@ -96,7 +97,7 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
*
* Use `akka.grpc.javadsl.ServiceHandler.concatOrNotFound` when combining several services.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> partial(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, io.grpc.Status>> eHandler, ActorSystem system) {
public static Function<HttpRequest, CompletionStage<HttpResponse>> partial(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
return (req -> {
Iterator<String> segments = req.getUri().pathSegments().iterator();
if (segments.hasNext() && segments.next().equals(prefix) && segments.hasNext()) {
Expand All @@ -113,15 +114,15 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
return @{service.name}.name;
}

private static CompletionStage<HttpResponse> handle(HttpRequest request, String method, @serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, io.grpc.Status>> eHandler, ActorSystem system) {
private static CompletionStage<HttpResponse> handle(HttpRequest request, String method, @serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
Codec responseCodec = Codecs.negotiate(request);
@{if(powerApis) { "Metadata metadata = new MetadataImpl(request.getHeaders());" } else { "" }}
@{if(powerApis) { "Metadata metadata = MetadataBuilder.fromHeaders(request.getHeaders());" } else { "" }}
switch(method) {
@for(method <- service.methods) {
case "@method.grpcName":
return @{method.unmarshal}(request, @method.deserializer.name, mat)
.@{if(method.outputStreaming) { "thenApply" } else { "thenCompose" }}(e -> implementation.@{method.name}(e@{if(powerApis) { ", metadata" } else { "" }}))
.thenApply(e -> @{method.marshal}(e, @method.serializer.name, mat, responseCodec, system, package$.MODULE$.scalaAnonymousPartialFunction(eHandler)));
.thenApply(e -> @{method.marshal}(e, @method.serializer.name, mat, responseCodec, system, eHandler));
}
default:
CompletableFuture<HttpResponse> result = new CompletableFuture<>();
Expand Down
11 changes: 6 additions & 5 deletions codegen/src/main/twirl/templates/ScalaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import scala.concurrent.ExecutionContext

import akka.grpc.scaladsl.{ GrpcExceptionHandler, GrpcMarshalling }
import akka.grpc.Codecs
import akka.grpc.Trailers

import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.Uri.Path.Segment
import akka.actor.ActorSystem
import akka.stream.Materializer

@{if (powerApis) "import akka.grpc.scaladsl.MetadataImpl" else ""}
@{if (powerApis) "import akka.grpc.scaladsl.MetadataBuilder" else ""}

@defining(if (powerApis) service.name + "PowerApi" else service.name) { serviceName =>
object @{serviceName}Handler {
Expand All @@ -41,7 +42,7 @@ import akka.stream.Materializer
* Use `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` with `@{service.name}Handler.partial` when combining
* several services.
*/
def apply(implementation: @serviceName, eHandler: ActorSystem => PartialFunction[Throwable, io.grpc.Status])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
def apply(implementation: @serviceName, eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
partial(implementation, @{service.name}.name, eHandler).orElse { case _ => notFound }

/**
Expand All @@ -65,7 +66,7 @@ import akka.stream.Materializer
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
def apply(implementation: @serviceName, prefix: String, eHandler: ActorSystem => PartialFunction[Throwable, io.grpc.Status])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
def apply(implementation: @serviceName, prefix: String, eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
partial(implementation, prefix, eHandler).orElse { case _ => notFound }

/**
Expand All @@ -77,15 +78,15 @@ import akka.stream.Materializer
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
def partial(implementation: @serviceName, prefix: String = @{service.name}.name, eHandler: ActorSystem => PartialFunction[Throwable, io.grpc.Status] = GrpcExceptionHandler.defaultMapper)(implicit mat: Materializer, system: ActorSystem): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
def partial(implementation: @serviceName, prefix: String = @{service.name}.name, eHandler: ActorSystem => PartialFunction[Throwable, Trailers] = GrpcExceptionHandler.defaultMapper)(implicit mat: Materializer, system: ActorSystem): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
implicit val ec: ExecutionContext = mat.executionContext
import @{service.name}.Serializers._

def handle(request: HttpRequest, method: String): scala.concurrent.Future[HttpResponse] = method match {
@for(method <- service.methods) {
case "@method.grpcName" =>
val responseCodec = Codecs.negotiate(request)
@{if(powerApis) { "val metadata = new MetadataImpl(request.headers)" } else { "" }}
@{if(powerApis) { "val metadata = MetadataBuilder.fromHeaders(request.headers)" } else { "" }}
@{method.unmarshal}(request)(@method.deserializer.name, mat)
.@{if(method.outputStreaming) { "map" } else { "flatMap" }}(implementation.@{method.name}(_@{if(powerApis) { ", metadata" } else { "" }}))
.map(e => @{method.marshal}(e, eHandler)(@method.serializer.name, mat, responseCodec, system))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
package akka.grpc.scaladsl

import scala.concurrent.Future
import scala.util.Try

import akka.actor.ActorSystem
import akka.grpc.GrpcServiceException
import akka.grpc.scaladsl.headers.`Status`
import akka.grpc.internal.GrpcEntityHelpers
import akka.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }

import org.scalatest.matchers.should.Matchers
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.wordspec.AnyWordSpecLike

import akka.testkit.TestKit

import akka.util.ByteString
import io.grpc.testing.integration.test.TestService

class GrpcExceptionHandlerSpec
Expand Down Expand Up @@ -50,6 +50,11 @@ class GrpcExceptionHandlerSpec
}
}

val metadata = (new MetadataBuilder)
.addText("test-text", "test-text-data")
.addBinary("test-binary-bin", ByteString("test-binary-data"))
.build()

import example.myapp.helloworld.grpc.helloworld._
object ExampleImpl extends GreeterService {

Expand All @@ -61,7 +66,6 @@ class GrpcExceptionHandlerSpec

//#unary
//#streaming
import akka.grpc.GrpcServiceException
import io.grpc.Status

//#unary
Expand All @@ -72,7 +76,7 @@ class GrpcExceptionHandlerSpec

def sayHello(in: HelloRequest): Future[HelloReply] = {
if (in.name.isEmpty)
Future.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found")))
Future.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found"), metadata))
else
Future.successful(HelloReply(s"Hi ${in.name}!"))
}
Expand All @@ -83,7 +87,7 @@ class GrpcExceptionHandlerSpec
//#streaming
def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
if (in.name.isEmpty)
Source.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found")))
Source.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found"), metadata))
else
myResponseSource
}
Expand Down Expand Up @@ -114,6 +118,32 @@ class GrpcExceptionHandlerSpec
statusHeader.map(_.value()) should be(Some("3"))
val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" }
statusMessageHeader.map(_.value()) should be(Some("No name found"))

val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer)
metadata.getText("test-text") should be(Some("test-text-data"))
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}

"return the correct user-supplied status for a streaming call" in {
import akka.http.scaladsl.client.RequestBuilding._
implicit val serializer =
example.myapp.helloworld.grpc.helloworld.GreeterService.Serializers.HelloRequestSerializer
implicit val codec = akka.grpc.Identity

val request = Get(s"/${GreeterService.name}/ItKeepsReplying", GrpcEntityHelpers(HelloRequest("")))

val reply = GreeterServiceHandler(ExampleImpl).apply(request).futureValue

val lastChunk = reply.entity.asInstanceOf[Chunked].chunks.runWith(Sink.last).futureValue.asInstanceOf[LastChunk]
// Invalid argument is '3' https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
val statusHeader = lastChunk.trailer.find { _.name == "grpc-status" }
statusHeader.map(_.value()) should be(Some("3"))
val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" }
statusMessageHeader.map(_.value()) should be(Some("No name found"))

val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer)
metadata.getText("test-text") should be(Some("test-text-data"))
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Replaced javadsl.MetadataImpl with internal.JavaMetadataImpl adapter.
ProblemFilters.exclude[MissingClassProblem]("akka.grpc.javadsl.MetadataImpl")

# Replaced scaladsl.MetadataImpl with internal.HeaderMetadataImpl.
ProblemFilters.exclude[MissingClassProblem]("akka.grpc.scaladsl.MetadataImpl")
ProblemFilters.exclude[MissingClassProblem]("akka.grpc.scaladsl.MetadataImpl$")

# Modified GrpcExceptionHandler to return Trailers instead of io.grpc.Status.
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.javadsl.GrpcExceptionHandler.standard")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can even use wildcards here but this is 👍

ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.javadsl.GrpcExceptionHandler.default")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.javadsl.GrpcExceptionHandler.defaultMapper")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.status")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.scaladsl.GrpcExceptionHandler.default")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.scaladsl.GrpcExceptionHandler.defaultMapper")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.scaladsl.GrpcMarshalling.marshal")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.scaladsl.GrpcMarshalling.marshal$default$2")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.scaladsl.GrpcMarshalling.marshalStream")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.scaladsl.GrpcMarshalling.marshalStream$default$2")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.grpc.internal.GrpcResponseHelpers.status")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.internal.GrpcResponseHelpers.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.grpc.internal.GrpcResponseHelpers.apply$default$3")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.grpc.internal.GrpcResponseHelpers.status")

# Added methods to iterate metadata entries to javadsl.Metadata interface.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.Metadata.asMap")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.Metadata.asList")

# Added javadsl.Metadata.asScala method.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.Metadata.asScala")

# Added scaladsl.Metadata.asList method.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.Metadata.asList")

# Moved exception handler conversion inside javadsl.GrpcMarshalling.
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.marshal")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.marshal$default$6")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.marshalStream")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.marshalStream$default$6")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.marshalStream$default$6")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.grpc.javadsl.GrpcMarshalling.marshalStream")
30 changes: 29 additions & 1 deletion runtime/src/main/scala/akka/grpc/GrpcServiceException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,34 @@ package akka.grpc

import io.grpc.Status

class GrpcServiceException(val status: Status) extends RuntimeException(status.getDescription) {
import akka.grpc.scaladsl.{ Metadata, MetadataBuilder }
import akka.grpc.internal.JavaMetadataImpl

class GrpcServiceException(val status: Status, val metadata: Metadata) extends RuntimeException(status.getDescription) {

require(!status.isOk, "Use GrpcServiceException in case of failure, not as a flow control mechanism.")

def this(status: Status) {
this(status, MetadataBuilder.empty)
}

/**
* Java API: Constructs a service exception which includes response metadata.
*/
def this(status: Status, metadata: javadsl.Metadata) {
this(status, metadata.asScala)
}

/**
* Java API: The response status.
*/
def getStatus: Status =
status

/**
* Java API: The response metadata.
*/
def getMetadata: javadsl.Metadata =
new JavaMetadataImpl(metadata)

}
36 changes: 36 additions & 0 deletions runtime/src/main/scala/akka/grpc/Trailers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc

import io.grpc.Status
import akka.grpc.internal.JavaMetadataImpl
import akka.grpc.scaladsl.{ Metadata, MetadataBuilder }
import akka.grpc.javadsl.{ Metadata => jMetadata }

case class Trailers(status: Status, metadata: Metadata) {
def this(status: Status) {
this(status, MetadataBuilder.empty)
}

def this(status: Status, metadata: jMetadata) {
this(status, metadata.asScala)
}

/**
* Java API: Returns the status.
*/
def getStatus: Status =
status

/**
* Java API: Returns the trailing metadata.
*/
def getMetadata: jMetadata =
new JavaMetadataImpl(metadata)
}

object Trailers {
def apply(status: Status): Trailers = new Trailers(status)
}
27 changes: 18 additions & 9 deletions runtime/src/main/scala/akka/grpc/internal/GrpcEntityHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
package akka.grpc.internal

import io.grpc.Status

import akka.NotUsed
import akka.{ grpc, NotUsed }
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.grpc.{ Codec, Grpc, GrpcServiceException, ProtobufSerializer }
import akka.grpc.scaladsl.{ headers, GrpcExceptionHandler }
import akka.grpc.{ Codec, Grpc, GrpcServiceException, ProtobufSerializer, Trailers }
import akka.grpc.scaladsl.{ headers, BytesEntry, Metadata, StringEntry }
import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpEntity.LastChunk
import akka.http.scaladsl.model.HttpHeader
import akka.http.scaladsl.model.headers.RawHeader
import akka.stream.Materializer
import akka.stream.scaladsl.Source

Expand All @@ -23,18 +23,18 @@ object GrpcEntityHelpers {
def apply[T](
e: Source[T, NotUsed],
trail: Source[HttpEntity.LastChunk, NotUsed],
eHandler: ActorSystem => PartialFunction[Throwable, Status])(
eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(
implicit m: ProtobufSerializer[T],
mat: Materializer,
codec: Codec,
system: ActorSystem): HttpEntity.Chunked = {
HttpEntity.Chunked(Grpc.contentType, chunks(e, trail).recover {
case t =>
val status = eHandler(system).orElse[Throwable, Status] {
case e: GrpcServiceException => e.status
case e: Exception => Status.UNKNOWN.withCause(e).withDescription("Stream failed")
val e = eHandler(system).orElse[Throwable, Trailers] {
case e: GrpcServiceException => grpc.Trailers(e.status, e.metadata)
case e: Exception => grpc.Trailers(Status.UNKNOWN.withCause(e).withDescription("Stream failed"))
}(t)
trailer(status)
trailer(e.status, e.metadata)
})
}

Expand All @@ -55,7 +55,16 @@ object GrpcEntityHelpers {
def trailer(status: Status): LastChunk =
LastChunk(trailer = statusHeaders(status))

def trailer(status: Status, metadata: Metadata): LastChunk =
LastChunk(trailer = statusHeaders(status) ++ metadataHeaders(metadata))

def statusHeaders(status: Status): List[HttpHeader] =
List(headers.`Status`(status.getCode.value.toString)) ++ Option(status.getDescription).map(d =>
headers.`Status-Message`(d))

def metadataHeaders(metadata: Metadata): List[HttpHeader] =
metadata.asList.map {
case (key, StringEntry(value)) => RawHeader(key, value)
case (key, BytesEntry(value)) => RawHeader(key, MetadataImpl.encodeBinaryHeader(value))
}
}
Loading