Skip to content

Commit

Permalink
Merge pull request #40754 from aloubyansky/incubating-model-resolutio…
Browse files Browse the repository at this point in the history
…n-task-runner

Incubating model resolver: wrap use of Phaser API in a simple task runner API
  • Loading branch information
gsmet authored May 22, 2024
2 parents 74a8ed2 + 994071c commit 2cb8599
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;
import java.util.function.BiConsumer;

import org.eclipse.aether.DefaultRepositorySystemSession;
Expand Down Expand Up @@ -105,8 +103,6 @@ public static IncubatingApplicationModelResolver newInstance() {
private final Map<ArtifactKey, ExtensionInfo> allExtensions = new ConcurrentHashMap<>();
private List<ConditionalDependency> conditionalDepsToProcess = new ArrayList<>();

private final Collection<Throwable> errors = new ConcurrentLinkedDeque<>();

private MavenArtifactResolver resolver;
private List<Dependency> managedDeps;
private ApplicationModelBuilder appBuilder;
Expand Down Expand Up @@ -204,10 +200,9 @@ private List<ConditionalDependency> activateConditionalDeps() {

private void processDeploymentDeps(DependencyNode root) {
var app = new AppDep(root);
var phaser = new Phaser(1);
app.scheduleChildVisits(phaser, AppDep::scheduleDeploymentVisit);
phaser.arriveAndAwaitAdvance();
assertNoErrors();
final ModelResolutionTaskRunner taskRunner = new ModelResolutionTaskRunner();
app.scheduleChildVisits(taskRunner, AppDep::scheduleDeploymentVisit);
taskRunner.waitForCompletion();
appBuilder.getApplicationArtifact().addDependencies(app.allDeps);
for (var d : app.children) {
d.addToModel();
Expand All @@ -218,88 +213,57 @@ private void processDeploymentDeps(DependencyNode root) {
}
}

private void assertNoErrors() {
if (!errors.isEmpty()) {
var sb = new StringBuilder(
"The following errors were encountered while processing Quarkus application dependencies:");
log.error(sb);
var i = 1;
for (var error : errors) {
var prefix = i++ + ")";
log.error(prefix, error);
sb.append(System.lineSeparator()).append(prefix).append(" ").append(error.getLocalizedMessage());
for (var e : error.getStackTrace()) {
sb.append(System.lineSeparator()).append(e);
if (e.getClassName().contains("io.quarkus")) {
break;
}
}
}
throw new RuntimeException(sb.toString());
}
}

private void injectDeployment(List<ConditionalDependency> activatedConditionalDeps) {
final ConcurrentLinkedDeque<Runnable> injectQueue = new ConcurrentLinkedDeque<>();
{
var phaser = new Phaser(1);
for (ExtensionDependency extDep : topExtensionDeps) {
phaser.register();
CompletableFuture.runAsync(() -> {
var resolvedDep = appBuilder.getDependency(getKey(extDep.info.deploymentArtifact));
try {
if (resolvedDep == null) {
extDep.collectDeploymentDeps();
injectQueue.add(() -> extDep.injectDeploymentNode(null));
} else {
// if resolvedDep isn't null, it means the deployment artifact is on the runtime classpath
// in which case we also clear the reloadable flag on it, in case it's coming from the workspace
resolvedDep.clearFlag(DependencyFlags.RELOADABLE);
}
} catch (BootstrapDependencyProcessingException e) {
errors.add(e);
} finally {
phaser.arriveAndDeregister();
}
});
}
// non-conditional deployment branches should be added before the activated conditional ones to have consistent
// dependency graph structures
phaser.arriveAndAwaitAdvance();
assertNoErrors();
}
collectDeploymentDeps(injectQueue);
if (!activatedConditionalDeps.isEmpty()) {
var phaser = new Phaser(1);
for (ConditionalDependency cd : activatedConditionalDeps) {
phaser.register();
CompletableFuture.runAsync(() -> {
var resolvedDep = appBuilder.getDependency(getKey(cd.conditionalDep.ext.info.deploymentArtifact));
try {
if (resolvedDep == null) {
var extDep = cd.getExtensionDependency();
extDep.collectDeploymentDeps();
injectQueue.add(() -> extDep.injectDeploymentNode(cd.conditionalDep.ext.getParentDeploymentNode()));
} else {
// if resolvedDep isn't null, it means the deployment artifact is on the runtime classpath
// in which case we also clear the reloadable flag on it, in case it's coming from the workspace
resolvedDep.clearFlag(DependencyFlags.RELOADABLE);
}
} catch (BootstrapDependencyProcessingException e) {
errors.add(e);
} finally {
phaser.arriveAndDeregister();
}
});
}
phaser.arriveAndAwaitAdvance();
assertNoErrors();
collectConditionalDeploymentDeps(activatedConditionalDeps, injectQueue);
}

for (var inject : injectQueue) {
inject.run();
}
}

private void collectConditionalDeploymentDeps(List<ConditionalDependency> activatedConditionalDeps,
ConcurrentLinkedDeque<Runnable> injectQueue) {
var taskRunner = new ModelResolutionTaskRunner();
for (ConditionalDependency cd : activatedConditionalDeps) {
taskRunner.run(() -> {
var resolvedDep = appBuilder.getDependency(getKey(cd.conditionalDep.ext.info.deploymentArtifact));
if (resolvedDep == null) {
var extDep = cd.getExtensionDependency();
extDep.collectDeploymentDeps();
injectQueue.add(() -> extDep.injectDeploymentNode(cd.conditionalDep.ext.getParentDeploymentNode()));
} else {
// if resolvedDep isn't null, it means the deployment artifact is on the runtime classpath
// in which case we also clear the reloadable flag on it, in case it's coming from the workspace
resolvedDep.clearFlag(DependencyFlags.RELOADABLE);
}
});
}
taskRunner.waitForCompletion();
}

private void collectDeploymentDeps(ConcurrentLinkedDeque<Runnable> injectQueue) {
var taskRunner = new ModelResolutionTaskRunner();
for (ExtensionDependency extDep : topExtensionDeps) {
taskRunner.run(() -> {
var resolvedDep = appBuilder.getDependency(getKey(extDep.info.deploymentArtifact));
if (resolvedDep == null) {
extDep.collectDeploymentDeps();
injectQueue.add(() -> extDep.injectDeploymentNode(null));
} else {
// if resolvedDep isn't null, it means the deployment artifact is on the runtime classpath
// in which case we also clear the reloadable flag on it, in case it's coming from the workspace
resolvedDep.clearFlag(DependencyFlags.RELOADABLE);
}
});
}
// non-conditional deployment branches should be added before the activated conditional ones to have consistent
// dependency graph structures
taskRunner.waitForCompletion();
}

/**
* Resolves and adds compile-only dependencies to the application model with the {@link DependencyFlags#COMPILE_ONLY} flag.
* Compile-only dependencies are resolved as direct dependencies of the root with all the previously resolved dependencies
Expand Down Expand Up @@ -458,10 +422,9 @@ private void processRuntimeDeps(DependencyNode root) {
appRoot.walkingFlags |= COLLECT_RELOADABLE_MODULES;
}

final Phaser phaser = new Phaser(1);
appRoot.scheduleChildVisits(phaser, AppDep::scheduleRuntimeVisit);
phaser.arriveAndAwaitAdvance();
assertNoErrors();
final ModelResolutionTaskRunner taskRunner = new ModelResolutionTaskRunner();
appRoot.scheduleChildVisits(taskRunner, AppDep::scheduleRuntimeVisit);
taskRunner.waitForCompletion();
appBuilder.getApplicationArtifact().addDependencies(appRoot.allDeps);
appRoot.setChildFlags();
}
Expand Down Expand Up @@ -500,18 +463,9 @@ void addToModel() {
}
}

void scheduleDeploymentVisit(Phaser phaser) {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
visitDeploymentDependency();
} catch (Throwable e) {
errors.add(e);
} finally {
phaser.arriveAndDeregister();
}
});
scheduleChildVisits(phaser, AppDep::scheduleDeploymentVisit);
void scheduleDeploymentVisit(ModelResolutionTaskRunner taskRunner) {
taskRunner.run(this::visitDeploymentDependency);
scheduleChildVisits(taskRunner, AppDep::scheduleDeploymentVisit);
}

void visitDeploymentDependency() {
Expand All @@ -525,18 +479,9 @@ void visitDeploymentDependency() {
}
}

void scheduleRuntimeVisit(Phaser phaser) {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
visitRuntimeDependency();
} catch (Throwable t) {
errors.add(t);
} finally {
phaser.arriveAndDeregister();
}
});
scheduleChildVisits(phaser, AppDep::scheduleRuntimeVisit);
void scheduleRuntimeVisit(ModelResolutionTaskRunner taskRunner) {
taskRunner.run(this::visitRuntimeDependency);
scheduleChildVisits(taskRunner, AppDep::scheduleRuntimeVisit);
}

void visitRuntimeDependency() {
Expand Down Expand Up @@ -578,7 +523,8 @@ void visitRuntimeDependency() {
}
}

void scheduleChildVisits(Phaser phaser, BiConsumer<AppDep, Phaser> childVisitor) {
void scheduleChildVisits(ModelResolutionTaskRunner taskRunner,
BiConsumer<AppDep, ModelResolutionTaskRunner> childVisitor) {
var childNodes = node.getChildren();
List<DependencyNode> filtered = null;
for (int i = 0; i < childNodes.size(); ++i) {
Expand All @@ -605,7 +551,7 @@ void scheduleChildVisits(Phaser phaser, BiConsumer<AppDep, Phaser> childVisitor)
node.setChildren(filtered);
}
for (var child : children) {
childVisitor.accept(child, phaser);
childVisitor.accept(child, taskRunner);
}
}

Expand Down Expand Up @@ -1081,10 +1027,9 @@ void activate() {
if (collectReloadableModules) {
conditionalDep.walkingFlags |= COLLECT_RELOADABLE_MODULES;
}
var phaser = new Phaser(1);
conditionalDep.scheduleRuntimeVisit(phaser);
phaser.arriveAndAwaitAdvance();
assertNoErrors();
var taskRunner = new ModelResolutionTaskRunner();
conditionalDep.scheduleRuntimeVisit(taskRunner);
taskRunner.waitForCompletion();
conditionalDep.setFlags(conditionalDep.walkingFlags);
if (conditionalDep.parent.resolvedDep == null) {
conditionalDep.parent.allDeps.add(conditionalDep.resolvedDep.getArtifactCoords());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.bootstrap.resolver.maven;

/**
* Task related to resolution of an {@link io.quarkus.bootstrap.model.ApplicationModel}
*/
public interface ModelResolutionTask {
void run() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.quarkus.bootstrap.resolver.maven;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;

import org.jboss.logging.Logger;

class ModelResolutionTaskRunner {

private static final Logger log = Logger.getLogger(ModelResolutionTaskRunner.class);

private final Phaser phaser = new Phaser(1);

/**
* Errors caught while running tasks
*/
private final Collection<Exception> errors = new ConcurrentLinkedDeque<>();

/**
* Runs a model resolution task asynchronously. This method may return before the task has completed.
*
* @param task task to run
*/
void run(ModelResolutionTask task) {
phaser.register();
CompletableFuture.runAsync(() -> {
try {
task.run();
} catch (Exception e) {
errors.add(e);
} finally {
phaser.arriveAndDeregister();
}
});
}

/**
* Blocks until all the tasks have completed.
* <p>
* In case some tasks failed with errors, this method will log each error and throw a {@link RuntimeException}
* with a corresponding message.
*/
void waitForCompletion() {
phaser.arriveAndAwaitAdvance();
assertNoErrors();
}

private void assertNoErrors() {
if (!errors.isEmpty()) {
var sb = new StringBuilder(
"The following errors were encountered while processing Quarkus application dependencies:");
log.error(sb);
var i = 1;
for (var error : errors) {
var prefix = i++ + ")";
log.error(prefix, error);
sb.append(System.lineSeparator()).append(prefix).append(" ").append(error.getLocalizedMessage());
for (var e : error.getStackTrace()) {
sb.append(System.lineSeparator()).append(e);
if (e.getClassName().contains("io.quarkus")) {
break;
}
}
}
throw new RuntimeException(sb.toString());
}
}
}

0 comments on commit 2cb8599

Please sign in to comment.