Skip to content

Commit

Permalink
#42 Now loading options/services/transforms in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Rudin committed Jan 17, 2017
1 parent b6bf4d8 commit 77e9175
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@

import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.marklogic.client.io.marker.QueryOptionsWriteHandle;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.FileCopyUtils;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -48,6 +55,11 @@ public class DefaultModulesLoader extends LoggingObject implements ModulesLoader
private ModulesManager modulesManager;
private StaticChecker staticChecker;

// For parallelizing writes of modules
private TaskExecutor taskExecutor;
private int taskThreadCount = 16;
private List<Future> taskFutures;

/**
* When set to true, exceptions thrown while loading transforms and resources will be caught and logged, and the
* module will be updated as having been loaded. This is useful when running a program that watches modules for changes, as it
Expand Down Expand Up @@ -85,6 +97,17 @@ public DefaultModulesLoader(XccAssetLoader xccAssetLoader) {
this.xccAssetLoader = xccAssetLoader;
}

protected void initializeDefaultTaskExecutor() {
if (taskThreadCount > 1) {
ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
tpte.setCorePoolSize(taskThreadCount);
tpte.afterPropertiesSet();
this.taskExecutor = tpte;
} else {
this.taskExecutor = new SyncTaskExecutor();
}
}

/**
* Load modules from the given base directory, selecting modules via the given ModulesFinder, and loading them via
* the given DatabaseClient. Note that asset modules will not be loaded by the DatabaseClient that's passed in here,
Expand All @@ -103,21 +126,62 @@ public Set<File> loadModules(File baseDir, ModulesFinder modulesFinder, Database

Modules modules = modulesFinder.findModules(baseDir);

Set<File> loadedModules = new HashSet<>();
if (taskExecutor == null) {
initializeDefaultTaskExecutor();
}
taskFutures = new ArrayList<>();

Set<File> loadedModules = new HashSet<>();
loadProperties(modules, loadedModules);
loadNamespaces(modules, loadedModules);
loadAssets(modules, loadedModules);

loadQueryOptions(modules, loadedModules);
loadTransforms(modules, loadedModules);
loadResources(modules, loadedModules);

// Wait for thread pool to complete
waitForTaskFuturesToComplete();

if (logger.isDebugEnabled()) {
logger.debug("Finished loading modules from base directory: " + baseDir.getAbsolutePath());
}
return loadedModules;
}

/**
* If an AsyncTaskExecutor is used for loading options/services/transforms, we need to wait for the tasks to complete
* before we e.g. release the DatabaseClient.
*/
protected void waitForTaskFuturesToComplete() {
int size = taskFutures.size();
for (int i = 0; i < size; i++) {
Future<?> f = taskFutures.get(i);
if (f.isDone() || f.isCancelled()) {
continue;
}
try {
// Wait up to 1 hour for a write to ML to finish (should never happen)
f.get(1, TimeUnit.HOURS);
} catch (Exception ex) {
logger.warn("Unable to wait for last task future to finish: " + ex.getMessage(), ex);
}
}

if (taskExecutor instanceof ExecutorConfigurationSupport) {
((ExecutorConfigurationSupport)taskExecutor).shutdown();
} else if (taskExecutor instanceof DisposableBean) {
try {
((DisposableBean) taskExecutor).destroy();
} catch (Exception ex) {
logger.warn("Unexpected exception while calling destroy() on taskExecutor: " + ex.getMessage(), ex);
}
}

// Null this out so a new thread pool is created if loadModules is called again on this object
this.taskExecutor = null;
}

