Skip to content

Commit

Permalink
Fix intermittent premature ClientRuntime finalization (#4508)
Browse files Browse the repository at this point in the history
* Fix intermittent premature ClientRuntime finalization

Signed-off-by: jansupol <[email protected]>
  • Loading branch information
jansupol authored Jun 18, 2020
1 parent d11a733 commit 52448e6
Show file tree
Hide file tree
Showing 6 changed files with 303 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018 Payara Foundation and/or its affiliates.
*
* This program and the accompanying materials are made available under the
Expand Down Expand Up @@ -414,12 +414,17 @@ private ClientRuntime initRuntime() {

BootstrapBag bootstrapBag = new ClientBootstrapBag();
bootstrapBag.setManagedObjectsFinalizer(new ManagedObjectsFinalizer(injectionManager));
List<BootstrapConfigurator> bootstrapConfigurators = Arrays.asList(new RequestScope.RequestScopeConfigurator(),

final ClientMessageBodyFactory.MessageBodyWorkersConfigurator messageBodyWorkersConfigurator =
new ClientMessageBodyFactory.MessageBodyWorkersConfigurator();

List<BootstrapConfigurator> bootstrapConfigurators = Arrays.asList(
new RequestScope.RequestScopeConfigurator(),
new ParamConverterConfigurator(),
new ParameterUpdaterConfigurator(),
new RuntimeConfigConfigurator(runtimeCfgState),
new ContextResolverFactory.ContextResolversConfigurator(),
new MessageBodyFactory.MessageBodyWorkersConfigurator(),
messageBodyWorkersConfigurator,
new ExceptionMapperFactory.ExceptionMappersConfigurator(),
new JaxrsProviders.ProvidersConfigurator(),
new AutoDiscoverableConfigurator(RuntimeType.CLIENT));
Expand Down Expand Up @@ -455,6 +460,8 @@ private ClientRuntime initRuntime() {
final ClientRuntime crt = new ClientRuntime(configuration, connector, injectionManager, bootstrapBag);

client.registerShutdownHook(crt);
messageBodyWorkersConfigurator.setClientRuntime(crt);

return crt;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.client;

import org.glassfish.jersey.internal.BootstrapBag;
import org.glassfish.jersey.internal.BootstrapConfigurator;
import org.glassfish.jersey.internal.inject.Bindings;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.InstanceBinding;
import org.glassfish.jersey.internal.util.collection.LazyValue;
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.glassfish.jersey.message.internal.MessageBodyFactory;

import javax.ws.rs.core.Configuration;

class ClientMessageBodyFactory extends MessageBodyFactory {

/**
* Keep reference to {@link ClientRuntime} so that {@code finalize} on it is not called
* before the {@link MessageBodyFactory} is used.
* <p>
* Some entity types {@code @Inject} {@code MessageBodyFactory} for their {@code read} methods,
* but if the finalizer is invoked before that, the HK2 injection manager gets closed.
* </p>
*/
private final LazyValue<ClientRuntime> clientRuntime;

/**
* Create a new message body factory.
*
* @param configuration configuration. Optional - can be null.
* @param clientRuntimeValue - a reference to ClientRuntime.
*/
private ClientMessageBodyFactory(Configuration configuration, Value<ClientRuntime> clientRuntimeValue) {
super(configuration);
clientRuntime = Values.lazy(clientRuntimeValue);
}

/**
* Configurator which initializes and register {@link MessageBodyWorkers} instance into {@link InjectionManager} and
* {@link BootstrapBag}.
*/
static class MessageBodyWorkersConfigurator implements BootstrapConfigurator {

private ClientMessageBodyFactory messageBodyFactory;
private ClientRuntime clientRuntime;

@Override
public void init(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
messageBodyFactory = new ClientMessageBodyFactory(bootstrapBag.getConfiguration(), () -> clientRuntime);
InstanceBinding<ClientMessageBodyFactory> binding =
Bindings.service(messageBodyFactory)
.to(MessageBodyWorkers.class);
injectionManager.register(binding);
}

@Override
public void postInit(InjectionManager injectionManager, BootstrapBag bootstrapBag) {
messageBodyFactory.initialize(injectionManager);
bootstrapBag.setMessageBodyWorkers(messageBodyFactory);
}

void setClientRuntime(ClientRuntime clientRuntime) {
this.clientRuntime = clientRuntime;
}
}

ClientRuntime getClientRuntime() {
return clientRuntime.get();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -22,6 +22,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.EntityTag;
Expand Down Expand Up @@ -51,6 +52,7 @@ class InboundJaxrsResponse extends Response {
private final ClientResponse context;
private final RequestScope scope;
private final RequestContext requestContext;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

/**
* Create new scoped client response.
Expand Down Expand Up @@ -139,11 +141,13 @@ public boolean bufferEntity() throws ProcessingException {

@Override
public void close() throws ProcessingException {
try {
context.close();
} finally {
if (requestContext != null) {
requestContext.release();
if (isClosed.compareAndSet(false, true)) {
try {
context.close();
} finally {
if (requestContext != null) {
requestContext.release();
}
}
}
}
Expand Down
49 changes: 49 additions & 0 deletions tests/integration/jersey-4507/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0, which is available at
http://www.eclipse.org/legal/epl-2.0.
This Source Code may also be made available under the following Secondary
Licenses when the conditions for such availability set forth in the
Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
version 2 with the GNU Classpath Exception, which is available at
https://www.gnu.org/software/classpath/license.html.
SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>project</artifactId>
<groupId>org.glassfish.jersey.tests.integration</groupId>
<version>2.32.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>jersey-4507</artifactId>
<dependencies>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-bundle</artifactId>
<type>pom</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.examples</groupId>
<artifactId>server-sent-events-jersey</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.tests.integration.jersey4507;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientLifecycleListener;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.examples.sse.jersey.App;
import org.glassfish.jersey.examples.sse.jersey.DomainResource;
import org.glassfish.jersey.examples.sse.jersey.ServerSentEventsResource;
import org.glassfish.jersey.media.sse.EventInput;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Assert;
import org.junit.Test;

import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;

public class SSETest extends JerseyTest {
private static final int MAX_CLIENTS = 10;
private static final int COUNT = 30;
private static final AtomicInteger atomicInteger = new AtomicInteger(0);
private static final CountDownLatch closeLatch = new CountDownLatch(COUNT);

@Override
protected Application configure() {
// enable(TestProperties.LOG_TRAFFIC);
return new ResourceConfig(ServerSentEventsResource.class, DomainResource.class, SseFeature.class);
}

@Override
protected void configureClient(ClientConfig config) {
config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, MAX_CLIENTS + 2);
config.register(new ClientRuntimeCloseVerifier());
}

/**
* Test consuming multiple SSE events sequentially using event input.
*
* @throws Exception in case of a failure during the test execution.
*/
public void testInboundEventReader() throws Exception {
final int MAX_MESSAGES = 5;
final CountDownLatch startLatch = new CountDownLatch(1);

final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
final Future<List<String>> futureMessages =
executor.submit(new Callable<List<String>>() {

@Override
public List<String> call() throws Exception {
final EventInput eventInput = target(App.ROOT_PATH).register(SseFeature.class)
.request().get(EventInput.class);

startLatch.countDown();

final List<String> messages = new ArrayList<String>(MAX_MESSAGES);
try {
for (int i = 0; i < MAX_MESSAGES; i++) {
InboundEvent event = eventInput.read();
messages.add(event.readData());
}
} finally {
if (eventInput != null) {
eventInput.close();
}
}

return messages;
}
});

Assert.assertTrue("Waiting for receiver thread to start has timed out.",
startLatch.await(15000, TimeUnit.SECONDS));

for (int i = 0; i < MAX_MESSAGES; i++) {
target(App.ROOT_PATH).request().post(Entity.text("message " + i));
}

int i = 0;
for (String message : futureMessages.get(50, TimeUnit.SECONDS)) {
Assert.assertThat("Unexpected SSE event data value.", message, equalTo("message " + i++));
}
} finally {
executor.shutdownNow();
}
}

@Test
public void testInboundEventReaderMultiple() throws Exception {
for (int i = 0; i != COUNT; i++) {
testInboundEventReader();
}

System.gc();
closeLatch.await(15_000, TimeUnit.MILLISECONDS);
// One ClientConfig is on the Client
// + COUNT of them is created by .register(SseFeature.class)
Assert.assertEquals(COUNT + 1, atomicInteger.get());
Assert.assertEquals(0, closeLatch.getCount());
}



public static class ClientRuntimeCloseVerifier implements ClientLifecycleListener {

@Override
public void onInit() {
atomicInteger.incrementAndGet();
}

@Override
public void onClose() {
closeLatch.countDown();
}
}
}
1 change: 1 addition & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<module>jersey-4003</module>
<module>jersey-4099</module>
<module>jersey-4321</module>
<module>jersey-4507</module>
<module>jetty-response-close</module>
<module>microprofile</module>
<module>portability-jersey-1</module>
Expand Down

0 comments on commit 52448e6

Please sign in to comment.