Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#29 Allow async initialization of Node and Client #30

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Import spring-elasticsearch in you project `pom.xml` file:
<dependency>
<groupId>fr.pilato.spring</groupId>
<artifactId>spring-elasticsearch</artifactId>
<version>0.2.0</version>
<version>0.2.0-SNAPSHOT</version>
</dependency>
```

Expand Down Expand Up @@ -352,6 +352,18 @@ Just set `forceTemplate` property to `true`.
<elasticsearch:client id="esClient" forceTemplate="true" />
```

### Asyncronous initialization

Node and client beans initialization are by default synchronously. They can be initialized asynchronously with the attributes `async` and `taskExecutor`.

```xml
<task:executor pool-size="4" id="taskExecutor"/>
<elasticsearch:client id="esClient" async="true" taskExecutor="taskExecutor"/>
```
Aynchronous initialization does not block Spring startup but it continues on background on another thread.
Any methods call to these beans before elasticsearch is initialized will be blocker. `taskExecutor` references a standard Spring's task executor.


Old fashion bean definition
---------------------------

Expand Down Expand Up @@ -422,8 +434,11 @@ Note that you can use the old fashion method to define your beans instead of usi
Thanks
------

Special thanks to [Nicolas Huray](https://github.com/nhuray) for his contribution about
Special thanks to
- [Nicolas Huray](https://github.com/nhuray) for his contribution about
[templates](https://github.com/dadoonet/spring-elasticsearch/pull/4)
- [Nicolas Labrot](https://github.com/nithril‎) for his contribution about
[async](https://github.com/dadoonet/spring-elasticsearch/pull/30)


License
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<url>https://github.com/nhuray/</url>
<timezone>+1</timezone>
</developer>
<developer>
<id>nlabrot</id>
<name>Nicolas Labrot</name>
<url>https://github.com/nithril‎/</url>
<timezone>Europe/Paris</timezone>
</developer>
</developers>

<scm>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@

package fr.pilato.spring.elasticsearch;

import fr.pilato.spring.elasticsearch.proxy.GenericInvocationHandler;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
Expand Down Expand Up @@ -162,6 +177,7 @@ public abstract class ElasticsearchAbstractClientFactoryBean extends Elasticsear
protected final Log logger = LogFactory.getLog(getClass());

protected Client client;
protected Client proxyfiedClient;

protected boolean forceMapping;

Expand Down Expand Up @@ -324,13 +340,33 @@ public void setClasspathRoot(String classpathRoot) {
public void afterPropertiesSet() throws Exception {
logger.info("Starting ElasticSearch client");

if (async) {
Assert.notNull(taskExecutor);
Future<Client> future = taskExecutor.submit(new Callable<Client>() {
@Override
public Client call() throws Exception {
return initialize();
}
});
proxyfiedClient = (Client) Proxy.newProxyInstance(Client.class.getClassLoader(),
new Class[]{Client.class}, new GenericInvocationHandler(future));

} else {
client = initialize();
}
}


private Client initialize() throws Exception {
client = buildClient();
if (autoscan) {
computeMappings();
}
initTemplates();
initMappings();
initAliases();

return client;
}

@Override
Expand All @@ -347,7 +383,7 @@ public void destroy() throws Exception {

@Override
public Client getObject() throws Exception {
return client;
return async ? proxyfiedClient : client;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.Map;
import java.util.Properties;
Expand All @@ -40,6 +41,10 @@ public abstract class ElasticsearchAbstractFactoryBean {

protected Properties properties;

protected boolean async = false;

protected ThreadPoolTaskExecutor taskExecutor;

/**
* Elasticsearch Settings file classpath URL (default : es.properties)
* <p>Example :<br/>
Expand Down Expand Up @@ -94,5 +99,23 @@ public void setSettings(final Map<String, String> settings) {
*/
public void setProperties(Properties properties) {
this.properties = properties;
}

/**
* Enable async initialization
*
* @param async
*/
public void setAsync(boolean async) {
this.async = async;
}

/**
* Executor for async init mode
*
* @param taskExecutor
*/
public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package fr.pilato.spring.elasticsearch;

import fr.pilato.spring.elasticsearch.proxy.GenericInvocationHandler;

import java.lang.reflect.Proxy;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.common.settings.ImmutableSettings;
Expand All @@ -28,6 +34,8 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;

/**
* A {@link FactoryBean} implementation used to create a {@link Node} element
Expand All @@ -46,34 +54,28 @@ public class ElasticsearchNodeFactoryBean extends ElasticsearchAbstractFactoryBe
protected final Log logger = LogFactory.getLog(getClass());

private Node node;
private Node proxyfiedNode;

@Override
public void afterPropertiesSet() throws Exception {
final NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder();
if (async) {
Assert.notNull(taskExecutor);

if (null != settings && null == properties) {
logger.warn("settings has been deprecated in favor of properties. See issue #15: https://github.com/dadoonet/spring-elasticsearch/issues/15.");
nodeBuilder.getSettings().put(settings);
}
Future<Node> nodeFuture = taskExecutor.submit(new Callable<Node>() {
@Override
public Node call() throws Exception {
return initialize();
}
});
proxyfiedNode = (Node) Proxy.newProxyInstance(Node.class.getClassLoader(),
new Class[]{Node.class}, new GenericInvocationHandler(nodeFuture));

if (null != settingsFile && null == properties) {
Settings settings = ImmutableSettings.settingsBuilder()
.loadFromClasspath(this.settingsFile)
.build();
nodeBuilder.getSettings().put(settings);
} else {
node = initialize();
}

if (null != properties) {
nodeBuilder.getSettings().put(properties);
}

if (logger.isDebugEnabled()) logger.debug("Starting ElasticSearch node...");
node = nodeBuilder.node();
logger.info( "Node [" + node.settings().get("name") + "] for [" + node.settings().get("cluster.name") + "] cluster started..." );
if (logger.isDebugEnabled()) logger.debug( " - data : " + node.settings().get("path.data") );
if (logger.isDebugEnabled()) logger.debug( " - logs : " + node.settings().get("path.logs") );
}


@Override
public void destroy() throws Exception {
try {
Expand All @@ -86,7 +88,7 @@ public void destroy() throws Exception {

@Override
public Node getObject() throws Exception {
return node;
return async ? proxyfiedNode : node;
}

@Override
Expand All @@ -99,4 +101,32 @@ public boolean isSingleton() {
return true;
}

private Node initialize() {
final NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder();

if (null != settings && null == properties) {
logger.warn("settings has been deprecated in favor of properties. See issue #15: https://github.com/dadoonet/spring-elasticsearch/issues/15.");
nodeBuilder.getSettings().put(settings);
}

if (null != settingsFile && null == properties) {
Settings settings = ImmutableSettings.settingsBuilder()
.loadFromClasspath(this.settingsFile)
.build();
nodeBuilder.getSettings().put(settings);
}

if (null != properties) {
nodeBuilder.getSettings().put(properties);
}

if (logger.isDebugEnabled()) logger.debug("Starting ElasticSearch node...");
node = nodeBuilder.node();
logger.info("Node [" + node.settings().get("name") + "] for [" + node.settings().get("cluster.name") + "] cluster started...");
if (logger.isDebugEnabled()) logger.debug(" - data : " + node.settings().get("path.data"));
if (logger.isDebugEnabled()) logger.debug(" - logs : " + node.settings().get("path.logs"));

return node;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.pilato.spring.elasticsearch.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.concurrent.Future;

import org.springframework.util.ReflectionUtils;

/**
* @author labrot
* Date: 10/7/13
* Time: 4:19 PM
*/
public class GenericInvocationHandler<T> implements InvocationHandler {

private volatile T bean;
private Future<T> nodeFuture;

public GenericInvocationHandler(Future<T> nodeFuture) {
this.nodeFuture = nodeFuture;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (bean == null) {
bean = nodeFuture.get();
//release reference
nodeFuture = null;
}
return ReflectionUtils.invokeMethod(method, bean, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public BeanDefinition parse(Element element, ParserContext parserContext) {
String aliases = XMLParserUtil.getElementStringValue(element, "aliases");
String templates = XMLParserUtil.getElementStringValue(element, "templates");

String taskExecutor = element.getAttribute("taskExecutor");
String async = element.getAttribute("async");

// Checking bean definition
boolean isClientNode = (node != null && node.length() > 0);
boolean isEsNodesEmpty = (esNodes == null || esNodes.length() == 0);
Expand All @@ -77,13 +80,13 @@ public BeanDefinition parse(Element element, ParserContext parserContext) {
bdef.setBeanClass(ElasticsearchClientFactoryBean.class);
BeanDefinitionBuilder clientBuilder = startClientBuilder(ElasticsearchClientFactoryBean.class,
settingsFile, properties, forceMapping, forceTemplate, mergeMapping, mergeSettings, autoscan,
classpathRoot, mappings, aliases, templates);
classpathRoot, mappings, aliases, templates, async, taskExecutor);
client = ClientBeanDefinitionParser.buildClientDef(clientBuilder, node);
} else {
bdef.setBeanClass(ElasticsearchTransportClientFactoryBean.class);
BeanDefinitionBuilder clientBuilder = startClientBuilder(ElasticsearchTransportClientFactoryBean.class,
settingsFile, properties, forceMapping, forceTemplate, mergeMapping, mergeSettings, autoscan,
classpathRoot, mappings, aliases, templates);
classpathRoot, mappings, aliases, templates, async, taskExecutor);
client = ClientBeanDefinitionParser.buildTransportClientDef(clientBuilder, esNodes);
}

Expand Down Expand Up @@ -119,7 +122,7 @@ public static BeanDefinitionBuilder startClientBuilder(Class beanClass, String s
boolean forceMapping, boolean forceTemplate,
boolean mergeMapping, boolean mergeSettings,
boolean autoscan, String classpathRoot, String mappings,
String aliases, String templates) {
String aliases, String templates, String async, String taskExecutor) {
BeanDefinitionBuilder nodeFactory = BeanDefinitionBuilder.rootBeanDefinition(beanClass);
if (settingsFile != null && settingsFile.length() > 0) {
logger.warn("settingsFile is deprecated. Use properties attribute instead. See issue #15: https://github.com/dadoonet/spring-elasticsearch/issues/15.");
Expand All @@ -145,6 +148,15 @@ public static BeanDefinitionBuilder startClientBuilder(Class beanClass, String s
if (templates != null && templates.length() > 0) {
nodeFactory.addPropertyValue("templates", templates);
}

if (async != null && async.length() > 0) {
nodeFactory.addPropertyValue("async", async);
}
if (taskExecutor != null && taskExecutor.length() > 0) {
nodeFactory.addPropertyReference("taskExecutor", taskExecutor);
}


return nodeFactory;
}

Expand Down
Loading