From 8b6e446ec44bb8b50f65112bb8cd799fa5bf6b80 Mon Sep 17 00:00:00 2001 From: Joydeep Sinha <49728262+yojs@users.noreply.github.com> Date: Fri, 15 May 2020 14:47:50 -0700 Subject: [PATCH] We must handle all exceptions while intercepting ES requests (#99) * Making sure that we don't throw exceptions while intercepting ES requests PerformanceAnalyzer intercepts various ES request paths toget detailed metrics. But today if we throw an exception, then it will bubble all the way upto ES and fail the request. * Addressing the PR comments * Updating the .gitignore * style changes --- .gitignore | 1 + licenses/performanceanalyzer-rca-1.3.jar.sha1 | 2 +- .../setting/ClusterSettingsManager.java | 15 +++++-- .../PerformanceAnalyzerSearchListener.java | 44 ++++++++++++++++--- ...rmanceAnalyzerTransportRequestHandler.java | 18 +++++++- 5 files changed, 67 insertions(+), 13 deletions(-) diff --git a/.gitignore b/.gitignore index 1062418c..fdb6f655 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/ *.iml +licenses/performanceanalyzer-rca-1.3.jar.sha1s \ No newline at end of file diff --git a/licenses/performanceanalyzer-rca-1.3.jar.sha1 b/licenses/performanceanalyzer-rca-1.3.jar.sha1 index 7e552d92..776209ed 100644 --- a/licenses/performanceanalyzer-rca-1.3.jar.sha1 +++ b/licenses/performanceanalyzer-rca-1.3.jar.sha1 @@ -1 +1 @@ -67a91489b55ec6cd563c823e6790a8909bd334c3 \ No newline at end of file +81381772c2fea1f20c8d4050b70e11c36e881752 \ No newline at end of file diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/setting/ClusterSettingsManager.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/setting/ClusterSettingsManager.java index 77dda931..3c9c825e 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/setting/ClusterSettingsManager.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/config/setting/ClusterSettingsManager.java @@ -1,5 +1,7 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -165,11 +167,16 @@ public void clusterChanged(final ClusterChangedEvent event) { * @param settingValue The new value for the setting. */ private void callListeners(final Setting setting, int settingValue) { - final List> listeners = listenerMap.get(setting); - if (listeners != null) { - for (ClusterSettingListener listener : listeners) { - listener.onSettingUpdate(settingValue); + try { + final List> listeners = listenerMap.get(setting); + if (listeners != null) { + for (ClusterSettingListener listener : listeners) { + listener.onSettingUpdate(settingValue); + } } + } catch(Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); } } diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java index 0a869f9a..75f8df3b 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/listener/PerformanceAnalyzerSearchListener.java @@ -15,6 +15,8 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.listener; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.index.shard.SearchOperationListener; @@ -51,32 +53,62 @@ private SearchListener getSearchListener() { @Override public void onPreQueryPhase(SearchContext searchContext) { - getSearchListener().preQueryPhase(searchContext); + try { + getSearchListener().preQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } } @Override public void onQueryPhase(SearchContext searchContext, long tookInNanos) { - getSearchListener().queryPhase(searchContext, tookInNanos); + try { + getSearchListener().queryPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } } @Override public void onFailedQueryPhase(SearchContext searchContext) { - getSearchListener().failedQueryPhase(searchContext); + try { + getSearchListener().failedQueryPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } } @Override public void onPreFetchPhase(SearchContext searchContext) { - getSearchListener().preFetchPhase(searchContext); + try { + getSearchListener().preFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } } @Override public void onFetchPhase(SearchContext searchContext, long tookInNanos) { - getSearchListener().fetchPhase(searchContext, tookInNanos); + try { + getSearchListener().fetchPhase(searchContext, tookInNanos); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } } @Override public void onFailedFetchPhase(SearchContext searchContext) { - getSearchListener().failedFetchPhase(searchContext); + try { + getSearchListener().failedFetchPhase(searchContext); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } } @Override diff --git a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index 31e7d884..35df9b70 100644 --- a/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/com/amazon/opendistro/elasticsearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -15,6 +15,8 @@ package com.amazon.opendistro.elasticsearch.performanceanalyzer.transport; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode; +import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -43,7 +45,7 @@ PerformanceAnalyzerTransportRequestHandler set(TransportRequestHandler act @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { - actualHandler.messageReceived(request, getChannel(request, channel, task) , task); + actualHandler.messageReceived(request, getChannel(request, channel, task), task); } private TransportChannel getChannel(T request, TransportChannel channel, Task task) { @@ -78,7 +80,19 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel BulkShardRequest bsr = (BulkShardRequest) transportRequest; PerformanceAnalyzerTransportChannel performanceanalyzerChannel = new PerformanceAnalyzerTransportChannel(); - performanceanalyzerChannel.set(channel, System.currentTimeMillis(), bsr.index(), bsr.shardId().id(), bsr.items().length, bPrimary); + + try { + performanceanalyzerChannel.set( + channel, + System.currentTimeMillis(), + bsr.index(), + bsr.shardId().id(), + bsr.items().length, + bPrimary); + } catch (Exception ex) { + LOG.error(ex); + StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR); + } return performanceanalyzerChannel; }