/**
* Specialized method for loading modules from the classpath. Currently does not support loading asset modules.
*
Expand Down Expand Up @@ -320,7 +384,6 @@ protected void loadTransforms(Modules modules, Set<File> loadedModules) {

for (Resource r : modules.getTransforms()) {
File f = getFileFromResource(r);

try {
ExtensionMetadataAndParams emap = extensionMetadataProvider.provideExtensionMetadataAndParams(r);
f = installTransform(f, emap.metadata);
Expand Down Expand Up @@ -386,61 +449,70 @@ public File installService(File file, ExtensionMetadata metadata, MethodParamete
if (modulesManager != null && !modulesManager.hasFileBeenModifiedSinceLastInstalled(file)) {
return null;
}

installService(new FileSystemResource(file), metadata, methodParams);

if (modulesManager != null) {
modulesManager.saveLastInstalledTimestamp(file, new Date());
}
return file;
}

public void installService(Resource r, ExtensionMetadata metadata, MethodParameters... methodParams) {
ResourceExtensionsManager extMgr = client.newServerConfigManager().newResourceExtensionsManager();
String resourceName = getExtensionNameFromFile(r);
public void installService(Resource r, final ExtensionMetadata metadata, final MethodParameters... methodParams) {
final ResourceExtensionsManager extMgr = client.newServerConfigManager().newResourceExtensionsManager();
final String resourceName = getExtensionNameFromFile(r);
if (metadata.getTitle() == null) {
metadata.setTitle(resourceName + " resource extension");
}

logger.info(String.format("Loading %s resource extension from file %s", resourceName, r.getFilename()));
InputStreamHandle h;
try {
extMgr.writeServices(resourceName, new InputStreamHandle(r.getInputStream()), metadata, methodParams);
h = new InputStreamHandle(r.getInputStream());
} catch (IOException ie) {
throw new RuntimeException("Unable to write service: " + ie.getMessage(), ie);
throw new RuntimeException("Unable to read service resource: " + ie.getMessage(), ie);
}
final InputStreamHandle finalHandle = h;
executeTask(new Runnable() {
@Override
public void run() {
extMgr.writeServices(resourceName, finalHandle, metadata, methodParams);
}
});
}

public File installTransform(File file, ExtensionMetadata metadata) {
if (modulesManager != null && !modulesManager.hasFileBeenModifiedSinceLastInstalled(file)) {
return null;
}

installTransform(new FileSystemResource(file), metadata);

if (modulesManager != null) {
modulesManager.saveLastInstalledTimestamp(file, new Date());
}
return file;
}

public void installTransform(Resource r, ExtensionMetadata metadata) {
String filename = r.getFilename();
TransformExtensionsManager mgr = client.newServerConfigManager().newTransformExtensionsManager();
String transformName = getExtensionNameFromFile(r);
public void installTransform(Resource r, final ExtensionMetadata metadata) {
final String filename = r.getFilename();
final TransformExtensionsManager mgr = client.newServerConfigManager().newTransformExtensionsManager();
final String transformName = getExtensionNameFromFile(r);
logger.info(String.format("Loading %s transform from resource %s", transformName, filename));
InputStreamHandle h = null;
try {
h = new InputStreamHandle(r.getInputStream());
} catch (IOException ie) {
throw new RuntimeException("Unable to read transform resource: " + ie.getMessage(), ie);
}
if (FilenameUtil.isXslFile(filename)) {
mgr.writeXSLTransform(transformName, h, metadata);
} else if (FilenameUtil.isJavascriptFile(filename)) {
mgr.writeJavascriptTransform(transformName, h, metadata);
} else {
mgr.writeXQueryTransform(transformName, h, metadata);
}
final InputStreamHandle finalHandle = h;
executeTask(new Runnable() {
@Override
public void run() {
if (FilenameUtil.isXslFile(filename)) {
mgr.writeXSLTransform(transformName, finalHandle, metadata);
} else if (FilenameUtil.isJavascriptFile(filename)) {
mgr.writeJavascriptTransform(transformName, finalHandle, metadata);
} else {
mgr.writeXQueryTransform(transformName, finalHandle, metadata);
}
}
});
}

public File installQueryOptions(File f) {
Expand All @@ -455,21 +527,41 @@ public File installQueryOptions(File f) {
}

public void installQueryOptions(Resource r) {
String filename = r.getFilename();
String name = getExtensionNameFromFile(r);
final String filename = r.getFilename();
final String name = getExtensionNameFromFile(r);
logger.info(String.format("Loading %s query options from file %s", name, filename));
QueryOptionsManager mgr = client.newServerConfigManager().newQueryOptionsManager();
InputStreamHandle h = null;
final QueryOptionsManager mgr = client.newServerConfigManager().newQueryOptionsManager();
InputStreamHandle h;
try {
h = new InputStreamHandle(r.getInputStream());
} catch (IOException ie) {
throw new RuntimeException("Unable to read transform resource: " + ie.getMessage(), ie);
}
if (filename.endsWith(".json")) {
mgr.writeOptions(name, h.withFormat(Format.JSON));
} else {
mgr.writeOptions(name, h);
}
final InputStreamHandle writeHandle = h;
executeTask(new Runnable() {
@Override
public void run() {
if (filename.endsWith(".json")) {
mgr.writeOptions(name, writeHandle.withFormat(Format.JSON));
} else {
mgr.writeOptions(name, writeHandle);
}
}
});
}

/**
* Ensures that if we're using an AsyncTaskExecutor, we capture the Future and add it to our list so we can ensure
* we wait for it to finish.
*
* @param r
*/
protected void executeTask(Runnable r) {
if (taskExecutor instanceof AsyncTaskExecutor) {
taskFutures.add(((AsyncTaskExecutor)taskExecutor).submit(r));
} else {
taskExecutor.execute(r);
}
}

public File installNamespace(File f) {
Expand Down Expand Up @@ -557,4 +649,12 @@ public void setStaticChecker(StaticChecker staticChecker) {
public StaticChecker getStaticChecker() {
return staticChecker;
}

public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

public void setTaskThreadCount(int taskThreadCount) {
this.taskThreadCount = taskThreadCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration( classes = {MarkLogicApplicationContext.class} )
public class AbstractIntegrationTest extends Assert {
public abstract class AbstractIntegrationTest extends Assert {

@Autowired
protected DatabaseClientConfig clientConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@
public class LoadModulesDebug {

public static void main(String[] args) throws Exception {
if (false) {
File dir = new File("c:/temp/modules");
dir.mkdirs();
for (int i = 0; i < 500; i++) {
FileCopyUtils.copy(new String("Hello " + System.currentTimeMillis()).getBytes(),
new File(dir, i + ".xqy"));
}
return;
}

DatabaseClient client = DatabaseClientFactory.newClient("localhost", 8000, "admin", "admin",
Authentication.DIGEST);

Expand All @@ -43,7 +33,7 @@ public static void main(String[] args) throws Exception {
//l.setRestApiAssetLoader(raal);
l.setXccAssetLoader(xal);

String path = "c:/temp/modules";
String path = "src/test/resources/sample-base-dir";
try {
long start = System.currentTimeMillis();
l.loadModules(new File(path), new DefaultModulesFinder(), client);
Expand Down

0 comments on commit 77e9175

Please sign in to comment.