From 2687173474d3627614140cabef516abd05171689 Mon Sep 17 00:00:00 2001 From: Boris Tsirkin Date: Sat, 3 Oct 2020 11:50:37 +0200 Subject: [PATCH] Fixes #1101. Using akka.japi.function.Function . --- .../main/java/akka/grpc/benchmarks/Utils.java | 3 +- .../grpc/benchmarks/driver/LoadWorker.java | 16 +++--- .../akka/grpc/benchmarks/qps/AsyncServer.java | 24 ++++----- .../templates/JavaServer/Handler.scala.txt | 2 +- .../akka/grpc/interop/AkkaGrpcServerJava.java | 2 +- .../java/example/myapp/CombinedServer.java | 23 ++++---- .../AuthenticatedGreeterServer.java | 13 ++--- .../myapp/helloworld/GreeterServer.java | 8 +-- .../myapp/helloworld/PowerGreeterServer.java | 8 +-- .../akka/grpc/javadsl/GrpcMarshalling.scala | 24 +++++---- .../scala/akka/grpc/javadsl/RouteUtils.scala | 53 ------------------- .../akka/grpc/javadsl/ServerReflection.scala | 2 +- .../akka/grpc/javadsl/ServiceHandler.scala | 11 ++-- .../scala/akka/grpc/javadsl/WebHandler.scala | 2 +- .../scala/akka/grpc/javadsl/package.scala | 8 +-- 15 files changed, 72 insertions(+), 127 deletions(-) delete mode 100644 runtime/src/main/scala/akka/grpc/javadsl/RouteUtils.scala diff --git a/benchmark-java/src/main/java/akka/grpc/benchmarks/Utils.java b/benchmark-java/src/main/java/akka/grpc/benchmarks/Utils.java index c45f93478..14187620c 100644 --- a/benchmark-java/src/main/java/akka/grpc/benchmarks/Utils.java +++ b/benchmark-java/src/main/java/akka/grpc/benchmarks/Utils.java @@ -20,7 +20,6 @@ import akka.actor.ActorSystem; import akka.grpc.GrpcClientSettings; import akka.grpc.SSLContextUtils; -import akka.grpc.benchmarks.proto.Control; import akka.grpc.benchmarks.proto.Messages; import akka.grpc.benchmarks.proto.Messages.Payload; import akka.grpc.benchmarks.proto.Messages.SimpleRequest; @@ -183,7 +182,7 @@ public static HttpsConnectionContext serverHttpContext() throws Exception { SSLContext context = SSLContext.getInstance("TLS"); context.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom()); - return ConnectionContext.https(context); + return ConnectionContext.httpsServer(context); } public static GrpcClientSettings createGrpcClientSettings(InetSocketAddress socketAddress, boolean useTls, ActorSystem system) { diff --git a/benchmark-java/src/main/java/akka/grpc/benchmarks/driver/LoadWorker.java b/benchmark-java/src/main/java/akka/grpc/benchmarks/driver/LoadWorker.java index 48b9cec45..11fb0422c 100644 --- a/benchmark-java/src/main/java/akka/grpc/benchmarks/driver/LoadWorker.java +++ b/benchmark-java/src/main/java/akka/grpc/benchmarks/driver/LoadWorker.java @@ -26,7 +26,6 @@ import akka.grpc.benchmarks.proto.Control.ServerArgs.ArgtypeCase; import akka.grpc.benchmarks.proto.WorkerService; import akka.grpc.benchmarks.proto.WorkerServiceHandlerFactory; -import akka.http.javadsl.ConnectWithHttps; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.stream.ActorMaterializer; @@ -56,7 +55,7 @@ public class LoadWorker { private final int driverPort; private final int serverPort; - LoadWorker(ActorSystem system, int driverPort, int serverPort) throws Exception { + LoadWorker(ActorSystem system, int driverPort, int serverPort) { this.system = system; this.driverPort = driverPort; this.serverPort = serverPort; @@ -67,11 +66,10 @@ public CompletionStage start() throws Exception { WorkerServiceImpl impl = new WorkerServiceImpl(mat); - CompletionStage bound = Http.get(system).bindAndHandleAsync( - WorkerServiceHandlerFactory.create(impl, system), - ConnectWithHttps.toHostHttps("0.0.0.0", driverPort).withCustomHttpsContext(Utils.serverHttpContext()), - mat); - + CompletionStage bound = Http.get(system) + .newServerAt("127.0.0.1", driverPort) + .enableHttps(Utils.serverHttpContext())) + .bind(WorkerServiceHandlerFactory.create(impl, system)); bound.thenAccept(binding -> { System.out.println("gRPC server bound to: " + binding.localAddress()); @@ -108,9 +106,9 @@ public static void main(String[] args) throws Exception { } String value = parts[1]; if ("server_port".equals(key)) { - serverPort = Integer.valueOf(value); + serverPort = Integer.parseInt(value); } else if ("driver_port".equals(key)) { - driverPort = Integer.valueOf(value); + driverPort = Integer.parseInt(value); } else { System.err.println("Unknown argument: " + key); usage = true; diff --git a/benchmark-java/src/main/java/akka/grpc/benchmarks/qps/AsyncServer.java b/benchmark-java/src/main/java/akka/grpc/benchmarks/qps/AsyncServer.java index c7d107aed..a1a0a8deb 100644 --- a/benchmark-java/src/main/java/akka/grpc/benchmarks/qps/AsyncServer.java +++ b/benchmark-java/src/main/java/akka/grpc/benchmarks/qps/AsyncServer.java @@ -23,9 +23,8 @@ import akka.grpc.benchmarks.proto.BenchmarkService; import akka.grpc.benchmarks.proto.BenchmarkServiceHandlerFactory; import akka.grpc.benchmarks.proto.Messages; -import akka.http.javadsl.ConnectHttp; -import akka.http.javadsl.ConnectWithHttps; import akka.http.javadsl.Http; +import akka.http.javadsl.ServerBuilder; import akka.stream.ActorMaterializer; import akka.stream.KillSwitches; import akka.stream.Materializer; @@ -77,22 +76,21 @@ public void run(InetSocketAddress address, boolean useTls) throws Exception { benchmarkService = new BenchmarkServiceImpl(mat); + final ServerBuilder serverBuilder = Http + .get(system) + .newServerAt(address.getHostName(), address.getPort()); + if (useTls) { - Http.get(system).bindAndHandleAsync( - BenchmarkServiceHandlerFactory.create(benchmarkService, system), - ConnectWithHttps.toHostHttps(address.getHostName(), address.getPort()).withCustomHttpsContext(Utils.serverHttpContext()), - mat) + serverBuilder + .enableHttps(Utils.serverHttpContext()) + .bind(BenchmarkServiceHandlerFactory.create(benchmarkService, system)) .thenAccept(binding -> { System.out.println("gRPC server bound to: https://" + address); }); } else { - Http.get(system).bindAndHandleAsync( - BenchmarkServiceHandlerFactory.create(benchmarkService, system), - ConnectHttp.toHost(address.getHostName(), address.getPort()), - mat - ).thenAccept(binding -> { - System.out.println("gRPC server bound to: http://" + address); - }); + serverBuilder + .bind(BenchmarkServiceHandlerFactory.create(benchmarkService, system)) + .thenAccept(binding -> System.out.println("gRPC server bound to: http://" + address)); } } diff --git a/codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt b/codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt index ab7b1b198..e3ec94bd6 100644 --- a/codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt +++ b/codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt @@ -11,7 +11,7 @@ import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import akka.japi.Function; +import akka.japi.function.Function; import akka.actor.ActorSystem; import akka.actor.ClassicActorSystemProvider; import akka.annotation.ApiMayChange; diff --git a/interop-tests/src/main/java/akka/grpc/interop/AkkaGrpcServerJava.java b/interop-tests/src/main/java/akka/grpc/interop/AkkaGrpcServerJava.java index 51d42c87d..b236ffc01 100644 --- a/interop-tests/src/main/java/akka/grpc/interop/AkkaGrpcServerJava.java +++ b/interop-tests/src/main/java/akka/grpc/interop/AkkaGrpcServerJava.java @@ -24,7 +24,7 @@ import akka.http.javadsl.model.*; import akka.http.javadsl.*; import akka.http.javadsl.settings.ServerSettings; -import akka.japi.Function; +import akka.japi.function.Function; import akka.stream.*; import com.typesafe.config.ConfigFactory; import io.grpc.internal.testing.TestUtils; diff --git a/plugin-tester-java/src/main/java/example/myapp/CombinedServer.java b/plugin-tester-java/src/main/java/example/myapp/CombinedServer.java index f84a9f48b..5b2c82437 100644 --- a/plugin-tester-java/src/main/java/example/myapp/CombinedServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/CombinedServer.java @@ -5,7 +5,7 @@ package example.myapp; import akka.actor.ActorSystem; -import akka.http.javadsl.*; +import akka.http.javadsl.Http; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import com.typesafe.config.Config; @@ -18,7 +18,7 @@ import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; -import akka.japi.Function; +import akka.japi.function.Function; //#import @@ -33,7 +33,7 @@ import example.myapp.echo.grpc.*; class CombinedServer { - public static void main(String[] args) throws Exception { + public static void main(String[] args) { // important to enable HTTP/2 in ActorSystem's config Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()); @@ -45,13 +45,13 @@ public static void main(String[] args) throws Exception { GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), sys); Function> echoService = EchoServiceHandlerFactory.create(new EchoServiceImpl(), sys); + @SuppressWarnings("unchecked") Function> serviceHandlers = - ServiceHandler.concatOrNotFound(greeterService, echoService); + ServiceHandler.handler(greeterService, echoService); - Http.get(sys).bindAndHandleAsync( - serviceHandlers, - ConnectHttp.toHost("127.0.0.1", 8090), - mat) + Http.get(sys) + .newServerAt("127.0.0.1", 8090) + .bind(serviceHandlers) //#concatOrNotFound .thenAccept(binding -> { System.out.println("gRPC server bound to: " + binding.localAddress()); @@ -61,10 +61,9 @@ public static void main(String[] args) throws Exception { Function> grpcWebServiceHandlers = WebHandler.grpcWebHandler(Arrays.asList(greeterService, echoService), sys, mat); - Http.get(sys).bindAndHandleAsync( - grpcWebServiceHandlers, - ConnectHttp.toHost("127.0.0.1", 8091), - mat) + Http.get(sys) + .newServerAt("127.0.0.1", 8090) + .bind(grpcWebServiceHandlers) //#grpc-web .thenAccept(binding -> { System.out.println("gRPC-Web server bound to: " + binding.localAddress()); diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java index f49538204..0810bf030 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/AuthenticatedGreeterServer.java @@ -11,14 +11,12 @@ import com.typesafe.config.ConfigFactory; import akka.actor.ActorSystem; -import akka.grpc.javadsl.RouteUtils; -import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.server.Route; -import akka.japi.Function; +import akka.japi.function.Function; import akka.stream.ActorMaterializer; import akka.stream.Materializer; @@ -61,7 +59,7 @@ public static CompletionStage run(ActorSystem sys) throws Excepti Function> handler = GreeterServiceHandlerFactory.create(impl, sys); // As a Route - Route handlerRoute = RouteUtils.fromFunction(handler, sys); + Route handlerRoute = handle(handler); //#grpc-route //#grpc-protected @@ -82,10 +80,9 @@ public static CompletionStage run(ActorSystem sys) throws Excepti protectedHandler ); - return Http.get(sys).bindAndHandleAsync( - RouteUtils.toFunction(finalRoute, sys), - ConnectHttp.toHost("127.0.0.1", 8090), - mat); + return Http.get(sys) + .newServerAt("127.0.0.1", 8090) + .bind(finalRoute); //#combined } } diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java index f89bc5a00..ff8c981e1 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/GreeterServer.java @@ -38,10 +38,10 @@ public static CompletionStage run(ActorSystem sys) throws Excepti // Instantiate implementation GreeterService impl = new GreeterServiceImpl(mat); - return Http.get(sys).bindAndHandleAsync( - GreeterServiceHandlerFactory.create(impl, sys), - ConnectHttp.toHost("127.0.0.1", 8090), - mat); + return Http + .get(sys) + .newServerAt("127.0.0.1", 8090) + .bind(GreeterServiceHandlerFactory.create(impl, sys)); } } //#full-server diff --git a/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java b/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java index 3008dc4f8..53a52686f 100644 --- a/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java +++ b/plugin-tester-java/src/main/java/example/myapp/helloworld/PowerGreeterServer.java @@ -40,10 +40,10 @@ public static CompletionStage run(ActorSystem sys) throws Excepti // Instantiate implementation GreeterServicePowerApi impl = new GreeterServicePowerApiImpl(mat); - return Http.get(sys).bindAndHandleAsync( - GreeterServicePowerApiHandlerFactory.create(impl, sys), - ConnectHttp.toHost("127.0.0.1", 8091), - mat); + return Http + .get(sys) + .newServerAt("127.0.0.1", 8091) + .bind(GreeterServicePowerApiHandlerFactory.create(impl,sys)); } } //#full-server diff --git a/runtime/src/main/scala/akka/grpc/javadsl/GrpcMarshalling.scala b/runtime/src/main/scala/akka/grpc/javadsl/GrpcMarshalling.scala index 32e0c96b0..7c717471b 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/GrpcMarshalling.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/GrpcMarshalling.scala @@ -4,7 +4,7 @@ package akka.grpc.javadsl -import java.util.concurrent.{ CompletableFuture, CompletionStage } +import java.util.concurrent.{CompletableFuture, CompletionStage} import java.util.Optional import akka.NotUsed @@ -12,13 +12,13 @@ import akka.actor.ActorSystem import akka.actor.ClassicActorSystemProvider import akka.grpc._ import akka.grpc.internal._ -import akka.grpc.GrpcProtocol.{ GrpcProtocolReader, GrpcProtocolWriter } -import akka.http.javadsl.model.{ HttpRequest, HttpResponse } -import akka.japi.Function +import akka.grpc.GrpcProtocol.{GrpcProtocolReader, GrpcProtocolWriter} +import akka.http.javadsl.model.{HttpRequest, HttpResponse} +import akka.japi.{Function => JFunction} +import akka.japi.function.Function import akka.stream.Materializer import akka.stream.javadsl.Source import akka.util.ByteString - import com.github.ghik.silencer.silent object GrpcMarshalling { @@ -62,18 +62,24 @@ object GrpcMarshalling { m: ProtobufSerializer[T], writer: GrpcProtocolWriter, system: ClassicActorSystemProvider, - eHandler: Function[ActorSystem, Function[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper) + eHandler: JFunction[ActorSystem, JFunction[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper) : HttpResponse = marshalStream(Source.single(e), m, writer, system, eHandler) + def marshalStream[T]( e: Source[T, NotUsed], m: ProtobufSerializer[T], writer: GrpcProtocolWriter, system: ClassicActorSystemProvider, - eHandler: Function[ActorSystem, Function[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper) - : HttpResponse = - GrpcResponseHelpers(e.asScala, scalaAnonymousPartialFunction(eHandler))(m, writer, system) + eHandler: JFunction[ActorSystem, JFunction[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper) + : HttpResponse = { + val errorHandler = new Function[ActorSystem, Function[Throwable, Trailers]] { + override def apply(param: ActorSystem): Function[Throwable, Trailers] = + (throwable: Throwable) => eHandler(param).apply(throwable) + } + GrpcResponseHelpers(e.asScala, scalaAnonymousPartialFunction(errorHandler))(m, writer, system) + } private def failure[R](error: Throwable): CompletableFuture[R] = { val future: CompletableFuture[R] = new CompletableFuture() diff --git a/runtime/src/main/scala/akka/grpc/javadsl/RouteUtils.scala b/runtime/src/main/scala/akka/grpc/javadsl/RouteUtils.scala deleted file mode 100644 index 824e3490c..000000000 --- a/runtime/src/main/scala/akka/grpc/javadsl/RouteUtils.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2020 Lightbend Inc. - */ - -package akka.grpc.javadsl - -import scala.concurrent.ExecutionContext -import java.util.concurrent.CompletionStage - -import akka.actor.ClassicActorSystemProvider -import akka.annotation.ApiMayChange -import akka.http.scaladsl -import akka.http.javadsl.model.{ HttpRequest, HttpResponse } -import akka.http.javadsl.server.Route -import akka.http.javadsl.server.directives.RouteAdapter -import akka.http.scaladsl.server.RouteResult -import akka.japi.Function - -import scala.compat.java8.FutureConverters - -/** - * To be moved to akka-http - */ -@ApiMayChange -object RouteUtils { - - /** To be introduced as a directive in Akka HTTP 10.2.0 (but possibly for akka.japi.function.Function instead) */ - def fromFunction( - handler: Function[HttpRequest, CompletionStage[HttpResponse]], - system: ClassicActorSystemProvider): Route = { - import scala.compat.java8.FutureConverters - implicit val ec: ExecutionContext = system.classicSystem.dispatcher - RouteAdapter { ctx => - FutureConverters - .toScala(handler(ctx.request)) - .map(response => RouteResult.Complete(response.asInstanceOf[scaladsl.model.HttpResponse])) - } - } - - /** To be introduced as a static method on Route in Akka HTTP 10.2.0 */ - def toFunction( - route: Route, - system: ClassicActorSystemProvider): Function[HttpRequest, CompletionStage[HttpResponse]] = { - implicit val sys = system - implicit val ec: ExecutionContext = system.classicSystem.dispatcher - val handler = scaladsl.server.Route.toFunction(route.asScala) - - (request: HttpRequest) => { - import FutureConverters._ - handler(request.asInstanceOf[scaladsl.model.HttpRequest]).map(_.asInstanceOf[HttpResponse]).toJava - } - } -} diff --git a/runtime/src/main/scala/akka/grpc/javadsl/ServerReflection.scala b/runtime/src/main/scala/akka/grpc/javadsl/ServerReflection.scala index 4a5a0e6bd..b52d84d84 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/ServerReflection.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/ServerReflection.scala @@ -20,7 +20,7 @@ object ServerReflection { @ApiMayChange(issue = "https://github.com/akka/akka-grpc/issues/850") def create( objects: Collection[ServiceDescription], - sys: ClassicActorSystemProvider): akka.japi.Function[HttpRequest, CompletionStage[HttpResponse]] = { + sys: ClassicActorSystemProvider): akka.japi.function.Function[HttpRequest, CompletionStage[HttpResponse]] = { import scala.collection.JavaConverters._ val delegate = ServerReflectionHandler.apply( ServerReflectionImpl(objects.asScala.map(_.descriptor).toSeq, objects.asScala.map(_.name).toList))(sys) diff --git a/runtime/src/main/scala/akka/grpc/javadsl/ServiceHandler.scala b/runtime/src/main/scala/akka/grpc/javadsl/ServiceHandler.scala index 6a71549a5..1afdb5d10 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/ServiceHandler.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/ServiceHandler.scala @@ -4,14 +4,15 @@ package akka.grpc.javadsl -import java.util.concurrent.{ CompletableFuture, CompletionStage } +import java.util.concurrent.{CompletableFuture, CompletionStage} import akka.annotation.ApiMayChange import akka.annotation.InternalApi -import akka.grpc.scaladsl.{ ServiceHandler => sServiceHandler } -import akka.http.javadsl.model.{ HttpRequest, HttpResponse, StatusCodes } -// using japi because bindAndHandleAsync expects that -import akka.japi.{ Function => JFunction } +import akka.grpc.scaladsl.{ServiceHandler => sServiceHandler} +import akka.http.javadsl.model.{HttpRequest, HttpResponse, StatusCodes} + +// using japi because bind expects that. +import akka.japi.function.{ Function => JFunction } import scala.annotation.varargs diff --git a/runtime/src/main/scala/akka/grpc/javadsl/WebHandler.scala b/runtime/src/main/scala/akka/grpc/javadsl/WebHandler.scala index 5244011ce..5aa2e0e8d 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/WebHandler.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/WebHandler.scala @@ -18,7 +18,7 @@ import akka.http.javadsl.server.directives.RouteAdapter import akka.http.scaladsl.marshalling.{ ToResponseMarshaller, Marshaller => sMarshaller } import akka.grpc.scaladsl import akka.http.scaladsl.server.directives.MarshallingDirectives -import akka.japi.{ Function => JFunction } +import akka.japi.function.{ Function => JFunction } import akka.stream.Materializer import akka.stream.javadsl.{ Keep, Sink, Source } import akka.util.ConstantFun diff --git a/runtime/src/main/scala/akka/grpc/javadsl/package.scala b/runtime/src/main/scala/akka/grpc/javadsl/package.scala index 818afe3c9..13189e094 100644 --- a/runtime/src/main/scala/akka/grpc/javadsl/package.scala +++ b/runtime/src/main/scala/akka/grpc/javadsl/package.scala @@ -25,18 +25,18 @@ package object javadsl { } /** - * Helper for creating Scala partial functions from akka.japi.Function + * Helper for creating Scala partial functions from [[akka.japi.function.Function]] * instances as Scala 2.11 does not know about SAMs. */ - def scalaPartialFunction[A, B](f: akka.japi.Function[A, B]): PartialFunction[A, B] = { + def scalaPartialFunction[A, B](f: akka.japi.function.Function[A, B]): PartialFunction[A, B] = { case a => f(a) } /** - * Helper for creating Scala anonymous partial functions from akka.japi.Function + * Helper for creating Scala anonymous partial functions from [[akka.japi.function.Function]] * instances as Scala 2.11 does not know about SAMs. */ def scalaAnonymousPartialFunction[A, B, C]( - f: akka.japi.Function[A, akka.japi.Function[B, C]]): A => PartialFunction[B, C] = + f: akka.japi.function.Function[A, akka.japi.function.Function[B, C]]): A => PartialFunction[B, C] = a => scalaPartialFunction(f(a)) }