-
Notifications
You must be signed in to change notification settings - Fork 153
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #164 from galihlasahido/elasticsearch-feature
add elastic search log integration
- Loading branch information
Showing
7 changed files
with
344 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
description = 'jPOS-EE :: Elastic search Support Module' | ||
group = 'org.jpos.elk' | ||
|
||
dependencies { | ||
implementation libraries.orgJson | ||
implementation libraries.jpos | ||
implementation libraries.elk | ||
} | ||
|
||
apply from: "${rootProject.projectDir}/jpos-app.gradle" | ||
|
82 changes: 82 additions & 0 deletions
82
modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchLogListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package org.jpos.elasticsearch; | ||
|
||
import org.jpos.core.Configurable; | ||
import org.jpos.core.Configuration; | ||
import org.jpos.core.ConfigurationException; | ||
import org.jpos.space.Space; | ||
import org.jpos.space.SpaceError; | ||
import org.jpos.space.SpaceFactory; | ||
import org.jpos.space.TSpace; | ||
import org.jpos.util.FrozenLogEvent; | ||
import org.jpos.util.LogEvent; | ||
import org.jpos.util.LogListener; | ||
import org.json.JSONObject; | ||
import org.json.XML; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.PrintStream; | ||
|
||
public class ElasticSearchLogListener implements LogListener, Configurable { | ||
String queueName; | ||
String indexName; | ||
Space sp; | ||
Space buffer; | ||
long timeout = 3000L; | ||
boolean frozen = true; | ||
Configuration cfg; | ||
|
||
@SuppressWarnings("unused") | ||
public ElasticSearchLogListener() { | ||
super(); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public synchronized LogEvent log(LogEvent ev) { | ||
LogEvent e = frozen ? new FrozenLogEvent(ev) : ev; | ||
ByteArrayOutputStream os = new ByteArrayOutputStream(); | ||
PrintStream ps = new PrintStream(os); | ||
e.dump(ps,""); | ||
JSONObject jsonObject= XML.toJSONObject(os.toString(), true); | ||
|
||
try { | ||
getSpace().out(queueName, jsonObject, timeout); | ||
return null; | ||
} catch (Throwable t) { | ||
ev.addMessage("Unable to log to " + queueName); | ||
return ev; | ||
} | ||
} | ||
|
||
/** | ||
* @param cfg Configuration object | ||
* @throws ConfigurationException | ||
*/ | ||
@Override | ||
public void setConfiguration(Configuration cfg) throws ConfigurationException { | ||
this.cfg = cfg; | ||
queueName = cfg.get("queue", null); | ||
indexName = cfg.get("index-name", "jpos"); | ||
if (queueName == null) | ||
throw new ConfigurationException("'queue' property not configured"); | ||
|
||
timeout = cfg.getLong("timeout", timeout); | ||
frozen = cfg.getBoolean("frozen", true); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private Space getSpace() { | ||
if (sp == null) { | ||
try { | ||
sp = SpaceFactory.getSpace(cfg.get("space")); | ||
if (buffer != null) { | ||
while (buffer.rdp(queueName) != null) | ||
sp.out (queueName, buffer.inp(queueName)); | ||
buffer = null; | ||
} | ||
} catch (SpaceError e) { | ||
return (buffer = new TSpace()); | ||
} | ||
} | ||
return sp; | ||
} | ||
} |
213 changes: 213 additions & 0 deletions
213
modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
/* | ||
* jPOS Project [http://jpos.org] | ||
* Copyright (C) 2000-2019 jPOS Software SRL | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package org.jpos.elasticsearch; | ||
|
||
import org.apache.http.HttpHost; | ||
import org.apache.http.auth.AuthScope; | ||
import org.apache.http.auth.UsernamePasswordCredentials; | ||
import org.apache.http.client.CredentialsProvider; | ||
import org.apache.http.impl.client.BasicCredentialsProvider; | ||
import org.elasticsearch.action.bulk.BulkRequest; | ||
import org.elasticsearch.action.bulk.BulkResponse; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.client.RequestOptions; | ||
import org.elasticsearch.client.RestClient; | ||
import org.elasticsearch.client.RestClientBuilder; | ||
import org.elasticsearch.client.RestHighLevelClient; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentFactory; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.jpos.core.Configuration; | ||
import org.jpos.core.ConfigurationException; | ||
import org.jpos.q2.QBeanSupport; | ||
import org.jpos.space.Space; | ||
import org.jpos.space.SpaceFactory; | ||
import org.jpos.util.NameRegistrar; | ||
import org.json.JSONArray; | ||
import org.json.JSONObject; | ||
|
||
import java.io.IOException; | ||
import java.text.ParseException; | ||
import java.text.SimpleDateFormat; | ||
import java.util.Arrays; | ||
import java.util.Date; | ||
import java.util.TimeZone; | ||
import java.util.UUID; | ||
|
||
public class ElasticSearchService extends QBeanSupport implements Runnable { | ||
private RestHighLevelClient client; | ||
private String queue; | ||
private Space sp; | ||
private String indexname; | ||
|
||
public RestHighLevelClient getClient() { | ||
return client; | ||
} | ||
|
||
@Override | ||
@SuppressWarnings("unchecked") | ||
public void run() { | ||
boolean stopping = false; | ||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); | ||
|
||
while (running() && !stopping) { | ||
try { | ||
BulkRequest br = new BulkRequest(); | ||
Object obj = sp.rd (queue); | ||
while ((obj = sp.inp (queue)) != null) { | ||
SimpleDateFormat sdfat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); | ||
SimpleDateFormat sdfatufc = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); | ||
sdfatufc.setTimeZone(TimeZone.getTimeZone("UTC")); | ||
|
||
if (obj instanceof JSONObject) { | ||
JSONObject jsonObject = (JSONObject) obj; | ||
JSONObject jsonLog = jsonObject.getJSONObject("log"); | ||
String direction = jsonLog.keySet().stream().filter(item->item.matches("receive|send")).findAny().orElse(""); | ||
if(!jsonLog.isNull("lifespan")) { | ||
jsonLog.put("lifespan-int", Integer.valueOf(jsonLog.get("lifespan").toString().replaceAll("ms", ""))); | ||
} | ||
|
||
if(!jsonLog.isNull("at")) { | ||
jsonObject.put("time-utc", sdfatufc.format(sdfat.parse(jsonLog.get("at").toString()))); | ||
} | ||
|
||
if(direction!="") { | ||
JSONObject receiveLog = jsonLog.getJSONObject(direction); | ||
if(!receiveLog.isNull("isomsg")) { | ||
jsonLog.put("direction", direction); | ||
JSONObject isomsgLog = receiveLog.getJSONObject("isomsg"); | ||
JSONArray fieldLog = isomsgLog.getJSONArray("field"); | ||
for (int i = 0; i < fieldLog.length(); i++) { | ||
jsonLog.put("field" + fieldLog.getJSONObject(i).get("id"), fieldLog.getJSONObject(i).get("value")); | ||
} | ||
jsonLog.remove(direction); | ||
jsonObject.put("action", direction + " message"); | ||
} | ||
} else { | ||
jsonObject.put("action", "logging"); | ||
} | ||
|
||
|
||
IndexRequest req = new IndexRequest( indexname+"-"+sdf.format(new Date())) | ||
.id(UUID.randomUUID().toString()) | ||
.source(jsonObject.toString(), XContentType.JSON) | ||
.timeout(TimeValue.timeValueSeconds(5)); | ||
|
||
br.add(req); | ||
} else { | ||
stopping = true; | ||
break; | ||
} | ||
} | ||
if (br.estimatedSizeInBytes() > 0) { | ||
BulkResponse bulkResponse = client.bulk(br, RequestOptions.DEFAULT); | ||
if(bulkResponse.hasFailures()) { | ||
getLog().warn (bulkResponse.buildFailureMessage()); | ||
} | ||
} | ||
} catch (IOException | ParseException e) { | ||
getLog().warn (e); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
protected void initService() { | ||
// String[] urls = cfg.getAll("url"); | ||
// HttpHost[] hosts = new HttpHost[urls.length]; | ||
// for (int i=0; i<hosts.length; i++) { | ||
// hosts[i] = HttpHost.create(urls[i]); | ||
// } | ||
|
||
HttpHost[] hosts = Arrays.stream(cfg.getAll("url")).map(HttpHost::create).toArray(HttpHost[]::new); | ||
RestClientBuilder builder = RestClient.builder(hosts); | ||
|
||
String user = cfg.get("user", null); | ||
String password = cfg.get("password", null); | ||
|
||
if (user != null && password !=null) { | ||
CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); | ||
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); | ||
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder | ||
.setDefaultCredentialsProvider(credentialsProvider)); | ||
} | ||
client = new RestHighLevelClient(builder); | ||
NameRegistrar.register(getName(), this); | ||
} | ||
|
||
@Override | ||
protected void startService() { | ||
if (queue != null) | ||
new Thread(this, getName()).start(); | ||
try { | ||
index("start"); | ||
} catch (Exception e) { | ||
getLog().warn(e); | ||
} | ||
} | ||
|
||
@Override | ||
protected void stopService() { | ||
try { | ||
if (queue != null) | ||
sp.out (queue, Boolean.FALSE); | ||
|
||
index("stop"); | ||
client.close(); | ||
} catch (IOException e) { | ||
getLog().warn(e); | ||
} | ||
} | ||
|
||
@Override | ||
protected void destroyService() { | ||
NameRegistrar.unregister(getName()); | ||
} | ||
|
||
@Override | ||
public void setConfiguration (Configuration cfg) throws ConfigurationException { | ||
super.setConfiguration(cfg); | ||
queue = cfg.get("queue", null); | ||
this.indexname = cfg.get("index-name", "jpos"); | ||
if (queue != null) { | ||
sp = SpaceFactory.getSpace(cfg.get("space")); | ||
} | ||
} | ||
|
||
private void index(String action) throws IOException { | ||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); | ||
|
||
XContentBuilder builder = XContentFactory.jsonBuilder(); | ||
builder.startObject(); | ||
{ | ||
builder.field("instance", getServer().getInstanceId().toString()); | ||
builder.timeField("time", new Date()); | ||
builder.field("action", action); | ||
} | ||
|
||
builder.endObject(); | ||
IndexRequest req = new IndexRequest(this.indexname +"-"+sdf.format(new Date())) | ||
.id(UUID.randomUUID().toString()) | ||
.source(builder) | ||
.timeout(TimeValue.timeValueSeconds(5) | ||
); | ||
client.index(req, RequestOptions.DEFAULT); | ||
} | ||
} |
17 changes: 17 additions & 0 deletions
17
modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/00_logger_elastic.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<logger name="Q2" class="org.jpos.q2.qbean.LoggerAdaptor"> | ||
<!-- | ||
<log-listener class="org.jpos.util.SimpleLogListener" /> | ||
<log-listener class="org.jpos.util.BufferedLogListener"> | ||
<property name="max-size" value="100" /> | ||
<property name="name" value="logger.Q2.buffered" /> | ||
</log-listener> | ||
--> | ||
<log-listener class="org.jpos.elasticsearch.ElasticSearchLogListener"> | ||
<property name="queue" value="logger.elastic" /> | ||
<property name="space" value="tspace:elastic" /> | ||
<property name="index-name" value="jpos" /> | ||
<property name="frozen" value="true" /> | ||
</log-listener> | ||
</logger> | ||
|
13 changes: 13 additions & 0 deletions
13
...elasticsearch/src/main/resources/META-INF/q2/installs/deploy/01_logger_elastic_daemon.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<logger name="DAEMON" class="org.jpos.q2.qbean.LoggerAdaptor"> | ||
<elastic class='org.jpos.elasticsearch.ElasticSearchService' logger='Q2'> | ||
<property name="url" value="http://localhost:9200" /> | ||
<property name="queue" value="logger.elastic" /> | ||
<property name="space" value="tspace:elastic" /> | ||
<!-- | ||
<property name="user" value="user" /> | ||
<property name="password" value="password" /> | ||
<property name="index-name" value="jpos" /> | ||
--> | ||
</elastic> | ||
</logger> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters