Skip to content

Commit

Permalink
Replaced GrpcErrorResponse with Trailers.
Browse files Browse the repository at this point in the history
Added MetadataBuilder to javadsl and scaladsl.
Refactored MetadataImpl to reduce code duplication.
Added metadata unit tests.
  • Loading branch information
drmontgomery committed Mar 3, 2020
1 parent ec63629 commit aafd784
Show file tree
Hide file tree
Showing 22 changed files with 738 additions and 250 deletions.
14 changes: 7 additions & 7 deletions codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import akka.stream.Materializer;

import akka.grpc.Codec;
import akka.grpc.Codecs;
import akka.grpc.GrpcErrorResponse;
import akka.grpc.Trailers;
import akka.grpc.javadsl.GrpcMarshalling;
import akka.grpc.javadsl.GrpcExceptionHandler;
import akka.grpc.javadsl.package$;

@{if (powerApis) "import akka.grpc.javadsl.Metadata;\nimport akka.grpc.javadsl.MetadataImpl;" else ""}
@{if (powerApis) "import akka.grpc.javadsl.Metadata;\nimport akka.grpc.javadsl.MetadataBuilder;" else ""}

import static @{service.packageName}.@{service.name}.Serializers.*;

Expand Down Expand Up @@ -51,7 +51,7 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
* Use `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` with `@{service.name}Handler.partial` when combining
* several services.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, GrpcErrorResponse>> eHandler, ActorSystem system) {
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
return create(implementation, @{service.name}.name, mat, eHandler, system);
}

