beanClassToBeanId = new HashMap<>();
for (MediatorBuildItem mediatorMethod : mediatorMethods) {
- String beanClass = mediatorMethod.getBean()
- .getBeanClass()
- .toString();
- if (!beanClassToBeanId.containsKey(beanClass)) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, beanClass));
- beanClassToBeanId.put(beanClass, mediatorMethod.getBean()
- .getIdentifier());
+ MethodInfo methodInfo = mediatorMethod.getMethod();
+ BeanInfo bean = mediatorMethod.getBean();
+
+ String generatedInvokerName = generateInvoker(bean, methodInfo, classOutput);
+ /*
+ * We need to register the invoker's constructor for reflection since it will be called inside smallrye.
+ * We could potentially lift this restriction with some extra CDI bean generation but it's probably not worth it
+ */
+ reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, generatedInvokerName));
+
+ try {
+ QuarkusMediatorConfiguration mediatorConfiguration = QuarkusMediatorConfigurationUtil.create(methodInfo, bean,
+ generatedInvokerName, recorderContext);
+ configurations.add(mediatorConfiguration);
+ } catch (IllegalArgumentException e) {
+ throw new DeploymentException(e); // needed to pass the TCK
}
}
+ recorder.registerMediators(configurations, beanContainer.getValue());
+
for (EmitterBuildItem it : emitterFields) {
int defaultBufferSize = ConfigProviderResolver.instance().getConfig()
.getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class).orElse(127);
@@ -211,8 +249,83 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, BeanContainerBuild
recorder.configureEmitter(beanContainer.getValue(), it.getName(), null, 0, defaultBufferSize);
}
}
+ }
+
+ /**
+ * Generates an invoker class that looks like the following:
+ *
+ *
+ * public class SomeName implements Invoker {
+ * private Object beanInstance;
+ *
+ * public SomeName(Object var1) {
+ * this.beanInstance = var1;
+ * }
+ *
+ * public Object invoke(Object[] args) {
+ * return ((BeanType) this.beanInstance).process(var1);
+ * }
+ * }
+ *
+ */
+ private String generateInvoker(BeanInfo bean, MethodInfo method, ClassOutput classOutput) {
+ String baseName;
+ if (bean.getImplClazz().enclosingClass() != null) {
+ baseName = DotNames.simpleName(bean.getImplClazz().enclosingClass()) + "_"
+ + DotNames.simpleName(bean.getImplClazz().name());
+ } else {
+ baseName = DotNames.simpleName(bean.getImplClazz().name());
+ }
+ StringBuilder sigBuilder = new StringBuilder();
+ sigBuilder.append(method.name()).append("_").append(method.returnType().name().toString());
+ for (Type i : method.parameters()) {
+ sigBuilder.append(i.name().toString());
+ }
+ String targetPackage = DotNames.packageName(bean.getImplClazz().name());
+ String generatedName = targetPackage.replace('.', '/') + "/" + baseName + INVOKER_SUFFIX + "_" + method.name() + "_"
+ + HashUtil.sha1(sigBuilder.toString());
+
+ try (ClassCreator invoker = ClassCreator.builder().classOutput(classOutput).className(generatedName)
+ .interfaces(Invoker.class)
+ .build()) {
+
+ FieldDescriptor beanInstanceField = invoker.getFieldCreator("beanInstance", Object.class).getFieldDescriptor();
+
+ // generate a constructor that bean instance an argument
+ try (MethodCreator ctor = invoker.getMethodCreator("", void.class, Object.class)) {
+ ctor.setModifiers(Modifier.PUBLIC);
+ ctor.invokeSpecialMethod(MethodDescriptor.ofConstructor(Object.class), ctor.getThis());
+ ResultHandle self = ctor.getThis();
+ ResultHandle config = ctor.getMethodParam(0);
+ ctor.writeInstanceField(beanInstanceField, self, config);
+ ctor.returnValue(null);
+ }
+
+ try (MethodCreator invoke = invoker.getMethodCreator(
+ MethodDescriptor.ofMethod(generatedName, "invoke", Object.class, Object[].class))) {
+
+ int parametersCount = method.parameters().size();
+ String[] argTypes = new String[parametersCount];
+ ResultHandle[] args = new ResultHandle[parametersCount];
+ for (int i = 0; i < parametersCount; i++) {
+ // the only method argument of io.smallrye.reactive.messaging.Invoker is an object array so we need to pull out
+ // each argument and put it in the target method arguments array
+ args[i] = invoke.readArrayValue(invoke.getMethodParam(0), i);
+ argTypes[i] = method.parameters().get(i).name().toString();
+ }
+ ResultHandle result = invoke.invokeVirtualMethod(
+ MethodDescriptor.ofMethod(method.declaringClass().name().toString(), method.name(),
+ method.returnType().name().toString(), argTypes),
+ invoke.readInstanceField(beanInstanceField, invoke.getThis()), args);
+ if (io.quarkus.smallrye.reactivemessaging.deployment.DotNames.VOID.equals(method.returnType().name())) {
+ invoke.returnValue(invoke.loadNull());
+ } else {
+ invoke.returnValue(result);
+ }
+ }
+ }
- recorder.registerMediators(beanClassToBeanId, beanContainer.getValue());
+ return generatedName.replace('/', '.');
}
}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java
new file mode 100644
index 0000000000000..30eee3ae1d115
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java
@@ -0,0 +1,211 @@
+package io.quarkus.smallrye.reactivemessaging.runtime;
+
+import java.lang.reflect.Method;
+
+import javax.enterprise.inject.spi.Bean;
+
+import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
+
+import io.quarkus.arc.Arc;
+import io.smallrye.reactive.messaging.Invoker;
+import io.smallrye.reactive.messaging.MediatorConfiguration;
+import io.smallrye.reactive.messaging.Shape;
+import io.smallrye.reactive.messaging.annotations.Merge;
+
+public class QuarkusMediatorConfiguration implements MediatorConfiguration {
+
+ private String beanId;
+
+ private String methodName;
+
+ private Class> returnType;
+
+ private Class>[] parameterTypes;
+
+ private Shape shape;
+
+ private String incoming;
+
+ private String outgoing;
+
+ private Acknowledgment.Strategy acknowledgment;
+
+ private Integer broadcastValue;
+
+ private MediatorConfiguration.Production production = MediatorConfiguration.Production.NONE;
+
+ private MediatorConfiguration.Consumption consumption = MediatorConfiguration.Consumption.NONE;
+
+ private boolean useBuilderTypes = false;
+
+ private Merge.Mode merge;
+
+ private Class extends Invoker> invokerClass;
+
+ public String getBeanId() {
+ return beanId;
+ }
+
+ public void setBeanId(String beanId) {
+ this.beanId = beanId;
+ }
+
+ @Override
+ public Bean> getBean() {
+ return Arc.container().bean(beanId);
+ }
+
+ public String getMethodName() {
+ return methodName;
+ }
+
+ public void setMethodName(String methodName) {
+ this.methodName = methodName;
+ }
+
+ @Override
+ public Class> getReturnType() {
+ return returnType;
+ }
+
+ public void setReturnType(Class> returnType) {
+ this.returnType = returnType;
+ }
+
+ @Override
+ public Class>[] getParameterTypes() {
+ return parameterTypes;
+ }
+
+ public void setParameterTypes(Class>[] parameterTypes) {
+ this.parameterTypes = parameterTypes;
+ }
+
+ public Shape getShape() {
+ return shape;
+ }
+
+ public void setShape(Shape shape) {
+ this.shape = shape;
+ }
+
+ @Override
+ public Shape shape() {
+ return shape;
+ }
+
+ @Override
+ public String getIncoming() {
+ return incoming;
+ }
+
+ public void setIncoming(String incoming) {
+ this.incoming = incoming;
+ }
+
+ @Override
+ public String getOutgoing() {
+ return outgoing;
+ }
+
+ public void setOutgoing(String outgoing) {
+ this.outgoing = outgoing;
+ }
+
+ @Override
+ public Acknowledgment.Strategy getAcknowledgment() {
+ return acknowledgment;
+ }
+
+ public void setAcknowledgment(Acknowledgment.Strategy acknowledgment) {
+ this.acknowledgment = acknowledgment;
+ }
+
+ public Integer getBroadcastValue() {
+ return broadcastValue;
+ }
+
+ public void setBroadcastValue(Integer broadcastValue) {
+ this.broadcastValue = broadcastValue;
+ }
+
+ @Override
+ public boolean getBroadcast() {
+ return broadcastValue != null;
+ }
+
+ public MediatorConfiguration.Production getProduction() {
+ return production;
+ }
+
+ public void setProduction(MediatorConfiguration.Production production) {
+ this.production = production;
+ }
+
+ @Override
+ public MediatorConfiguration.Production production() {
+ return production;
+ }
+
+ public MediatorConfiguration.Consumption getConsumption() {
+ return consumption;
+ }
+
+ public void setConsumption(MediatorConfiguration.Consumption consumption) {
+ this.consumption = consumption;
+ }
+
+ @Override
+ public MediatorConfiguration.Consumption consumption() {
+ return consumption;
+ }
+
+ public boolean isUseBuilderTypes() {
+ return useBuilderTypes;
+ }
+
+ public void setUseBuilderTypes(boolean useBuilderTypes) {
+ this.useBuilderTypes = useBuilderTypes;
+ }
+
+ @Override
+ public boolean usesBuilderTypes() {
+ return useBuilderTypes;
+ }
+
+ public Merge.Mode getMerge() {
+ return merge;
+ }
+
+ public void setMerge(Merge.Mode merge) {
+ this.merge = merge;
+ }
+
+ @Override
+ public Class extends Invoker> getInvokerClass() {
+ return invokerClass;
+ }
+
+ public void setInvokerClass(Class extends Invoker> invokerClass) {
+ this.invokerClass = invokerClass;
+ }
+
+ @Override
+ public String methodAsString() {
+ return getBean().getBeanClass().getName() + "#" + getMethodName();
+ }
+
+ @Override
+ public Method getMethod() {
+ throw new UnsupportedOperationException("getMethod is not meant to be called on " + this.getClass().getName());
+ }
+
+ @Override
+ public int getNumberOfSubscriberBeforeConnecting() {
+ if (!getBroadcast()) {
+ return -1;
+ } else {
+ return broadcastValue;
+ }
+ }
+}
diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java
index c44544d6337c7..f2d670c027edd 100644
--- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java
+++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingRecorder.java
@@ -1,11 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.runtime;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.List;
-import javax.enterprise.inject.spi.DeploymentException;
-
-import io.quarkus.arc.Arc;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.reactive.messaging.extension.MediatorManager;
@@ -22,22 +18,9 @@ public void configureEmitter(BeanContainer container, String name, String strate
mediatorManager.initializeEmitter(name, strategy, bufferSize, defaultBufferSize);
}
- public void registerMediators(Map beanClassToBeanId, BeanContainer container) {
+ public void registerMediators(List configurations, BeanContainer container) {
MediatorManager mediatorManager = container.instance(MediatorManager.class);
- for (Entry entry : beanClassToBeanId.entrySet()) {
- try {
- Class> beanClass = Thread.currentThread()
- .getContextClassLoader()
- .loadClass(entry.getKey());
- mediatorManager.analyze(beanClass, Arc.container()
- .bean(entry.getValue()));
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(e);
- } catch (IllegalArgumentException e) {
- throw new DeploymentException(e);
- }
- }
-
+ mediatorManager.addAnalyzed(configurations);
}
}