Skip to content

Commit

Permalink
Fixes akka#1101. Using akka.japi.function.Function .
Browse files Browse the repository at this point in the history
  • Loading branch information
dotbg committed Oct 5, 2020
1 parent 8b94eb7 commit 7209b35
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 124 deletions.
3 changes: 1 addition & 2 deletions benchmark-java/src/main/java/akka/grpc/benchmarks/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import akka.actor.ActorSystem;
import akka.grpc.GrpcClientSettings;
import akka.grpc.SSLContextUtils;
import akka.grpc.benchmarks.proto.Control;
import akka.grpc.benchmarks.proto.Messages;
import akka.grpc.benchmarks.proto.Messages.Payload;
import akka.grpc.benchmarks.proto.Messages.SimpleRequest;
Expand Down Expand Up @@ -183,7 +182,7 @@ public static HttpsConnectionContext serverHttpContext() throws Exception {
SSLContext context = SSLContext.getInstance("TLS");
context.init(keyManagerFactory.getKeyManagers(), null, new SecureRandom());

return ConnectionContext.https(context);
return ConnectionContext.httpsServer(context);
}

public static GrpcClientSettings createGrpcClientSettings(InetSocketAddress socketAddress, boolean useTls, ActorSystem system) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import akka.grpc.benchmarks.proto.Control.ServerArgs.ArgtypeCase;
import akka.grpc.benchmarks.proto.WorkerService;
import akka.grpc.benchmarks.proto.WorkerServiceHandlerFactory;
import akka.http.javadsl.ConnectWithHttps;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.stream.ActorMaterializer;
Expand Down Expand Up @@ -56,7 +55,7 @@ public class LoadWorker {
private final int driverPort;
private final int serverPort;

LoadWorker(ActorSystem system, int driverPort, int serverPort) throws Exception {
LoadWorker(ActorSystem system, int driverPort, int serverPort) {
this.system = system;
this.driverPort = driverPort;
this.serverPort = serverPort;
Expand All @@ -67,11 +66,10 @@ public CompletionStage<ServerBinding> start() throws Exception {

WorkerServiceImpl impl = new WorkerServiceImpl(mat);

CompletionStage<ServerBinding> bound = Http.get(system).bindAndHandleAsync(
WorkerServiceHandlerFactory.create(impl, system),
ConnectWithHttps.toHostHttps("0.0.0.0", driverPort).withCustomHttpsContext(Utils.serverHttpContext()),
mat);

CompletionStage<ServerBinding> bound = Http.get(system)
.newServerAt("127.0.0.1", driverPort)
.enableHttps(Utils.serverHttpContext()))
.bind(WorkerServiceHandlerFactory.create(impl, system));

bound.thenAccept(binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
Expand Down Expand Up @@ -108,9 +106,9 @@ public static void main(String[] args) throws Exception {
}
String value = parts[1];
if ("server_port".equals(key)) {
serverPort = Integer.valueOf(value);
serverPort = Integer.parseInt(value);
} else if ("driver_port".equals(key)) {
driverPort = Integer.valueOf(value);
driverPort = Integer.parseInt(value);
} else {
System.err.println("Unknown argument: " + key);
usage = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import akka.grpc.benchmarks.proto.BenchmarkService;
import akka.grpc.benchmarks.proto.BenchmarkServiceHandlerFactory;
import akka.grpc.benchmarks.proto.Messages;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.ConnectWithHttps;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
Expand Down Expand Up @@ -77,22 +76,21 @@ public void run(InetSocketAddress address, boolean useTls) throws Exception {

benchmarkService = new BenchmarkServiceImpl(mat);

final ServerBuilder serverBuilder = Http
.get(system)
.newServerAt(address.getHostName(), address.getPort());

if (useTls) {
Http.get(system).bindAndHandleAsync(
BenchmarkServiceHandlerFactory.create(benchmarkService, system),
ConnectWithHttps.toHostHttps(address.getHostName(), address.getPort()).withCustomHttpsContext(Utils.serverHttpContext()),
mat)
serverBuilder
.enableHttps(Utils.serverHttpContext())
.bind(BenchmarkServiceHandlerFactory.create(benchmarkService, system))
.thenAccept(binding -> {
System.out.println("gRPC server bound to: https://" + address);
});
} else {
Http.get(system).bindAndHandleAsync(
BenchmarkServiceHandlerFactory.create(benchmarkService, system),
ConnectHttp.toHost(address.getHostName(), address.getPort()),
mat
).thenAccept(binding -> {
System.out.println("gRPC server bound to: http://" + address);
});
serverBuilder
.bind(BenchmarkServiceHandlerFactory.create(benchmarkService, system))
.thenAccept(binding -> System.out.println("gRPC server bound to: http://" + address));
}
}

Expand Down
10 changes: 5 additions & 5 deletions codegen/src/main/twirl/templates/JavaServer/Handler.scala.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import akka.japi.Function;
import akka.japi.function.Function;
import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.annotation.ApiMayChange;
Expand Down Expand Up @@ -60,7 +60,7 @@ public class @{serviceName}HandlerFactory {
* Use {@@link akka.grpc.javadsl.ServiceHandler#concatOrNotFound} with {@@link @{service.name}HandlerFactory#partial} when combining
* several services.
*/
public static Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>> create(@serviceName implementation, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
public static Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>> create(@serviceName implementation, akka.japi.Function<ActorSystem, akka.japi.Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
return create(implementation, @{service.name}.name, eHandler, system);
}

Expand All @@ -86,7 +86,7 @@ public class @{serviceName}HandlerFactory {
*
* Registering a gRPC service under a custom prefix is not widely supported and strongly discouraged by the specification.
*/
public static Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>> create(@serviceName implementation, String prefix, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
public static Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>> create(@serviceName implementation, String prefix, akka.japi.Function<ActorSystem, akka.japi.Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
return partial(implementation, prefix, SystemMaterializer.get(system).materializer(), eHandler, system);
}

Expand All @@ -106,7 +106,7 @@ public class @{serviceName}HandlerFactory {
*
* Use {@@link akka.grpc.javadsl.ServiceHandler#concatOrNotFound} when combining several services.
*/
public static Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>> partial(@serviceName implementation, String prefix, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
public static Function<akka.http.javadsl.model.HttpRequest, CompletionStage<akka.http.javadsl.model.HttpResponse>> partial(@serviceName implementation, String prefix, Materializer mat, akka.japi.Function<ActorSystem, akka.japi.Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
return (req -> {
Iterator<String> segments = req.getUri().pathSegments().iterator();
if (segments.hasNext() && segments.next().equals(prefix) && segments.hasNext()) {
Expand All @@ -123,7 +123,7 @@ public class @{serviceName}HandlerFactory {
return @{service.name}.name;
}

private static CompletionStage<akka.http.javadsl.model.HttpResponse> handle(akka.http.javadsl.model.HttpRequest request, String method, @serviceName implementation, Materializer mat, Function<ActorSystem, Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
private static CompletionStage<akka.http.javadsl.model.HttpResponse> handle(akka.http.javadsl.model.HttpRequest request, String method, @serviceName implementation, Materializer mat, akka.japi.Function<ActorSystem, akka.japi.Function<Throwable, Trailers>> eHandler, ClassicActorSystemProvider system) {
return GrpcMarshalling.negotiated(request, (reader, writer) -> {
final CompletionStage<akka.http.javadsl.model.HttpResponse> response;
@{if(powerApis) { "Metadata metadata = MetadataBuilder.fromHeaders(request.getHeaders());" } else { "" }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import akka.http.javadsl.model.*;
import akka.http.javadsl.*;
import akka.http.javadsl.settings.ServerSettings;
import akka.japi.Function;
import akka.japi.function.Function;
import akka.stream.*;
import com.typesafe.config.ConfigFactory;
import io.grpc.internal.testing.TestUtils;
Expand Down
23 changes: 11 additions & 12 deletions plugin-tester-java/src/main/java/example/myapp/CombinedServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package example.myapp;

import akka.actor.ActorSystem;
import akka.http.javadsl.*;
import akka.http.javadsl.Http;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import com.typesafe.config.Config;
Expand All @@ -18,7 +18,7 @@
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.Function;
import akka.japi.function.Function;

//#import

Expand All @@ -33,7 +33,7 @@
import example.myapp.echo.grpc.*;

class CombinedServer {
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
// important to enable HTTP/2 in ActorSystem's config
Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on")
.withFallback(ConfigFactory.defaultApplication());
Expand All @@ -45,13 +45,13 @@ public static void main(String[] args) throws Exception {
GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), sys);
Function<HttpRequest, CompletionStage<HttpResponse>> echoService =
EchoServiceHandlerFactory.create(new EchoServiceImpl(), sys);
@SuppressWarnings("unchecked")
Function<HttpRequest, CompletionStage<HttpResponse>> serviceHandlers =
ServiceHandler.concatOrNotFound(greeterService, echoService);
ServiceHandler.concat(greeterService, echoService);

Http.get(sys).bindAndHandleAsync(
serviceHandlers,
ConnectHttp.toHost("127.0.0.1", 8090),
mat)
Http.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(serviceHandlers)
//#concatOrNotFound
.thenAccept(binding -> {
System.out.println("gRPC server bound to: " + binding.localAddress());
Expand All @@ -61,10 +61,9 @@ public static void main(String[] args) throws Exception {
Function<HttpRequest, CompletionStage<HttpResponse>> grpcWebServiceHandlers =
WebHandler.grpcWebHandler(Arrays.asList(greeterService, echoService), sys, mat);

Http.get(sys).bindAndHandleAsync(
grpcWebServiceHandlers,
ConnectHttp.toHost("127.0.0.1", 8091),
mat)
Http.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(grpcWebServiceHandlers)
//#grpc-web
.thenAccept(binding -> {
System.out.println("gRPC-Web server bound to: " + binding.localAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.grpc.javadsl.RouteUtils;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.Route;
import akka.japi.Function;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;

Expand Down Expand Up @@ -61,7 +59,7 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
Function<HttpRequest, CompletionStage<HttpResponse>> handler = GreeterServiceHandlerFactory.create(impl, sys);

// As a Route
Route handlerRoute = RouteUtils.fromFunction(handler, sys);
Route handlerRoute = handle(handler);
//#grpc-route

//#grpc-protected
Expand All @@ -82,10 +80,9 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
protectedHandler
);

return Http.get(sys).bindAndHandleAsync(
RouteUtils.toFunction(finalRoute, sys),
ConnectHttp.toHost("127.0.0.1", 8090),
mat);
return Http.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(finalRoute);
//#combined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
// Instantiate implementation
GreeterService impl = new GreeterServiceImpl(mat);

return Http.get(sys).bindAndHandleAsync(
GreeterServiceHandlerFactory.create(impl, sys),
ConnectHttp.toHost("127.0.0.1", 8090),
mat);
return Http
.get(sys)
.newServerAt("127.0.0.1", 8090)
.bind(GreeterServiceHandlerFactory.create(impl, sys));
}
}
//#full-server
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Excepti
// Instantiate implementation
GreeterServicePowerApi impl = new GreeterServicePowerApiImpl(mat);

return Http.get(sys).bindAndHandleAsync(
GreeterServicePowerApiHandlerFactory.create(impl, sys),
ConnectHttp.toHost("127.0.0.1", 8091),
mat);
return Http
.get(sys)
.newServerAt("127.0.0.1", 8091)
.bind(GreeterServicePowerApiHandlerFactory.create(impl,sys));
}
}
//#full-server
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import akka.grpc._
import akka.grpc.internal._
import akka.grpc.GrpcProtocol.{ GrpcProtocolReader, GrpcProtocolWriter }
import akka.http.javadsl.model.{ HttpRequest, HttpResponse }
import akka.japi.Function
import akka.japi.{ Function => JFunction }
import akka.stream.Materializer
import akka.stream.javadsl.Source
import akka.util.ByteString

import com.github.ghik.silencer.silent

object GrpcMarshalling {
Expand Down Expand Up @@ -62,7 +61,7 @@ object GrpcMarshalling {
m: ProtobufSerializer[T],
writer: GrpcProtocolWriter,
system: ClassicActorSystemProvider,
eHandler: Function[ActorSystem, Function[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
eHandler: JFunction[ActorSystem, JFunction[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
: HttpResponse =
marshalStream(Source.single(e), m, writer, system, eHandler)

Expand All @@ -71,7 +70,7 @@ object GrpcMarshalling {
m: ProtobufSerializer[T],
writer: GrpcProtocolWriter,
system: ClassicActorSystemProvider,
eHandler: Function[ActorSystem, Function[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
eHandler: JFunction[ActorSystem, JFunction[Throwable, Trailers]] = GrpcExceptionHandler.defaultMapper)
: HttpResponse =
GrpcResponseHelpers(e.asScala, scalaAnonymousPartialFunction(eHandler))(m, writer, system)

Expand Down
53 changes: 0 additions & 53 deletions runtime/src/main/scala/akka/grpc/javadsl/RouteUtils.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object ServerReflection {
@ApiMayChange(issue = "https://github.com/akka/akka-grpc/issues/850")
def create(
objects: Collection[ServiceDescription],
sys: ClassicActorSystemProvider): akka.japi.Function[HttpRequest, CompletionStage[HttpResponse]] = {
sys: ClassicActorSystemProvider): akka.japi.function.Function[HttpRequest, CompletionStage[HttpResponse]] = {
import scala.collection.JavaConverters._
val delegate = ServerReflectionHandler.apply(
ServerReflectionImpl(objects.asScala.map(_.descriptor).toSeq, objects.asScala.map(_.name).toList))(sys)
Expand Down
Loading

0 comments on commit 7209b35

Please sign in to comment.