Skip to content

Commit

Permalink
SeaseLtd#89: Implement persistence connector for ES6.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Pearce committed Jul 30, 2019
1 parent 88bb18e commit 3bc70b7
Show file tree
Hide file tree
Showing 16 changed files with 881 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Add the following to the `pom.xml`:
<dependency>
<groupId>io.sease</groupId>
<artifactId>rre-persistence-plugin-elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>rre-persistence-plugin-elasticsearch</artifactId>
<version>6.7.1</version>

<properties>
<elasticsearch.version>7.2.0</elasticsearch.version>
</properties>

<name>RRE - Persistence Plugin - Elasticsearch</name>
<description>A persistence plugin for storing query results to Elasticsearch</description>
Expand All @@ -37,13 +40,13 @@
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${project.version}</version>
<version>${elasticsearch.version}</version>
</dependency>
<!-- Include ES direct dependency to exclude log4j -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${project.version}</version>
<version>${elasticsearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public boolean isAvailable() {
boolean ret = false;

try {
Response response = client.getLowLevelClient().performRequest(GET_METHOD, CLUSTER_HEALTH_ENDPOINT);
Response response = null;//client.getLowLevelClient().performRequest(GET_METHOD, CLUSTER_HEALTH_ENDPOINT);
String body = EntityUtils.toString(response.getEntity());
Map jsonMap = new ObjectMapper().readValue(body, Map.class);
if (jsonMap.containsKey("status")) {
Expand Down Expand Up @@ -106,7 +106,7 @@ public boolean isAvailable() {
* @throws IOException if a problem occurs calling the server.
*/
public boolean indexExists(String index) throws IOException {
return client.indices().exists(new GetIndexRequest().indices(index));
return false; // client.indices().exists(new GetIndexRequest().indices(index));
}

/**
Expand All @@ -122,7 +122,7 @@ public boolean createIndex(String index) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(index)
.source(readConfig(), XContentType.JSON);

CreateIndexResponse response = client.indices().create(request);
CreateIndexResponse response = null;//client.indices().create(request);

return response.isAcknowledged();
}
Expand Down Expand Up @@ -170,7 +170,7 @@ public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
};

BulkProcessor processor = BulkProcessor.builder(client::bulkAsync, listener).build();
BulkProcessor processor = null;//BulkProcessor.builder(client::bulkAsync, listener).build();

reports.forEach(r -> processor.add(
new IndexRequest(index, DOC_MAPPING_TYPE, r.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.sease.rre.core.domain.Query;
import io.sease.rre.persistence.PersistenceException;
import io.sease.rre.persistence.PersistenceHandler;
import io.sease.rre.persistence.impl.connector.ElasticsearchConnector;
import io.sease.rre.persistence.impl.connector.ElasticsearchConnectorFactory;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -124,7 +126,9 @@ private void initialiseElasticsearchConnector() throws PersistenceException {
.toArray(HttpHost[]::new);

// Initialise the client
elasticsearch = new ElasticsearchConnector(new RestHighLevelClient(RestClient.builder(httpHosts)));
elasticsearch = new ElasticsearchConnectorFactory(new RestHighLevelClient(RestClient.builder(httpHosts))).buildConnector();
} catch (final IOException e) {
LOGGER.warn("Could not initialise ElasticsearchConnector :: {}", e.getMessage());
} catch (final IllegalArgumentException e) {
throw new PersistenceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.sease.rre.core.domain.*;
import io.sease.rre.core.domain.metrics.MetricUtils;
import org.apache.commons.codec.digest.DigestUtils;
Expand All @@ -37,6 +38,7 @@
*
* @author Matt Pearce ([email protected])
*/
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class QueryVersionReport {

private final String id;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.sease.rre.persistence.impl.connector;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.sease.rre.persistence.impl.QueryVersionReport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Optional;

/**
* Utility methods for {@link ElasticsearchConnector} implementations.
*
* @author Matt Pearce ([email protected])
*/
abstract class ConnectorUtils {

private static final Logger LOGGER = LogManager.getLogger(ConnectorUtils.class);

static final String MAPPINGS_FILE = "/es_config.json";

/**
* Get the input stream for a file on the local classpath (eg. one held in
* the resources directory.
*
* @param filePath the path for the file.
* @return an optional input stream for the file, empty if the file does
* not exist.
*/
static Optional<InputStream> getStreamForMappingsFile(String filePath) {
return Optional.ofNullable(ConnectorUtils.class.getResourceAsStream(filePath));
}

/**
* Read the index configuration file stream and return it as a String.
*
* @param inputStream the input stream for the configuration file.
* @return the index configuration file.
* @throws IOException if the file cannot be read.
*/
static String readConfig(InputStream inputStream) throws IOException {
final StringWriter sw = new StringWriter();
final PrintWriter pw = new PrintWriter(sw);

try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = br.readLine()) != null) {
pw.println(line);
}
pw.close();
} catch (IOException e) {
LOGGER.error("IOException reading mappings :: {}", e.getMessage());
throw (e);
}

return sw.toString();
}

/**
* Convert a {@link QueryVersionReport} to JSON using an object mapper.
*
* @param mapper the ObjectMapper to use to carry out the conversion.
* @param report the report to be converted.
* @return a String representing the JSON object.
*/
static String convertReportToJson(ObjectMapper mapper, QueryVersionReport report) {
String json = null;

try {
json = mapper.writeValueAsString(report);
} catch (JsonProcessingException e) {
LOGGER.error("Could not convert versioned query report to JSON for Elasticsearch: {}", e.getMessage());
}

return json;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.sease.rre.persistence.impl.connector;

import io.sease.rre.persistence.impl.QueryVersionReport;

import java.io.IOException;
import java.util.Collection;

/**
* An Elasticsearch connector provides the functions required to write query
* reports to an Elasticsearch index.
*
* @author Matt Pearce ([email protected])
*/
public interface ElasticsearchConnector {

String CLUSTER_HEALTH_ENDPOINT = "/_cluster/health";

/**
* Is the Elasticsearch cluster available (and healthy)?
* <p>
* If this is not true, we won't be able to write anything to
* Elasticsearch, so the persistence handler should fail.
*
* @return {@code true} if the Elasticsearch cluster is available and
* healthy (ie. the cluster health status is green or yellow).
*/
boolean isAvailable();

/**
* Check whether or not an index exists.
*
* @param index the name of the index to check.
* @return {@code true} if the index exists.
* @throws IOException if a problem occurs calling the server.
*/
boolean indexExists(String index) throws IOException;

/**
* Create an index, using the mappings read from the mappings file.
*
* @param index the name of the index to create.
* @return {@code true} if the index was successfully created.
* @throws IOException if there are problems reading the mappings, or
* making the index creation request.
*/
boolean createIndex(String index) throws IOException;

/**
* Store a collection of items to an Elasticsearch index.
*
* @param index the index the items should be written to.
* @param reports the items to store.
*/
void storeItems(String index, Collection<QueryVersionReport> reports);

/**
* Close the Elasticsearch connector.
*
* @throws IOException if problems occur closing the connection.
*/
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.sease.rre.persistence.impl.connector;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;

import java.io.IOException;

/**
* Factory class for building an ElasticsearchConnector implementation,
* based on the Elasticsearch version in use.
*
* @author Matt Pearce ([email protected])
*/
public class ElasticsearchConnectorFactory {

private static final Logger LOGGER = LogManager.getLogger(ElasticsearchConnectorFactory.class);

private final RestHighLevelClient client;

public ElasticsearchConnectorFactory(RestHighLevelClient client) {
this.client = client;
}

public ElasticsearchConnector buildConnector() throws IOException {
final ElasticsearchConnector connector;

if (versionAllowsTypes(getVersionDetails())) {
connector = new MappingTypeElasticsearchConnector(client);
} else {
connector = new IndexOnlyElasticsearchConnector(client);
}

return connector;
}

private MainResponse.Version getVersionDetails() throws IOException {
try {
return client.info(RequestOptions.DEFAULT).getVersion();
} catch (ElasticsearchException e) {
LOGGER.error("Caught ES exception :: {}", e.getMessage());
throw new IOException(e);
}
}

private boolean versionAllowsTypes(MainResponse.Version version) {
String versionNumber = version.getNumber();
int majorVersion = Integer.valueOf(versionNumber.split("\\.")[0]);
return majorVersion < 7;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.sease.rre.persistence.impl.connector;

import io.sease.rre.persistence.impl.QueryVersionReport;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.Collection;

/**
* Implementation of {@link ElasticsearchConnector} that uses the index-only
* calls required by Elasticsearch 7 and later.
* <p>
* Makes use of the higher-level client methods where possible.
*
* @author Matt Pearce ([email protected])
*/
public class IndexOnlyElasticsearchConnector implements ElasticsearchConnector {

private final RestHighLevelClient client;

public IndexOnlyElasticsearchConnector(RestHighLevelClient client) {
this.client = client;
}

@Override
public boolean isAvailable() {
return false;
}

@Override
public boolean indexExists(String index) throws IOException {
return false;
}

@Override
public boolean createIndex(String index) throws IOException {
return false;
}

@Override
public void storeItems(String index, Collection<QueryVersionReport> reports) {

}

@Override
public void close() throws IOException {

}
}
Loading

0 comments on commit 3bc70b7

Please sign in to comment.