From 63b619a09f27ba7895ce551903681569dcad1965 Mon Sep 17 00:00:00 2001 From: Galih Lasahido Date: Mon, 6 Jul 2020 11:09:58 +0700 Subject: [PATCH] add elastic search log integration --- libraries.gradle | 7 +- modules/elasticsearch/build.gradle | 11 + .../ElasticSearchLogListener.java | 82 +++++++ .../elasticsearch/ElasticSearchService.java | 213 ++++++++++++++++++ .../q2/installs/deploy/00_logger_elastic.xml | 17 ++ .../deploy/01_logger_elastic_daemon.xml | 13 ++ settings.gradle | 3 +- 7 files changed, 344 insertions(+), 2 deletions(-) create mode 100644 modules/elasticsearch/build.gradle create mode 100644 modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchLogListener.java create mode 100644 modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchService.java create mode 100644 modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/00_logger_elastic.xml create mode 100644 modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/01_logger_elastic_daemon.xml diff --git a/libraries.gradle b/libraries.gradle index 048d05ebb5..c20e1f6aca 100644 --- a/libraries.gradle +++ b/libraries.gradle @@ -23,6 +23,8 @@ ext { postgresJDBCVersion = '42.2.11' flywaydbVersion = '6.0.4' liquibaseVersion = '3.8.0' + elkVersion = '7.8.0' + orgjsonVersion = '20200518' libraries = [ //jUnit (Tests) @@ -120,7 +122,10 @@ ext { guava: "com.google.guava:guava:${guavaVersion}", httpAsyncClient: "org.apache.httpcomponents:httpasyncclient:${httpAsyncClientVersion}", flywaydb: "org.flywaydb:flyway-core:${flywaydbVersion}", - liquibase: "org.liquibase:liquibase-core:${liquibaseVersion}" + liquibase: "org.liquibase:liquibase-core:${liquibaseVersion}", + + elk: "org.elasticsearch.client:elasticsearch-rest-high-level-client:${elkVersion}", + orgJson: "org.json:json:${orgjsonVersion}" ] jsonSchemaValidatorLibs = [ diff --git a/modules/elasticsearch/build.gradle b/modules/elasticsearch/build.gradle new file mode 100644 index 0000000000..5447b058bc --- /dev/null +++ b/modules/elasticsearch/build.gradle @@ -0,0 +1,11 @@ +description = 'jPOS-EE :: Elastic search Support Module' +group = 'org.jpos.elk' + +dependencies { + implementation libraries.orgJson + implementation libraries.jpos + implementation libraries.elk +} + +apply from: "${rootProject.projectDir}/jpos-app.gradle" + diff --git a/modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchLogListener.java b/modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchLogListener.java new file mode 100644 index 0000000000..d2ea45d983 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchLogListener.java @@ -0,0 +1,82 @@ +package org.jpos.elasticsearch; + +import org.jpos.core.Configurable; +import org.jpos.core.Configuration; +import org.jpos.core.ConfigurationException; +import org.jpos.space.Space; +import org.jpos.space.SpaceError; +import org.jpos.space.SpaceFactory; +import org.jpos.space.TSpace; +import org.jpos.util.FrozenLogEvent; +import org.jpos.util.LogEvent; +import org.jpos.util.LogListener; +import org.json.JSONObject; +import org.json.XML; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +public class ElasticSearchLogListener implements LogListener, Configurable { + String queueName; + String indexName; + Space sp; + Space buffer; + long timeout = 3000L; + boolean frozen = true; + Configuration cfg; + + @SuppressWarnings("unused") + public ElasticSearchLogListener() { + super(); + } + + @SuppressWarnings("unchecked") + public synchronized LogEvent log(LogEvent ev) { + LogEvent e = frozen ? new FrozenLogEvent(ev) : ev; + ByteArrayOutputStream os = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(os); + e.dump(ps,""); + JSONObject jsonObject= XML.toJSONObject(os.toString(), true); + + try { + getSpace().out(queueName, jsonObject, timeout); + return null; + } catch (Throwable t) { + ev.addMessage("Unable to log to " + queueName); + return ev; + } + } + + /** + * @param cfg Configuration object + * @throws ConfigurationException + */ + @Override + public void setConfiguration(Configuration cfg) throws ConfigurationException { + this.cfg = cfg; + queueName = cfg.get("queue", null); + indexName = cfg.get("index-name", "jpos"); + if (queueName == null) + throw new ConfigurationException("'queue' property not configured"); + + timeout = cfg.getLong("timeout", timeout); + frozen = cfg.getBoolean("frozen", true); + } + + @SuppressWarnings("unchecked") + private Space getSpace() { + if (sp == null) { + try { + sp = SpaceFactory.getSpace(cfg.get("space")); + if (buffer != null) { + while (buffer.rdp(queueName) != null) + sp.out (queueName, buffer.inp(queueName)); + buffer = null; + } + } catch (SpaceError e) { + return (buffer = new TSpace()); + } + } + return sp; + } +} diff --git a/modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchService.java b/modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchService.java new file mode 100644 index 0000000000..314861a59f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/jpos/elasticsearch/ElasticSearchService.java @@ -0,0 +1,213 @@ +/* + * jPOS Project [http://jpos.org] + * Copyright (C) 2000-2019 jPOS Software SRL + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jpos.elasticsearch; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.jpos.core.Configuration; +import org.jpos.core.ConfigurationException; +import org.jpos.q2.QBeanSupport; +import org.jpos.space.Space; +import org.jpos.space.SpaceFactory; +import org.jpos.util.NameRegistrar; +import org.json.JSONArray; +import org.json.JSONObject; + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.TimeZone; +import java.util.UUID; + +public class ElasticSearchService extends QBeanSupport implements Runnable { + private RestHighLevelClient client; + private String queue; + private Space sp; + private String indexname; + + public RestHighLevelClient getClient() { + return client; + } + + @Override + @SuppressWarnings("unchecked") + public void run() { + boolean stopping = false; + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + while (running() && !stopping) { + try { + BulkRequest br = new BulkRequest(); + Object obj = sp.rd (queue); + while ((obj = sp.inp (queue)) != null) { + SimpleDateFormat sdfat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); + SimpleDateFormat sdfatufc = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + sdfatufc.setTimeZone(TimeZone.getTimeZone("UTC")); + + if (obj instanceof JSONObject) { + JSONObject jsonObject = (JSONObject) obj; + JSONObject jsonLog = jsonObject.getJSONObject("log"); + String direction = jsonLog.keySet().stream().filter(item->item.matches("receive|send")).findAny().orElse(""); + if(!jsonLog.isNull("lifespan")) { + jsonLog.put("lifespan-int", Integer.valueOf(jsonLog.get("lifespan").toString().replaceAll("ms", ""))); + } + + if(!jsonLog.isNull("at")) { + jsonObject.put("time-utc", sdfatufc.format(sdfat.parse(jsonLog.get("at").toString()))); + } + + if(direction!="") { + JSONObject receiveLog = jsonLog.getJSONObject(direction); + if(!receiveLog.isNull("isomsg")) { + jsonLog.put("direction", direction); + JSONObject isomsgLog = receiveLog.getJSONObject("isomsg"); + JSONArray fieldLog = isomsgLog.getJSONArray("field"); + for (int i = 0; i < fieldLog.length(); i++) { + jsonLog.put("field" + fieldLog.getJSONObject(i).get("id"), fieldLog.getJSONObject(i).get("value")); + } + jsonLog.remove(direction); + jsonObject.put("action", direction + " message"); + } + } else { + jsonObject.put("action", "logging"); + } + + + IndexRequest req = new IndexRequest( indexname+"-"+sdf.format(new Date())) + .id(UUID.randomUUID().toString()) + .source(jsonObject.toString(), XContentType.JSON) + .timeout(TimeValue.timeValueSeconds(5)); + + br.add(req); + } else { + stopping = true; + break; + } + } + if (br.estimatedSizeInBytes() > 0) { + BulkResponse bulkResponse = client.bulk(br, RequestOptions.DEFAULT); + if(bulkResponse.hasFailures()) { + getLog().warn (bulkResponse.buildFailureMessage()); + } + } + } catch (IOException | ParseException e) { + getLog().warn (e); + } + } + } + + @Override + protected void initService() { +// String[] urls = cfg.getAll("url"); +// HttpHost[] hosts = new HttpHost[urls.length]; +// for (int i=0; i httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider)); + } + client = new RestHighLevelClient(builder); + NameRegistrar.register(getName(), this); + } + + @Override + protected void startService() { + if (queue != null) + new Thread(this, getName()).start(); + try { + index("start"); + } catch (Exception e) { + getLog().warn(e); + } + } + + @Override + protected void stopService() { + try { + if (queue != null) + sp.out (queue, Boolean.FALSE); + + index("stop"); + client.close(); + } catch (IOException e) { + getLog().warn(e); + } + } + + @Override + protected void destroyService() { + NameRegistrar.unregister(getName()); + } + + @Override + public void setConfiguration (Configuration cfg) throws ConfigurationException { + super.setConfiguration(cfg); + queue = cfg.get("queue", null); + this.indexname = cfg.get("index-name", "jpos"); + if (queue != null) { + sp = SpaceFactory.getSpace(cfg.get("space")); + } + } + + private void index(String action) throws IOException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + { + builder.field("instance", getServer().getInstanceId().toString()); + builder.timeField("time", new Date()); + builder.field("action", action); + } + + builder.endObject(); + IndexRequest req = new IndexRequest(this.indexname +"-"+sdf.format(new Date())) + .id(UUID.randomUUID().toString()) + .source(builder) + .timeout(TimeValue.timeValueSeconds(5) + ); + client.index(req, RequestOptions.DEFAULT); + } +} diff --git a/modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/00_logger_elastic.xml b/modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/00_logger_elastic.xml new file mode 100644 index 0000000000..a47b65c7a0 --- /dev/null +++ b/modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/00_logger_elastic.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + diff --git a/modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/01_logger_elastic_daemon.xml b/modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/01_logger_elastic_daemon.xml new file mode 100644 index 0000000000..fe1094edef --- /dev/null +++ b/modules/elasticsearch/src/main/resources/META-INF/q2/installs/deploy/01_logger_elastic_daemon.xml @@ -0,0 +1,13 @@ + + + + + + + + + \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 9b1a6e3192..673ae322cc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -43,7 +43,8 @@ include ':modules:core', ':modules:iso-http-servlet', ':modules:http-client', ':modules:seqno', - ':modules:db-flyway' + ':modules:db-flyway', + ':modules:elasticsearch' rootProject.name = 'jposee'