Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

We must handle all exceptions while intercepting ES requests #99

Merged
merged 4 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
*.iml
licenses/performanceanalyzer-rca-1.3.jar.sha1s
2 changes: 1 addition & 1 deletion licenses/performanceanalyzer-rca-1.3.jar.sha1
Original file line number Diff line number Diff line change
@@ -1 +1 @@
67a91489b55ec6cd563c823e6790a8909bd334c3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this SHA1 needs to change.

81381772c2fea1f20c8d4050b70e11c36e881752
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.amazon.opendistro.elasticsearch.performanceanalyzer.config.setting;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -165,11 +167,16 @@ public void clusterChanged(final ClusterChangedEvent event) {
* @param settingValue The new value for the setting.
*/
private void callListeners(final Setting<Integer> setting, int settingValue) {
final List<ClusterSettingListener<Integer>> listeners = listenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<Integer> listener : listeners) {
listener.onSettingUpdate(settingValue);
try {
final List<ClusterSettingListener<Integer>> listeners = listenerMap.get(setting);
if (listeners != null) {
for (ClusterSettingListener<Integer> listener : listeners) {
listener.onSettingUpdate(settingValue);
}
}
} catch(Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.listener;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.shard.SearchOperationListener;
Expand Down Expand Up @@ -51,32 +53,62 @@ private SearchListener getSearchListener() {

@Override
public void onPreQueryPhase(SearchContext searchContext) {
getSearchListener().preQueryPhase(searchContext);
try {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
getSearchListener().queryPhase(searchContext, tookInNanos);
try {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
getSearchListener().failedQueryPhase(searchContext);
try {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
getSearchListener().preFetchPhase(searchContext);
try {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
getSearchListener().fetchPhase(searchContext, tookInNanos);
try {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
getSearchListener().failedFetchPhase(searchContext);
try {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.amazon.opendistro.elasticsearch.performanceanalyzer.transport;

import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatExceptionCode;
import com.amazon.opendistro.elasticsearch.performanceanalyzer.collectors.StatsCollector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -43,7 +45,7 @@ PerformanceAnalyzerTransportRequestHandler<T> set(TransportRequestHandler<T> act

@Override
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
actualHandler.messageReceived(request, getChannel(request, channel, task) , task);
actualHandler.messageReceived(request, getChannel(request, channel, task), task);
}

private TransportChannel getChannel(T request, TransportChannel channel, Task task) {
Expand Down Expand Up @@ -78,7 +80,19 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel

BulkShardRequest bsr = (BulkShardRequest) transportRequest;
PerformanceAnalyzerTransportChannel performanceanalyzerChannel = new PerformanceAnalyzerTransportChannel();
performanceanalyzerChannel.set(channel, System.currentTimeMillis(), bsr.index(), bsr.shardId().id(), bsr.items().length, bPrimary);

try {
performanceanalyzerChannel.set(
channel,
System.currentTimeMillis(),
bsr.index(),
bsr.shardId().id(),
bsr.items().length,
bPrimary);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(StatExceptionCode.ES_REQUEST_INTERCEPTOR_ERROR);
}

return performanceanalyzerChannel;
}
Expand Down