Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
We must handle all exceptions while intercepting ES requests (#99)
Browse files Browse the repository at this point in the history
* Making sure that we don't throw exceptions while intercepting ES requests

PerformanceAnalyzer intercepts various ES request paths toget detailed metrics. But today if we throw an exception, then it will bubble all the way upto ES and fail the request.

* Addressing the PR comments

* Updating the .gitignore

* style changes
  • Loading branch information
yojs authored May 15, 2020
1 parent 5db7fc8 commit 8b6e446
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
*.iml
licenses/performanceanalyzer-rca-1.3.jar.sha1s
2 changes: 1 addition & 1 deletion licenses/performanceanalyzer-rca-1.3.jar.sha1
Original file line number Diff line number Diff line change
@@ -1 +1 @@
67a91489b55ec6cd563c823e6790a8909bd334c3
81381772c2fea1f20c8d4050b70e11c36e881752
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -165,11 +167,16 @@ public void clusterChanged(final ClusterChangedEvent event) {
* @param settingValue The new value for the setting.
*/
private void callListeners(final Setting<Integer> setting, int settingValue) {
final List<ClusterSettingListener<Integer>> listeners = listenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<Integer> listener : listeners) {
listener.onSettingUpdate(settingValue);
try {
final List<ClusterSettingListener<Integer>> listeners = listenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<Integer> listener : listeners) {
listener.onSettingUpdate(settingValue);
}
}
} catch(Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.listener;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.shard.SearchOperationListener;
Expand Down Expand Up @@ -51,32 +53,62 @@ private SearchListener getSearchListener() {

@Override
public void onPreQueryPhase(SearchContext searchContext) {
getSearchListener().preQueryPhase(searchContext);
try {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
getSearchListener().queryPhase(searchContext, tookInNanos);
try {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
getSearchListener().failedQueryPhase(searchContext);
try {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
getSearchListener().preFetchPhase(searchContext);
try {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
getSearchListener().fetchPhase(searchContext, tookInNanos);
try {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
getSearchListener().failedFetchPhase(searchContext);
try {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.transport;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -43,7 +45,7 @@ PerformanceAnalyzerTransportRequestHandler<T> set(TransportRequestHandler<T> act

@Override
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
actualHandler.messageReceived(request, getChannel(request, channel, task) , task);
actualHandler.messageReceived(request, getChannel(request, channel, task), task);
}

private TransportChannel getChannel(T request, TransportChannel channel, Task task) {
Expand Down Expand Up @@ -78,7 +80,19 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel

BulkShardRequest bsr = (BulkShardRequest) transportRequest;
PerformanceAnalyzerTransportChannel performanceanalyzerChannel = new PerformanceAnalyzerTransportChannel();
performanceanalyzerChannel.set(channel, System.currentTimeMillis(), bsr.index(), bsr.shardId().id(), bsr.items().length, bPrimary);

try {
performanceanalyzerChannel.set(
channel,
System.currentTimeMillis(),
bsr.index(),
bsr.shardId().id(),
bsr.items().length,
bPrimary);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}

return performanceanalyzerChannel;
}
Expand Down

0 comments on commit 8b6e446

Please sign in to comment.