diff --git a/pom.xml b/pom.xml index 3bfff12..e4f4749 100644 --- a/pom.xml +++ b/pom.xml @@ -52,14 +52,67 @@ + 2.1.6 + 3.4.2.Final + 2.2.1.Final + 5.0.4.Final + 5.8.2 + 8 8 + rxjava3 + + + + jakarta.ws.rs + jakarta.ws.rs-api + ${version.jakarta.ws.rs-api} + + + org.jboss.resteasy + resteasy-bom + ${version.org.jboss.resteasy} + pom + import + + + org.junit + junit-bom + ${version.org.junit} + pom + import + + + + org.jboss.logging + jboss-logging + ${version.org.jboss.logging} + + + org.jboss.logging + jboss-logging-annotations + ${version.org.jboss.logging.tools} + + true + provided + + + org.jboss.logging + jboss-logging-processor + ${version.org.jboss.logging.tools} + + true + provided + + + + @@ -72,4 +125,4 @@ - \ No newline at end of file + diff --git a/rxjava3/pom.xml b/rxjava3/pom.xml new file mode 100644 index 0000000..c115e98 --- /dev/null +++ b/rxjava3/pom.xml @@ -0,0 +1,90 @@ + + + + + + resteasy-rxjava-parent + dev.resteasy.rxjava + 1.0.0.Alpha1-SNAPSHOT + + 4.0.0 + + resteasy-rxjava3 + + + 3.1.5 + + + + + jakarta.ws.rs + jakarta.ws.rs-api + + + + io.reactivex.rxjava3 + rxjava + ${version.io.reactivex.rxjava3-rxjava} + + + + org.jboss.logging + jboss-logging-annotations + + true + provided + + + org.jboss.logging + jboss-logging-processor + + true + provided + + + + org.jboss.resteasy + resteasy-client + + + org.jboss.resteasy + resteasy-core + + + org.jboss.resteasy + resteasy-core-spi + + + + + org.jboss.resteasy + resteasy-undertow + test + + + org.junit.jupiter + junit-jupiter + test + + + + \ No newline at end of file diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableProvider.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableProvider.java new file mode 100644 index 0000000..dfa92d4 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableProvider.java @@ -0,0 +1,36 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import javax.ws.rs.ext.Provider; + +import org.jboss.resteasy.spi.AsyncStreamProvider; +import org.reactivestreams.Publisher; + +import io.reactivex.rxjava3.core.Flowable; + +@Provider +public class FlowableProvider implements AsyncStreamProvider> { + @Override + public Publisher toAsyncStream(Flowable asyncResponse) { + return asyncResponse; + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvoker.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvoker.java new file mode 100644 index 0000000..6f5c587 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvoker.java @@ -0,0 +1,31 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import javax.ws.rs.client.RxInvoker; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; + +public interface FlowableRxInvoker extends RxInvoker> { + BackpressureStrategy getBackpressureStrategy(); + + void setBackpressureStrategy(BackpressureStrategy backpressureStrategy); +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvokerImpl.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvokerImpl.java new file mode 100644 index 0000000..00996e2 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvokerImpl.java @@ -0,0 +1,253 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.SyncInvoker; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; + +import org.jboss.resteasy.client.jaxrs.internal.ClientInvocationBuilder; +import org.jboss.resteasy.plugins.providers.sse.InboundSseEventImpl; +import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl; +import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl.SourceBuilder; + +import dev.resteasy.rxjava3.i18n.Messages; +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; + +public class FlowableRxInvokerImpl implements FlowableRxInvoker { + private static final Object monitor = new Object(); + private final ClientInvocationBuilder syncInvoker; + private final ScheduledExecutorService executorService; + private BackpressureStrategy backpressureStrategy = BackpressureStrategy.BUFFER; + + public FlowableRxInvokerImpl(final SyncInvoker syncInvoker, final ExecutorService executorService) { + if (!(syncInvoker instanceof ClientInvocationBuilder)) { + throw Messages.MESSAGES.expectedClientInvocationBuilder(syncInvoker.getClass().getName()); + } + this.syncInvoker = (ClientInvocationBuilder) syncInvoker; + if (executorService instanceof ScheduledExecutorService) { + this.executorService = (ScheduledExecutorService) executorService; + } else { + this.executorService = null; + } + } + + @Override + public Flowable get() { + return eventSourceToObservable(getEventSource(), String.class, "GET", null, getAccept()); + } + + @Override + public Flowable get(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "GET", null, getAccept()); + } + + @Override + public Flowable get(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "GET", null, getAccept()); + } + + @Override + public Flowable put(Entity entity) { + return eventSourceToObservable(getEventSource(), String.class, "PUT", entity, getAccept()); + } + + @Override + public Flowable put(Entity entity, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "PUT", entity, getAccept()); + } + + @Override + public Flowable put(Entity entity, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "PUT", entity, getAccept()); + } + + @Override + public Flowable post(Entity entity) { + return eventSourceToObservable(getEventSource(), String.class, "POST", entity, getAccept()); + } + + @Override + public Flowable post(Entity entity, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "POST", entity, getAccept()); + } + + @Override + public Flowable post(Entity entity, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "POST", entity, getAccept()); + } + + @Override + public Flowable delete() { + return eventSourceToObservable(getEventSource(), String.class, "DELETE", null, getAccept()); + } + + @Override + public Flowable delete(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "DELETE", null, getAccept()); + } + + @Override + public Flowable delete(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "DELETE", null, getAccept()); + } + + @Override + public Flowable head() { + return eventSourceToObservable(getEventSource(), String.class, "HEAD", null, getAccept()); + } + + @Override + public Flowable options() { + return eventSourceToObservable(getEventSource(), String.class, "OPTIONS", null, getAccept()); + } + + @Override + public Flowable options(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "OPTIONS", null, getAccept()); + } + + @Override + public Flowable options(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "OPTIONS", null, getAccept()); + } + + @Override + public Flowable trace() { + return eventSourceToObservable(getEventSource(), String.class, "TRACE", null, getAccept()); + } + + @Override + public Flowable trace(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "TRACE", null, getAccept()); + } + + @Override + public Flowable trace(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "TRACE", null, getAccept()); + } + + @Override + public Flowable method(String name) { + return eventSourceToObservable(getEventSource(), String.class, name, null, getAccept()); + } + + @Override + public Flowable method(String name, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, null, getAccept()); + } + + @Override + public Flowable method(String name, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, null, getAccept()); + } + + @Override + public Flowable method(String name, Entity entity) { + return eventSourceToObservable(getEventSource(), String.class, name, entity, getAccept()); + } + + @Override + public Flowable method(String name, Entity entity, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, entity, getAccept()); + } + + @Override + public Flowable method(String name, Entity entity, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, entity, getAccept()); + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + @Override + public BackpressureStrategy getBackpressureStrategy() { + return backpressureStrategy; + } + + @Override + public void setBackpressureStrategy(BackpressureStrategy backpressureStrategy) { + this.backpressureStrategy = backpressureStrategy; + } + + private Flowable eventSourceToObservable(SseEventSourceImpl sseEventSource, Class clazz, String verb, + Entity entity, MediaType[] mediaTypes) { + return Flowable.create( + emitter -> { + sseEventSource.register( + (InboundSseEvent e) -> { + T t = e.readData(clazz, ((InboundSseEventImpl) e).getMediaType()); + emitter.onNext(t); + }, + emitter::onError, + emitter::onComplete); + synchronized (monitor) { + if (!sseEventSource.isOpen()) { + sseEventSource.open(null, verb, entity, mediaTypes); + } + } + }, + backpressureStrategy); + } + + private Flowable eventSourceToObservable(SseEventSourceImpl sseEventSource, GenericType type, String verb, + Entity entity, MediaType[] mediaTypes) { + return Flowable.create( + emitter -> { + sseEventSource.register( + (InboundSseEvent e) -> { + T t = e.readData(type, ((InboundSseEventImpl) e).getMediaType()); + emitter.onNext(t); + }, + emitter::onError, + emitter::onComplete); + synchronized (monitor) { + if (!sseEventSource.isOpen()) { + sseEventSource.open(null, verb, entity, mediaTypes); + } + } + }, + backpressureStrategy); + } + + private SseEventSourceImpl getEventSource() { + SourceBuilder builder = (SourceBuilder) SseEventSource.target(syncInvoker.getTarget()); + if (executorService != null) { + builder.executor(executorService); + } + return (SseEventSourceImpl) builder.alwaysReconnect(false).build(); + } + + private MediaType[] getAccept() { + if (syncInvoker != null) { + List accept = syncInvoker.getHeaders().getAcceptableMediaTypes(); + return accept.toArray(new MediaType[0]); + } else { + return null; + } + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvokerProvider.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvokerProvider.java new file mode 100644 index 0000000..be3317d --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/FlowableRxInvokerProvider.java @@ -0,0 +1,50 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.concurrent.ExecutorService; + +import javax.ws.rs.client.RxInvokerProvider; +import javax.ws.rs.client.SyncInvoker; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.ext.Provider; + +@Provider +public class FlowableRxInvokerProvider implements RxInvokerProvider { + WebTarget target; + + @Override + public boolean isProviderFor(Class clazz) { + return FlowableRxInvoker.class.equals(clazz); + } + + @Override + public FlowableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) { + return new FlowableRxInvokerImpl(syncInvoker, executorService); + } + + public WebTarget getTarget() { + return target; + } + + public void setTarget(WebTarget target) { + this.target = target; + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableProvider.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableProvider.java new file mode 100644 index 0000000..911f041 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableProvider.java @@ -0,0 +1,37 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import javax.ws.rs.ext.Provider; + +import org.jboss.resteasy.spi.AsyncStreamProvider; +import org.reactivestreams.Publisher; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Observable; + +@Provider +public class ObservableProvider implements AsyncStreamProvider> { + @Override + public Publisher toAsyncStream(Observable asyncResponse) { + return asyncResponse.toFlowable(BackpressureStrategy.BUFFER); + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvoker.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvoker.java new file mode 100644 index 0000000..12c2232 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvoker.java @@ -0,0 +1,28 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import javax.ws.rs.client.RxInvoker; + +import io.reactivex.rxjava3.core.Observable; + +public interface ObservableRxInvoker extends RxInvoker> { + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvokerImpl.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvokerImpl.java new file mode 100644 index 0000000..57e3e8b --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvokerImpl.java @@ -0,0 +1,239 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.SyncInvoker; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; + +import org.jboss.resteasy.client.jaxrs.internal.ClientInvocationBuilder; +import org.jboss.resteasy.plugins.providers.sse.InboundSseEventImpl; +import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl; +import org.jboss.resteasy.plugins.providers.sse.client.SseEventSourceImpl.SourceBuilder; + +import dev.resteasy.rxjava3.i18n.Messages; +import io.reactivex.rxjava3.core.Observable; + +public class ObservableRxInvokerImpl implements ObservableRxInvoker { + private static final Object monitor = new Object(); + private final ClientInvocationBuilder syncInvoker; + private final ScheduledExecutorService executorService; + + public ObservableRxInvokerImpl(final SyncInvoker syncInvoker, final ExecutorService executorService) { + if (!(syncInvoker instanceof ClientInvocationBuilder)) { + throw Messages.MESSAGES.expectedClientInvocationBuilder(syncInvoker.getClass().getName()); + } + this.syncInvoker = (ClientInvocationBuilder) syncInvoker; + if (executorService instanceof ScheduledExecutorService) { + this.executorService = (ScheduledExecutorService) executorService; + } else { + this.executorService = null; + } + } + + @Override + public Observable get() { + return eventSourceToObservable(getEventSource(), String.class, "GET", null, getAccept()); + } + + @Override + public Observable get(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "GET", null, getAccept()); + } + + @Override + public Observable get(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "GET", null, getAccept()); + } + + @Override + public Observable put(Entity entity) { + return eventSourceToObservable(getEventSource(), String.class, "PUT", entity, getAccept()); + } + + @Override + public Observable put(Entity entity, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "PUT", entity, getAccept()); + } + + @Override + public Observable put(Entity entity, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "PUT", entity, getAccept()); + } + + @Override + public Observable post(Entity entity) { + return eventSourceToObservable(getEventSource(), String.class, "POST", entity, getAccept()); + } + + @Override + public Observable post(Entity entity, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "POST", entity, getAccept()); + } + + @Override + public Observable post(Entity entity, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "POST", entity, getAccept()); + } + + @Override + public Observable delete() { + return eventSourceToObservable(getEventSource(), String.class, "DELETE", null, getAccept()); + } + + @Override + public Observable delete(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "DELETE", null, getAccept()); + } + + @Override + public Observable delete(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "DELETE", null, getAccept()); + } + + @Override + public Observable head() { + return eventSourceToObservable(getEventSource(), String.class, "HEAD", null, getAccept()); + } + + @Override + public Observable options() { + return eventSourceToObservable(getEventSource(), String.class, "OPTIONS", null, getAccept()); + } + + @Override + public Observable options(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "OPTIONS", null, getAccept()); + } + + @Override + public Observable options(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "OPTIONS", null, getAccept()); + } + + @Override + public Observable trace() { + return eventSourceToObservable(getEventSource(), String.class, "TRACE", null, getAccept()); + } + + @Override + public Observable trace(Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, "TRACE", null, getAccept()); + } + + @Override + public Observable trace(GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, "TRACE", null, getAccept()); + } + + @Override + public Observable method(String name) { + return eventSourceToObservable(getEventSource(), String.class, name, null, getAccept()); + } + + @Override + public Observable method(String name, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, null, getAccept()); + } + + @Override + public Observable method(String name, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, null, getAccept()); + } + + @Override + public Observable method(String name, Entity entity) { + return eventSourceToObservable(getEventSource(), String.class, name, entity, getAccept()); + } + + @Override + public Observable method(String name, Entity entity, Class responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, entity, getAccept()); + } + + @Override + public Observable method(String name, Entity entity, GenericType responseType) { + return eventSourceToObservable(getEventSource(), responseType, name, entity, getAccept()); + } + + /////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + private Observable eventSourceToObservable(SseEventSourceImpl sseEventSource, Class clazz, String verb, + Entity entity, MediaType[] mediaTypes) { + return Observable.create( + emitter -> { + sseEventSource.register( + (InboundSseEvent e) -> { + T t = e.readData(clazz, ((InboundSseEventImpl) e).getMediaType()); + emitter.onNext(t); + }, + emitter::onError, + emitter::onComplete); + synchronized (monitor) { + if (!sseEventSource.isOpen()) { + sseEventSource.open(null, verb, entity, mediaTypes); + } + } + }); + } + + private Observable eventSourceToObservable(SseEventSourceImpl sseEventSource, GenericType type, String verb, + Entity entity, MediaType[] mediaTypes) { + return Observable.create( + emitter -> { + sseEventSource.register( + (InboundSseEvent e) -> { + T t = e.readData(type, ((InboundSseEventImpl) e).getMediaType()); + emitter.onNext(t); + }, + emitter::onError, + emitter::onComplete); + synchronized (monitor) { + if (!sseEventSource.isOpen()) { + sseEventSource.open(null, verb, entity, mediaTypes); + } + } + }); + } + + private SseEventSourceImpl getEventSource() { + SourceBuilder builder = (SourceBuilder) SseEventSource.target(syncInvoker.getTarget()); + if (executorService != null) { + builder.executor(executorService); + } + return (SseEventSourceImpl) builder.alwaysReconnect(false).build(); + } + + private MediaType[] getAccept() { + if (syncInvoker != null) { + List accept = syncInvoker.getHeaders().getAcceptableMediaTypes(); + return accept.toArray(new MediaType[0]); + } else { + return null; + } + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvokerProvider.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvokerProvider.java new file mode 100644 index 0000000..8667822 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/ObservableRxInvokerProvider.java @@ -0,0 +1,50 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.concurrent.ExecutorService; + +import javax.ws.rs.client.RxInvokerProvider; +import javax.ws.rs.client.SyncInvoker; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.ext.Provider; + +@Provider +public class ObservableRxInvokerProvider implements RxInvokerProvider { + WebTarget target; + + @Override + public boolean isProviderFor(Class clazz) { + return ObservableRxInvoker.class.equals(clazz); + } + + @Override + public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) { + return new ObservableRxInvokerImpl(syncInvoker, executorService); + } + + public WebTarget getTarget() { + return target; + } + + public void setTarget(WebTarget target) { + this.target = target; + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleProvider.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleProvider.java new file mode 100644 index 0000000..e0cb81e --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleProvider.java @@ -0,0 +1,64 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +import javax.ws.rs.ext.Provider; + +import org.jboss.resteasy.spi.AsyncClientResponseProvider; +import org.jboss.resteasy.spi.AsyncResponseProvider; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.disposables.Disposable; + +@Provider +public class SingleProvider implements AsyncResponseProvider>, AsyncClientResponseProvider> { + private static class SingleAdaptor extends CompletableFuture { + private final Disposable subscription; + + SingleAdaptor(final Single single) { + this.subscription = single.subscribe(this::complete, this::completeExceptionally); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + subscription.dispose(); + return super.cancel(mayInterruptIfRunning); + } + } + + @Override + public CompletionStage toCompletionStage(Single asyncResponse) { + return new SingleAdaptor<>(asyncResponse); + } + + @Override + public Single fromCompletionStage(CompletionStage completionStage) { + return Single.fromFuture(completionStage.toCompletableFuture()); + } + + @Override + public Single fromCompletionStage(final Supplier> completionStageSupplier) { + return Single.defer(() -> fromCompletionStage(completionStageSupplier.get())); + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvoker.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvoker.java new file mode 100644 index 0000000..2d8dc57 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvoker.java @@ -0,0 +1,105 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.RxInvoker; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; + +import io.reactivex.rxjava3.core.Single; + +public interface SingleRxInvoker extends RxInvoker> { + @Override + Single get(); + + @Override + Single get(Class responseType); + + @Override + Single get(GenericType responseType); + + @Override + Single put(Entity entity); + + @Override + Single put(Entity entity, Class clazz); + + @Override + Single put(Entity entity, GenericType type); + + @Override + Single post(Entity entity); + + @Override + Single post(Entity entity, Class clazz); + + @Override + Single post(Entity entity, GenericType type); + + @Override + Single delete(); + + @Override + Single delete(Class responseType); + + @Override + Single delete(GenericType responseType); + + @Override + Single head(); + + @Override + Single options(); + + @Override + Single options(Class responseType); + + @Override + Single options(GenericType responseType); + + @Override + Single trace(); + + @Override + Single trace(Class responseType); + + @Override + Single trace(GenericType responseType); + + @Override + Single method(String name); + + @Override + Single method(String name, Class responseType); + + @Override + Single method(String name, GenericType responseType); + + @Override + Single method(String name, Entity entity); + + @Override + Single method(String name, Entity entity, Class responseType); + + @Override + Single method(String name, Entity entity, GenericType responseType); + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvokerImpl.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvokerImpl.java new file mode 100644 index 0000000..516cf33 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvokerImpl.java @@ -0,0 +1,179 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.Objects; +import java.util.concurrent.CompletionStage; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; + +import org.jboss.resteasy.client.jaxrs.internal.ClientInvocationBuilder; +import org.jboss.resteasy.client.jaxrs.internal.PublisherRxInvokerImpl; +import org.reactivestreams.Publisher; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; + +public class SingleRxInvokerImpl implements SingleRxInvoker { + + private final SinglePublisherInvoker publisherInvoker; + + public SingleRxInvokerImpl(final ClientInvocationBuilder builder) { + publisherInvoker = new SinglePublisherInvoker(Objects.requireNonNull(builder)); + } + + static class SinglePublisherInvoker extends PublisherRxInvokerImpl { + SinglePublisherInvoker(final ClientInvocationBuilder builder) { + super(builder); + } + + @Override + protected Publisher toPublisher(CompletionStage completable) { + return Flowable.fromFuture(completable.toCompletableFuture()); + } + } + + @Override + public Single get() { + return Single.fromPublisher(publisherInvoker.get()); + } + + @Override + public Single get(Class responseType) { + return Single.fromPublisher(publisherInvoker.get(responseType)); + } + + @Override + public Single get(GenericType responseType) { + return Single.fromPublisher(publisherInvoker.get(responseType)); + } + + @Override + public Single put(Entity entity) { + return Single.fromPublisher(publisherInvoker.put(entity)); + } + + @Override + public Single put(Entity entity, Class clazz) { + return Single.fromPublisher(publisherInvoker.put(entity, clazz)); + } + + @Override + public Single put(Entity entity, GenericType type) { + return Single.fromPublisher(publisherInvoker.put(entity, type)); + } + + @Override + public Single post(Entity entity) { + return Single.fromPublisher(publisherInvoker.post(entity)); + } + + @Override + public Single post(Entity entity, Class clazz) { + return Single.fromPublisher(publisherInvoker.post(entity, clazz)); + } + + @Override + public Single post(Entity entity, GenericType type) { + return Single.fromPublisher(publisherInvoker.post(entity, type)); + } + + @Override + public Single delete() { + return Single.fromPublisher(publisherInvoker.delete()); + } + + @Override + public Single delete(Class responseType) { + return Single.fromPublisher(publisherInvoker.delete(responseType)); + } + + @Override + public Single delete(GenericType responseType) { + return Single.fromPublisher(publisherInvoker.delete(responseType)); + } + + @Override + public Single head() { + return Single.fromPublisher(publisherInvoker.head()); + } + + @Override + public Single options() { + return Single.fromPublisher(publisherInvoker.options()); + } + + @Override + public Single options(Class responseType) { + return Single.fromPublisher(publisherInvoker.options(responseType)); + } + + @Override + public Single options(GenericType responseType) { + return Single.fromPublisher(publisherInvoker.options(responseType)); + } + + @Override + public Single trace() { + return Single.fromPublisher(publisherInvoker.trace()); + } + + @Override + public Single trace(Class responseType) { + return Single.fromPublisher(publisherInvoker.trace(responseType)); + } + + @Override + public Single trace(GenericType responseType) { + return Single.fromPublisher(publisherInvoker.trace(responseType)); + } + + @Override + public Single method(String name) { + return Single.fromPublisher(publisherInvoker.method(name)); + } + + @Override + public Single method(String name, Class responseType) { + return Single.fromPublisher(publisherInvoker.method(name, responseType)); + } + + @Override + public Single method(String name, GenericType responseType) { + return Single.fromPublisher(publisherInvoker.method(name, responseType)); + } + + @Override + public Single method(String name, Entity entity) { + return Single.fromPublisher(publisherInvoker.method(name, entity)); + } + + @Override + public Single method(String name, Entity entity, Class responseType) { + return Single.fromPublisher(publisherInvoker.method(name, entity, responseType)); + } + + @Override + public Single method(String name, Entity entity, GenericType responseType) { + return Single.fromPublisher(publisherInvoker.method(name, entity, responseType)); + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvokerProvider.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvokerProvider.java new file mode 100644 index 0000000..a44f7a3 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/SingleRxInvokerProvider.java @@ -0,0 +1,44 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.concurrent.ExecutorService; + +import javax.ws.rs.client.RxInvokerProvider; +import javax.ws.rs.client.SyncInvoker; + +import org.jboss.resteasy.client.jaxrs.internal.ClientInvocationBuilder; + +import dev.resteasy.rxjava3.i18n.Messages; + +public class SingleRxInvokerProvider implements RxInvokerProvider { + @Override + public boolean isProviderFor(Class clazz) { + return SingleRxInvoker.class.equals(clazz); + } + + @Override + public SingleRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) { + if (syncInvoker instanceof ClientInvocationBuilder) { + return new SingleRxInvokerImpl((ClientInvocationBuilder) syncInvoker); + } + throw Messages.MESSAGES.expectedClientInvocationBuilder(syncInvoker.getClass().getName()); + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/i18n/Messages.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/i18n/Messages.java new file mode 100644 index 0000000..1ebadb5 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/i18n/Messages.java @@ -0,0 +1,38 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.i18n; + +import javax.ws.rs.ProcessingException; + +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageBundle; + +/** + * + * @author Ron Sigal + * @version $Revision: 1.1 $ + */ +@MessageBundle(projectCode = "RESTEASY") +public interface Messages { + Messages MESSAGES = org.jboss.logging.Messages.getBundle(Messages.class); + + @Message(id = 21500, value = "Expected ClientInvocationBuilder, not: %s") + ProcessingException expectedClientInvocationBuilder(String className); +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnCompletableAssemblyAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnCompletableAssemblyAction.java new file mode 100644 index 0000000..f09002e --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnCompletableAssemblyAction.java @@ -0,0 +1,59 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.functions.Function; + +/** + * @author James R. Perkins + */ +class ContextPropagatorOnCompletableAssemblyAction implements Function { + + ContextPropagatorOnCompletableAssemblyAction() { + } + + @Override + public Completable apply(final Completable t) throws Exception { + return new ContextPropagatorCompletable(t, ContextualExecutors.executor()); + } + + private static class ContextPropagatorCompletable extends Completable { + + private final Completable source; + private final Executor contextExecutor; + + private ContextPropagatorCompletable(final Completable t, final Executor contextExecutor) { + this.source = t; + this.contextExecutor = contextExecutor; + } + + @Override + protected void subscribeActual(final CompletableObserver observer) { + contextExecutor.execute(() -> source.subscribe(observer)); + } + + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnCompletableCreateAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnCompletableCreateAction.java new file mode 100644 index 0000000..824a340 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnCompletableCreateAction.java @@ -0,0 +1,70 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.BiFunction; + +class ContextPropagatorOnCompletableCreateAction + implements BiFunction { + + ContextPropagatorOnCompletableCreateAction() { + } + + @Override + public CompletableObserver apply(final Completable completable, final CompletableObserver observer) + throws Exception { + return new ContextCapturerCompletable(completable, observer, ContextualExecutors.executor()); + } + + private static class ContextCapturerCompletable implements CompletableObserver { + + private final CompletableObserver source; + private final Executor contextExecutor; + + private ContextCapturerCompletable(final Completable s, final CompletableObserver o, + final Executor contextExecutor) { + this.source = o; + this.contextExecutor = contextExecutor; + } + + @Override + public void onError(final Throwable t) { + contextExecutor.execute(() -> source.onError(t)); + } + + @Override + public void onSubscribe(final Disposable d) { + contextExecutor.execute(() -> source.onSubscribe(d)); + } + + @Override + public void onComplete() { + contextExecutor.execute(source::onComplete); + } + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnFlowableAssemblyAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnFlowableAssemblyAction.java new file mode 100644 index 0000000..8cd8d82 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnFlowableAssemblyAction.java @@ -0,0 +1,60 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.functions.Function; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnFlowableAssemblyAction implements Function { + + ContextPropagatorOnFlowableAssemblyAction() { + } + + @SuppressWarnings("unchecked") + @Override + public Flowable apply(final Flowable t) throws Exception { + return new ContextPropagatorFlowable(t, ContextualExecutors.executor()); + } + + private static class ContextPropagatorFlowable extends Flowable { + + private final Flowable source; + + private final Executor contextExecutor; + + private ContextPropagatorFlowable(final Flowable t, final Executor contextExecutor) { + this.source = t; + this.contextExecutor = contextExecutor; + } + + @Override + protected void subscribeActual(final Subscriber observer) { + contextExecutor.execute(() -> source.subscribe(observer)); + } + + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnFlowableCreateAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnFlowableCreateAction.java new file mode 100644 index 0000000..de1bd30 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnFlowableCreateAction.java @@ -0,0 +1,76 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.functions.BiFunction; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnFlowableCreateAction implements BiFunction { + + ContextPropagatorOnFlowableCreateAction() { + } + + @SuppressWarnings("unchecked") + @Override + public Subscriber apply(final Flowable flowable, final Subscriber observer) throws Exception { + return new ContextCapturerFlowable<>(flowable, observer, ContextualExecutors.executor()); + } + + @SuppressWarnings("ReactiveStreamsSubscriberImplementation") + private static class ContextCapturerFlowable implements Subscriber { + + private final Subscriber source; + private final Executor contextExecutor; + + private ContextCapturerFlowable(final Flowable observable, final Subscriber observer, + final Executor contextExecutor) { + this.source = observer; + this.contextExecutor = contextExecutor; + } + + @Override + public void onComplete() { + contextExecutor.execute(source::onComplete); + } + + @Override + public void onError(final Throwable t) { + contextExecutor.execute(() -> source.onError(t)); + } + + @Override + public void onNext(final T v) { + contextExecutor.execute(() -> source.onNext(v)); + } + + @Override + public void onSubscribe(final Subscription s) { + contextExecutor.execute(() -> source.onSubscribe(s)); + } + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnMaybeAssemblyAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnMaybeAssemblyAction.java new file mode 100644 index 0000000..1968f94 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnMaybeAssemblyAction.java @@ -0,0 +1,60 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.functions.Function; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnMaybeAssemblyAction implements Function { + + ContextPropagatorOnMaybeAssemblyAction() { + } + + @SuppressWarnings("unchecked") + @Override + public Maybe apply(Maybe t) throws Exception { + return new ContextPropagatorMaybe(t, ContextualExecutors.executor()); + } + + private static class ContextPropagatorMaybe extends Maybe { + + private final Maybe source; + + private final Executor contextExecutor; + + private ContextPropagatorMaybe(final Maybe t, final Executor contextExecutor) { + this.source = t; + this.contextExecutor = contextExecutor; + } + + @Override + protected void subscribeActual(final MaybeObserver observer) { + contextExecutor.execute(() -> source.subscribe(observer)); + } + + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnMaybeCreateAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnMaybeCreateAction.java new file mode 100644 index 0000000..48024e5 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnMaybeCreateAction.java @@ -0,0 +1,75 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.BiFunction; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnMaybeCreateAction implements BiFunction { + + ContextPropagatorOnMaybeCreateAction() { + } + + @SuppressWarnings("unchecked") + @Override + public MaybeObserver apply(final Maybe maybe, final MaybeObserver observer) throws Exception { + return new ContextCapturerMaybe<>(maybe, observer, ContextualExecutors.executor()); + } + + private static class ContextCapturerMaybe implements MaybeObserver { + + private final MaybeObserver source; + private final Executor contextExecutor; + + private ContextCapturerMaybe(final Maybe observable, final MaybeObserver observer, + final Executor contextExecutor) { + this.source = observer; + this.contextExecutor = contextExecutor; + } + + @Override + public void onComplete() { + contextExecutor.execute(source::onComplete); + } + + @Override + public void onError(final Throwable t) { + contextExecutor.execute(() -> source.onError(t)); + } + + @Override + public void onSubscribe(final Disposable d) { + contextExecutor.execute(() -> source.onSubscribe(d)); + } + + @Override + public void onSuccess(T v) { + contextExecutor.execute(() -> source.onSuccess(v)); + } + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnObservableAssemblyAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnObservableAssemblyAction.java new file mode 100644 index 0000000..dbd1d20 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnObservableAssemblyAction.java @@ -0,0 +1,60 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.functions.Function; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnObservableAssemblyAction implements Function { + + ContextPropagatorOnObservableAssemblyAction() { + } + + @SuppressWarnings("unchecked") + @Override + public Observable apply(final Observable t) throws Exception { + return new ContextPropagatorObservable(t, ContextualExecutors.executor()); + } + + private static class ContextPropagatorObservable extends Observable { + + private final Observable source; + + private final Executor contextExecutor; + + private ContextPropagatorObservable(final Observable t, final Executor contextExecutor) { + this.source = t; + this.contextExecutor = contextExecutor; + } + + @Override + protected void subscribeActual(final Observer observer) { + contextExecutor.execute(() -> source.subscribe(observer)); + } + + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnObservableCreateAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnObservableCreateAction.java new file mode 100644 index 0000000..59af98a --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnObservableCreateAction.java @@ -0,0 +1,74 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.BiFunction; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnObservableCreateAction implements BiFunction { + + ContextPropagatorOnObservableCreateAction() { + } + + @SuppressWarnings("unchecked") + @Override + public Observer apply(final Observable observable, final Observer observer) throws Exception { + return new ContextCapturerObservable(observable, observer, ContextualExecutors.executor()); + } + + private static class ContextCapturerObservable implements Observer { + + private final Observer source; + private final Executor contextExecutor; + + private ContextCapturerObservable(final Observable observable, final Observer observer, + final Executor contextExecutor) { + this.source = observer; + this.contextExecutor = contextExecutor; + } + + @Override + public void onComplete() { + contextExecutor.execute(source::onComplete); + } + + @Override + public void onError(final Throwable t) { + contextExecutor.execute(() -> source.onError(t)); + } + + @Override + public void onNext(final T v) { + contextExecutor.execute(() -> source.onNext(v)); + } + + @Override + public void onSubscribe(final Disposable d) { + contextExecutor.execute(() -> source.onSubscribe(d)); + } + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnSingleAssemblyAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnSingleAssemblyAction.java new file mode 100644 index 0000000..e00d77a --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnSingleAssemblyAction.java @@ -0,0 +1,59 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.functions.Function; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnSingleAssemblyAction implements Function { + + ContextPropagatorOnSingleAssemblyAction() { + } + + @SuppressWarnings("unchecked") + @Override + public Single apply(final Single t) throws Exception { + return new ContextPropagatorSingle(t, ContextualExecutors.executor()); + } + + private static class ContextPropagatorSingle extends Single { + + private final Single source; + private final Executor contextExecutor; + + private ContextPropagatorSingle(final Single t, final Executor contextExecutor) { + this.source = t; + this.contextExecutor = contextExecutor; + } + + @Override + protected void subscribeActual(final SingleObserver observer) { + contextExecutor.execute(() -> source.subscribe(observer)); + } + + } + +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnSingleCreateAction.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnSingleCreateAction.java new file mode 100644 index 0000000..45dbb29 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/ContextPropagatorOnSingleCreateAction.java @@ -0,0 +1,68 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import java.util.concurrent.Executor; + +import org.jboss.resteasy.concurrent.ContextualExecutors; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.BiFunction; + +@SuppressWarnings("rawtypes") +class ContextPropagatorOnSingleCreateAction implements BiFunction { + + ContextPropagatorOnSingleCreateAction() { + } + + @SuppressWarnings("unchecked") + @Override + public SingleObserver apply(final Single s, final SingleObserver o) throws Exception { + return new ContextCapturerSingle(s, o, ContextualExecutors.executor()); + } + + private static class ContextCapturerSingle implements SingleObserver { + + private final SingleObserver source; + private final Executor contextExecutor; + + private ContextCapturerSingle(final Single s, final SingleObserver o, final Executor contextExecutor) { + this.source = o; + this.contextExecutor = contextExecutor; + } + + @Override + public void onError(final Throwable t) { + contextExecutor.execute(() -> source.onError(t)); + } + + @Override + public void onSubscribe(final Disposable d) { + contextExecutor.execute(() -> source.onSubscribe(d)); + } + + @Override + public void onSuccess(final T v) { + contextExecutor.execute(() -> source.onSuccess(v)); + } + } +} diff --git a/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/RxJava3ContextPropagator.java b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/RxJava3ContextPropagator.java new file mode 100644 index 0000000..f3e0c45 --- /dev/null +++ b/rxjava3/src/main/java/dev/resteasy/rxjava3/propagation/RxJava3ContextPropagator.java @@ -0,0 +1,74 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3.propagation; + +import javax.ws.rs.RuntimeType; +import javax.ws.rs.core.Feature; +import javax.ws.rs.core.FeatureContext; +import javax.ws.rs.ext.Provider; + +import org.jboss.resteasy.core.ResteasyContext; +import org.jboss.resteasy.spi.Dispatcher; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Reactive Context propagator for RxJava 2. Supports propagating context to all + * {@link Single}, {@link Observable}, {@link Completable}, {@link Flowable} and + * {@link Maybe} types. + * + * @author Stéphane Épardaud + * @author James R. Perkins + */ +@Provider +public class RxJava3ContextPropagator implements Feature { + + @Override + public boolean configure(final FeatureContext context) { + // this is tied to the deployment, which is what we want for the reactive context + if (context.getConfiguration().getRuntimeType() == RuntimeType.CLIENT) + return false; + if (ResteasyContext.getContextData(Dispatcher.class) == null) { + // this can happen, but it means we're not able to find a deployment + return false; + } + configure(); + return true; + } + + private synchronized void configure() { + RxJavaPlugins.setOnSingleSubscribe(new ContextPropagatorOnSingleCreateAction()); + RxJavaPlugins.setOnCompletableSubscribe(new ContextPropagatorOnCompletableCreateAction()); + RxJavaPlugins.setOnFlowableSubscribe(new ContextPropagatorOnFlowableCreateAction()); + RxJavaPlugins.setOnMaybeSubscribe(new ContextPropagatorOnMaybeCreateAction()); + RxJavaPlugins.setOnObservableSubscribe(new ContextPropagatorOnObservableCreateAction()); + + RxJavaPlugins.setOnSingleAssembly(new ContextPropagatorOnSingleAssemblyAction()); + RxJavaPlugins.setOnCompletableAssembly(new ContextPropagatorOnCompletableAssemblyAction()); + RxJavaPlugins.setOnFlowableAssembly(new ContextPropagatorOnFlowableAssemblyAction()); + RxJavaPlugins.setOnMaybeAssembly(new ContextPropagatorOnMaybeAssemblyAction()); + RxJavaPlugins.setOnObservableAssembly(new ContextPropagatorOnObservableAssemblyAction()); + } +} diff --git a/rxjava3/src/main/resources/META-INF/services/javax.ws.rs.ext.Providers b/rxjava3/src/main/resources/META-INF/services/javax.ws.rs.ext.Providers new file mode 100644 index 0000000..d63ca0f --- /dev/null +++ b/rxjava3/src/main/resources/META-INF/services/javax.ws.rs.ext.Providers @@ -0,0 +1,29 @@ +# +# JBoss, Home of Professional Open Source. +# +# Copyright 2022 Red Hat, Inc., and individual contributors +# as indicated by the @author tags. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +dev.resteasy.rxjava3.SingleProvider +dev.resteasy.rxjava3.SingleRxInvokerImpl +dev.resteasy.rxjava3.SingleRxInvokerProvider +dev.resteasy.rxjava3.ObservableProvider +dev.resteasy.rxjava3.ObservableRxInvokerImpl +dev.resteasy.rxjava3.ObservableRxInvokerProvider +dev.resteasy.rxjava3.FlowableProvider +dev.resteasy.rxjava3.FlowableRxInvokerImpl +dev.resteasy.rxjava3.FlowableRxInvokerProvider +dev.resteasy.rxjava3.propagation.RxJava3ContextPropagator \ No newline at end of file diff --git a/rxjava3/src/test/java/dev/resteasy/rxjava3/Async.java b/rxjava3/src/test/java/dev/resteasy/rxjava3/Async.java new file mode 100644 index 0000000..e29ada9 --- /dev/null +++ b/rxjava3/src/test/java/dev/resteasy/rxjava3/Async.java @@ -0,0 +1,33 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD }) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface Async { + +} diff --git a/rxjava3/src/test/java/dev/resteasy/rxjava3/RxInjector.java b/rxjava3/src/test/java/dev/resteasy/rxjava3/RxInjector.java new file mode 100644 index 0000000..ba4a27b --- /dev/null +++ b/rxjava3/src/test/java/dev/resteasy/rxjava3/RxInjector.java @@ -0,0 +1,57 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +import javax.ws.rs.ext.Provider; + +import org.jboss.resteasy.spi.ContextInjector; + +import io.reactivex.rxjava3.core.Single; + +@Provider +public class RxInjector implements ContextInjector, Integer> { + + @Override + public Single resolve(Class> rawType, Type genericType, + Annotation[] annotations) { + boolean async = false; + for (Annotation annotation : annotations) { + if (annotation.annotationType() == Async.class) + async = true; + } + if (!async) + return Single.just(42); + return Single.create(emitter -> { + new Thread(() -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + emitter.onError(e); + return; + } + emitter.onSuccess(42); + }).start(); + }); + } + +} diff --git a/rxjava3/src/test/java/dev/resteasy/rxjava3/RxResource.java b/rxjava3/src/test/java/dev/resteasy/rxjava3/RxResource.java new file mode 100644 index 0000000..264d58e --- /dev/null +++ b/rxjava3/src/test/java/dev/resteasy/rxjava3/RxResource.java @@ -0,0 +1,130 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.UriInfo; + +import org.jboss.resteasy.annotations.Stream; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; + +@Path("/") +public class RxResource { + @Path("single") + @GET + public Single single() { + return Single.just("got it"); + } + + @Produces(MediaType.APPLICATION_JSON) + @Path("observable") + @GET + @Stream + public Observable observable() { + return Observable.fromArray("one", "two"); + } + + @Produces(MediaType.APPLICATION_JSON) + @Path("flowable") + @GET + @Stream + public Flowable flowable() { + return Flowable.fromArray("one", "two"); + } + + @Path("context/single") + @GET + public Single contextSingle(@Context UriInfo uriInfo) { + return Single. create(foo -> { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable() { + public void run() { + foo.onSuccess("got it"); + } + }); + }).map(str -> { + uriInfo.getAbsolutePath(); + return str; + }); + } + + @Produces(MediaType.APPLICATION_JSON) + @Path("context/observable") + @GET + @Stream + public Observable contextObservable(@Context UriInfo uriInfo) { + return Observable. create(foo -> { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable() { + public void run() { + foo.onNext("one"); + foo.onNext("two"); + foo.onComplete(); + } + }); + }).map(str -> { + uriInfo.getAbsolutePath(); + return str; + }); + } + + @Produces(MediaType.APPLICATION_JSON) + @Path("context/flowable") + @GET + @Stream + public Flowable contextFlowable(@Context UriInfo uriInfo) { + return Flowable. create(foo -> { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable() { + public void run() { + foo.onNext("one"); + foo.onNext("two"); + foo.onComplete(); + } + }); + }, BackpressureStrategy.BUFFER).map(str -> { + uriInfo.getAbsolutePath(); + return str; + }); + } + + @Path("injection") + @GET + public Single injection(@Context Integer value) { + return Single.just(value); + } + + @Path("injection-async") + @GET + public Single injectionAsync(@Async @Context Integer value) { + return Single.just(value); + } +} diff --git a/rxjava3/src/test/java/dev/resteasy/rxjava3/RxTest.java b/rxjava3/src/test/java/dev/resteasy/rxjava3/RxTest.java new file mode 100644 index 0000000..1e2d1ff --- /dev/null +++ b/rxjava3/src/test/java/dev/resteasy/rxjava3/RxTest.java @@ -0,0 +1,194 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import static org.jboss.resteasy.test.TestPortProvider.generateURL; + +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Response; + +import org.jboss.logging.Logger; +import org.jboss.resteasy.plugins.server.undertow.UndertowJaxrsServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.disposables.Disposable; + +public class RxTest { + private static UndertowJaxrsServer server; + + private static CountDownLatch latch; + private static final AtomicReference value = new AtomicReference<>(); + private static final Logger LOG = Logger.getLogger(RxTest.class); + + @BeforeAll + public static void beforeClass() { + server = new UndertowJaxrsServer(); + server.getDeployment().getActualResourceClasses().add(RxResource.class); + server.getDeployment().getActualProviderClasses().add(RxInjector.class); + server.getDeployment().start(); + server.getDeployment().registration(); + // Required to explicitly deploy + server.deploy(); + server.start(); + } + + @AfterAll + public static void afterClass() { + server.stop(); + server = null; + } + + private Client client; + private Disposable disposable; + + @BeforeEach + public void before() { + client = ClientBuilder.newBuilder() + .readTimeout(5, TimeUnit.SECONDS) + .connectTimeout(5, TimeUnit.SECONDS) + .build(); + value.set(null); + latch = new CountDownLatch(1); + } + + @AfterEach + public void after() { + client.close(); + if (disposable != null) { + disposable.dispose(); + } + } + + @Test + public void testSingle() throws Exception { + Single single = client.target(generateURL("/single")).request().rx(SingleRxInvoker.class).get(); + disposable = single.subscribe((Response r) -> { + value.set(r.readEntity(String.class)); + latch.countDown(); + }); + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Did not complete request within 5 seconds"); + Assertions.assertEquals("got it", value.get()); + } + + @Test + public void testSingleContext() throws Exception { + Single single = client.target(generateURL("/context/single")).request().rx(SingleRxInvoker.class).get(); + disposable = single.subscribe((Response r) -> { + value.set(r.readEntity(String.class)); + latch.countDown(); + }); + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Did not complete request within 5 seconds"); + Assertions.assertEquals("got it", value.get()); + } + + @Test + public void testObservable() throws Exception { + ObservableRxInvoker invoker = client.target(generateURL("/observable")).request().rx(ObservableRxInvoker.class); + @SuppressWarnings("unchecked") + Observable observable = (Observable) invoker.get(); + Set data = new TreeSet<>(); //FIXME [RESTEASY-2778] Intermittent flow / flux test failure + disposable = observable.subscribe( + data::add, + (Throwable t) -> LOG.error(t.getMessage(), t), + () -> latch.countDown()); + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Did not complete request within 5 seconds"); + Assertions.assertArrayEquals(new String[] { "one", "two" }, data.toArray()); + } + + @Test + public void testObservableContext() throws Exception { + ObservableRxInvoker invoker = ClientBuilder.newClient().target(generateURL("/context/observable")).request() + .rx(ObservableRxInvoker.class); + @SuppressWarnings("unchecked") + Observable observable = (Observable) invoker.get(); + Set data = new TreeSet<>(); //FIXME [RESTEASY-2778] Intermittent flow / flux test failure + disposable = observable.subscribe( + data::add, + (Throwable t) -> LOG.error(t.getMessage(), t), + () -> latch.countDown()); + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Did not complete request within 5 seconds"); + Assertions.assertArrayEquals(new String[] { "one", "two" }, data.toArray()); + } + + @Test + public void testFlowable() throws Exception { + FlowableRxInvoker invoker = client.target(generateURL("/flowable")).request().rx(FlowableRxInvoker.class); + @SuppressWarnings("unchecked") + Flowable flowable = (Flowable) invoker.get(); + Set data = new TreeSet<>(); //FIXME [RESTEASY-2778] Intermittent flow / flux test failure + disposable = flowable.subscribe( + data::add, + (Throwable t) -> LOG.error(t.getMessage(), t), + () -> latch.countDown()); + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Did not complete request within 5 seconds"); + Assertions.assertArrayEquals(new String[] { "one", "two" }, data.toArray()); + } + + @Test + public void testFlowablecontext() throws Exception { + FlowableRxInvoker invoker = client.target(generateURL("/context/flowable")).request().rx(FlowableRxInvoker.class); + @SuppressWarnings("unchecked") + Flowable flowable = (Flowable) invoker.get(); + Set data = new TreeSet<>(); //FIXME [RESTEASY-2778] Intermittent flow / flux test failure + disposable = flowable.subscribe( + data::add, + (Throwable t) -> LOG.error(t.getMessage(), t), + () -> { + latch.countDown(); + LOG.info("onComplete()"); + }); + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Did not complete request within 5 seconds"); + Assertions.assertArrayEquals(new String[] { "one", "two" }, data.toArray()); + } + + // @Test + public void testChunked() throws Exception { + Invocation.Builder request = client.target(generateURL("/chunked")).request(); + Response response = request.get(); + String entity = response.readEntity(String.class); + Assertions.assertEquals(200, response.getStatus()); + Assertions.assertEquals("onetwo", entity); + } + + @Test + public void testInjection() { + Integer data = client.target(generateURL("/injection")).request().get(Integer.class); + Assertions.assertEquals((Integer) 42, data); + + data = client.target(generateURL("/injection-async")).request().get(Integer.class); + Assertions.assertEquals((Integer) 42, data); + } +} diff --git a/rxjava3/src/test/java/dev/resteasy/rxjava3/SingleProviderTest.java b/rxjava3/src/test/java/dev/resteasy/rxjava3/SingleProviderTest.java new file mode 100644 index 0000000..3440633 --- /dev/null +++ b/rxjava3/src/test/java/dev/resteasy/rxjava3/SingleProviderTest.java @@ -0,0 +1,72 @@ +/* + * JBoss, Home of Professional Open Source. + * + * Copyright 2022 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.resteasy.rxjava3; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.reactivex.rxjava3.core.Single; + +public class SingleProviderTest { + private final SingleProvider provider = new SingleProvider(); + + @Test + public void testFromCompletionStage() { + final CompletableFuture cs = new CompletableFuture<>(); + cs.complete(1); + final Single single = provider.fromCompletionStage(cs); + Assertions.assertEquals(1, single.blockingGet()); + } + + @Test + public void testFromCompletionStageNotDeferred() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final Single single = provider.fromCompletionStage(someAsyncMethod(latch)); + + Assertions.assertTrue(latch.await(1, TimeUnit.SECONDS)); + Assertions.assertEquals("Hello!", single.blockingGet()); + Assertions.assertEquals(0, latch.getCount()); + } + + @Test + public void testFromCompletionStageDeferred() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final Single single = provider.fromCompletionStage(() -> someAsyncMethod(latch)); + + Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS)); + Assertions.assertEquals("Hello!", single.blockingGet()); + Assertions.assertEquals(0, latch.getCount()); + } + + private CompletableFuture someAsyncMethod(final CountDownLatch latch) { + latch.countDown(); + return CompletableFuture.completedFuture("Hello!"); + } + + @Test + public void testToCompletionStageCase() throws Exception { + final Object actual = provider.toCompletionStage(Single.just(1)).toCompletableFuture().get(); + Assertions.assertEquals(1, actual); + } +}