Skip to content

Commit

Permalink
Fixes akka#1101. Using akka.japi.function.Function .
Browse files Browse the repository at this point in the history
  • Loading branch information
dotbg committed Oct 5, 2020
1 parent 8b94eb7 commit 2687173
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 127 deletions.
3 changes: 1 addition & 2 deletions benchmark-java/src/main/java/akka/grpc/benchmarks/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import akka.actor.ActorSystem;
import akka.grpc.GrpcClientSettings;
import akka.grpc.SSLContextUtils;
import akka.grpc.benchmarks.proto.Control;
import akka.grpc.benchmarks.proto.Messages;
import akka.grpc.benchmarks.proto.Messages.Payload;
import akka.grpc.benchmarks.proto.Messages.SimpleRequest;
Expand Down Expand Up @@ -183,7 +182,7 @@ public static HttpsConnectionContext serverHttpContext() throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());

return ConnectionContext.https(context);
return ConnectionContext.httpsServer(context);
}

public static GrpcClientSettings createGrpcClientSettings(InetSocketAddress socketAddress, boolean useTls, ActorSystem system) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import akka.grpc.benchmarks.proto.Control.ServerArgs.ArgtypeCase;
import akka.grpc.benchmarks.proto.WorkerService;
import akka.grpc.benchmarks.proto.WorkerServiceHandlerFactory;
import akka.http.javadsl.ConnectWithHttps;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.stream.ActorMaterializer;
Expand Down Expand Up @@ -56,7 +55,7 @@ public class LoadWorker {
private final int driverPort;
private final int serverPort;

LoadWorker(ActorSystem system, int driverPort, int serverPort) throws Exception {
LoadWorker(ActorSystem system, int driverPort, int serverPort) {
this.system = system;
this.driverPort = driverPort;
this.serverPort = serverPort;
Expand All @@ -67,11 +66,10 @@ public CompletionStage<ServerBinding> start() throws Exception {

WorkerServiceImpl impl = new WorkerServiceImpl(mat);

CompletionStage<ServerBinding> bound = Http.get(system).bindAndHandleAsync(
WorkerServiceHandlerFactory.create(impl, system),
ConnectWithHttps.toHostHttps("0.0.0.0", driverPort).withCustomHttpsContext(Utils.serverHttpContext()),
mat);

CompletionStage<ServerBinding> bound = Http.get(system)
.newServerAt("127.0.0.1", driverPort)
.enableHttps(Utils.serverHttpContext()))
.bind(WorkerServiceHandlerFactory.create(impl, system));

bound.thenAccept(binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
Expand Down Expand Up @@ -108,9 +106,9 @@ public static void main(String[] args) throws Exception {
}
String value = parts[1];
if ("server_port".equals(key)) {
serverPort = Integer.valueOf(value);
serverPort = Integer.parseInt(value);
} else if ("driver_port".equals(key)) {
driverPort = Integer.valueOf(value);
driverPort = Integer.parseInt(value);
} else {
System.err.println("Unknown argument: " + key);
usage = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import akka.grpc.benchmarks.proto.BenchmarkService;
import akka.grpc.benchmarks.proto.BenchmarkServiceHandlerFactory;
import akka.grpc.benchmarks.proto.Messages;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.ConnectWithHttps;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
Expand Down Expand Up @@ -77,22 +76,21 @@ public void run(InetSocketAddress address, boolean useTls) throws Exception {

benchmarkService = new BenchmarkServiceImpl(mat);

final ServerBuilder serverBuilder = Http
.get(system)
.newServerAt(address.getHostName(), address.getPort());

if (useTls) {
Http.get(system).bindAndHandleAsync(
BenchmarkServiceHandlerFactory.create(benchmarkService, system),
ConnectWithHttps.toHostHttps(address.getHostName(), address.getPort()).withCustomHttpsContext(Utils.serverHttpContext()),
mat)
serverBuilder
.enableHttps(Utils.serverHttpContext())
.bind(BenchmarkServiceHandlerFactory.create(benchmarkService, system))
.thenAccept(binding -> {
System.out.println("gRPC server bound to: https://" + address);
});
} else {
Http.get(system).bindAndHandleAsync(
BenchmarkServiceHandlerFactory.create(benchmarkService, system),
ConnectHttp.toHost(address.getHostName(), address.getPort()),
mat
).thenAccept(binding -> {
System.out.println("gRPC server bound to: http://" + address);
});
serverBuilder
.bind(BenchmarkServiceHandlerFactory.create(benchmarkService, system))
.thenAccept(binding -> System.out.println("gRPC server bound to: http://" + address));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import akka.japi.Function;
import akka.japi.function.Function;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.annotation.ApiMayChange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import akka.http.javadsl.model.*;
import akka.http.javadsl.*;
import akka.http.javadsl.settings.ServerSettings;
import akka.japi.Function;
import akka.japi.function.Function;
import akka.stream.*;
import com.typesafe.config.ConfigFactory;
import io.grpc.internal.testing.TestUtils;
Expand Down
23 changes: 11 additions & 12 deletions plugin-tester-java/src/main/java/example/myapp/CombinedServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package example.myapp;

import akka.actor.ActorSystem;
import akka.http.javadsl.*;
import akka.http.javadsl.Http;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import com.typesafe.config.Config;
Expand All @@ -18,7 +18,7 @@
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Function;
import akka.japi.function.Function;

//#import

Expand All @@ -33,7 +33,7 @@
import example.myapp.echo.grpc.*;

class CombinedServer {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
// important to enable HTTP/2 in ActorSystem's config
Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication());
Expand All @@ -45,13 +45,13 @@ public static void main(String[] args) throws Exception {
GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), sys);
Function<HttpRequest, CompletionStage<HttpResponse>> echoService =
EchoServiceHandlerFactory.create(new EchoServiceImpl(), sys);
@SuppressWarnings("unchecked")
Function<HttpRequest, CompletionStage<HttpResponse>> serviceHandlers =
ServiceHandler.concatOrNotFound(greeterService, echoService);
ServiceHandler.handler(greeterService, echoService);

Http.get(sys).bindAndHandleAsync(
serviceHandlers,
ConnectHttp.toHost("127.0.0.1", 8090),
mat)
Http.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(serviceHandlers)
//#concatOrNotFound
.thenAccept(binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
Expand All @@ -61,10 +61,9 @@ public static void main(String[] args) throws Exception {
Function<HttpRequest, CompletionStage<HttpResponse>> grpcWebServiceHandlers =
WebHandler.grpcWebHandler(Arrays.asList(greeterService, echoService), sys, mat);

Http.get(sys).bindAndHandleAsync(
grpcWebServiceHandlers,
ConnectHttp.toHost("127.0.0.1", 8091),
mat)
Http.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(grpcWebServiceHandlers)
//#grpc-web
.thenAccept(binding -> {
System.out.println("gRPC-Web server bound to: " + binding.localAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.grpc.javadsl.RouteUtils;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.Route;
import akka.japi.Function;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

Expand Down Expand Up @@ -61,7 +59,7 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
Function<HttpRequest, CompletionStage<HttpResponse>> handler = GreeterServiceHandlerFactory.create(impl, sys);

// As a Route
Route handlerRoute = RouteUtils.fromFunction(handler, sys);
Route handlerRoute = handle(handler);
//#grpc-route

//#grpc-protected
Expand All @@ -82,10 +80,9 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
protectedHandler
);

return Http.get(sys).bindAndHandleAsync(
RouteUtils.toFunction(finalRoute, sys),
ConnectHttp.toHost("127.0.0.1", 8090),
mat);
return Http.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(finalRoute);
//#combined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
// Instantiate implementation
GreeterService impl = new GreeterServiceImpl(mat);

return Http.get(sys).bindAndHandleAsync(
GreeterServiceHandlerFactory.create(impl, sys),
ConnectHttp.toHost("127.0.0.1", 8090),
mat);
return Http
.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(GreeterServiceHandlerFactory.create(impl, sys));
}
}
//#full-server
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
// Instantiate implementation
GreeterServicePowerApi impl = new GreeterServicePowerApiImpl(mat);

return Http.get(sys).bindAndHandleAsync(
GreeterServicePowerApiHandlerFactory.create(impl, sys),
ConnectHttp.toHost("127.0.0.1", 8091),
mat);
return Http
.get(sys)
.newServerAt("127.0.0.1", 8091)
.bind(GreeterServicePowerApiHandlerFactory.create(impl,sys));
}
}
//#full-server
24 changes: 15 additions & 9 deletions runtime/src/main/scala/akka/grpc/javadsl/GrpcMarshalling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

package akka.grpc.javadsl

import java.util.concurrent.{ CompletableFuture, CompletionStage }
import java.util.concurrent.{CompletableFuture, CompletionStage}
import java.util.Optional

import akka.NotUsed
import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.grpc._
import akka.grpc.internal._
import akka.grpc.GrpcProtocol.{ GrpcProtocolReader, GrpcProtocolWriter }
import akka.http.javadsl.model.{ HttpRequest, HttpResponse }
import akka.japi.Function
import akka.grpc.GrpcProtocol.{GrpcProtocolReader, GrpcProtocolWriter}
import akka.http.javadsl.model.{HttpRequest, HttpResponse}
import akka.japi.{Function => JFunction}
import akka.japi.function.Function
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.util.ByteString

import com.github.ghik.silencer.silent

object GrpcMarshalling {
Expand Down Expand Up @@ -62,18 +62,24 @@ object GrpcMarshalling {
m: ProtobufSerializer[T],
writer: GrpcProtocolWriter,
system: ClassicActorSystemProvider,
eHandler: Function[ActorSystem, Function[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
eHandler: JFunction[ActorSystem, JFunction[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
: HttpResponse =
marshalStream(Source.single(e), m, writer, system, eHandler)


def marshalStream[T](
e: Source[T, NotUsed],
m: ProtobufSerializer[T],
writer: GrpcProtocolWriter,
system: ClassicActorSystemProvider,
eHandler: Function[ActorSystem, Function[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
: HttpResponse =
GrpcResponseHelpers(e.asScala, scalaAnonymousPartialFunction(eHandler))(m, writer, system)
eHandler: JFunction[ActorSystem, JFunction[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
: HttpResponse = {
val errorHandler = new Function[ActorSystem, Function[Throwable, Trailers]] {
override def apply(param: ActorSystem): Function[Throwable, Trailers] =
(throwable: Throwable) => eHandler(param).apply(throwable)
}
GrpcResponseHelpers(e.asScala, scalaAnonymousPartialFunction(errorHandler))(m, writer, system)
}

private def failure[R](error: Throwable): CompletableFuture[R] = {
val future: CompletableFuture[R] = new CompletableFuture()
Expand Down
53 changes: 0 additions & 53 deletions runtime/src/main/scala/akka/grpc/javadsl/RouteUtils.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object ServerReflection {
@ApiMayChange(issue = "https://github.com/akka/akka-grpc/issues/850")
def create(
objects: Collection[ServiceDescription],
sys: ClassicActorSystemProvider): akka.japi.Function[HttpRequest, CompletionStage[HttpResponse]] = {
sys: ClassicActorSystemProvider): akka.japi.function.Function[HttpRequest, CompletionStage[HttpResponse]] = {
import scala.collection.JavaConverters._
val delegate = ServerReflectionHandler.apply(
ServerReflectionImpl(objects.asScala.map(_.descriptor).toSeq, objects.asScala.map(_.name).toList))(sys)
Expand Down
11 changes: 6 additions & 5 deletions runtime/src/main/scala/akka/grpc/javadsl/ServiceHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

package akka.grpc.javadsl

import java.util.concurrent.{ CompletableFuture, CompletionStage }
import java.util.concurrent.{CompletableFuture, CompletionStage}

import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.grpc.scaladsl.{ ServiceHandler => sServiceHandler }
import akka.http.javadsl.model.{ HttpRequest, HttpResponse, StatusCodes }
// using japi because bindAndHandleAsync expects that
import akka.japi.{ Function => JFunction }
import akka.grpc.scaladsl.{ServiceHandler => sServiceHandler}
import akka.http.javadsl.model.{HttpRequest, HttpResponse, StatusCodes}

// using japi because bind expects that.
import akka.japi.function.{ Function => JFunction }

import scala.annotation.varargs

Expand Down
Loading

0 comments on commit 2687173

Please sign in to comment.