diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java index da8052b67e..d444788aa4 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientConfig.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2018 Payara Foundation and/or its affiliates. * * This program and the accompanying materials are made available under the @@ -414,12 +414,17 @@ private ClientRuntime initRuntime() { BootstrapBag bootstrapBag = new ClientBootstrapBag(); bootstrapBag.setManagedObjectsFinalizer(new ManagedObjectsFinalizer(injectionManager)); - List bootstrapConfigurators = Arrays.asList(new RequestScope.RequestScopeConfigurator(), + + final ClientMessageBodyFactory.MessageBodyWorkersConfigurator messageBodyWorkersConfigurator = + new ClientMessageBodyFactory.MessageBodyWorkersConfigurator(); + + List bootstrapConfigurators = Arrays.asList( + new RequestScope.RequestScopeConfigurator(), new ParamConverterConfigurator(), new ParameterUpdaterConfigurator(), new RuntimeConfigConfigurator(runtimeCfgState), new ContextResolverFactory.ContextResolversConfigurator(), - new MessageBodyFactory.MessageBodyWorkersConfigurator(), + messageBodyWorkersConfigurator, new ExceptionMapperFactory.ExceptionMappersConfigurator(), new JaxrsProviders.ProvidersConfigurator(), new AutoDiscoverableConfigurator(RuntimeType.CLIENT)); @@ -455,6 +460,8 @@ private ClientRuntime initRuntime() { final ClientRuntime crt = new ClientRuntime(configuration, connector, injectionManager, bootstrapBag); client.registerShutdownHook(crt); + messageBodyWorkersConfigurator.setClientRuntime(crt); + return crt; } diff --git a/core-client/src/main/java/org/glassfish/jersey/client/ClientMessageBodyFactory.java b/core-client/src/main/java/org/glassfish/jersey/client/ClientMessageBodyFactory.java new file mode 100644 index 0000000000..e05cb5fdca --- /dev/null +++ b/core-client/src/main/java/org/glassfish/jersey/client/ClientMessageBodyFactory.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.client; + +import org.glassfish.jersey.internal.BootstrapBag; +import org.glassfish.jersey.internal.BootstrapConfigurator; +import org.glassfish.jersey.internal.inject.Bindings; +import org.glassfish.jersey.internal.inject.InjectionManager; +import org.glassfish.jersey.internal.inject.InstanceBinding; +import org.glassfish.jersey.internal.util.collection.LazyValue; +import org.glassfish.jersey.internal.util.collection.Value; +import org.glassfish.jersey.internal.util.collection.Values; +import org.glassfish.jersey.message.MessageBodyWorkers; +import org.glassfish.jersey.message.internal.MessageBodyFactory; + +import javax.ws.rs.core.Configuration; + +class ClientMessageBodyFactory extends MessageBodyFactory { + + /** + * Keep reference to {@link ClientRuntime} so that {@code finalize} on it is not called + * before the {@link MessageBodyFactory} is used. + *

+ * Some entity types {@code @Inject} {@code MessageBodyFactory} for their {@code read} methods, + * but if the finalizer is invoked before that, the HK2 injection manager gets closed. + *

+ */ + private final LazyValue clientRuntime; + + /** + * Create a new message body factory. + * + * @param configuration configuration. Optional - can be null. + * @param clientRuntimeValue - a reference to ClientRuntime. + */ + private ClientMessageBodyFactory(Configuration configuration, Value clientRuntimeValue) { + super(configuration); + clientRuntime = Values.lazy(clientRuntimeValue); + } + + /** + * Configurator which initializes and register {@link MessageBodyWorkers} instance into {@link InjectionManager} and + * {@link BootstrapBag}. + */ + static class MessageBodyWorkersConfigurator implements BootstrapConfigurator { + + private ClientMessageBodyFactory messageBodyFactory; + private ClientRuntime clientRuntime; + + @Override + public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) { + messageBodyFactory = new ClientMessageBodyFactory(bootstrapBag.getConfiguration(), () -> clientRuntime); + InstanceBinding binding = + Bindings.service(messageBodyFactory) + .to(MessageBodyWorkers.class); + injectionManager.register(binding); + } + + @Override + public void postInit(InjectionManager injectionManager, BootstrapBag bootstrapBag) { + messageBodyFactory.initialize(injectionManager); + bootstrapBag.setMessageBodyWorkers(messageBodyFactory); + } + + void setClientRuntime(ClientRuntime clientRuntime) { + this.clientRuntime = clientRuntime; + } + } + + ClientRuntime getClientRuntime() { + return clientRuntime.get(); + } +} diff --git a/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java b/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java index 4d6fe52511..fd5ebfa8f5 100644 --- a/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java +++ b/core-client/src/main/java/org/glassfish/jersey/client/InboundJaxrsResponse.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -22,6 +22,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import javax.ws.rs.ProcessingException; import javax.ws.rs.core.EntityTag; @@ -51,6 +52,7 @@ class InboundJaxrsResponse extends Response { private final ClientResponse context; private final RequestScope scope; private final RequestContext requestContext; + private final AtomicBoolean isClosed = new AtomicBoolean(false); /** * Create new scoped client response. @@ -139,11 +141,13 @@ public boolean bufferEntity() throws ProcessingException { @Override public void close() throws ProcessingException { - try { - context.close(); - } finally { - if (requestContext != null) { - requestContext.release(); + if (isClosed.compareAndSet(false, true)) { + try { + context.close(); + } finally { + if (requestContext != null) { + requestContext.release(); + } } } } diff --git a/tests/integration/jersey-4507/pom.xml b/tests/integration/jersey-4507/pom.xml new file mode 100644 index 0000000000..4d5352c3f7 --- /dev/null +++ b/tests/integration/jersey-4507/pom.xml @@ -0,0 +1,49 @@ + + + + + project + org.glassfish.jersey.tests.integration + 2.32.0-SNAPSHOT + + 4.0.0 + + jersey-4507 + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-bundle + pom + test + + + org.glassfish.jersey.test-framework + jersey-test-framework-core + test + + + org.glassfish.jersey.examples + server-sent-events-jersey + ${project.version} + test + + + \ No newline at end of file diff --git a/tests/integration/jersey-4507/src/test/java/org/glassfish/jersey/tests/integration/jersey4507/SSETest.java b/tests/integration/jersey-4507/src/test/java/org/glassfish/jersey/tests/integration/jersey4507/SSETest.java new file mode 100644 index 0000000000..b02eda5392 --- /dev/null +++ b/tests/integration/jersey-4507/src/test/java/org/glassfish/jersey/tests/integration/jersey4507/SSETest.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.tests.integration.jersey4507; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientLifecycleListener; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.examples.sse.jersey.App; +import org.glassfish.jersey.examples.sse.jersey.DomainResource; +import org.glassfish.jersey.examples.sse.jersey.ServerSentEventsResource; +import org.glassfish.jersey.media.sse.EventInput; +import org.glassfish.jersey.media.sse.InboundEvent; +import org.glassfish.jersey.media.sse.SseFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class SSETest extends JerseyTest { + private static final int MAX_CLIENTS = 10; + private static final int COUNT = 30; + private static final AtomicInteger atomicInteger = new AtomicInteger(0); + private static final CountDownLatch closeLatch = new CountDownLatch(COUNT); + + @Override + protected Application configure() { + // enable(TestProperties.LOG_TRAFFIC); + return new ResourceConfig(ServerSentEventsResource.class, DomainResource.class, SseFeature.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, MAX_CLIENTS + 2); + config.register(new ClientRuntimeCloseVerifier()); + } + + /** + * Test consuming multiple SSE events sequentially using event input. + * + * @throws Exception in case of a failure during the test execution. + */ + public void testInboundEventReader() throws Exception { + final int MAX_MESSAGES = 5; + final CountDownLatch startLatch = new CountDownLatch(1); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + final Future> futureMessages = + executor.submit(new Callable>() { + + @Override + public List call() throws Exception { + final EventInput eventInput = target(App.ROOT_PATH).register(SseFeature.class) + .request().get(EventInput.class); + + startLatch.countDown(); + + final List messages = new ArrayList(MAX_MESSAGES); + try { + for (int i = 0; i < MAX_MESSAGES; i++) { + InboundEvent event = eventInput.read(); + messages.add(event.readData()); + } + } finally { + if (eventInput != null) { + eventInput.close(); + } + } + + return messages; + } + }); + + Assert.assertTrue("Waiting for receiver thread to start has timed out.", + startLatch.await(15000, TimeUnit.SECONDS)); + + for (int i = 0; i < MAX_MESSAGES; i++) { + target(App.ROOT_PATH).request().post(Entity.text("message " + i)); + } + + int i = 0; + for (String message : futureMessages.get(50, TimeUnit.SECONDS)) { + Assert.assertThat("Unexpected SSE event data value.", message, equalTo("message " + i++)); + } + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testInboundEventReaderMultiple() throws Exception { + for (int i = 0; i != COUNT; i++) { + testInboundEventReader(); + } + + System.gc(); + closeLatch.await(15_000, TimeUnit.MILLISECONDS); + // One ClientConfig is on the Client + // + COUNT of them is created by .register(SseFeature.class) + Assert.assertEquals(COUNT + 1, atomicInteger.get()); + Assert.assertEquals(0, closeLatch.getCount()); + } + + + + public static class ClientRuntimeCloseVerifier implements ClientLifecycleListener { + + @Override + public void onInit() { + atomicInteger.incrementAndGet(); + } + + @Override + public void onClose() { + closeLatch.countDown(); + } + } +} diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 85cb602494..5ac0218456 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -85,6 +85,7 @@ jersey-4003 jersey-4099 jersey-4321 + jersey-4507 jetty-response-close microprofile portability-jersey-1