Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bubble exceptions up in ClusterApplierService #37729

Merged
merged 8 commits into from
Jan 24, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -390,31 +390,24 @@ protected void runTask(UpdateTask task) {
newClusterState = task.apply(previousClusterState);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}{}{}",
executionTime,
previousClusterState.version(),
task.source,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()),
e);
}
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onFailure(task.source, e);
return;
}

if (previousClusterState == newClusterState) {
task.listener.onSuccess(task.source);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
} else {
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", task.source, newClusterState);
} else if (logger.isDebugEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,
newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
}
try {
Expand All @@ -424,20 +417,19 @@ protected void runTask(UpdateTask task) {
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source);
task.listener.onSuccess(task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
final long version = newClusterState.version();
final String stateUUID = newClusterState.stateUUID();
final String fullState = newClusterState.toString();
logger.warn(() -> new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime,
version,
stateUUID,
task.source,
fullState),
e);
// TODO: do we want to call updateTask.onFailure here?
if (logger.isTraceEnabled()) {
logger.warn(new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}",
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source, newClusterState), e);
} else {
logger.warn(new ParameterizedMessage(
"failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]",
executionTime, newClusterState.version(), newClusterState.stateUUID(), task.source), e);
}
task.listener.onFailure(task.source, e);
}
}
}
Expand All @@ -454,17 +446,14 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
}
}

logger.trace("connecting to nodes of cluster state with version {}", newClusterState.version());
nodeConnectionsService.connectToNodes(newClusterState.nodes());

logger.debug("applying cluster state version {}", newClusterState.version());
try {
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
}
} catch (Exception ex) {
logger.warn("failed to apply cluster settings", ex);
// nothing to do until we actually recover from the gateway or any other block indicates we need to disable persistency
if (clusterChangedEvent.state().blocks().disableStatePersistence() == false && clusterChangedEvent.metaDataChanged()) {
logger.debug("applying settings from cluster state with version {}", newClusterState.version());
final Settings incomingSettings = clusterChangedEvent.state().metaData().settings();
clusterSettings.applySettings(incomingSettings);
}

logger.debug("apply cluster state with version {}", newClusterState.version());
Expand All @@ -476,18 +465,12 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
state.set(newClusterState);

callClusterStateListeners(clusterChangedEvent);

task.listener.onSuccess(task.source);
}

private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
clusterStateAppliers.forEach(applier -> {
try {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
applier.applyClusterState(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateApplier", ex);
}
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
applier.applyClusterState(clusterChangedEvent);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class IngestService implements ClusterStateApplier {

public static final String NOOP_PIPELINE_NAME = "_none";

private static final Logger logger = LogManager.getLogger(IngestService.class);

private final ClusterService clusterService;
private final ScriptService scriptService;
private final Map<String, Processor.Factory> processorFactories;
Expand Down Expand Up @@ -256,7 +260,11 @@ Map<String, Pipeline> pipelines() {
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
Map<String, Pipeline> originalPipelines = pipelines;
innerUpdatePipelines(event.previousState(), state);
try {
innerUpdatePipelines(event.previousState(), state);
} catch (ElasticsearchParseException e) {
logger.warn("failed to update ingest pipelines", e);
}
//pipelines changed, so add the old metrics to the new metrics
if (originalPipelines != pipelines) {
pipelines.forEach((id, pipeline) -> {
Expand Down
Loading