Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix client holding the JVM hostage with netty/background threads #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
import java.util.concurrent.CompletionStage;

public interface Example {
CompletionStage<Void> run(EdgeDBClient client);
CompletionStage<Void> run(EdgeDBClient client) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ public final class GlobalsAndConfig implements Example {
private static final Logger logger = LoggerFactory.getLogger(GlobalsAndConfig.class);

@Override
public CompletionStage<Void> run(EdgeDBClient client) {
var configuredClient = client
public CompletionStage<Void> run(EdgeDBClient client) throws Exception {
try(var configuredClient = client
.withConfig(config -> config
.withIdleTransactionTimeout(Duration.ZERO)
.applyAccessPolicies(true))
.withGlobals(new HashMap<>(){{
put("current_user_id", UUID.randomUUID());
}});

return configuredClient.queryRequiredSingle(UUID.class, "select global current_user_id")
.thenAccept(result -> logger.info("current_user_id global: {}", result));
}})) {
return configuredClient.queryRequiredSingle(UUID.class, "select global current_user_id")
.thenAccept(result -> logger.info("current_user_id global: {}", result));
}
}
}
20 changes: 10 additions & 10 deletions examples/java-examples/src/main/java/com/edgedb/examples/Main.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package com.edgedb.examples;

import com.edgedb.driver.*;
import com.edgedb.driver.exceptions.EdgeDBException;
import com.edgedb.driver.EdgeDBClient;
import com.edgedb.driver.EdgeDBClientConfig;
import com.edgedb.driver.namingstrategies.NamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.function.Supplier;

public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);

