Skip to content

Commit

Permalink
feature: start worker process when starting WebApp in embedded mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed Nov 20, 2024
1 parent d2e6216 commit a5736d0
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public Process executeExtension(boolean inheritIO, String... args) throws IOExce
return builder.start();
}

String getPidFilePattern() {
String pattern = extensionPattern;
if (pattern.endsWith("$")) {
pattern = pattern.substring(0, pattern.length() - 1);
}
pattern += ".*\\.pid";
return pattern;
}

private Path locateExtension() {
Set<File> extensionBins = extensionService.listInstalled(extensionPattern);
return switch (extensionBins.size()) {
Expand Down
101 changes: 87 additions & 14 deletions datashare-app/src/main/java/org/icij/datashare/WebApp.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
package org.icij.datashare;

import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
import static org.icij.datashare.cli.DatashareCliOptions.BROWSER_OPEN_LINK_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.NLP_PARALLELISM_OPT;
import static org.icij.datashare.utils.ProcessHandler.dumpPid;
import static org.icij.datashare.utils.ProcessHandler.findPidPaths;
import static org.icij.datashare.utils.ProcessHandler.isProcessRunning;
import static org.icij.datashare.utils.ProcessHandler.killProcessById;

import java.awt.Desktop;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import net.codestory.http.WebServer;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.bus.amqp.QpidAmqpServer;
Expand All @@ -8,31 +27,31 @@
import org.icij.datashare.cli.DatashareCli;
import org.icij.datashare.cli.Mode;
import org.icij.datashare.cli.QueueType;
import org.icij.datashare.json.JsonObjectMapper;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.mode.EmbeddedMode;
import org.icij.datashare.tasks.BatchSearchRunner;

import java.awt.*;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.Properties;

import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
import static org.icij.datashare.cli.DatashareCliOptions.BROWSER_OPEN_LINK_OPT;

public class WebApp {
private static final int AMQP_PORT = 5672;

public static void main(String[] args) throws Exception {
start(new DatashareCli().parseArguments(args).properties);
}

static void start(Properties properties) throws Exception {
if (shouldStartQpid(properties)) {
if (isEmbeddedAMQP(properties)) {
// before creating mode because AmqpInterlocutor will try to connect the broker
new QpidAmqpServer(5672).start();
new QpidAmqpServer(AMQP_PORT).start();
}
CommonMode mode = CommonMode.create(properties);
Process nlpWorkerProcess = null;
Path nlpWorkersPidPath = null;
if (isEmbeddedAMQP(properties)) {
nlpWorkerProcess = startNlpWorkers((EmbeddedMode) mode, true);
nlpWorkersPidPath = Files.createTempFile("datashare-spacy-worker-", ".pid");
dumpPid(nlpWorkersPidPath.toFile(), nlpWorkerProcess.pid());
}

Thread webServerThread = new Thread(() ->
new WebServer()
Expand All @@ -49,10 +68,17 @@ static void start(Properties properties) throws Exception {
Desktop.getDesktop().browse(URI.create(new URI("http://localhost:")+mode.properties().getProperty(PropertiesProvider.TCP_LISTEN_PORT)));
}
requeueDatabaseBatchSearches(mode.get(BatchSearchRepository.class), mode.get(TaskManager.class));
webServerThread.join();
try {
webServerThread.join();
} finally {
if (nlpWorkerProcess != null && nlpWorkerProcess.isAlive()) {
killProcessById(nlpWorkerProcess.pid());
Files.deleteIfExists(nlpWorkersPidPath);
}
}
}

private static boolean shouldStartQpid(Properties properties) {
private static boolean isEmbeddedAMQP(Properties properties) {
return CommonMode.getMode(properties) == Mode.EMBEDDED && properties.containsValue(QueueType.AMQP.name());
}

Expand Down Expand Up @@ -80,4 +106,51 @@ private static boolean isOpen(int port) {
return false;
}
}

protected static Process startNlpWorkers(ExecutableExtensionHelper extensionHelper, int nWorkers, boolean inheritIO) throws IOException {
Path tmpRoot = Path.of(System.getProperty("java.io.tmpdir"));
for (Path p: findPidPaths("regex:" + extensionHelper.getPidFilePattern(), tmpRoot)) {
if (isProcessRunning(p, 1, TimeUnit.SECONDS)) {
String pid;
try {
pid = Files.readAllLines(p).get(0);
} catch (IOException e) {
throw new RuntimeException("failed to read pid from " + p);
}
String msg = "found phantom worker running in process " + pid
+ ", kill this process before restarting datashare !";
throw new RuntimeException(msg);
}
try {
Files.deleteIfExists(p);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// If not we start them
Path workerConfigPath = dumpNlpWorkerConfig();
return extensionHelper.executeExtension(inheritIO, "-c", workerConfigPath.toString(), "-n", String.valueOf(nWorkers));
}

private static Process startNlpWorkers(EmbeddedMode mode, boolean inheritIO) throws IOException {
PropertiesProvider propertiesProvider = mode.get(PropertiesProvider.class);
ExecutableExtensionHelper nlpExtHelper = new ExecutableExtensionHelper(
propertiesProvider, mode.get(ExtensionService.class), "^datashare-spacy-worker-[\\d\\.]+$"
);
int nWorkers = mode.get(PropertiesProvider.class).get(NLP_PARALLELISM_OPT).map(Integer::parseInt).orElse(1);
return startNlpWorkers(nlpExtHelper, nWorkers, inheritIO);
}

private static Path dumpNlpWorkerConfig() throws IOException {
Map<String, String> workerConfig = Map.of(
"type", "amqp",
"rabbitmq_host", "localhost",
"rabbitmq_port", String.valueOf(AMQP_PORT)
);
Path workerConfigPath = Files.createTempFile("datashare-spacy-worker-config-", ".json");
File tempFile = workerConfigPath.toFile();
// Write the JSON object to the temporary file
JsonObjectMapper.MAPPER.writeValue(tempFile, workerConfig);
return workerConfigPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ public static void dumpPid(File pidFile, Long pid) throws IOException {
}
}

public static Long isProcessRunning(Path pidPath, int timeout, TimeUnit timeunit) {
public static boolean isProcessRunning(Path pidPath, int timeout, TimeUnit timeunit) {
try (Stream<String> lines = Files.lines(pidPath)) {
Long pid = Long.parseLong(
lines.findFirst()
.orElseThrow(() -> new RuntimeException("PID file is empty"))
.strip()
);
if (isProcessRunning(pid, timeout, timeunit)) {
return pid;
}
return isProcessRunning(pid, timeout, timeunit);
} catch (IOException e) {
throw new RuntimeException("Failed to read PID file", e);
}
return null;
}

public static boolean isProcessRunning(Long pid, int timeout, TimeUnit timeunit) {
Expand Down Expand Up @@ -92,23 +89,10 @@ public static void killProcessById(Long pid, boolean force) {
);
}

public static Path findPidPath(String pattern, Path dir) {
public static List<Path> findPidPaths(String pattern, Path dir) {
PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher(pattern);
try (Stream<Path> paths = Files.list(dir)) {
List<Path> pidFilePaths = paths
.filter(file -> !Files.isDirectory(file) && pathMatcher.matches(file.getFileName()))
.toList();
if (pidFilePaths.isEmpty()) {
return null;
}
if (pidFilePaths.size() != 1) {
String msg = "Found several matching PID files "
+ pidFilePaths
+ ", to avoid phantom Python process,"
+ " kill these processes and clean the PID files";
throw new RuntimeException(msg);
}
return pidFilePaths.get(0);
return paths.filter(file -> !Files.isDirectory(file) && pathMatcher.matches(file.getFileName())).toList();
} catch (IOException e) {
throw new RuntimeException("Failed to list files in " + pattern, e);
}
Expand Down
71 changes: 71 additions & 0 deletions datashare-app/src/test/java/org/icij/datashare/WebAppTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.icij.datashare;

import static org.fest.assertions.Assertions.assertThat;
import static org.icij.datashare.WebApp.startNlpWorkers;
import static org.icij.datashare.json.JsonObjectMapper.MAPPER;
import static org.icij.datashare.utils.ProcessHandler.dumpPid;
import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class WebAppTest {
private static final String extDir = Objects.requireNonNull(
WebAppTest.class.getClassLoader().getResource("extensions")).getPath();
private static final Map<String, Object> properties = Map.of(
"mode", "EMBEDDED", "nlpParallelism", "6", "extensionsDir", extDir
);

@Test
public void test_nlp_workers_process() throws IOException, InterruptedException {
// Given
String extensionPattern = "^datashare-spacy-worker-[\\d\\.]+$";
ExtensionService extensionService = new ExtensionService(Path.of(extDir));
PropertiesProvider propertiesProvider = new PropertiesProvider(properties);
ExecutableExtensionHelper extensionHelper =
new ExecutableExtensionHelper(propertiesProvider, extensionService, extensionPattern);
// When
Process p = startNlpWorkers(extensionHelper, 6, false);
// Then
int timeout = 2;
TimeUnit unit = TimeUnit.SECONDS;
if (!p.waitFor(timeout, unit)) {
throw new AssertionError(
"failed to get process output in less than " + timeout + unit.name().toLowerCase());
}
HashMap<String, Object> output = (HashMap<String, Object>) MAPPER.readValue(
p.getInputStream().readAllBytes(), Map.class);
assertThat(output.get("n_workers")).isEqualTo(6);
assertThat((String) output.get("config_file")).contains("datashare-spacy-worker-config-");
}

@Test(timeout = 20000)
public void test_nlp_workers_process_should_throw_when_worker_pool_is_running() throws IOException {
// Given
String extensionPattern = "^datashare-spacy-worker-[\\d\\.]+$";
ExtensionService extensionService = new ExtensionService(Path.of(extDir));
PropertiesProvider propertiesProvider = new PropertiesProvider(properties);
ExecutableExtensionHelper extensionHelper =
new ExecutableExtensionHelper(propertiesProvider, extensionService, extensionPattern);
Process p = null;
try {
p = new ProcessBuilder("sleep", "100000").start();
Path pidPath = Files.createTempFile("datashare-spacy-worker-1.9.0", ".pid");
dumpPid(pidPath.toFile(), p.pid());
// When/Then
assertThat(
assertThrows(RuntimeException.class, () -> startNlpWorkers(extensionHelper, 1, false)).getMessage())
.matches("found phantom worker running in process.*");
} finally {
if (p != null) {
p.destroyForcibly();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static java.lang.String.join;
import static org.fest.assertions.Assertions.assertThat;
import static org.icij.datashare.utils.ProcessHandler.dumpPid;
import static org.icij.datashare.utils.ProcessHandler.findPidPath;
import static org.icij.datashare.utils.ProcessHandler.findPidPaths;
import static org.icij.datashare.utils.ProcessHandler.isProcessRunning;
import static org.icij.datashare.utils.ProcessHandler.killProcessById;

Expand Down Expand Up @@ -54,14 +54,19 @@ public void test_process_handler_int() throws IOException {

// When
String pidFilePattern = "glob:test-process-handler-*.pid";
Path foundPath = findPidPath(pidFilePattern, Path.of(pidFileDir));
List<Path> foundPaths = findPidPaths(pidFilePattern, Path.of(pidFileDir));
// Then
assertThat(Objects.requireNonNull(foundPath).toString()).isEqualTo(pidPath.toString());
assertThat(foundPaths.size()).isEqualTo(1);
assertThat(foundPaths.get(0).toString()).isEqualTo(pidPath.toString());

// When
boolean isRunning = isProcessRunning(pid, 2, TimeUnit.SECONDS);
// Then
assertThat(isRunning).isTrue();
// When
boolean isRunningFromFile = isProcessRunning(pidPath, 2, TimeUnit.SECONDS);
// Then
assertThat(isRunningFromFile).isTrue();

// When
killProcessById(pid, true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env bash
config_file=$2
n_workers=$4
echo { \"n_workers\": $n_workers, \"config_file\": \"$config_file\" }

0 comments on commit a5736d0

Please sign in to comment.