From d5de72a41c07b9e8fb0869f7485da69968de25bf Mon Sep 17 00:00:00 2001 From: Daniel Fiala Date: Wed, 10 Apr 2024 23:01:52 +0200 Subject: [PATCH] feat(grpc): added grpc transcoding --- extensions/grpc-common/runtime/pom.xml | 4 + extensions/grpc/api/pom.xml | 7 +- .../java/io/quarkus/grpc/GrpcTranscoding.java | 11 + .../grpc/GrpcTranscodingDescriptor.java | 21 ++ .../grpc/GrpcTranscodingMarshaller.java | 67 ++++ .../quarkus/grpc/GrpcTranscodingMethod.java | 18 ++ .../quarkus/grpc/deployment/GrpcCodeGen.java | 35 +- .../grpc/deployment/GrpcClientBuildItem.java | 2 - .../quarkus/grpc/deployment/GrpcDotNames.java | 5 + .../deployment/GrpcTranscodingBuildItem.java | 27 ++ .../deployment/GrpcTranscodingProcessor.java | 127 ++++++++ .../GrpcTranscodingServerBuildItem.java | 17 + .../resources/dev-ui/qwc-grpc-services.js | 73 +++-- extensions/grpc/protoc/pom.xml | 4 + .../protoc/plugin/MutinyGrpcGenerator.java | 79 ++++- .../main/resources/MutinyMarshalling.mustache | 44 +++ extensions/grpc/runtime/pom.xml | 6 +- .../grpc/runtime/GrpcServerRecorder.java | 7 +- .../runtime/config/GrpcTranscodingConfig.java | 20 ++ .../runtime/supports/SSLConfigHelper.java | 2 +- .../transcoding/GrpcTranscodingBridge.java | 213 +++++++++++++ .../transcoding/GrpcTranscodingContainer.java | 18 ++ .../transcoding/GrpcTranscodingHttpUtils.java | 84 +++++ .../GrpcTranscodingMessageDecoder.java | 27 ++ .../GrpcTranscodingMessageEncoder.java | 51 +++ .../GrpcTranscodingMessageWriter.java | 79 +++++ .../transcoding/GrpcTranscodingMetadata.java | 51 +++ .../transcoding/GrpcTranscodingMethod.java | 29 ++ .../GrpcTranscodingReadStreamAdapter.java | 57 ++++ .../transcoding/GrpcTranscodingRecorder.java | 183 +++++++++++ .../transcoding/GrpcTranscodingRequest.java | 301 ++++++++++++++++++ .../transcoding/GrpcTranscodingResponse.java | 248 +++++++++++++++ .../transcoding/GrpcTranscodingServer.java | 142 +++++++++ .../GrpcTranscodingWriteStreamAdapter.java | 55 ++++ integration-tests/grpc-transcoding/pom.xml | 114 +++++++ .../examples/hello/HelloWorldNewEndpoint.java | 32 ++ .../examples/hello/HelloWorldNewService.java | 48 +++ .../src/main/proto/helloworld.proto | 111 +++++++ .../src/main/resources/application.properties | 20 ++ .../hello/HelloWorldNewEndpointTest.java | 105 ++++++ .../hello/HelloWorldNewEndpointTestBase.java | 5 + integration-tests/pom.xml | 1 + 42 files changed, 2491 insertions(+), 59 deletions(-) create mode 100644 extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscoding.java create mode 100644 extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingDescriptor.java create mode 100644 extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMarshaller.java create mode 100644 extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMethod.java create mode 100644 extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingBuildItem.java create mode 100644 extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingProcessor.java create mode 100644 extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingServerBuildItem.java create mode 100644 extensions/grpc/protoc/src/main/resources/MutinyMarshalling.mustache create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcTranscodingConfig.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingBridge.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingContainer.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingHttpUtils.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageDecoder.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageEncoder.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageWriter.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMetadata.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMethod.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingReadStreamAdapter.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRecorder.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRequest.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingResponse.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingServer.java create mode 100644 extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingWriteStreamAdapter.java create mode 100644 integration-tests/grpc-transcoding/pom.xml create mode 100644 integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpoint.java create mode 100644 integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java create mode 100644 integration-tests/grpc-transcoding/src/main/proto/helloworld.proto create mode 100644 integration-tests/grpc-transcoding/src/main/resources/application.properties create mode 100644 integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTest.java create mode 100644 integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTestBase.java diff --git a/extensions/grpc-common/runtime/pom.xml b/extensions/grpc-common/runtime/pom.xml index 3cb6eed8d0cad..9631fbd8ddaf2 100644 --- a/extensions/grpc-common/runtime/pom.xml +++ b/extensions/grpc-common/runtime/pom.xml @@ -17,6 +17,10 @@ com.google.code.findbugs jsr305 + + com.google.api.grpc + proto-google-common-protos + io.vertx vertx-grpc diff --git a/extensions/grpc/api/pom.xml b/extensions/grpc/api/pom.xml index f4d8240c8694a..1ab12b251f7d8 100644 --- a/extensions/grpc/api/pom.xml +++ b/extensions/grpc/api/pom.xml @@ -33,10 +33,15 @@ + + com.google.protobuf + protobuf-java-util + 3.24.3 + jakarta.enterprise jakarta.enterprise.cdi-api - \ No newline at end of file + diff --git a/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscoding.java b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscoding.java new file mode 100644 index 0000000000000..7142ad2163c29 --- /dev/null +++ b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscoding.java @@ -0,0 +1,11 @@ +package io.quarkus.grpc; + +import com.google.protobuf.Message; + +public interface GrpcTranscoding { + + String getGrpcServiceName(); + + GrpcTranscodingDescriptor findTranscodingDescriptor( + String methodName); +} diff --git a/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingDescriptor.java b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingDescriptor.java new file mode 100644 index 0000000000000..68b608a65e120 --- /dev/null +++ b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingDescriptor.java @@ -0,0 +1,21 @@ +package io.quarkus.grpc; + +public class GrpcTranscodingDescriptor { + + private final GrpcTranscodingMarshaller requestMarshaller; + private final GrpcTranscodingMarshaller responseMarshaller; + + public GrpcTranscodingDescriptor(GrpcTranscodingMarshaller requestMarshaller, + GrpcTranscodingMarshaller responseMarshaller) { + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + public GrpcTranscodingMarshaller getRequestMarshaller() { + return requestMarshaller; + } + + public GrpcTranscodingMarshaller getResponseMarshaller() { + return responseMarshaller; + } +} diff --git a/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMarshaller.java b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMarshaller.java new file mode 100644 index 0000000000000..4b917bd6e2b43 --- /dev/null +++ b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMarshaller.java @@ -0,0 +1,67 @@ +package io.quarkus.grpc; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import org.jboss.logging.Logger; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; + +import io.grpc.MethodDescriptor; +import io.grpc.Status; + +public class GrpcTranscodingMarshaller implements MethodDescriptor.PrototypeMarshaller { + + private final static Logger log = Logger.getLogger(GrpcTranscodingMarshaller.class); + + private final T defaultInstance; + + public GrpcTranscodingMarshaller(T defaultInstance) { + this.defaultInstance = checkNotNull(defaultInstance, "defaultInstance cannot be null"); + } + + @SuppressWarnings("unchecked") + @Override + public Class getMessageClass() { + return (Class) defaultInstance.getClass(); + } + + @Override + public T getMessagePrototype() { + return defaultInstance; + } + + @Override + public InputStream stream(T value) { + try { + String response = JsonFormat.printer().omittingInsignificantWhitespace().print(value); + return new ByteArrayInputStream(response.getBytes(StandardCharsets.UTF_8)); + } catch (InvalidProtocolBufferException e) { + throw Status.INTERNAL.withDescription("Unable to convert message to JSON").withCause(e).asRuntimeException(); + } + } + + @SuppressWarnings("unchecked") + @Override + public T parse(InputStream stream) { + try (InputStreamReader reader = new InputStreamReader(stream, StandardCharsets.UTF_8)) { + Message.Builder builder = defaultInstance.newBuilderForType(); + JsonFormat.parser().ignoringUnknownFields().merge(reader, builder); + return (T) builder.build(); + } catch (InvalidProtocolBufferException e) { + log.error("Unable to parse JSON to message", e); + throw Status.INTERNAL.withDescription("Unable to parse JSON to message").withCause(e).asRuntimeException(); + } catch (IOException e) { + log.error("An I/O error occurred while parsing the stream", e); + throw Status.INTERNAL.withDescription("An I/O error occurred while parsing the stream").withCause(e) + .asRuntimeException(); + } + } +} diff --git a/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMethod.java b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMethod.java new file mode 100644 index 0000000000000..147427c87cae5 --- /dev/null +++ b/extensions/grpc/api/src/main/java/io/quarkus/grpc/GrpcTranscodingMethod.java @@ -0,0 +1,18 @@ +package io.quarkus.grpc; + +import static java.lang.annotation.ElementType.METHOD; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface GrpcTranscodingMethod { + + String grpcMethodName(); + + String httpMethod(); + + String httpPath(); +} diff --git a/extensions/grpc/codegen/src/main/java/io/quarkus/grpc/deployment/GrpcCodeGen.java b/extensions/grpc/codegen/src/main/java/io/quarkus/grpc/deployment/GrpcCodeGen.java index 0ebf558fa366d..79c71314156ce 100644 --- a/extensions/grpc/codegen/src/main/java/io/quarkus/grpc/deployment/GrpcCodeGen.java +++ b/extensions/grpc/codegen/src/main/java/io/quarkus/grpc/deployment/GrpcCodeGen.java @@ -64,6 +64,8 @@ public class GrpcCodeGen implements CodeGenProvider { private static final String DESCRIPTOR_SET_OUTPUT_DIR = "quarkus.generate-code.grpc.descriptor-set.output-dir"; private static final String DESCRIPTOR_SET_FILENAME = "quarkus.generate-code.grpc.descriptor-set.name"; + private static final String TRANSCODING_ENABLED = "quarkus.grpc.transcoding.enabled"; + private static final String USE_ARG_FILE = "quarkus.generate-code.grpc.use-arg-file"; private Executables executables; @@ -141,7 +143,7 @@ public boolean trigger(CodeGenContext context) throws CodeGenException { } if (!protoFiles.isEmpty()) { - initExecutables(workDir, context.applicationModel()); + initExecutables(context, workDir, context.applicationModel()); Collection protosToImport = gatherDirectoriesWithImports(workDir.resolve("protoc-dependencies"), context); @@ -241,7 +243,6 @@ private void postprocessing(CodeGenContext context, Path outDir) { } new GrpcPostProcessing(context, outDir).postprocess(); - } private Collection gatherProtosFromDependencies(Path workDir, Set protoDirectories, @@ -315,6 +316,7 @@ private Path getDescriptorSetOutputFile(CodeGenContext context) throws IOExcepti } private Collection gatherDirectoriesWithImports(Path workDir, CodeGenContext context) throws CodeGenException { + log.info("Scanning dependencies for proto files to import"); Config properties = context.config(); String scanForImports = properties.getOptionalValue(SCAN_FOR_IMPORTS, String.class) @@ -328,12 +330,16 @@ private Collection gatherDirectoriesWithImports(Path workDir, CodeGenCon List dependenciesToScan = Arrays.stream(scanForImports.split(",")).map(String::trim) .collect(Collectors.toList()); + dependenciesToScan.add("com.google.api:api-common"); + dependenciesToScan.add("com.google.api.grpc:proto-google-common-protos"); + Set importDirectories = new HashSet<>(); ApplicationModel appModel = context.applicationModel(); for (ResolvedDependency artifact : appModel.getRuntimeDependencies()) { if (scanAll || dependenciesToScan.contains( String.format("%s:%s", artifact.getGroupId(), artifact.getArtifactId()))) { + log.infof("Scanning %s:%s for proto files to import", artifact.getGroupId(), artifact.getArtifactId()); extractProtosFromArtifact(workDir, new ArrayList<>(), importDirectories, artifact, List.of(), List.of(), false); } @@ -406,11 +412,12 @@ private String escapeWhitespace(String path) { } } - private void initExecutables(Path workDir, ApplicationModel model) throws CodeGenException { + private void initExecutables(CodeGenContext context, Path workDir, ApplicationModel model) throws CodeGenException { if (executables == null) { Path protocPath; String protocPathProperty = System.getProperty("quarkus.grpc.protoc-path"); String classifier = System.getProperty("quarkus.grpc.protoc-os-classifier", osClassifier()); + if (protocPathProperty == null) { protocPath = findArtifactPath(model, PROTOC_GROUPID, PROTOC, classifier, EXE); } else { @@ -421,7 +428,7 @@ private void initExecutables(Path workDir, ApplicationModel model) throws CodeGe Path protocGrpcPluginExe = prepareExecutable(workDir, model, "io.grpc", "protoc-gen-grpc-java", classifier, "exe"); - Path quarkusGrpcPluginExe = prepareQuarkusGrpcExecutable(model, workDir); + Path quarkusGrpcPluginExe = prepareQuarkusGrpcExecutable(context, model, workDir); executables = new Executables(protocExe, protocGrpcPluginExe, quarkusGrpcPluginExe); } @@ -488,26 +495,28 @@ private String osClassifier() throws CodeGenException { } } - private static Path prepareQuarkusGrpcExecutable(ApplicationModel appModel, Path buildDir) throws CodeGenException { + private static Path prepareQuarkusGrpcExecutable(CodeGenContext context, ApplicationModel appModel, Path buildDir) + throws CodeGenException { Path pluginPath = findArtifactPath(appModel, "io.quarkus", "quarkus-grpc-protoc-plugin", "shaded", "jar"); if (pluginPath == null) { throw new CodeGenException("Failed to find Quarkus gRPC protoc plugin among dependencies"); } if (OS.determineOS() != OS.WINDOWS) { - return writeScript(buildDir, pluginPath, "#!/bin/sh\n", ".sh"); + return writeScript(context, buildDir, pluginPath, "#!/bin/sh\n", ".sh"); } else { - return writeScript(buildDir, pluginPath, "@echo off\r\n", ".cmd"); + return writeScript(context, buildDir, pluginPath, "@echo off\r\n", ".cmd"); } } - private static Path writeScript(Path buildDir, Path pluginPath, String shebang, String suffix) throws CodeGenException { + private static Path writeScript(CodeGenContext context, Path buildDir, Path pluginPath, String shebang, String suffix) + throws CodeGenException { Path script; try { script = Files.createTempFile(buildDir, "quarkus-grpc", suffix); try (BufferedWriter writer = Files.newBufferedWriter(script)) { writer.write(shebang); - writePluginExeCmd(pluginPath, writer); + writePluginExeCmd(context, pluginPath, writer); } } catch (IOException e) { throw new CodeGenException("Failed to create a wrapper script for quarkus-grpc plugin", e); @@ -518,9 +527,13 @@ private static Path writeScript(Path buildDir, Path pluginPath, String shebang, return script; } - private static void writePluginExeCmd(Path pluginPath, BufferedWriter writer) throws IOException { + private static void writePluginExeCmd(CodeGenContext context, Path pluginPath, BufferedWriter writer) throws IOException { + Config properties = context.config(); + boolean enableTranscoding = properties.getOptionalValue(TRANSCODING_ENABLED, Boolean.class).orElse(false); + writer.write("\"" + JavaBinFinder.findBin() + "\" -cp \"" + - pluginPath.toAbsolutePath() + "\" " + quarkusProtocPluginMain); + pluginPath.toAbsolutePath() + "\" " + quarkusProtocPluginMain + + (enableTranscoding ? " --enableTranscoding" : "")); writer.newLine(); } diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java index 34facd278f88d..8d4f329330cd7 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientBuildItem.java @@ -7,8 +7,6 @@ import org.jboss.jandex.DotName; import io.quarkus.builder.item.MultiBuildItem; -import io.quarkus.grpc.deployment.GrpcClientBuildItem.ClientInfo; -import io.quarkus.grpc.deployment.GrpcClientBuildItem.ClientType; public final class GrpcClientBuildItem extends MultiBuildItem { diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java index 08cf2daab02b9..b13653c17b224 100644 --- a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java @@ -16,6 +16,8 @@ import io.quarkus.grpc.GlobalInterceptor; import io.quarkus.grpc.GrpcClient; import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.GrpcTranscoding; +import io.quarkus.grpc.GrpcTranscodingMethod; import io.quarkus.grpc.MutinyBean; import io.quarkus.grpc.MutinyClient; import io.quarkus.grpc.MutinyGrpc; @@ -49,6 +51,9 @@ public class GrpcDotNames { public static final DotName MUTINY_BEAN = DotName.createSimple(MutinyBean.class.getName()); public static final DotName MUTINY_SERVICE = DotName.createSimple(MutinyService.class.getName()); + public static final DotName GRPC_TRANSCODING = DotName.createSimple(GrpcTranscoding.class.getName()); + public static final DotName GRPC_TRANSCODING_METHOD = DotName.createSimple(GrpcTranscodingMethod.class.getName()); + public static final DotName GLOBAL_INTERCEPTOR = DotName.createSimple(GlobalInterceptor.class.getName()); public static final DotName REGISTER_INTERCEPTOR = DotName.createSimple(RegisterInterceptor.class.getName()); public static final DotName REGISTER_INTERCEPTORS = DotName.createSimple(RegisterInterceptors.class.getName()); diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingBuildItem.java new file mode 100644 index 0000000000000..c1013dd4e1fd9 --- /dev/null +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingBuildItem.java @@ -0,0 +1,27 @@ +package io.quarkus.grpc.deployment; + +import java.util.List; + +import org.jboss.jandex.DotName; + +import io.quarkus.builder.item.MultiBuildItem; +import io.quarkus.grpc.transcoding.GrpcTranscodingMethod; + +public final class GrpcTranscodingBuildItem extends MultiBuildItem { + + final DotName marshallingClass; + final List transcodingMethods; + + public GrpcTranscodingBuildItem(DotName marshallingClass, List transcodingMethods) { + this.marshallingClass = marshallingClass; + this.transcodingMethods = transcodingMethods; + } + + public DotName getMarshallingClass() { + return marshallingClass; + } + + public List getTranscodingMethods() { + return transcodingMethods; + } +} diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingProcessor.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingProcessor.java new file mode 100644 index 0000000000000..2779244c0d9de --- /dev/null +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingProcessor.java @@ -0,0 +1,127 @@ +package io.quarkus.grpc.deployment; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.AnnotationTarget; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.jandex.MethodInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem; +import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem; +import io.quarkus.arc.processor.AnnotationsTransformer; +import io.quarkus.arc.processor.BuiltinScope; +import io.quarkus.deployment.Capabilities; +import io.quarkus.deployment.Capability; +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.Consume; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.ShutdownContextBuildItem; +import io.quarkus.grpc.transcoding.GrpcTranscodingContainer; +import io.quarkus.grpc.transcoding.GrpcTranscodingMethod; +import io.quarkus.grpc.transcoding.GrpcTranscodingRecorder; +import io.quarkus.grpc.transcoding.GrpcTranscodingServer; +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.vertx.deployment.VertxBuildItem; +import io.quarkus.vertx.http.deployment.VertxWebRouterBuildItem; + +class GrpcTranscodingProcessor { + + private static final String FEATURE = "quarkus-grpc-transcoding"; + private static final Logger log = LoggerFactory.getLogger(GrpcTranscodingProcessor.class); + + private static final String[] PRODUCES = new String[] { "application/json" }; + private static final String[] CONSUMES = new String[] { "application/json" }; + + @BuildStep + FeatureBuildItem feature() { + return new FeatureBuildItem(FEATURE); + } + + @BuildStep + void processGeneratedBeans(CombinedIndexBuildItem index, + BuildProducer transformers, + BuildProducer marshallings, + BuildProducer delegatingBeans) { + Set generatedBeans = new HashSet<>(); + + for (ClassInfo generatedBean : index.getIndex().getAllKnownImplementors(GrpcDotNames.GRPC_TRANSCODING)) { + DotName generatedBeanName = generatedBean.name(); + generatedBeans.add(generatedBeanName); + + // Extract gRPC transcoding configuration from methods and store the results + List transcodingMethods = collectTranscodingMethods(generatedBean); + marshallings.produce(new GrpcTranscodingBuildItem(generatedBeanName, transcodingMethods)); + } + + delegatingBeans.produce(AdditionalBeanBuildItem.unremovableOf(GrpcTranscodingContainer.class)); + + transformers.produce(new AnnotationsTransformerBuildItem(new AnnotationsTransformer() { + @Override + public boolean appliesTo(AnnotationTarget.Kind kind) { + return kind == AnnotationTarget.Kind.CLASS; + } + + @Override + public void transform(TransformationContext context) { + // Check if the class is a generated gRPC transcoding bean + if (generatedBeans.contains(context.getTarget().asClass().name())) { + context.transform() + .add(BuiltinScope.SINGLETON.getName()) + .done(); + } + } + })); + } + + @BuildStep + @Record(value = ExecutionTime.RUNTIME_INIT) + @Consume(SyntheticBeansRuntimeInitBuildItem.class) + GrpcTranscodingServerBuildItem buildTranscoding(GrpcTranscodingRecorder recorder, + VertxBuildItem vertx, + VertxWebRouterBuildItem routerBuildItem, + List marshallings, + Capabilities capabilities, + ShutdownContextBuildItem shutdown) { + // Build a map to organize the collected gRPC transcoding methods by service name + Map> methods = new HashMap<>(); + for (GrpcTranscodingBuildItem item : marshallings) { + String name = item.getMarshallingClass().toString().replace("Marshalling", ""); + methods.put(name, item.getTranscodingMethods()); + } + + // Create and initialize the gRPC transcoding server + RuntimeValue server = recorder.initializeMarshallingServer(vertx.getVertx(), + routerBuildItem.getHttpRouter(), shutdown, methods, capabilities.isPresent(Capability.SECURITY)); + return new GrpcTranscodingServerBuildItem(server); + } + + private static List collectTranscodingMethods(ClassInfo service) { + List transcodingMethods = new ArrayList<>(); + for (MethodInfo method : service.methods()) { + if (method.hasAnnotation(GrpcDotNames.GRPC_TRANSCODING_METHOD)) { + AnnotationInstance annotation = method.annotation(GrpcDotNames.GRPC_TRANSCODING_METHOD); + + GrpcTranscodingMethod transcodingMethod = new GrpcTranscodingMethod( + annotation.value("grpcMethodName").asString(), annotation.value("httpMethod").asString(), + annotation.value("httpPath").asString()); + transcodingMethods.add(transcodingMethod); + } + } + + return transcodingMethods; + } +} diff --git a/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingServerBuildItem.java b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingServerBuildItem.java new file mode 100644 index 0000000000000..83ef70aa598b2 --- /dev/null +++ b/extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcTranscodingServerBuildItem.java @@ -0,0 +1,17 @@ +package io.quarkus.grpc.deployment; + +import io.quarkus.builder.item.SimpleBuildItem; +import io.quarkus.grpc.transcoding.GrpcTranscodingServer; +import io.quarkus.runtime.RuntimeValue; + +public final class GrpcTranscodingServerBuildItem extends SimpleBuildItem { + private final RuntimeValue transcodingServer; + + public GrpcTranscodingServerBuildItem(RuntimeValue transcodingServer) { + this.transcodingServer = transcodingServer; + } + + public RuntimeValue getTranscodingServer() { + return transcodingServer; + } +} diff --git a/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js b/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js index 7a505a4861051..93d2781d3ecbb 100644 --- a/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js +++ b/extensions/grpc/deployment/src/main/resources/dev-ui/qwc-grpc-services.js @@ -1,9 +1,8 @@ -import { QwcHotReloadElement, html, css} from 'qwc-hot-reload-element'; -import { JsonRpc } from 'jsonrpc'; -import { columnBodyRenderer } from '@vaadin/grid/lit.js'; -import { gridRowDetailsRenderer } from '@vaadin/grid/lit.js'; -import { observeState } from 'lit-element-state'; -import { themeState } from 'theme-state'; +import {css, html, QwcHotReloadElement} from 'qwc-hot-reload-element'; +import {JsonRpc} from 'jsonrpc'; +import {columnBodyRenderer, gridRowDetailsRenderer} from '@vaadin/grid/lit.js'; +import {observeState} from 'lit-element-state'; +import {themeState} from 'theme-state'; import '@quarkus-webcomponents/codeblock'; import '@vaadin/progress-bar'; import '@vaadin/grid'; @@ -18,7 +17,7 @@ import '@vaadin/button'; /** * This component shows the Grpc Services */ -export class QwcGrpcServices extends observeState(QwcHotReloadElement) { +export class QwcGrpcServices extends observeState(QwcHotReloadElement) { jsonRpc = new JsonRpc(this); streamsMap = new Map(); @@ -64,7 +63,7 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { _testerButtons: {state: true} }; - constructor() { + constructor() { super(); this._detailsOpenedItem = []; this._streamsMap = new Map(); @@ -85,13 +84,13 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { } hotReload(){ - this.jsonRpc.getServices().then(jsonRpcResponse => { + this.jsonRpc.getServices().then(jsonRpcResponse => { this._services = jsonRpcResponse.result; this._forceUpdate(); }); } - render() { + render() { if(this._services){ return html` - `; + `; }else{ return html``; } } - + _statusRenderer(service){ if(service.status === "SERVING"){ return html``; @@ -147,36 +146,36 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { return html``; } } - + _nameRenderer(service){ return html`${service.name}`; } - + _serviceClassRenderer(service){ return html`${service.serviceClass}`; } - + _methodsRenderer(service){ return html`
${service.methods.map(method => html`${this._methodRenderer(method)}` )}
`; } - + _methodRenderer(method){ return html`${method.type} ${method.bareMethodName}`; } - + _testRenderer(service){ if(service.hasTestableMethod){ return html``; } } - + _testerRenderer(service){ - + if(service.methods.length > 1 ){ - + return html` ${service.methods.map(method => html`${this._methodTesterTabHeadingRenderer(service.name, method)}` @@ -193,21 +192,21 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { `; } } - + _tabSelectedChanged(service, n){ let method = service.methods[n]; this._testerContent = this._methodTesterRenderer(service, method); this._testerButtons = this._renderCommandButtons(service, method); this._forceUpdate(); } - + _methodTesterTabHeadingRenderer(serviceName,method) { return html` ${method.bareMethodName} ${method.type} `; } - + _methodTesterRenderer(service, method){ return html` @@ -230,7 +229,7 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { `; } - + _renderCommandButtons(service, method){ if(this._streamsMap.size >=0){ if(method.type == 'UNARY'){ @@ -244,7 +243,7 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { } } } - + _keypress(e, service, method){ if(method.type == 'UNARY' || !this._isRunning(service.name, method)){ if ((e.keyCode == 10 || e.keyCode == 13) && e.ctrlKey){ // ctlr-enter @@ -252,26 +251,26 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { } } } - + _isRunning(serviceName, method){ let id = this._id(serviceName, method); return this._streamsMap.has(id); } - + _id(serviceName, method){ - return serviceName + "_" + method.bareMethodName + "_" + method.type; + return serviceName + "_" + method.bareMethodName + "_" + method.type; } - + _clear(serviceName, method){ this._requestTextArea(serviceName, method).clear(); this._responseTextArea(serviceName, method).clear(); } - + _default(serviceName, method){ let pv = JSON.parse(method.prototype); this._requestTextArea(serviceName, method).populatePrettyJson(JSON.stringify(pv)); } - + _test(service, method){ let textArea = this._requestTextArea(service.name, method); let content = textArea.getAttribute('value'); @@ -306,7 +305,7 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { this._forceUpdate(); } } - + _forceUpdate(){ if(this._detailsOpenedItem.length > 0){ let itemZero = this._detailsOpenedItem[0]; @@ -314,21 +313,21 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) { this._detailsOpenedItem.push(itemZero); } } - + _requestTextArea(serviceName, method){ return this.shadowRoot.getElementById(this._requestId(serviceName, method)); } - + _responseTextArea(serviceName, method){ return this.shadowRoot.getElementById(this._responseId(serviceName, method)); } - + _requestId(serviceName, method){ return serviceName + '/' + method.bareMethodName + '_request'; } - + _responseId(serviceName, method){ return serviceName + '/' + method.bareMethodName + '_response'; } } -customElements.define('qwc-grpc-services', QwcGrpcServices); \ No newline at end of file +customElements.define('qwc-grpc-services', QwcGrpcServices); diff --git a/extensions/grpc/protoc/pom.xml b/extensions/grpc/protoc/pom.xml index 15b8037182f6b..259b0751bcd7e 100644 --- a/extensions/grpc/protoc/pom.xml +++ b/extensions/grpc/protoc/pom.xml @@ -17,6 +17,10 @@ com.google.protobuf protobuf-java + + com.google.api.grpc + proto-google-common-protos + com.salesforce.servicelibs jprotoc diff --git a/extensions/grpc/protoc/src/main/java/io/quarkus/grpc/protoc/plugin/MutinyGrpcGenerator.java b/extensions/grpc/protoc/src/main/java/io/quarkus/grpc/protoc/plugin/MutinyGrpcGenerator.java index c987ff6e13a90..888c4e2f16ece 100644 --- a/extensions/grpc/protoc/src/main/java/io/quarkus/grpc/protoc/plugin/MutinyGrpcGenerator.java +++ b/extensions/grpc/protoc/src/main/java/io/quarkus/grpc/protoc/plugin/MutinyGrpcGenerator.java @@ -3,11 +3,15 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; +import com.google.api.AnnotationsProto; +import com.google.api.HttpRule; import com.google.common.base.Strings; import com.google.common.html.HtmlEscapers; import com.google.protobuf.DescriptorProtos; @@ -28,6 +32,17 @@ public class MutinyGrpcGenerator extends Generator { private static final int METHOD_NUMBER_OF_PATHS = 4; public static final String CLASS_PREFIX = "Mutiny"; + private static final Set EXCLUDED_PACKAGES = Set.of("grpc.health.v1", "io.grpc.reflection"); + + private boolean skipTranscoding = false; + + public MutinyGrpcGenerator() { + } + + public MutinyGrpcGenerator(boolean skipTranscoding) { + this.skipTranscoding = skipTranscoding; + } + private String getServiceJavaDocPrefix() { return " "; } @@ -144,6 +159,37 @@ private MethodContext buildMethodContext(DescriptorProtos.MethodDescriptorProto methodContext.isManyInput = methodProto.getClientStreaming(); methodContext.isManyOutput = methodProto.getServerStreaming(); methodContext.methodNumber = methodNumber; + methodContext.httpPath = methodContext.methodName; + methodContext.httpMethod = "GET"; + + if (!skipTranscoding && methodProto.getOptions().hasExtension(AnnotationsProto.http)) { + // Extract the HTTP rule from the method options (if present) + HttpRule httpRule = getHttpRule(methodProto); + if (httpRule == null) { + throw new IllegalArgumentException("HTTP rule is not defined for method " + methodProto.getName()); + } + + // Determine HTTP method and path based on the HTTP rule + if (httpRule.hasGet()) { + methodContext.httpMethod = "GET"; + methodContext.httpPath = httpRule.getGet(); + } else if (httpRule.hasPost()) { + methodContext.httpMethod = "POST"; + methodContext.httpPath = httpRule.getPost(); + } else if (httpRule.hasPut()) { + methodContext.httpMethod = "PUT"; + methodContext.httpPath = httpRule.getPut(); + } else if (httpRule.hasDelete()) { + methodContext.httpMethod = "DELETE"; + methodContext.httpPath = httpRule.getDelete(); + } else if (httpRule.hasPatch()) { + methodContext.httpMethod = "PATCH"; + methodContext.httpPath = httpRule.getPatch(); + } else if (httpRule.hasCustom()) { + methodContext.httpMethod = "CUSTOM"; + methodContext.httpPath = httpRule.getCustom().getPath(); + } + } DescriptorProtos.SourceCodeInfo.Location methodLocation = locations.stream() .filter(location -> location.getPathCount() == METHOD_NUMBER_OF_PATHS && @@ -171,6 +217,13 @@ private MethodContext buildMethodContext(DescriptorProtos.MethodDescriptorProto return methodContext; } + private HttpRule getHttpRule(DescriptorProtos.MethodDescriptorProto methodProto) { + if (methodProto.getOptions().hasExtension(AnnotationsProto.http)) { + return methodProto.getOptions().getExtension(AnnotationsProto.http); + } + return null; + } + static String adaptMethodName(String name) { if (name == null || name.isEmpty()) { return name; @@ -273,7 +326,13 @@ private List generateFiles(List argSet = new HashSet<>(Arrays.asList(args)); + if (argSet.contains("help") || argSet.contains("--help") || argSet.contains("-h")) { + System.out.println("Usage: MutinyGrpcGenerator [enableTranscoding]"); + System.out.println("--enableTranscoding: Enable transcoding support"); + return; + } + + boolean skipTranscoding = !argSet.contains("--enableTranscoding"); + log.info("Transcoding will be " + (skipTranscoding ? "skipped" : "enabled")); + + String[] finalArgs = skipTranscoding ? args : Arrays.copyOfRange(args, 1, args.length); + if (finalArgs.length == 0) { + ProtocPlugin.generate(List.of(new MutinyGrpcGenerator(skipTranscoding)), List.of(AnnotationsProto.http)); } else { - ProtocPlugin.debug(new MutinyGrpcGenerator(), args[0]); + ProtocPlugin.debug(List.of(new MutinyGrpcGenerator()), List.of(AnnotationsProto.http), finalArgs[0]); } } } diff --git a/extensions/grpc/protoc/src/main/resources/MutinyMarshalling.mustache b/extensions/grpc/protoc/src/main/resources/MutinyMarshalling.mustache new file mode 100644 index 0000000000000..df3c2efeddfd2 --- /dev/null +++ b/extensions/grpc/protoc/src/main/resources/MutinyMarshalling.mustache @@ -0,0 +1,44 @@ +{{#packageName}} +package {{packageName}}; +{{/packageName}} + +import jakarta.ws.rs.core.Response; +import io.quarkus.grpc.GrpcService; +import io.quarkus.grpc.GrpcTranscoding; +import io.quarkus.grpc.GrpcTranscodingDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.InvalidProtocolBufferException; + +@jakarta.annotation.Generated( +value = "by {{classPrefix}} Grpc generator", +comments = "Source: {{protoName}}") +public class {{serviceName}}Marshalling implements GrpcTranscoding { + + @Override + public String getGrpcServiceName() { + return "{{packageName}}.{{serviceName}}"; + } + + @SuppressWarnings("unchecked") + @Override + public GrpcTranscodingDescriptor findTranscodingDescriptor(String method) { + switch (method) { + {{#unaryUnaryMethods}} + case "{{methodName}}": + return (GrpcTranscodingDescriptor) {{methodName}}TranscodingDescriptor(); + {{/unaryUnaryMethods}} + default: + throw new IllegalArgumentException("Unknown request method: " + method); + } + } + + {{#unaryUnaryMethods}} + @io.quarkus.grpc.GrpcTranscodingMethod(grpcMethodName = "{{methodName}}", httpMethod = "{{httpMethod}}", httpPath = "{{httpPath}}") + public io.quarkus.grpc.GrpcTranscodingDescriptor<{{inputType}}, {{outputType}}> {{methodName}}TranscodingDescriptor() { + return new io.quarkus.grpc.GrpcTranscodingDescriptor<{{inputType}}, {{outputType}}>( + new io.quarkus.grpc.GrpcTranscodingMarshaller<{{inputType}}>({{inputType}}.getDefaultInstance()), + new io.quarkus.grpc.GrpcTranscodingMarshaller<{{outputType}}>({{outputType}}.getDefaultInstance())); + } + {{/unaryUnaryMethods}} +} diff --git a/extensions/grpc/runtime/pom.xml b/extensions/grpc/runtime/pom.xml index 73393d614ff21..f200dddc867d1 100644 --- a/extensions/grpc/runtime/pom.xml +++ b/extensions/grpc/runtime/pom.xml @@ -13,6 +13,10 @@ Quarkus - gRPC - Runtime Serve and consume gRPC services + + com.google.api.grpc + proto-google-common-protos + jakarta.annotation jakarta.annotation-api @@ -96,7 +100,7 @@ io.smallrye.common smallrye-common-vertx-context - + io.quarkus diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java index 54e83ead17432..a9c93eef87ad7 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/GrpcServerRecorder.java @@ -133,7 +133,8 @@ public void initializeGrpcServer(RuntimeValue vertxSupplier, launchMode); } } else { - buildGrpcServer(vertx, configuration, routerSupplier, shutdown, blockingMethodsPerService, virtualMethodsPerService, + buildGrpcServer(vertx, configuration, routerSupplier, shutdown, + blockingMethodsPerService, virtualMethodsPerService, grpcContainer, launchMode, securityPresent); } } @@ -145,6 +146,7 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, GrpcContainer grpcContainer, LaunchMode launchMode, boolean securityPresent) { GrpcServer server = GrpcServer.server(vertx); + List globalInterceptors = grpcContainer.getSortedGlobalInterceptors(); if (launchMode == LaunchMode.DEVELOPMENT) { @@ -163,7 +165,6 @@ private void buildGrpcServer(Vertx vertx, GrpcServerConfiguration configuration, vertx, grpcContainer, blockingMethodsPerService, virtualMethodsPerService, compressionInterceptor, globalInterceptors, service, launchMode == LaunchMode.DEVELOPMENT); - LOGGER.debugf("Registered gRPC service '%s'", service.definition.getServiceDescriptor().getName()); GrpcServiceBridge bridge = GrpcServiceBridge.bridge(serviceDefinition); bridge.bind(server); definitions.add(service.definition); @@ -398,7 +399,7 @@ public static final class GrpcServiceDefinition { public final BindableService service; public final ServerServiceDefinition definition; - GrpcServiceDefinition(BindableService service, ServerServiceDefinition definition) { + public GrpcServiceDefinition(BindableService service, ServerServiceDefinition definition) { this.service = service; this.definition = definition; } diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcTranscodingConfig.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcTranscodingConfig.java new file mode 100644 index 0000000000000..fa7390b9b7a2f --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/config/GrpcTranscodingConfig.java @@ -0,0 +1,20 @@ +package io.quarkus.grpc.runtime.config; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +/** + * Configuration root for gRPC Transcoding feature in Quarkus. gRPC Transcoding allows you to create + * RESTful JSON APIs that are backed by existing gRPC services. + */ +@ConfigRoot(name = "grpc.transcoding", phase = ConfigPhase.BUILD_TIME) +public class GrpcTranscodingConfig { + + /** + * Flag to enable or disable the gRPC Transcoding feature. + * The default value is `false` (disabled). + */ + @ConfigItem(defaultValue = "false") + public boolean enabled; +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/SSLConfigHelper.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/SSLConfigHelper.java index c8e1f7e6cdbd7..c1fd55c77b16a 100644 --- a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/SSLConfigHelper.java +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/SSLConfigHelper.java @@ -117,4 +117,4 @@ private static void ensureKeyCertOptionsNotSet(TCPSSLOptions options) { private SSLConfigHelper() { // Utility } -} \ No newline at end of file +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingBridge.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingBridge.java new file mode 100644 index 0000000000000..0a17cd4950c93 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingBridge.java @@ -0,0 +1,213 @@ +package io.quarkus.grpc.transcoding; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.List; + +import io.grpc.Attributes; +import io.grpc.Grpc; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerMethodDefinition; +import io.grpc.Status; +import io.vertx.core.net.SocketAddress; +import io.vertx.grpc.common.GrpcError; +import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.common.impl.Utils; +import io.vertx.grpc.server.GrpcServer; +import io.vertx.grpc.server.GrpcServerRequest; +import io.vertx.grpc.server.GrpcServerResponse; +import io.vertx.grpc.server.GrpcServiceBridge; + +/** + * A bridge that binds and unbinds transcoding methods to a {@link GrpcServer}. + * + * @see io.vertx.grpc.server.impl.GrpcServiceBridgeImpl for the default implementation. + */ +public class GrpcTranscodingBridge implements GrpcServiceBridge { + + private final List> methods; + + public GrpcTranscodingBridge(List> methods) { + this.methods = methods; + } + + @Override + public void unbind(GrpcServer server) { + methods.forEach(m -> unbind((GrpcTranscodingServer) server, m)); + } + + private void unbind(GrpcTranscodingServer server, ServerMethodDefinition methodDef) { + server.callHandler(methodDef.getMethodDescriptor(), null); + } + + @Override + public void bind(GrpcServer server) { + methods.forEach(m -> bind((GrpcTranscodingServer) server, m)); + } + + private void bind(GrpcTranscodingServer server, ServerMethodDefinition methodDef) { + server.callHandler(methodDef.getMethodDescriptor(), req -> { + ServerCallHandler callHandler = methodDef.getServerCallHandler(); + MethodDescriptor.Marshaller reqMarshaller = server + .findRequestMarshaller(methodDef.getMethodDescriptor().getFullMethodName()); + MethodDescriptor.Marshaller respMarshaller = server + .findResponseMarshaller(methodDef.getMethodDescriptor().getFullMethodName()); + + GrpcTranscodingBridge.ServerCallImpl call = new GrpcTranscodingBridge.ServerCallImpl<>(req, methodDef, + reqMarshaller, respMarshaller); + + ServerCall.Listener listener = callHandler.startCall(call, Utils.readMetadata(req.headers())); + call.init(listener); + }); + } + + private static class ServerCallImpl extends ServerCall { + + private final GrpcTranscodingRequest req; + private final ServerMethodDefinition methodDef; + + private final MethodDescriptor.Marshaller reqMarshaller; + private final MethodDescriptor.Marshaller respMarshaller; + + private final GrpcTranscodingReadStreamAdapter readAdapter; + private final GrpcTranscodingWriteStreamAdapter writeAdapter; + private ServerCall.Listener listener; + private boolean halfClosed; + private boolean closed; + private int messagesSent; + private final Attributes attributes; + + public ServerCallImpl(GrpcServerRequest req, ServerMethodDefinition methodDef, + MethodDescriptor.Marshaller reqMarshaller, MethodDescriptor.Marshaller respMarshaller) { + this.req = (GrpcTranscodingRequest) req; + this.methodDef = methodDef; + this.reqMarshaller = reqMarshaller; + this.respMarshaller = respMarshaller; + this.readAdapter = new GrpcTranscodingReadStreamAdapter() { + @Override + protected void handleClose() { + halfClosed = true; + listener.onHalfClose(); + } + + @Override + protected void handleMessage(Req msg) { + listener.onMessage(msg); + } + }; + this.writeAdapter = new GrpcTranscodingWriteStreamAdapter() { + @Override + protected void handleReady() { + listener.onReady(); + } + }; + this.attributes = createAttributes(); + } + + void init(ServerCall.Listener listener) { + this.listener = listener; + req.errorHandler(error -> { + if (error == GrpcError.CANCELLED && !closed) { + listener.onCancel(); + } + }); + + readAdapter.init(req, new GrpcTranscodingMessageDecoder<>(reqMarshaller)); + writeAdapter.init(req.response(), new GrpcTranscodingMessageEncoder<>(respMarshaller)); + } + + private Attributes createAttributes() { + Attributes.Builder builder = Attributes.newBuilder(); + SocketAddress remoteAddr = req.connection().remoteAddress(); + if (remoteAddr != null && remoteAddr.isInetSocket()) { + try { + InetAddress address = InetAddress.getByName(remoteAddr.hostAddress()); + builder.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InetSocketAddress(address, remoteAddr.port())); + } catch (UnknownHostException ignored) { + } + } + SocketAddress localAddr = req.connection().localAddress(); + if (localAddr != null && localAddr.isInetSocket()) { + try { + InetAddress address = InetAddress.getByName(localAddr.hostAddress()); + builder.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InetSocketAddress(address, localAddr.port())); + } catch (UnknownHostException ignored) { + } + } + if (req.connection().isSsl()) { + builder.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, req.connection().sslSession()); + } + return builder.build(); + } + + @Override + public boolean isReady() { + return writeAdapter.isReady(); + } + + @Override + public void request(int numMessages) { + readAdapter.request(numMessages); + } + + @Override + public void sendHeaders(Metadata headers) { + Utils.writeMetadata(headers, req.response().headers()); + } + + @Override + public void sendMessage(Resp message) { + messagesSent++; + writeAdapter.write(message); + } + + @Override + public void close(Status status, Metadata trailers) { + if (closed) { + throw new IllegalStateException("Already closed"); + } + closed = true; + GrpcServerResponse response = req.response(); + if (status == Status.OK && methodDef.getMethodDescriptor().getType().serverSendsOneMessage() && messagesSent == 0) { + response.status(GrpcStatus.UNAVAILABLE).end(); + } else { + Utils.writeMetadata(trailers, response.trailers()); + response.status(GrpcStatus.valueOf(status.getCode().value())); + response.statusMessage(status.getDescription()); + response.end(); + } + listener.onComplete(); + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return methodDef.getMethodDescriptor(); + } + + @Override + public void setCompression(String encoding) { + // ???? + super.setCompression(encoding); + } + + @Override + public void setMessageCompression(boolean enabled) { + // ???? + super.setMessageCompression(enabled); + } + + @Override + public Attributes getAttributes() { + return this.attributes; + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingContainer.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingContainer.java new file mode 100644 index 0000000000000..7f8cfec3119ab --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingContainer.java @@ -0,0 +1,18 @@ +package io.quarkus.grpc.transcoding; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +import io.quarkus.grpc.GrpcTranscoding; + +@ApplicationScoped +public class GrpcTranscodingContainer { + + @Inject + Instance services; + + public Instance getServices() { + return services; + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingHttpUtils.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingHttpUtils.java new file mode 100644 index 0000000000000..6499fe808ad22 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingHttpUtils.java @@ -0,0 +1,84 @@ +package io.quarkus.grpc.transcoding; + +import java.util.HashMap; +import java.util.Map; + +/** + * The `GrpcTranscodingHttpUtils` class provides utility functions for path handling + * and parameter extraction during the gRPC message transcoding process. Its key + * functions include: + *

+ * Checking if a request path matches a given gRPC path template. + * Extracting path parameters from both gRPC path templates and concrete HTTP paths. + */ +public class GrpcTranscodingHttpUtils { + + /** + * Determines if a given HTTP request path conforms to a specified gRPC path template. + * + * @param requestPath The actual HTTP request path to be checked. + * @param pathTemplate The gRPC path template defining the expected structure. + * @return `true` if the paths match, `false` otherwise. + */ + public static boolean isPathMatch(String requestPath, String pathTemplate) { + int pathIndex = 0; + int templateIndex = 0; + + while (pathIndex < requestPath.length() && templateIndex < pathTemplate.length()) { + int pathEnd = requestPath.indexOf('/', pathIndex); + int templateEnd = pathTemplate.indexOf('/', templateIndex); + + // Extract the current segment from both paths + String requestPart = pathEnd == -1 ? requestPath.substring(pathIndex) : requestPath.substring(pathIndex, pathEnd); + String templatePart = templateEnd == -1 ? pathTemplate.substring(templateIndex) + : pathTemplate.substring(templateIndex, templateEnd); + + // Check if the template part is a variable segment + if (templatePart.startsWith("{") && templatePart.endsWith("}")) { + if (requestPart.isEmpty()) { + return false; + } + // Skip to the end of the next segment + pathIndex = pathEnd != -1 ? pathEnd + 1 : requestPath.length(); + templateIndex = templateEnd != -1 ? templateEnd + 1 : pathTemplate.length(); + } else { + if (!requestPart.equals(templatePart)) { + return false; + } + + // Skip to the end of the next segment + pathIndex = pathEnd != -1 ? pathEnd + 1 : requestPath.length(); + templateIndex = templateEnd != -1 ? templateEnd + 1 : pathTemplate.length(); + } + } + + // Ensure both paths have been fully consumed + return pathIndex == requestPath.length() && templateIndex == pathTemplate.length(); + } + + /** + * Extracts path parameters from a gRPC path template and an associated HTTP path. + * + * @param pathTemplate The gRPC path template defining the parameter structure. + * @param httpPath The actual HTTP path from which to extract the parameter values. + * @return A `Map` containing the extracted parameter names and their corresponding values. + */ + public static Map extractPathParams(String pathTemplate, String httpPath) { + Map extractedParams = new HashMap<>(); + + String[] pathParts = httpPath.split("/"); + String[] templateParts = pathTemplate.split("/"); + + for (int i = 0; i < pathParts.length; i++) { + String pathPart = pathParts[i]; + String templatePart = templateParts[i]; + + if (templatePart.startsWith("{") && templatePart.endsWith("}")) { + String paramName = templatePart.substring(1, templatePart.length() - 1); + extractedParams.put(paramName, pathPart); + } + } + + return extractedParams; + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageDecoder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageDecoder.java new file mode 100644 index 0000000000000..c61dffb50ff59 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageDecoder.java @@ -0,0 +1,27 @@ +package io.quarkus.grpc.transcoding; + +import java.io.ByteArrayInputStream; + +import io.grpc.MethodDescriptor; +import io.vertx.grpc.common.GrpcMessage; +import io.vertx.grpc.common.GrpcMessageDecoder; + +/* + * A message decoder that uses a {@link MethodDescriptor.Marshaller} to decode the message payload. + * + * @param The type of the message payload. + * @see io.vertx.grpc.common.impl.GrpcMessageDecoderImpl for the original implementation + */ +public class GrpcTranscodingMessageDecoder implements GrpcMessageDecoder { + + private final MethodDescriptor.Marshaller marshaller; + + public GrpcTranscodingMessageDecoder(MethodDescriptor.Marshaller marshaller) { + this.marshaller = marshaller; + } + + @Override + public T decode(GrpcMessage msg) { + return marshaller.parse(new ByteArrayInputStream(msg.payload().getBytes())); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageEncoder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageEncoder.java new file mode 100644 index 0000000000000..4a2e39a03aa41 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageEncoder.java @@ -0,0 +1,51 @@ +package io.quarkus.grpc.transcoding; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import io.grpc.MethodDescriptor; +import io.vertx.core.buffer.Buffer; +import io.vertx.grpc.common.GrpcMessage; +import io.vertx.grpc.common.GrpcMessageEncoder; + +/* + * A message encoder that uses a {@link MethodDescriptor.Marshaller} to encode the message payload. + * + * @param The type of the message payload. + * @see io.vertx.grpc.common.impl.GrpcMessageEncoderImpl for the original implementation + */ +public class GrpcTranscodingMessageEncoder implements GrpcMessageEncoder { + + private final MethodDescriptor.Marshaller marshaller; + + public GrpcTranscodingMessageEncoder(MethodDescriptor.Marshaller marshaller) { + this.marshaller = marshaller; + } + + @Override + public GrpcMessage encode(T msg) { + return new GrpcMessage() { + private Buffer encoded; + + @Override + public String encoding() { + return "identity"; + } + + @Override + public Buffer payload() { + if (encoded == null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + marshaller.stream(msg).transferTo(baos); + } catch (IOException e) { + throw new RuntimeException(e); + } + byte[] bytes = baos.toByteArray(); + encoded = Buffer.buffer(bytes); + } + return encoded; + } + }; + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageWriter.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageWriter.java new file mode 100644 index 0000000000000..192bd6ab28d53 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMessageWriter.java @@ -0,0 +1,79 @@ +package io.quarkus.grpc.transcoding; + +import java.util.HashMap; +import java.util.Map; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.Json; + +/** + * The `GrpcTranscodingMessageWriter` class assists with the manipulation of gRPC + * message payloads during the transcoding process. Its responsibilities include: + *

+ * Merging existing JSON payloads, path parameters, and query parameters into a + * unified map representation. + * Providing the logic for inserting nested parameters within the generated map. + */ +public class GrpcTranscodingMessageWriter { + + private final static String SEPARATOR = "\\."; + + /** + * Merges path parameters, query parameters, and an optional existing JSON payload + * into a single `Map` object. This method provides a centralized way to combine + * parameters during gRPC message transcoding. + * + * @param pathParams A map containing path parameters extracted from the request. + * @param queryParams A map containing query parameters extracted from the request. + * @param existingPayload An optional Vert.x `Buffer` containing an existing JSON payload. + * @return A `Map` representing the merged parameters. + * @throws IllegalArgumentException If the provided `existingPayload` cannot be parsed as valid JSON. + */ + public static Map mergeParameters(Map pathParams, Map queryParams, + Buffer existingPayload) { + Map allParams = new HashMap<>(); + + if (existingPayload != null && existingPayload.getBytes().length > 0) { + try { + String existingPayloadJson = new String(existingPayload.getBytes()); + allParams = new HashMap(Json.decodeValue(existingPayloadJson, Map.class)); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid payload", e); + } + } + + for (Map.Entry entry : pathParams.entrySet()) { + insertNestedParam(allParams, entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : queryParams.entrySet()) { + insertNestedParam(allParams, entry.getKey(), entry.getValue()); + } + + return allParams; + } + + /** + * Inserts a key-value pair into a nested structure within a `Map`. This method supports + * the creation of hierarchical parameter structures during the transcoding process. + * Key components are separated by periods ('.'). + * + * @param paramsMap The `Map` object where the nested parameter will be inserted. + * @param key The parameter key, potentially containing periods for nested structures. + * @param value The parameter value to be inserted. + */ + public static void insertNestedParam(Map paramsMap, String key, String value) { + String[] pathComponents = key.split(SEPARATOR); + + Map currentLevel = paramsMap; + for (int i = 0; i < pathComponents.length - 1; i++) { + String component = pathComponents[i]; + if (!currentLevel.containsKey(component)) { + currentLevel.put(component, new HashMap<>()); + } + currentLevel = (Map) currentLevel.get(component); + } + + currentLevel.put(pathComponents[pathComponents.length - 1], value); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMetadata.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMetadata.java new file mode 100644 index 0000000000000..212f1b4a7fa87 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMetadata.java @@ -0,0 +1,51 @@ +package io.quarkus.grpc.transcoding; + +import com.google.protobuf.Message; + +import io.grpc.MethodDescriptor; +import io.quarkus.grpc.GrpcTranscodingMarshaller; + +/** + * A metadata class that holds the transcoding information for a gRPC method. + * + * @param The type of the request message. + * @param The type of the response message. + */ +public class GrpcTranscodingMetadata { + + private final String httpMethodName; + private final String grpcMethodName; + private final GrpcTranscodingMarshaller requestMarshaller; + private final GrpcTranscodingMarshaller responseMarshaller; + private final MethodDescriptor methodDescriptor; + + public GrpcTranscodingMetadata(String httpMethodName, String grpcMethodName, + GrpcTranscodingMarshaller requestMarshaller, + GrpcTranscodingMarshaller responseMarshaller, MethodDescriptor methodDescriptor) { + this.httpMethodName = httpMethodName; + this.grpcMethodName = grpcMethodName; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + this.methodDescriptor = methodDescriptor; + } + + public String getHttpMethodName() { + return httpMethodName; + } + + public String getGrpcMethodName() { + return grpcMethodName; + } + + public GrpcTranscodingMarshaller getRequestMarshaller() { + return requestMarshaller; + } + + public GrpcTranscodingMarshaller getResponseMarshaller() { + return responseMarshaller; + } + + public MethodDescriptor getMethodDescriptor() { + return methodDescriptor; + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMethod.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMethod.java new file mode 100644 index 0000000000000..3fbaf90a0d11e --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingMethod.java @@ -0,0 +1,29 @@ +package io.quarkus.grpc.transcoding; + +/** + * A metadata class that holds the transcoding information for a gRPC method. + */ +public final class GrpcTranscodingMethod { + + private final String grpcMethodName; + private final String httpMethodName; + private final String uriTemplate; + + public GrpcTranscodingMethod(String grpcMethodName, String httpMethodName, String uriTemplate) { + this.grpcMethodName = grpcMethodName; + this.httpMethodName = httpMethodName; + this.uriTemplate = uriTemplate; + } + + public String getGrpcMethodName() { + return grpcMethodName; + } + + public String getHttpMethodName() { + return httpMethodName; + } + + public String getUriTemplate() { + return uriTemplate; + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingReadStreamAdapter.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingReadStreamAdapter.java new file mode 100644 index 0000000000000..74efd3db76905 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingReadStreamAdapter.java @@ -0,0 +1,57 @@ +package io.quarkus.grpc.transcoding; + +import io.vertx.grpc.common.GrpcReadStream; + +/** + * Adapter for {@link GrpcReadStream} to handle message and close events. + * + * @param The type of the message payload. + * @see io.vertx.grpc.common.impl.ReadStreamAdapter for the original implementation + */ +public class GrpcTranscodingReadStreamAdapter { + + private GrpcReadStream stream; + private int request = 0; + + /** + * Init the adapter with the stream. + */ + public final void init(GrpcReadStream stream, GrpcTranscodingMessageDecoder decoder) { + stream.messageHandler(msg -> { + handleMessage(decoder.decode(msg)); + }); + stream.endHandler(v -> { + handleClose(); + }); + this.stream = stream; + stream.pause(); + if (request > 0) { + stream.fetch(request); + } + } + + /** + * Override this to handle close event + */ + protected void handleClose() { + + } + + /** + * Override this to handle message event + */ + protected void handleMessage(T msg) { + + } + + /** + * Request {@code num} messages + */ + public final void request(int num) { + if (stream != null) { + stream.fetch(num); + } else { + request += num; + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRecorder.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRecorder.java new file mode 100644 index 0000000000000..0d4265eca8f8b --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRecorder.java @@ -0,0 +1,183 @@ +package io.quarkus.grpc.transcoding; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import jakarta.enterprise.inject.Instance; + +import org.jboss.logging.Logger; + +import com.google.protobuf.Message; + +import io.grpc.BindableService; +import io.grpc.MethodDescriptor; +import io.grpc.ServerMethodDefinition; +import io.grpc.ServerServiceDefinition; +import io.quarkus.arc.Arc; +import io.quarkus.grpc.GrpcTranscoding; +import io.quarkus.grpc.GrpcTranscodingDescriptor; +import io.quarkus.grpc.auth.GrpcSecurityInterceptor; +import io.quarkus.grpc.runtime.GrpcContainer; +import io.quarkus.grpc.runtime.GrpcServerRecorder; +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.ShutdownContext; +import io.quarkus.runtime.annotations.Recorder; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.ext.web.Route; +import io.vertx.ext.web.Router; + +@Recorder +public class GrpcTranscodingRecorder { + + private static final Logger LOGGER = Logger.getLogger(GrpcTranscodingRecorder.class.getName()); + + public RuntimeValue initializeMarshallingServer(RuntimeValue vertxSupplier, + RuntimeValue routerSupplier, + ShutdownContext shutdown, Map> httpMethods, + boolean securityPresent) { + GrpcTranscodingServer transcodingServer = new GrpcTranscodingServer(vertxSupplier.getValue()); + + GrpcContainer grpcContainer = Arc.container().instance(GrpcContainer.class).get(); + GrpcTranscodingContainer container = Arc.container().instance(GrpcTranscodingContainer.class).get(); + + if (grpcContainer == null) { + throw new IllegalStateException("GrpcContainer not found"); + } + + if (container == null) { + throw new IllegalStateException("GrpcTranscodingContainer not found"); + } + + List grpcServices = collectServiceDefinitions(grpcContainer.getServices()); + List transcodingServices = collectTranscodingServices(container.getServices()); + + List> mappedMethods = new ArrayList<>(); + + LOGGER.info("Initializing gRPC transcoding services"); + for (GrpcTranscoding transcodingService : transcodingServices) { + GrpcServerRecorder.GrpcServiceDefinition grpcService = findGrpcService(grpcServices, transcodingService); + List transcodingMethods = findTranscodingMethods(httpMethods, + transcodingService.getGrpcServiceName()); + + for (ServerMethodDefinition serviceDefinition : grpcService.definition.getMethods()) { + MethodDescriptor methodDescriptor = (MethodDescriptor) serviceDefinition + .getMethodDescriptor(); + GrpcTranscodingMethod transcodingMethod = findTranscodingMethod(transcodingMethods, methodDescriptor); + + String path = transcodingMethod.getUriTemplate(); + GrpcTranscodingMetadata metadata = createMetadata(transcodingMethod, methodDescriptor, + transcodingService); + + transcodingServer.addMethodMapping(path, methodDescriptor.getFullMethodName()); + transcodingServer.addMetadataHandler(methodDescriptor.getFullMethodName(), metadata); + + mappedMethods.add(serviceDefinition); + + Route route = routerSupplier.getValue().route().handler(ctx -> { + if (securityPresent) { + GrpcSecurityInterceptor.propagateSecurityIdentityWithDuplicatedCtx(ctx); + } + if (!Context.isOnEventLoopThread()) { + Context capturedVertxContext = Vertx.currentContext(); + if (capturedVertxContext != null) { + capturedVertxContext.runOnContext(new Handler() { + @Override + public void handle(Void unused) { + transcodingServer.handle(ctx.request()); + } + }); + return; + } + } + + transcodingServer.handle(ctx.request()); + }); + + shutdown.addShutdownTask(route::remove); + } + } + + GrpcTranscodingBridge bridge = new GrpcTranscodingBridge(mappedMethods); + bridge.bind(transcodingServer); + + return new RuntimeValue<>(transcodingServer); + } + + private GrpcTranscodingMetadata createMetadata( + GrpcTranscodingMethod transcodingMethod, MethodDescriptor methodDescriptor, + GrpcTranscoding transcodingService) { + String fullMethodName = methodDescriptor.getFullMethodName() + .substring(methodDescriptor.getFullMethodName().lastIndexOf("/") + 1); + fullMethodName = Character.toLowerCase(fullMethodName.charAt(0)) + fullMethodName.substring(1); + + GrpcTranscodingDescriptor descriptor = transcodingService.findTranscodingDescriptor(fullMethodName); + + return new GrpcTranscodingMetadata<>( + transcodingMethod.getHttpMethodName(), + fullMethodName, + descriptor.getRequestMarshaller(), + descriptor.getResponseMarshaller(), + methodDescriptor); + } + + private List findTranscodingMethods(Map> transcodingMethods, + String grpcServiceName) { + List methods = new ArrayList<>(); + for (Map.Entry> entry : transcodingMethods.entrySet()) { + if (entry.getKey().startsWith(grpcServiceName)) { + methods.addAll(entry.getValue()); + } + } + + return methods; + } + + private GrpcTranscodingMethod findTranscodingMethod(List transcodingMethods, + MethodDescriptor methodDescriptor) { + String fullMethodName = methodDescriptor.getFullMethodName(); + fullMethodName = fullMethodName.substring(fullMethodName.lastIndexOf("/") + 1); + fullMethodName = Character.toLowerCase(fullMethodName.charAt(0)) + fullMethodName.substring(1); + + for (GrpcTranscodingMethod transcodingMethod : transcodingMethods) { + if (transcodingMethod.getGrpcMethodName().startsWith(fullMethodName)) { + return transcodingMethod; + } + } + + throw new IllegalStateException("Transcoding method not found for " + fullMethodName); + } + + private static List collectServiceDefinitions( + Instance services) { + List definitions = new ArrayList<>(); + for (BindableService service : services) { + ServerServiceDefinition definition = service.bindService(); + definitions.add(new GrpcServerRecorder.GrpcServiceDefinition(service, definition)); + } + + return definitions; + } + + private static List collectTranscodingServices(Instance services) { + List transcodingServices = new ArrayList<>(); + for (GrpcTranscoding service : services) { + transcodingServices.add(service); + } + + return transcodingServices; + } + + private static GrpcServerRecorder.GrpcServiceDefinition findGrpcService( + List grpcServices, GrpcTranscoding transcodingService) { + for (GrpcServerRecorder.GrpcServiceDefinition grpcService : grpcServices) { + if (grpcService.getImplementationClassName().startsWith(transcodingService.getGrpcServiceName())) { + return grpcService; + } + } + + throw new IllegalStateException("gRPC service not found for " + transcodingService.getGrpcServiceName()); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRequest.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRequest.java new file mode 100644 index 0000000000000..d9188dc43afd7 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingRequest.java @@ -0,0 +1,301 @@ +package io.quarkus.grpc.transcoding; + +import static io.vertx.grpc.common.GrpcError.mapHttp2ErrorCode; + +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.stream.Collector; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpConnection; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.StreamResetException; +import io.vertx.core.http.impl.HttpServerRequestInternal; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.impl.future.PromiseInternal; +import io.vertx.core.json.Json; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.impl.InboundBuffer; +import io.vertx.grpc.common.CodecException; +import io.vertx.grpc.common.GrpcError; +import io.vertx.grpc.common.GrpcMessage; +import io.vertx.grpc.common.GrpcMessageDecoder; +import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.grpc.common.GrpcReadStream; +import io.vertx.grpc.common.ServiceName; +import io.vertx.grpc.common.impl.GrpcMethodCall; +import io.vertx.grpc.server.GrpcServerRequest; +import io.vertx.grpc.server.GrpcServerResponse; + +/** + * A gRPC transcoding request that maps HTTP requests to gRPC methods. + * + * @param The type of the request message. + * @param The type of the response message. + * @see io.vertx.grpc.server.impl.GrpcServerRequestImpl for the original implementation + */ +public class GrpcTranscodingRequest implements GrpcReadStream, Handler, GrpcServerRequest { + + static final GrpcMessage END_SENTINEL = new GrpcMessage() { + @Override + public String encoding() { + return null; + } + + @Override + public Buffer payload() { + return null; + } + }; + + private final HttpServerRequest httpRequest; + private final GrpcServerResponse response; + private GrpcMethodCall methodCall; + protected final ContextInternal context; + private final ReadStream stream; + private final InboundBuffer queue; + private Buffer buffer; + private Handler errorHandler; + private Handler exceptionHandler; + private Handler messageHandler; + private Handler endHandler; + private GrpcMessage last; + private final GrpcMessageDecoder messageDecoder; + private final Promise end; + private final Map pathParams; + private final Map queryParams; + + public GrpcTranscodingRequest(HttpServerRequest httpRequest, + GrpcMessageDecoder messageDecoder, + GrpcMessageEncoder messageEncoder, + GrpcMethodCall methodCall, + Map pathParams, + Map queryParams) { + this.httpRequest = httpRequest; + this.response = new GrpcTranscodingResponse<>(this, httpRequest.response(), messageEncoder); + this.methodCall = methodCall; + this.pathParams = pathParams; + this.queryParams = queryParams; + + this.context = (ContextInternal) ((HttpServerRequestInternal) httpRequest).context(); + this.stream = httpRequest; + this.queue = new InboundBuffer<>(context); + this.messageDecoder = messageDecoder; + this.end = context.promise(); + } + + public void init() { + stream.handler(this); + stream.endHandler(v -> queue.write(END_SENTINEL)); + stream.exceptionHandler(err -> { + if (err instanceof StreamResetException) { + handleReset(((StreamResetException) err).getCode()); + } else { + handleException(err); + } + }); + queue.drainHandler(v -> stream.resume()); + queue.handler(msg -> { + if (msg == END_SENTINEL) { + if (httpRequest.bytesRead() == 0) { + handleMessage(mergeParametersIntoMessage(msg)); + } + + handleEnd(); + } else { + handleMessage(mergeParametersIntoMessage(msg)); + } + }); + } + + private GrpcMessage mergeParametersIntoMessage(GrpcMessage msg) { + Map allParams = GrpcTranscodingMessageWriter.mergeParameters( + pathParams, + queryParams, + msg.payload()); + + byte[] jsonPayload = Json.encode(allParams).getBytes(); + return GrpcMessage.message(msg.encoding(), Buffer.buffer(jsonPayload)); + } + + protected Req decodeMessage(GrpcMessage msg) throws CodecException { + return messageDecoder.decode(msg); + } + + @Override + public void handle(Buffer chunk) { + if (buffer == null) { + buffer = chunk; + } else { + buffer.appendBuffer(chunk); + } + + Buffer payload = buffer.slice(); + GrpcMessage message = GrpcMessage.message("identity", payload); + boolean pause = !queue.write(message); + + if (pause) { + stream.pause(); + } + + buffer = null; + } + + protected void handleReset(long code) { + Handler handler = errorHandler; + if (handler != null) { + GrpcError error = mapHttp2ErrorCode(code); + if (error != null) { + handler.handle(error); + } + } + } + + protected void handleException(Throwable err) { + end.tryFail(err); + Handler handler = exceptionHandler; + if (handler != null) { + handler.handle(err); + } + } + + protected void handleEnd() { + end.tryComplete(); + Handler handler = endHandler; + if (handler != null) { + handler.handle(null); + } + } + + protected void handleMessage(GrpcMessage msg) { + last = msg; + Handler handler = messageHandler; + if (handler != null) { + handler.handle(msg); + } + } + + @Override + public ServiceName serviceName() { + return methodCall.serviceName(); + } + + @Override + public String methodName() { + return methodCall.methodName(); + } + + @Override + public String fullMethodName() { + return methodCall.fullMethodName(); + } + + @Override + public GrpcServerResponse response() { + return response; + } + + @Override + public MultiMap headers() { + return httpRequest.headers(); + } + + @Override + public String encoding() { + return httpRequest.getHeader("grpc-encoding"); + } + + @Override + public GrpcServerRequest messageHandler(@Nullable Handler handler) { + messageHandler = handler; + return this; + } + + @Override + public GrpcServerRequest errorHandler(@Nullable Handler handler) { + errorHandler = handler; + return this; + } + + @Override + public GrpcServerRequest exceptionHandler(@Nullable Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public GrpcServerRequest handler(@Nullable Handler handler) { + if (handler != null) { + return messageHandler(msg -> { + Req decoded; + try { + decoded = decodeMessage(msg); + } catch (CodecException e) { + response.cancel(); + return; + } + handler.handle(decoded); + }); + } else { + return messageHandler(null); + } + } + + @Override + public GrpcServerRequest pause() { + queue.pause(); + return this; + } + + @Override + public GrpcServerRequest resume() { + queue.resume(); + return this; + } + + @Override + public GrpcServerRequest fetch(long amount) { + queue.fetch(amount); + return this; + } + + @Override + public GrpcServerRequest endHandler(@Nullable Handler handler) { + this.endHandler = handler; + return this; + } + + @Override + public Future last() { + return end().map(v -> decodeMessage(last)); + } + + @Override + public Future end() { + return end.future(); + } + + @Override + public Future collecting(Collector collector) { + PromiseInternal promise = context.promise(); + C cumulation = collector.supplier().get(); + BiConsumer accumulator = collector.accumulator(); + handler(elt -> accumulator.accept(cumulation, elt)); + endHandler(v -> { + R result = collector.finisher().apply(cumulation); + promise.tryComplete(result); + }); + exceptionHandler(promise::tryFail); + return promise.future(); + } + + @Override + public HttpConnection connection() { + return httpRequest.connection(); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingResponse.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingResponse.java new file mode 100644 index 0000000000000..abe58cee39cbc --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingResponse.java @@ -0,0 +1,248 @@ +package io.quarkus.grpc.transcoding; + +import java.util.Map; +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.grpc.common.GrpcError; +import io.vertx.grpc.common.GrpcMessage; +import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.grpc.common.GrpcStatus; +import io.vertx.grpc.common.impl.Utils; +import io.vertx.grpc.server.GrpcServerResponse; + +/** + * A gRPC transcoding response that maps gRPC responses to HTTP responses. + * + * @param The type of the request message. + * @param The type of the response message. + * @see io.vertx.grpc.server.impl.GrpcServerResponseImpl for the original implementation + */ +public class GrpcTranscodingResponse implements GrpcServerResponse { + + private final GrpcTranscodingRequest request; + private final HttpServerResponse httpResponse; + private final GrpcMessageEncoder encoder; + private GrpcStatus status = GrpcStatus.OK; + private String statusMessage; + private boolean headersSent; + private boolean trailersSent; + private boolean cancelled; + private MultiMap headers, trailers; + + public GrpcTranscodingResponse(GrpcTranscodingRequest request, HttpServerResponse httpResponse, + GrpcMessageEncoder encoder) { + this.request = request; + this.httpResponse = httpResponse; + this.encoder = encoder; + } + + public GrpcServerResponse status(GrpcStatus status) { + Objects.requireNonNull(status); + this.status = status; + return this; + } + + @Override + public GrpcServerResponse statusMessage(String msg) { + this.statusMessage = msg; + return this; + } + + // We don't need to implement this method + public GrpcServerResponse encoding(String encoding) { + // ???? + return this; + } + + @Override + public MultiMap headers() { + if (headersSent) { + throw new IllegalStateException("Headers already sent"); + } + if (headers == null) { + headers = MultiMap.caseInsensitiveMultiMap(); + } + return headers; + } + + @Override + public MultiMap trailers() { + if (trailersSent) { + throw new IllegalStateException("Trailers already sent"); + } + if (trailers == null) { + trailers = MultiMap.caseInsensitiveMultiMap(); + } + return trailers; + } + + @Override + public GrpcTranscodingResponse exceptionHandler(Handler handler) { + httpResponse.exceptionHandler(handler); + return this; + } + + @Override + public Future write(Resp message) { + return writeMessage(encoder.encode(message)); + } + + @Override + public Future end(Resp message) { + return endMessage(encoder.encode(message)); + } + + @Override + public Future writeMessage(GrpcMessage data) { + return writeMessage(data, false); + } + + @Override + public Future endMessage(GrpcMessage message) { + return writeMessage(message, true); + } + + public Future end() { + return writeMessage(null, true); + } + + @Override + public GrpcServerResponse setWriteQueueMaxSize(int maxSize) { + httpResponse.setWriteQueueMaxSize(maxSize); + return this; + } + + @Override + public boolean writeQueueFull() { + return httpResponse.writeQueueFull(); + } + + @Override + public GrpcServerResponse drainHandler(Handler handler) { + httpResponse.drainHandler(handler); + return this; + } + + @Override + public void cancel() { + if (cancelled) { + return; + } + cancelled = true; + Future fut = request.end(); + boolean requestEnded; + if (fut.failed()) { + return; + } else { + requestEnded = fut.succeeded(); + } + if (!requestEnded || !trailersSent) { + httpResponse.reset(GrpcError.CANCELLED.http2ResetCode); + } + } + + private Future writeMessage(GrpcMessage message, boolean end) { + if (cancelled) { + throw new IllegalStateException("The stream has been cancelled"); + } + if (trailersSent) { + throw new IllegalStateException("The stream has been closed"); + } + + if (message == null && !end) { + throw new IllegalStateException(); + } + + boolean trailersOnly = status != GrpcStatus.OK && !headersSent && end; + + MultiMap responseHeaders = httpResponse.headers(); + if (!headersSent) { + headersSent = true; + if (headers != null && headers.size() > 0) { + for (Map.Entry header : headers) { + responseHeaders.add(header.getKey(), header.getValue()); + } + } + + responseHeaders.set("content-type", "application/json"); + } + + if (end) { + if (!trailersSent) { + trailersSent = true; + } + MultiMap responseTrailers; + if (trailersOnly) { + responseTrailers = httpResponse.headers(); + } else { + responseTrailers = httpResponse.trailers(); + } + + if (trailers != null && trailers.size() > 0) { + for (Map.Entry trailer : trailers) { + responseTrailers.add(trailer.getKey(), trailer.getValue()); + } + } + if (!responseHeaders.contains("grpc-status")) { + responseTrailers.set("grpc-status", status.toString()); + } + if (status != GrpcStatus.OK) { + String msg = statusMessage; + if (msg != null && !responseHeaders.contains("grpc-status-message")) { + responseTrailers.set("grpc-message", Utils.utf8PercentEncode(msg)); + } + } else { + responseTrailers.remove("grpc-message"); + } + if (message != null) { + Buffer encoded = encode(message); + if (encoded == null) { + throw new IllegalStateException("The message is null"); + } + + responseHeaders.set("content-length", String.valueOf(encoded.length())); + return httpResponse.end(encoded); + } else { + return httpResponse.end(); + } + } else { + Buffer encoded = encode(message); + if (encoded == null) { + throw new IllegalStateException("The message is null"); + } + + responseHeaders.set("content-length", String.valueOf(encoded.length())); + return httpResponse.write(encoded); + } + } + + private Buffer encode(GrpcMessage message) { + if (message == null) { + return null; + } + + ByteBuf bbuf = message.payload().getByteBuf(); + CompositeByteBuf composite = Unpooled.compositeBuffer(); + composite.addComponent(true, bbuf); + return Buffer.buffer(composite); + } + + @Override + public void write(Resp data, Handler> handler) { + write(data).onComplete(handler); + } + + @Override + public void end(Handler> handler) { + end().onComplete(handler); + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingServer.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingServer.java new file mode 100644 index 0000000000000..2807e3e7bc0fa --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingServer.java @@ -0,0 +1,142 @@ +package io.quarkus.grpc.transcoding; + +import java.util.HashMap; +import java.util.Map; + +import io.grpc.MethodDescriptor; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.grpc.common.GrpcMessageDecoder; +import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.grpc.common.impl.GrpcMethodCall; +import io.vertx.grpc.server.GrpcServer; +import io.vertx.grpc.server.GrpcServerRequest; + +/** + * A gRPC transcoding server that maps HTTP requests to gRPC methods. + * + * @see io.vertx.grpc.server.impl.GrpcServerImpl for the original implementation + * @see for the HTTP mapping rules + */ +public class GrpcTranscodingServer implements GrpcServer { + + private final Vertx vertx; + private Handler> requestHandler; + private final Map methodMapping = new HashMap<>(); + private final Map> methodCallHandlers = new HashMap<>(); + private final Map> metadataHandlers = new HashMap<>(); + + public GrpcTranscodingServer(Vertx vertx) { + this.vertx = vertx; + } + + @Override + public void handle(HttpServerRequest httpRequest) { + String requestPath = httpRequest.path(); + + for (Map.Entry entry : methodMapping.entrySet()) { + String pathTemplate = entry.getKey(); + String mappedMethod = entry.getValue(); + if (GrpcTranscodingHttpUtils.isPathMatch(requestPath, pathTemplate)) { + GrpcTranscodingMetadata metadata = metadataHandlers.get(mappedMethod); + if (metadata != null) { + if (metadata.getHttpMethodName().equals(httpRequest.method().name())) { + handleWithMappedMethod(httpRequest, pathTemplate, mappedMethod); + return; + } + } + } + } + + httpRequest.response().setStatusCode(404).end(); + } + + private void handleWithMappedMethod(HttpServerRequest httpRequest, String pathTemplate, String mappedMethod) { + GrpcMethodCall methodCall = new GrpcMethodCall("/" + mappedMethod); + String fmn = methodCall.fullMethodName(); + MethodCallHandler method = methodCallHandlers.get(fmn); + + if (method != null) { + handle(pathTemplate, method, httpRequest, methodCall); + } else { + httpRequest.response().setStatusCode(500).end(); + } + } + + private void handle(String pathTemplate, MethodCallHandler method, HttpServerRequest httpRequest, + GrpcMethodCall methodCall) { + Map pathParams = GrpcTranscodingHttpUtils.extractPathParams(pathTemplate, httpRequest.path()); + Map queryParameters = new HashMap<>(httpRequest.params().entries().stream() + .collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), HashMap::putAll)); + + GrpcTranscodingRequest grpcRequest = new GrpcTranscodingRequest<>(httpRequest, method.messageDecoder, + method.messageEncoder, methodCall, pathParams, queryParameters); + grpcRequest.init(); + method.handle(grpcRequest); + } + + public GrpcServer callHandler(Handler> handler) { + this.requestHandler = handler; + return this; + } + + @Override + public GrpcServer callHandler(MethodDescriptor methodDesc, + Handler> handler) { + if (handler != null) { + MethodDescriptor.Marshaller reqMarshaller = findRequestMarshaller(methodDesc.getFullMethodName()); + MethodDescriptor.Marshaller respMarshaller = findResponseMarshaller(methodDesc.getFullMethodName()); + + methodCallHandlers.put(methodDesc.getFullMethodName(), + new GrpcTranscodingServer.MethodCallHandler<>(methodDesc, + GrpcMessageDecoder.unmarshaller(reqMarshaller), + GrpcMessageEncoder.marshaller(respMarshaller), handler)); + } else { + methodCallHandlers.remove(methodDesc.getFullMethodName()); + } + return this; + } + + public void addMethodMapping(String path, String fullMethodName) { + methodMapping.put(path, fullMethodName); + } + + public void addMetadataHandler(String fullMethodName, GrpcTranscodingMetadata metadata) { + metadataHandlers.put(fullMethodName, metadata); + } + + @SuppressWarnings("unchecked") + public MethodDescriptor.Marshaller findRequestMarshaller(String fullMethodName) { + GrpcTranscodingMetadata metadata = metadataHandlers.get(fullMethodName); + return (MethodDescriptor.Marshaller) metadata.getRequestMarshaller(); + } + + @SuppressWarnings("unchecked") + public MethodDescriptor.Marshaller findResponseMarshaller(String fullMethodName) { + GrpcTranscodingMetadata metadata = metadataHandlers.get(fullMethodName); + return (MethodDescriptor.Marshaller) metadata.getResponseMarshaller(); + } + + private static class MethodCallHandler implements Handler> { + + final MethodDescriptor def; + final GrpcMessageDecoder messageDecoder; + final GrpcMessageEncoder messageEncoder; + final Handler> handler; + + MethodCallHandler(MethodDescriptor def, GrpcMessageDecoder messageDecoder, + GrpcMessageEncoder messageEncoder, Handler> handler) { + this.def = def; + this.messageDecoder = messageDecoder; + this.messageEncoder = messageEncoder; + this.handler = handler; + } + + @Override + public void handle(GrpcServerRequest grpcRequest) { + handler.handle(grpcRequest); + } + } +} diff --git a/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingWriteStreamAdapter.java b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingWriteStreamAdapter.java new file mode 100644 index 0000000000000..cf1414ad96712 --- /dev/null +++ b/extensions/grpc/runtime/src/main/java/io/quarkus/grpc/transcoding/GrpcTranscodingWriteStreamAdapter.java @@ -0,0 +1,55 @@ +package io.quarkus.grpc.transcoding; + +import io.vertx.grpc.common.GrpcMessageEncoder; +import io.vertx.grpc.common.GrpcWriteStream; + +/** + * A write stream adapter that uses a {@link GrpcMessageEncoder} to encode the message payload. + * + * @param The type of the message payload. + * @see io.vertx.grpc.common.impl.WriteStreamAdapter for the original implementation + */ +public class GrpcTranscodingWriteStreamAdapter { + + private GrpcWriteStream stream; + private boolean ready; + private GrpcMessageEncoder encoder; + + /** + * Override this method to call gRPC {@code onReady} + */ + protected void handleReady() { + } + + public final void init(GrpcWriteStream stream, GrpcTranscodingMessageEncoder encoder) { + synchronized (this) { + this.stream = stream; + this.encoder = encoder; + } + stream.drainHandler(v -> { + checkReady(); + }); + checkReady(); + } + + public final synchronized boolean isReady() { + return ready; + } + + public final void write(T msg) { + stream.writeMessage(encoder.encode(msg)); + synchronized (this) { + ready = !stream.writeQueueFull(); + } + } + + private void checkReady() { + synchronized (this) { + if (ready || stream.writeQueueFull()) { + return; + } + ready = true; + } + handleReady(); + } +} diff --git a/integration-tests/grpc-transcoding/pom.xml b/integration-tests/grpc-transcoding/pom.xml new file mode 100644 index 0000000000000..8cc30979dcc8e --- /dev/null +++ b/integration-tests/grpc-transcoding/pom.xml @@ -0,0 +1,114 @@ + + + 4.0.0 + + + quarkus-integration-tests-parent + io.quarkus + 999-SNAPSHOT + + + quarkus-integration-test-grpc-transcoding + Quarkus - Integration Tests - gRPC - Transcoding + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-mutiny + + + io.quarkus + quarkus-grpc + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-test-grpc + ${project.version} + test + + + + + io.quarkus + quarkus-grpc-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-deployment + ${project.version} + pom + test + + + * + * + + + + + io.quarkus + quarkus-resteasy-mutiny-deployment + ${project.version} + pom + test + + + * + * + + + + + + + + + io.quarkus + quarkus-maven-plugin + + + + generate-code + build + + + + + + + diff --git a/integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpoint.java b/integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpoint.java new file mode 100644 index 0000000000000..b7b56d0865750 --- /dev/null +++ b/integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpoint.java @@ -0,0 +1,32 @@ +package io.quarkus.grpc.examples.hello; + +import examples.GreeterGrpc; +import examples.MutinyGreeterGrpc; +import io.quarkus.grpc.GrpcClient; + +//@Path("/hello") +public class HelloWorldNewEndpoint { + + @GrpcClient("hello") + GreeterGrpc.GreeterBlockingStub blockingHelloService; + + @GrpcClient("hello") + MutinyGreeterGrpc.MutinyGreeterStub mutinyHelloService; + + /* + * @GET + * + * @Path("/blocking/{name}") + * public String helloBlocking(@PathParam("name") String name) { + * return blockingHelloService.sayHello(HelloRequest.newBuilder().setName(name).build()).getMessage(); + * } + * + * @GET + * + * @Path("/mutiny/{name}") + * public Uni helloMutiny(@PathParam("name") String name) { + * return mutinyHelloService.sayHello(HelloRequest.newBuilder().setName(name).build()) + * .onItem().transform(HelloReply::getMessage); + * } + */ +} diff --git a/integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java b/integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java new file mode 100644 index 0000000000000..f048af741e5b6 --- /dev/null +++ b/integration-tests/grpc-transcoding/src/main/java/io/quarkus/grpc/examples/hello/HelloWorldNewService.java @@ -0,0 +1,48 @@ +package io.quarkus.grpc.examples.hello; + +import examples.*; +import io.quarkus.grpc.GrpcService; +import io.smallrye.mutiny.Uni; + +@GrpcService +public class HelloWorldNewService implements Greeter { + + @Override + public Uni simplePath(HelloRequest request) { + String greeting = "Hello from Simple Path, " + request.getName() + "!"; + return Uni.createFrom().item(HelloReply.newBuilder().setMessage(greeting).build()); + } + + @Override + public Uni complexPath(HelloRequest request) { + String greeting = "Hello from Complex Path, " + request.getName() + "!"; + return Uni.createFrom().item(HelloReply.newBuilder().setMessage(greeting).build()); + } + + @Override + public Uni resourceLookup(ResourceRequest request) { + String greeting = "Resource details: type='" + request.getResourceType() + + "', id='" + request.getResourceId() + "'"; + return Uni.createFrom().item(HelloReply.newBuilder().setMessage(greeting).build()); + } + + @Override + public Uni nestedResourceLookup(UpdateRequest request) { + String greeting = "Greeting with id '" + request.getGreetingId() + "' " + + "updated with nested resource details: name='" + request.getUpdatedContent().getName() + "'"; + + return Uni.createFrom().item(HelloReply.newBuilder().setMessage(greeting).build()); + } + + @Override + public Uni searchGreetings(SearchRequest request) { + String greeting = "Matching greetings for your query: '" + request.getQuery() + "'"; + return Uni.createFrom().item(HelloReply.newBuilder().setMessage(greeting).build()); + } + + @Override + public Uni updateGreeting(UpdateRequest request) { + String greeting = "Greeting with id '" + request.getGreetingId() + "' updated!"; + return Uni.createFrom().item(HelloReply.newBuilder().setMessage(greeting).build()); + } +} diff --git a/integration-tests/grpc-transcoding/src/main/proto/helloworld.proto b/integration-tests/grpc-transcoding/src/main/proto/helloworld.proto new file mode 100644 index 0000000000000..85bf549c177d3 --- /dev/null +++ b/integration-tests/grpc-transcoding/src/main/proto/helloworld.proto @@ -0,0 +1,111 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +import "google/api/annotations.proto"; + +option java_multiple_files = true; +option java_package = "examples"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // RPC with simple path + rpc SimplePath (HelloRequest) returns (HelloReply) { + option (google.api.http) = { + post: "/v1/simple" + body: "*" + }; + } + + // RPC with complex path + rpc ComplexPath (HelloRequest) returns (HelloReply) { + option (google.api.http) = { + post: "/v1/complex/{name}/path" + body: "*" + }; + } + + // RPC with multiple path variables + rpc ResourceLookup(ResourceRequest) returns (HelloReply) { + option (google.api.http) = { + get: "/v1/resources/{resource_type}/resource/{resource_id}" + }; + } + + // RPC with nested path variables + rpc NestedResourceLookup(UpdateRequest) returns (HelloReply) { + option (google.api.http) = { + get: "/v1/resources/{updated_content.name}/{greeting_id}" + }; + } + + // RPC with query parameters + rpc SearchGreetings(SearchRequest) returns (HelloReply) { + option (google.api.http) = { + get: "/v1/greetings" + }; + } + + // Custom binding example + rpc UpdateGreeting(UpdateRequest) returns (HelloReply) { + option (google.api.http) = { + put: "/v1/greetings/update" + body: "*" + }; + } +} + +message HelloRequest { + string name = 1; +} + +message HelloReply { + string message = 1; +} + +message ResourceRequest { + string resource_type = 1; + string resource_id = 2; +} + +message SearchRequest { + string query = 1; + int32 page_size = 2; + string page_token = 3; +} + +message UpdateRequest { + string greeting_id = 1; + HelloRequest updated_content = 2; +} diff --git a/integration-tests/grpc-transcoding/src/main/resources/application.properties b/integration-tests/grpc-transcoding/src/main/resources/application.properties new file mode 100644 index 0000000000000..298b78be5325c --- /dev/null +++ b/integration-tests/grpc-transcoding/src/main/resources/application.properties @@ -0,0 +1,20 @@ +quarkus.grpc.server.port=9001 +quarkus.grpc.server.instances=2 +quarkus.grpc.server.use-separate-server=false +quarkus.grpc.transcoding.enabled=true + +%vertx.quarkus.grpc.server.use-separate-server=false +%n2o.quarkus.grpc.server.use-separate-server=true +%o2n.quarkus.grpc.server.use-separate-server=false + +quarkus.grpc.clients.hello.host=localhost +quarkus.grpc.clients.hello.port=9001 + +%vertx.quarkus.grpc.clients.hello.port=8081 +%vertx.quarkus.grpc.clients.hello.use-quarkus-grpc-client=true + +%n2o.quarkus.grpc.clients.hello.port=9001 +%n2o.quarkus.grpc.clients.hello.use-quarkus-grpc-client=true + +%o2n.quarkus.grpc.clients.hello.port=8081 +%o2n.quarkus.grpc.clients.hello.use-quarkus-grpc-client=false diff --git a/integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTest.java b/integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTest.java new file mode 100644 index 0000000000000..7a41d9f437c0a --- /dev/null +++ b/integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTest.java @@ -0,0 +1,105 @@ +package io.quarkus.grpc.examples.hello; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; + +import examples.HelloRequest; +import examples.UpdateRequest; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.http.ContentType; + +@QuarkusTest +class HelloWorldNewEndpointTest extends HelloWorldNewEndpointTestBase { + + @Test + public void testSimplePath() { + given() + .body(getJsonRequest("simple-test")) + .contentType(ContentType.JSON) + .when().post("/v1/simple") + .then() + .statusCode(200) + .body("message", is("Hello from Simple Path, simple-test!")); + } + + @Test + public void testComplexPath() { + given() + .body(getJsonRequest("complex-test")) + .contentType(ContentType.JSON) + .when().post("/v1/complex/complex-test/path") + .then() + .statusCode(200) + .body("message", is("Hello from Complex Path, complex-test!")); + } + + @Test + public void testResourceLookup() { + given() + .contentType(ContentType.JSON) + .when().get("/v1/resources/resource-type-1/resource/1234") + .then() + .statusCode(200) + .body("message", is("Resource details: type='resource-type-1', id='1234'")); + } + + @Test + public void testNestedResourceLookup() { + given() + .contentType(ContentType.JSON) + .when().get("/v1/resources/update/1234") + .then() + .statusCode(200) + .body("message", is("Greeting with id '1234' updated with nested resource details: name='update'")); + } + + @Test + public void testSearchGreetings() { + given() + .param("query", "test-query") + .contentType(ContentType.JSON) + .when().get("/v1/greetings") + .then() + .statusCode(200) + .body("message", is("Matching greetings for your query: 'test-query'")); + } + + @Test + public void testUpdateGreeting() { + given() + .body(getUpdateRequest("5678")) + .contentType(ContentType.JSON) + .when().put("/v1/greetings/update") + .then() + .statusCode(200) + .body("message", is("Greeting with id '5678' updated!")); + } + + // Create a helper method for getJsonRequest + private String getJsonRequest(String name) { + try { + return JsonFormat.printer().omittingInsignificantWhitespace() + .print(HelloRequest.newBuilder().setName(name).build()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + + // Create a helper method for getUpdateRequest + private String getUpdateRequest(String greetingId) { + try { + UpdateRequest updateRequest = UpdateRequest.newBuilder() + .setGreetingId(greetingId) + .setUpdatedContent(HelloRequest.newBuilder().setName("Updated Name").build()) + .build(); + return JsonFormat.printer().omittingInsignificantWhitespace().print(updateRequest); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } +} diff --git a/integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTestBase.java b/integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTestBase.java new file mode 100644 index 0000000000000..43b317c47b3a0 --- /dev/null +++ b/integration-tests/grpc-transcoding/src/test/java/io/quarkus/grpc/examples/hello/HelloWorldNewEndpointTestBase.java @@ -0,0 +1,5 @@ +package io.quarkus.grpc.examples.hello; + +class HelloWorldNewEndpointTestBase { + +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 355b11472b6ae..abf96be0d08ea 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -391,6 +391,7 @@ grpc-stork-simple grpc-exceptions grpc-test-random-port + grpc-transcoding google-cloud-functions-http google-cloud-functions istio