Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace read/write lock in JarResource to avoid virtual threads pinning #42139

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package io.quarkus.bootstrap.runner;

import static io.quarkus.bootstrap.runner.VirtualThreadSupport.isVirtualThread;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.jar.JarFile;

import io.smallrye.common.io.jar.JarFiles;

public class JarFileReference {
// Guarded by an atomic reader counter that emulate the behaviour of a read/write lock.
// To enable virtual threads compatibility and avoid pinning it is not possible to use an explicit read/write lock
// because the jarFile access may happen inside a native call (for example triggered by the RunnerClassLoader)
// and then it is necessary to avoid blocking on it.
private final JarFile jarFile;

// The referenceCounter - 1 represents the number of effective readers (#aqcuire - #release), while the first
// reference is used to determine if a close has been required.
// The JarFileReference is created as already acquired and that's why the referenceCounter starts from 2
private final AtomicInteger referenceCounter = new AtomicInteger(2);

private JarFileReference(JarFile jarFile) {
this.jarFile = jarFile;
}

/**
* Increase the readers counter of the jarFile.
*
* @return true if the acquiring succeeded: it's now safe to access and use the inner jarFile.
* false if the jar reference is going to be closed and then no longer usable.
*/
private boolean acquire() {
while (true) {
int count = referenceCounter.get();
if (count == 0) {
return false;
}
if (referenceCounter.compareAndSet(count, count + 1)) {
return true;
}
}
}

/**
* Decrease the readers counter of the jarFile.
* If the counter drops to 0 and a release has been requested also closes the jarFile.
*
* @return true if the release also closes the underlying jarFile.
*/
private boolean release(JarResource jarResource) {
while (true) {
int count = referenceCounter.get();
if (count <= 0) {
throw new IllegalStateException(
"The reference counter cannot be negative, found: " + (referenceCounter.get() - 1));
}
if (referenceCounter.compareAndSet(count, count - 1)) {
if (count == 1) {
jarResource.jarFileReference.set(null);
try {
jarFile.close();
} catch (IOException e) {
// ignore
}
return true;
}
return false;
}
}
}

/**
* Ask to close this reference.
* If there are no readers currently accessing the jarFile also close it, otherwise defer the closing when the last reader
* will leave.
*/
void close(JarResource jarResource) {
release(jarResource);
}

@FunctionalInterface
interface JarFileConsumer<T> {
T apply(JarFile jarFile, Path jarPath, String resource);
}

static <T> T withJarFile(JarResource jarResource, String resource, JarFileConsumer<T> fileConsumer) {

// Happy path: the jar reference already exists and it's ready to be used
final var localJarFileRefFuture = jarResource.jarFileReference.get();
if (localJarFileRefFuture != null && localJarFileRefFuture.isDone()) {
JarFileReference jarFileReference = localJarFileRefFuture.join();
if (jarFileReference.acquire()) {
return consumeSharedJarFile(jarFileReference, jarResource, resource, fileConsumer);
}
}

// There's no valid jar reference, so load a new one

// Platform threads can load the jarfile asynchronously and eventually blocking till not ready
// to avoid loading the same jarfile multiple times in parallel
if (!isVirtualThread()) {
// It's ok to eventually block on a join() here since we're sure this is used only by platform thread
return consumeSharedJarFile(asyncLoadAcquiredJarFile(jarResource), jarResource, resource, fileConsumer);
}

// Virtual threads needs to load the jarfile synchronously to avoid blocking. This means that eventually
// multiple threads could load the same jarfile in parallel and this duplication has to be reconciled
final var newJarFileRef = syncLoadAcquiredJarFile(jarResource);
if (jarResource.jarFileReference.compareAndSet(localJarFileRefFuture, newJarFileRef) ||
jarResource.jarFileReference.compareAndSet(null, newJarFileRef)) {
// The new file reference has been successfully published and can be used by the current and other threads
// The join() cannot be blocking here since the CompletableFuture has been created already completed
return consumeSharedJarFile(newJarFileRef.join(), jarResource, resource, fileConsumer);
}

// The newly created file reference hasn't been published, so it can be used exclusively by the current virtual thread
return consumeUnsharedJarFile(newJarFileRef, jarResource, resource, fileConsumer);
}

private static <T> T consumeSharedJarFile(JarFileReference jarFileReference,
JarResource jarResource, String resource, JarFileConsumer<T> fileConsumer) {
try {
return fileConsumer.apply(jarFileReference.jarFile, jarResource.jarPath, resource);
} finally {
jarFileReference.release(jarResource);
}
}

private static <T> T consumeUnsharedJarFile(CompletableFuture<JarFileReference> jarFileReferenceFuture,
JarResource jarResource, String resource, JarFileConsumer<T> fileConsumer) {
JarFileReference jarFileReference = jarFileReferenceFuture.join();
try {
return fileConsumer.apply(jarFileReference.jarFile, jarResource.jarPath, resource);
} finally {
boolean closed = jarFileReference.release(jarResource);
assert !closed;
// Check one last time if the file reference can be published and reused by other threads, otherwise close it
if (!jarResource.jarFileReference.compareAndSet(null, jarFileReferenceFuture)) {
closed = jarFileReference.release(jarResource);
assert closed;
}
}
}

private static CompletableFuture<JarFileReference> syncLoadAcquiredJarFile(JarResource jarResource) {
try {
return CompletableFuture.completedFuture(new JarFileReference(JarFiles.create(jarResource.jarPath.toFile())));
} catch (IOException e) {
throw new RuntimeException("Failed to open " + jarResource.jarPath, e);
}
}

private static JarFileReference asyncLoadAcquiredJarFile(JarResource jarResource) {
CompletableFuture<JarFileReference> newJarRefFuture = new CompletableFuture<>();
CompletableFuture<JarFileReference> existingJarRefFuture = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of this variable is the actual fix. The old algorithm wrongly reused the newJarFileRef one. This means that in the unfortunate case when a thread was about to close the old jar, but before setting the AtomicReference to null, another thread could fail the compareAndSet(null) check here, take at the end of the loop from the AtomicReference the old and no longer valid CompletableFuture and in subsequent loop (after that the close finally managed to set the AtomicReference to null) mistakenly set it back into the same AtomicReference.

JarFileReference existingJarRef = null;

do {
if (jarResource.jarFileReference.compareAndSet(null, newJarRefFuture)) {
try {
JarFileReference newJarRef = new JarFileReference(JarFiles.create(jarResource.jarPath.toFile()));
newJarRefFuture.complete(newJarRef);
return newJarRef;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
existingJarRefFuture = jarResource.jarFileReference.get();
existingJarRef = existingJarRefFuture == null ? null : existingJarRefFuture.join();
} while (existingJarRef == null || !existingJarRef.acquire());
return existingJarRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,28 @@
import java.security.ProtectionDomain;
import java.security.cert.Certificate;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

import io.smallrye.common.io.jar.JarEntries;
import io.smallrye.common.io.jar.JarFiles;

/**
* A jar resource
*/
public class JarResource implements ClassLoadingResource {

private final ManifestInfo manifestInfo;
private final Path jarPath;

private final Lock readLock;
private final Lock writeLock;

private volatile ProtectionDomain protectionDomain;
private final ManifestInfo manifestInfo;

//Guarded by the read/write lock; open/close operations on the JarFile require the exclusive lock,
//while using an existing open reference can use the shared lock.
//If a lock is acquired, and as long as it's owned, we ensure that the zipFile reference
//points to an open JarFile instance, and read operations are valid.
//To close the jar, the exclusive lock must be owned, and reference will be set to null before releasing it.
//Likewise, opening a JarFile requires the exclusive lock.
private volatile JarFile zipFile;
final Path jarPath;
final AtomicReference<CompletableFuture<JarFileReference>> jarFileReference = new AtomicReference<>();

public JarResource(ManifestInfo manifestInfo, Path jarPath) {
this.manifestInfo = manifestInfo;
this.jarPath = jarPath;
final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
}

@Override
Expand All @@ -69,38 +53,48 @@ public void init() {

@Override
public byte[] getResourceData(String resource) {
final ZipFile zipFile = readLockAcquireAndGetJarReference();
try {
ZipEntry entry = zipFile.getEntry(resource);
return JarFileReference.withJarFile(this, resource, JarResourceDataProvider.INSTANCE);
}

private static class JarResourceDataProvider implements JarFileReference.JarFileConsumer<byte[]> {
private static final JarResourceDataProvider INSTANCE = new JarResourceDataProvider();

@Override
public byte[] apply(JarFile jarFile, Path path, String res) {
ZipEntry entry = jarFile.getEntry(res);
if (entry == null) {
return null;
}
try (InputStream is = zipFile.getInputStream(entry)) {
try (InputStream is = jarFile.getInputStream(entry)) {
byte[] data = new byte[(int) entry.getSize()];
int pos = 0;
int rem = data.length;
while (rem > 0) {
int read = is.read(data, pos, rem);
if (read == -1) {
throw new RuntimeException("Failed to read all data for " + resource);
throw new RuntimeException("Failed to read all data for " + res);
}
pos += read;
rem -= read;
}
return data;
} catch (IOException e) {
throw new RuntimeException("Failed to read zip entry " + resource, e);
throw new RuntimeException("Failed to read zip entry " + res, e);
}
} finally {
readLock.unlock();
}
}

@Override
public URL getResourceURL(String resource) {
final JarFile jarFile = readLockAcquireAndGetJarReference();
try {
JarEntry entry = jarFile.getJarEntry(resource);
return JarFileReference.withJarFile(this, resource, JarResourceURLProvider.INSTANCE);
}

private static class JarResourceURLProvider implements JarFileReference.JarFileConsumer<URL> {
private static final JarResourceURLProvider INSTANCE = new JarResourceURLProvider();

@Override
public URL apply(JarFile jarFile, Path path, String res) {
JarEntry entry = jarFile.getJarEntry(res);
if (entry == null) {
return null;
}
Expand All @@ -110,15 +104,7 @@ public URL getResourceURL(String resource) {
if (realName.endsWith("/")) {
realName = realName.substring(0, realName.length() - 1);
}
final URI jarUri = jarPath.toUri();
// first create a URI which includes both the jar file path and the relative resource name
// and then invoke a toURL on it. The URI reconstruction allows for any encoding to be done
// for the "path" which includes the "realName"
var ssp = new StringBuilder(jarUri.getPath().length() + realName.length() + 2);
ssp.append(jarUri.getPath());
ssp.append("!/");
ssp.append(realName);
final URL resUrl = new URI(jarUri.getScheme(), ssp.toString(), null).toURL();
final URL resUrl = getUrl(path, realName);
// wrap it up into a "jar" protocol URL
//horrible hack to deal with '?' characters in the URL
//seems to be the only way, the URI constructor just does not let you handle them in a sane way
Expand All @@ -136,8 +122,18 @@ public URL getResourceURL(String resource) {
} catch (MalformedURLException | URISyntaxException e) {
throw new RuntimeException(e);
}
} finally {
readLock.unlock();
}

private static URL getUrl(Path jarPath, String realName) throws MalformedURLException, URISyntaxException {
final URI jarUri = jarPath.toUri();
// first create a URI which includes both the jar file path and the relative resource name
// and then invoke a toURL on it. The URI reconstruction allows for any encoding to be done
// for the "path" which includes the "realName"
var ssp = new StringBuilder(jarUri.getPath().length() + realName.length() + 2);
ssp.append(jarUri.getPath());
ssp.append("!/");
ssp.append(realName);
return new URI(jarUri.getScheme(), ssp.toString(), null).toURL();
}
}

Expand All @@ -151,60 +147,16 @@ public ProtectionDomain getProtectionDomain() {
return protectionDomain;
}

private JarFile readLockAcquireAndGetJarReference() {
while (true) {
readLock.lock();
final JarFile zipFileLocal = this.zipFile;
if (zipFileLocal != null) {
//Expected fast path: returns a reference to the open JarFile while owning the readLock
return zipFileLocal;
} else {
//This Lock implementation doesn't allow upgrading a readLock to a writeLock, so release it
//as we're going to need the WriteLock.
readLock.unlock();
//trigger the JarFile being (re)opened.
ensureJarFileIsOpen();
//Now since we no longer own any lock, we need to try again to obtain the readLock
//and check for the reference still being valid.
//This exposes us to a race with closing the just-opened JarFile;
//however this should be extremely rare, so we can trust we won't loop much;
//A counter doesn't seem necessary, as in fact we know that methods close()
//and resetInternalCaches() are invoked each at most once, which limits the amount
//of loops here in practice.
}
}
}

private void ensureJarFileIsOpen() {
writeLock.lock();
try {
if (this.zipFile == null) {
try {
this.zipFile = JarFiles.create(jarPath.toFile());
} catch (IOException e) {
throw new RuntimeException("Failed to open " + jarPath, e);
}
}
} finally {
writeLock.unlock();
}
}

@Override
public void close() {
writeLock.lock();
try {
final JarFile zipFileLocal = this.zipFile;
if (zipFileLocal != null) {
try {
this.zipFile = null;
zipFileLocal.close();
} catch (Throwable e) {
//ignore
}
var futureRef = jarFileReference.get();
if (futureRef != null) {
// The jarfile has been already used and it's going to be removed from the cache,
// so the future must be already completed
var ref = futureRef.getNow(null);
if (ref != null) {
ref.close(this);
}
} finally {
writeLock.unlock();
}
}

Expand Down
Loading
Loading