Expand All @@ -77,7 +77,7 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, GrpcErrorResponse>> eHandler, ActorSystem system) {
public static Function<HttpRequest, CompletionStage<HttpResponse>> create(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
return partial(implementation, prefix, mat, eHandler, system);
}

Expand All @@ -97,7 +97,7 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
*
* Use `akka.grpc.javadsl.ServiceHandler.concatOrNotFound` when combining several services.
*/
public static Function<HttpRequest, CompletionStage<HttpResponse>> partial(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, GrpcErrorResponse>> eHandler, ActorSystem system) {
public static Function<HttpRequest, CompletionStage<HttpResponse>> partial(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
return (req -> {
Iterator<String> segments = req.getUri().pathSegments().iterator();
if (segments.hasNext() && segments.next().equals(prefix) && segments.hasNext()) {
Expand All @@ -114,9 +114,9 @@ import static @{service.packageName}.@{service.name}.Serializers.*;
return @{service.name}.name;
}

private static CompletionStage<HttpResponse> handle(HttpRequest request, String method, @serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, GrpcErrorResponse>> eHandler, ActorSystem system) {
private static CompletionStage<HttpResponse> handle(HttpRequest request, String method, @serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ActorSystem system) {
Codec responseCodec = Codecs.negotiate(request);
@{if(powerApis) { "Metadata metadata = new MetadataImpl(request.getHeaders());" } else { "" }}
@{if(powerApis) { "Metadata metadata = MetadataBuilder.fromHeaders(request.getHeaders());" } else { "" }}
switch(method) {
@for(method <- service.methods) {
case "@method.grpcName":
Expand Down
12 changes: 6 additions & 6 deletions codegen/src/main/twirl/templates/ScalaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import scala.concurrent.ExecutionContext

import akka.grpc.scaladsl.{ GrpcExceptionHandler, GrpcMarshalling }
import akka.grpc.Codecs
import akka.grpc.GrpcErrorResponse
import akka.grpc.Trailers

import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.Uri.Path.Segment
import akka.actor.ActorSystem
import akka.stream.Materializer

@{if (powerApis) "import akka.grpc.scaladsl.MetadataImpl" else ""}
@{if (powerApis) "import akka.grpc.scaladsl.MetadataBuilder" else ""}

@defining(if (powerApis) service.name + "PowerApi" else service.name) { serviceName =>
object @{serviceName}Handler {
Expand All @@ -42,7 +42,7 @@ import akka.stream.Materializer
* Use `akka.grpc.scaladsl.ServiceHandler.concatOrNotFound` with `@{service.name}Handler.partial` when combining
* several services.
*/
def apply(implementation: @serviceName, eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
def apply(implementation: @serviceName, eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
partial(implementation, @{service.name}.name, eHandler).orElse { case _ => notFound }

/**
Expand All @@ -66,7 +66,7 @@ import akka.stream.Materializer
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
def apply(implementation: @serviceName, prefix: String, eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
def apply(implementation: @serviceName, prefix: String, eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(implicit mat: Materializer, system: ActorSystem): HttpRequest => scala.concurrent.Future[HttpResponse] =
partial(implementation, prefix, eHandler).orElse { case _ => notFound }

/**
Expand All @@ -78,15 +78,15 @@ import akka.stream.Materializer
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
def partial(implementation: @serviceName, prefix: String = @{service.name}.name, eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse] = GrpcExceptionHandler.defaultMapper)(implicit mat: Materializer, system: ActorSystem): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
def partial(implementation: @serviceName, prefix: String = @{service.name}.name, eHandler: ActorSystem => PartialFunction[Throwable, Trailers] = GrpcExceptionHandler.defaultMapper)(implicit mat: Materializer, system: ActorSystem): PartialFunction[HttpRequest, scala.concurrent.Future[HttpResponse]] = {
implicit val ec: ExecutionContext = mat.executionContext
import @{service.name}.Serializers._

def handle(request: HttpRequest, method: String): scala.concurrent.Future[HttpResponse] = method match {
@for(method <- service.methods) {
case "@method.grpcName" =>
val responseCodec = Codecs.negotiate(request)
@{if(powerApis) { "val metadata = new MetadataImpl(request.headers)" } else { "" }}
@{if(powerApis) { "val metadata = MetadataBuilder.fromHeaders(request.headers)" } else { "" }}
@{method.unmarshal}(request)(@method.deserializer.name, mat)
.@{if(method.outputStreaming) { "map" } else { "flatMap" }}(implementation.@{method.name}(_@{if(powerApis) { ", metadata" } else { "" }}))
.map(e => @{method.marshal}(e, eHandler)(@method.serializer.name, mat, responseCodec, system))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.concurrent.Future
import scala.util.Try

import akka.actor.ActorSystem
import akka.grpc.{ BytesEntry, GrpcServiceException, StringEntry }
import akka.grpc.GrpcServiceException
import akka.grpc.scaladsl.headers.`Status`
import akka.grpc.internal.GrpcEntityHelpers
import akka.http.scaladsl.model.HttpEntity.{ Chunked, LastChunk }
Expand Down Expand Up @@ -50,8 +50,10 @@ class GrpcExceptionHandlerSpec
}
}

val metadata =
Map("test-string" -> StringEntry("test-string-data"), "test-bytes" -> BytesEntry(ByteString("test-bytes-data")))
val metadata = (new MetadataBuilder)
.addText("test-text", "test-text-data")
.addBinary("test-binary-bin", ByteString("test-binary-data"))
.build()

import example.myapp.helloworld.grpc.helloworld._
object ExampleImpl extends GreeterService {
Expand Down Expand Up @@ -117,9 +119,9 @@ class GrpcExceptionHandlerSpec
val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" }
statusMessageHeader.map(_.value()) should be(Some("No name found"))

val metadata = new MetadataImpl(lastChunk.trailer)
metadata.getText("test-string") should be(Some("test-string-data"))
metadata.getBinary("test-bytes") should be(Some(ByteString("test-bytes-data")))
val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer)
metadata.getText("test-text") should be(Some("test-text-data"))
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}

"return the correct user-supplied status for a streaming call" in {
Expand All @@ -139,9 +141,9 @@ class GrpcExceptionHandlerSpec
val statusMessageHeader = lastChunk.trailer.find { _.name == "grpc-message" }
statusMessageHeader.map(_.value()) should be(Some("No name found"))

val metadata = new MetadataImpl(lastChunk.trailer)
metadata.getText("test-string") should be(Some("test-string-data"))
metadata.getBinary("test-bytes") should be(Some(ByteString("test-bytes-data")))
val metadata = MetadataBuilder.fromHeaders(lastChunk.trailer)
metadata.getText("test-text") should be(Some("test-text-data"))
metadata.getBinary("test-binary-bin") should be(Some(ByteString("test-binary-data")))
}
}
}
23 changes: 0 additions & 23 deletions runtime/src/main/scala/akka/grpc/GrpcErrorResponse.scala

This file was deleted.

21 changes: 10 additions & 11 deletions runtime/src/main/scala/akka/grpc/GrpcServiceException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@

package akka.grpc

import java.util.{ Map => jMap }

import scala.collection.JavaConverters._
import io.grpc.Status

class GrpcServiceException(val status: Status, val metadata: Map[String, MetadataEntry])
extends RuntimeException(status.getDescription) {
import akka.grpc.scaladsl.{ Metadata, MetadataBuilder }
import akka.grpc.internal.JavaMetadataImpl

class GrpcServiceException(val status: Status, val metadata: Metadata) extends RuntimeException(status.getDescription) {

require(!status.isOk, "Use GrpcServiceException in case of failure, not as a flow control mechanism.")

def this(status: Status) {
this(status, Map[String, MetadataEntry]())
this(status, MetadataBuilder.empty)
}

/**
* Java API: Constructs a service exception which includes response metadata.
*/
def this(status: Status, metadata: jMap[String, MetadataEntry]) {
this(status, metadata.asScala.toMap)
def this(status: Status, metadata: javadsl.Metadata) {
this(status, metadata.asScala)
}

/**
Expand All @@ -32,9 +31,9 @@ class GrpcServiceException(val status: Status, val metadata: Map[String, Metadat
status

/**
* Java API: The response headers.
* Java API: The response metadata.
*/
def getMetadata: jMap[String, MetadataEntry] =
metadata.asJava
def getMetadata: javadsl.Metadata =
new JavaMetadataImpl(metadata)

}
11 changes: 0 additions & 11 deletions runtime/src/main/scala/akka/grpc/MetadataEntry.scala

This file was deleted.

36 changes: 36 additions & 0 deletions runtime/src/main/scala/akka/grpc/Trailers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.grpc

import io.grpc.Status
import akka.grpc.internal.JavaMetadataImpl
import akka.grpc.scaladsl.{ Metadata, MetadataBuilder }
import akka.grpc.javadsl.{ Metadata => jMetadata }

case class Trailers(status: Status, metadata: Metadata) {
def this(status: Status) {
this(status, MetadataBuilder.empty)
}

def this(status: Status, metadata: jMetadata) {
this(status, metadata.asScala)
}

/**
* Java API: Returns the status.
*/
def getStatus: Status =
status

/**
* Java API: Returns the trailing metadata.
*/
def getMetadata: jMetadata =
new JavaMetadataImpl(metadata)
}

object Trailers {
def apply(status: Status): Trailers = new Trailers(status)
}
37 changes: 13 additions & 24 deletions runtime/src/main/scala/akka/grpc/internal/GrpcEntityHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,12 @@

package akka.grpc.internal

import java.util.Base64

import io.grpc.Status
import akka.{ grpc, NotUsed }
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.grpc.{
BytesEntry,
Codec,
Grpc,
GrpcErrorResponse,
GrpcServiceException,
MetadataEntry,
ProtobufSerializer,
StringEntry
}
import akka.grpc.scaladsl.headers
import akka.grpc.{ Codec, Grpc, GrpcServiceException, ProtobufSerializer, Trailers }
import akka.grpc.scaladsl.{ headers, BytesEntry, Metadata, StringEntry }
import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpEntity.LastChunk
import akka.http.scaladsl.model.HttpHeader
Expand All @@ -34,16 +23,16 @@ object GrpcEntityHelpers {
def apply[T](
e: Source[T, NotUsed],
trail: Source[HttpEntity.LastChunk, NotUsed],
eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse])(
eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(
implicit m: ProtobufSerializer[T],
mat: Materializer,
codec: Codec,
system: ActorSystem): HttpEntity.Chunked = {
HttpEntity.Chunked(Grpc.contentType, chunks(e, trail).recover {
case t =>
val e = eHandler(system).orElse[Throwable, GrpcErrorResponse] {
case e: GrpcServiceException => grpc.GrpcErrorResponse(e.status, e.metadata)
case e: Exception => grpc.GrpcErrorResponse(Status.UNKNOWN.withCause(e).withDescription("Stream failed"))
val e = eHandler(system).orElse[Throwable, Trailers] {
case e: GrpcServiceException => grpc.Trailers(e.status, e.metadata)
case e: Exception => grpc.Trailers(Status.UNKNOWN.withCause(e).withDescription("Stream failed"))
}(t)
trailer(e.status, e.metadata)
})
Expand All @@ -63,19 +52,19 @@ object GrpcEntityHelpers {
system: ActorSystem) =
e.map(m.serialize).via(Grpc.grpcFramingEncoder(codec)).map(bytes => HttpEntity.Chunk(bytes)).concat(trail)

def trailer(status: Status, headers: List[HttpHeader] = Nil): LastChunk =
LastChunk(trailer = statusHeaders(status) ++ headers)
def trailer(status: Status): LastChunk =
LastChunk(trailer = statusHeaders(status))

def trailer(status: Status, metadata: Map[String, MetadataEntry]): LastChunk =
def trailer(status: Status, metadata: Metadata): LastChunk =
LastChunk(trailer = statusHeaders(status) ++ metadataHeaders(metadata))

def statusHeaders(status: Status): List[HttpHeader] =
List(headers.`Status`(status.getCode.value.toString)) ++ Option(status.getDescription).map(d =>
headers.`Status-Message`(d))

def metadataHeaders(metadata: Map[String, MetadataEntry]): List[HttpHeader] =
metadata.map {
def metadataHeaders(metadata: Metadata): List[HttpHeader] =
metadata.asList.map {
case (key, StringEntry(value)) => RawHeader(key, value)
case (key, BytesEntry(value)) => RawHeader(key, new String(Base64.getEncoder.encode(value.toByteBuffer).array))
}.toList
case (key, BytesEntry(value)) => RawHeader(key, MetadataImpl.encodeBinaryHeader(value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.grpc.scaladsl.{ headers, GrpcExceptionHandler }
import akka.grpc.{ Codec, Grpc, GrpcErrorResponse, ProtobufSerializer }
import akka.grpc.{ Codec, Grpc, ProtobufSerializer, Trailers }
import akka.http.scaladsl.model.HttpEntity.LastChunk
import akka.http.scaladsl.model.{ HttpEntity, HttpHeader, HttpResponse }
import akka.stream.Materializer
Expand All @@ -31,7 +31,7 @@ object GrpcResponseHelpers {
system: ActorSystem): HttpResponse =
GrpcResponseHelpers(e, Source.single(GrpcEntityHelpers.trailer(Status.OK)))

def apply[T](e: Source[T, NotUsed], eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse])(
def apply[T](e: Source[T, NotUsed], eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(
implicit m: ProtobufSerializer[T],
mat: Materializer,
codec: Codec,
Expand All @@ -48,7 +48,7 @@ object GrpcResponseHelpers {
def apply[T](
e: Source[T, NotUsed],
status: Future[Status],
eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse])(
eHandler: ActorSystem => PartialFunction[Throwable, Trailers])(
implicit m: ProtobufSerializer[T],
mat: Materializer,
codec: Codec,
Expand All @@ -63,7 +63,7 @@ object GrpcResponseHelpers {
def apply[T](
e: Source[T, NotUsed],
trail: Source[HttpEntity.LastChunk, NotUsed],
eHandler: ActorSystem => PartialFunction[Throwable, GrpcErrorResponse] = GrpcExceptionHandler.defaultMapper)(
eHandler: ActorSystem => PartialFunction[Throwable, Trailers] = GrpcExceptionHandler.defaultMapper)(
implicit m: ProtobufSerializer[T],
mat: Materializer,
codec: Codec,
Expand All @@ -74,7 +74,7 @@ object GrpcResponseHelpers {
entity = GrpcEntityHelpers(e, trail, eHandler))
}

def status(e: GrpcErrorResponse): HttpResponse =
def status(e: Trailers): HttpResponse =
HttpResponse(entity =
HttpEntity.Chunked(Grpc.contentType, Source.single(GrpcEntityHelpers.trailer(e.status, e.metadata))))
}
Loading

0 comments on commit aafd784

Please sign in to comment.