public static void main(String[] args) throws IOException, EdgeDBException {
var client = new EdgeDBClient(EdgeDBClientConfig.builder()
public static void main(String[] args) throws Exception {
try (var client = new EdgeDBClient(EdgeDBClientConfig.builder()
.withNamingStrategy(NamingStrategy.snakeCase())
.useFieldSetters(true)
.build()
).withModule("examples");
).withModule("examples")) {
runJavaExamples(client);

runJavaExamples(client);

logger.info("Examples complete");
logger.info("Examples complete");
}

System.exit(0);
// run a GC cycle to ensure that any remaining dormant client instances get collected and closed.
System.gc();
}

private static void runJavaExamples(EdgeDBClient client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class GlobalsAndConfig : Example {
"current_user_id" to UUID.randomUUID()
))

val currentUserId = configuredClient.queryRequiredSingle(
UUID::class.java,
"SELECT GLOBAL current_user_id"
).await()
configuredClient.use {
val currentUserId = configuredClient.queryRequiredSingle(
UUID::class.java,
"SELECT GLOBAL current_user_id"
).await()

logger.info("Current user ID: {}", currentUserId)
logger.info("Current user ID: {}", currentUserId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,21 @@ object Main {
Transactions()
)

runBlocking {
for (example in examples) {
logger.info("Running Kotlin example {}...", example)
try {
example.runAsync(client)
logger.info("Kotlin example {} complete!", example)
} catch (x: Exception) {
logger.error("Failed to run Kotlin example {}", example, x)
client.use {
runBlocking {
for (example in examples) {
logger.info("Running Kotlin example {}...", example)
try {
example.runAsync(client)
logger.info("Kotlin example {} complete!", example)
} catch (x: Exception) {
logger.error("Failed to run Kotlin example {}", example, x)
}
}
}
}

exitProcess(0)
// run a GC cycle to ensure that any remaining dormant client instances get collected and closed.
System.gc();
}
}
2 changes: 1 addition & 1 deletion examples/scala-examples/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "3.1.3"

libraryDependencies ++= Seq(
"com.edgedb" % "driver" % "0.2.3" from "file:///" + System.getProperty("user.dir") + "/lib/com.edgedb.driver-0.2.3.jar",
"com.edgedb" % "driver" % "0.2.3" from "file:///" + System.getProperty("user.dir") + "/lib/com.edgedb.driver-0.4.3.jar",
"ch.qos.logback" % "logback-classic" % "1.4.7",
"ch.qos.logback" % "logback-core" % "1.4.7",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.15.1",
Expand Down
23 changes: 13 additions & 10 deletions examples/scala-examples/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.util.control.NonFatal
import ExecutionContext.Implicits.global
import scala.util.Using

@main
def main(): Unit = {
val logger = LoggerFactory.getLogger("Main")

val client = new EdgeDBClient(EdgeDBClientConfig.builder
.withNamingStrategy(NamingStrategy.snakeCase)
.useFieldSetters(true)
.build
).withModule("examples")

val examples = List(
AbstractTypes(),
BasicQueryFunctions(),
Expand All @@ -28,12 +23,20 @@ def main(): Unit = {
Transactions()
)

for (example <- examples)
Await.ready(runExample(logger, client, example), Duration.Inf)
Using(
new EdgeDBClient(EdgeDBClientConfig.builder
.withNamingStrategy(NamingStrategy.snakeCase)
.useFieldSetters(true)
.build
).withModule("examples")) { client =>
for (example <- examples)
Await.ready(runExample(logger, client, example), Duration.Inf)

logger.info("Examples complete!")
logger.info("Examples complete!")
}

System.exit(0)
// run a GC cycle to ensure that any remaining dormant client instances get collected and closed.
System.gc();
}

private def runExample(logger: Logger, client: EdgeDBClient, example: Example)(implicit context: ExecutionContext): Future[Unit] = {
Expand Down
120 changes: 103 additions & 17 deletions src/driver/src/main/java/com/edgedb/driver/EdgeDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import com.edgedb.driver.datatypes.Json;
import com.edgedb.driver.exceptions.ConfigurationException;
import com.edgedb.driver.exceptions.EdgeDBException;
import com.edgedb.driver.pooling.ClientPool;
import com.edgedb.driver.pooling.PoolContract;
import com.edgedb.driver.state.Config;
import com.edgedb.driver.state.Session;
import com.edgedb.driver.util.ClientPoolHolder;
import com.edgedb.driver.util.CleanerProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
Expand All @@ -24,6 +26,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -58,11 +61,13 @@ public Duration age() {
private final @NotNull ConcurrentLinkedQueue<PooledClient> clients;
private final EdgeDBConnection connection;
private final EdgeDBClientConfig config;
private final ClientPoolHolder poolHolder;
private final ClientPool clientPool;
private final ClientFactory clientFactory;
private final Session session;
private final int clientAvailability;

private final AtomicBoolean isClosed;

/**
* Constructs a new {@linkplain EdgeDBClient}.
* @param connection The connection parameters used to connect this client to EdgeDB.
Expand All @@ -73,10 +78,20 @@ public EdgeDBClient(EdgeDBConnection connection, @NotNull EdgeDBClientConfig con
this.clients = new ConcurrentLinkedQueue<>();
this.config = config;
this.connection = connection;
this.poolHolder = new ClientPoolHolder(config.getPoolSize());
this.clientFactory = createClientFactory();
this.session = Session.DEFAULT;
this.clientAvailability = config.getClientAvailability();
this.isClosed = new AtomicBoolean(false);

this.clientPool = new ClientPool(config.getPoolSize());
this.clientPool.addShareholder();

CleanerProvider.getCleaner().register(this, new CleanerState(
this.clientPool,
this.clients,
this.clientCount,
this.isClosed
));
}

/**
Expand Down Expand Up @@ -111,10 +126,20 @@ private EdgeDBClient(@NotNull EdgeDBClient other, Session session) {
this.clients = new ConcurrentLinkedQueue<>();
this.config = other.config;
this.connection = other.connection;
this.poolHolder = other.poolHolder;
this.clientFactory = other.clientFactory;
this.session = session;
this.clientAvailability = other.clientAvailability;
this.isClosed = new AtomicBoolean(false);

this.clientPool = other.clientPool;
this.clientPool.addShareholder();

CleanerProvider.getCleaner().register(this, new CleanerState(
this.clientPool,
this.clients,
this.clientCount,
this.isClosed
));
}

public int getClientCount() {
Expand Down Expand Up @@ -327,15 +352,6 @@ public CompletionStage<List<Json>> queryJsonElements(@NotNull String query, @Nul
);
}

@Override
public void close() throws Exception {
int count = clientCount.get();
while(!clients.isEmpty() && count > 0) {
clients.poll().client.disconnect().toCompletableFuture().get();
count = clientCount.decrementAndGet();
}
}

private synchronized CompletionStage<BaseEdgeDBClient> getClient() {
logger.trace("polling cached clients...");
var cachedClient = clients.poll();
Expand Down Expand Up @@ -394,15 +410,15 @@ private synchronized void acceptClient(BaseEdgeDBClient client) {
private synchronized @NotNull CompletionStage<Void> onClientReady(@NotNull BaseEdgeDBClient client) {
var suggestedConcurrency = client.getSuggestedPoolConcurrency();

suggestedConcurrency.ifPresent(this.poolHolder::resize);
suggestedConcurrency.ifPresent(this.clientPool::resize);

return CompletableFuture.completedFuture(null);
}

private CompletionStage<BaseEdgeDBClient> createClient() {
return this.poolHolder.acquireContract()
return this.clientPool.acquireContract()
.thenApply(contract -> {
logger.trace("Contract acquired, remaining handles: {}", this.poolHolder.remaining());
logger.trace("Contract acquired, remaining handles: {}", this.clientPool.remaining());
BaseEdgeDBClient client;
try {
client = clientFactory.create(this.connection, this.config, contract);
Expand All @@ -417,9 +433,79 @@ private CompletionStage<BaseEdgeDBClient> createClient() {
.thenApply(client -> client.withSession(this.session));
}

@Override
public void close() throws Exception {
logger.debug("Cleaning from explicit close");
if(!isClosed.compareAndSet(false, true)) {
logger.debug("Cleaning already preformed");
return;
}

int count = clientCount.get();
while(!clients.isEmpty() && count > 0) {
clients.poll().client.disconnect().toCompletableFuture().get();
count = clientCount.decrementAndGet();
}

if(this.clientPool.removeShareholder()) {
this.clientPool.close();
}

logger.debug("Cleaning complete");
}

private static class CleanerState implements Runnable {
private final ClientPool pool;
private final ConcurrentLinkedQueue<PooledClient> clients;
private final AtomicInteger clientCount;
private final AtomicBoolean isClosed;

public CleanerState(
ClientPool pool,
ConcurrentLinkedQueue<PooledClient> clients,
AtomicInteger clientCount,
AtomicBoolean isClosed
) {
this.pool = pool;
this.clients = clients;
this.clientCount = clientCount;
this.isClosed = isClosed;
}

@Override
public void run() {
logger.debug("Running cleaning from garbage collection");
if(!isClosed.compareAndSet(false, true)) {
logger.debug("Cleaning already preformed");
return;
}

int count = clientCount.get();
while(!clients.isEmpty() && count > 0) {
try {
clients.poll().client.disconnect().toCompletableFuture().get();
} catch (Exception x) {
logger.warn("Failed to disconnect client", x);
}
count = clientCount.decrementAndGet();
}

if(this.pool.removeShareholder()) {
try {
this.pool.close();
} catch (Exception x) {
logger.warn("Failed to close client pool", x);
}
}

logger.debug("Cleaning complete");
}
}


@FunctionalInterface
private interface ClientFactory {
BaseEdgeDBClient create(EdgeDBConnection connection, EdgeDBClientConfig config, AutoCloseable poolHandle)
BaseEdgeDBClient create(EdgeDBConnection connection, EdgeDBClientConfig config, PoolContract poolContract)
throws EdgeDBException;
}
}
Loading