Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Add bulk indexing to result index handler #268

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,44 @@

public class AnomalyIndexHandler<T extends ToXContentObject> {
private static final Logger LOG = LogManager.getLogger(AnomalyIndexHandler.class);

static final String CANNOT_SAVE_ERR_MSG = "Cannot save %s due to write block.";
static final String FAIL_TO_SAVE_ERR_MSG = "Fail to save %s: ";
static final String RETRY_SAVING_ERR_MSG = "Retry in saving %s: ";
static final String SUCCESS_SAVING_MSG = "Succeed in saving %s";
static final String CANNOT_SAVE_ERR_MSG = "Cannot save %s due to write block.";
static final String RETRY_SAVING_ERR_MSG = "Retry in saving %s: ";

protected final Client client;

private final ThreadPool threadPool;
private final BackoffPolicy savingBackoffPolicy;
protected final ThreadPool threadPool;
protected final BackoffPolicy savingBackoffPolicy;
protected final String indexName;
private final Consumer<ActionListener<CreateIndexResponse>> createIndex;
private final BooleanSupplier indexExists;
// whether save to a specific doc id or not
private final boolean fixedDoc;
protected final Consumer<ActionListener<CreateIndexResponse>> createIndex;
protected final BooleanSupplier indexExists;
// whether save to a specific doc id or not. False by default.
protected boolean fixedDoc;
protected final ClientUtil clientUtil;
private final IndexUtils indexUtils;
private final ClusterService clusterService;

protected final IndexUtils indexUtils;
protected final ClusterService clusterService;

/**
* Abstract class for index operation.
*
* @param client client to Elasticsearch query
* @param settings accessor for node settings.
* @param threadPool used to invoke specific threadpool to execute
* @param indexName name of index to save to
* @param createIndex functional interface to create the index to save to
* @param indexExists funcitonal interface to find out if the index exists
* @param clientUtil client wrapper
* @param indexUtils Index util classes
* @param clusterService accessor to ES cluster service
*/
public AnomalyIndexHandler(
Client client,
Settings settings,
ThreadPool threadPool,
String indexName,
Consumer<ActionListener<CreateIndexResponse>> createIndex,
BooleanSupplier indexExists,
boolean fixedDoc,
ClientUtil clientUtil,
IndexUtils indexUtils,
ClusterService clusterService
Expand All @@ -90,12 +101,22 @@ public AnomalyIndexHandler(
this.indexName = indexName;
this.createIndex = createIndex;
this.indexExists = indexExists;
this.fixedDoc = fixedDoc;
this.fixedDoc = false;
this.clientUtil = clientUtil;
this.indexUtils = indexUtils;
this.clusterService = clusterService;
}

/**
* Since the constructor needs to provide injected value and Guice does not allow Boolean to be there
* (claiming it does not know how to instantiate it), caller needs to manually set it to true if
* it want to save to a specific doc.
* @param fixedDoc whether to save to a specific doc Id
*/
public void setFixedDoc(boolean fixedDoc) {
weicongs-amazon marked this conversation as resolved.
Show resolved Hide resolved
this.fixedDoc = fixedDoc;
}

public void index(T toSave, String detectorId) {
if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.indexName)) {
LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId));
Expand Down Expand Up @@ -133,7 +154,7 @@ private void onCreateIndexResponse(CreateIndexResponse response, T toSave, Strin
if (response.isAcknowledged()) {
save(toSave, detectorId);
} else {
throw new AnomalyDetectionException(detectorId, "Creating %s with mappings call not acknowledged.");
throw new AnomalyDetectionException(detectorId, String.format("Creating %s with mappings call not acknowledged.", indexName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.NodeStateManager;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.transport.TransportStateManager;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.google.common.base.Objects;
Expand Down Expand Up @@ -79,7 +79,7 @@ public DetectorInternalState createNewState(DetectorInternalState state) {

private static final Logger LOG = LogManager.getLogger(DetectionStateHandler.class);
private NamedXContentRegistry xContentRegistry;
private TransportStateManager adStateManager;
private NodeStateManager adStateManager;

public DetectionStateHandler(
Client client,
Expand All @@ -91,7 +91,7 @@ public DetectionStateHandler(
IndexUtils indexUtils,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
TransportStateManager adStateManager
NodeStateManager adStateManager
) {
super(
client,
Expand All @@ -100,11 +100,11 @@ public DetectionStateHandler(
DetectorInternalState.DETECTOR_STATE_INDEX,
createIndex,
indexExists,
true,
clientUtil,
indexUtils,
clusterService
);
this.fixedDoc = true;
ohltyler marked this conversation as resolved.
Show resolved Hide resolved
this.xContentRegistry = xContentRegistry;
this.adStateManager = adStateManager;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.transport.handler;

import java.time.Clock;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.NodeStateManager;
import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ADResultBulkRequest;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils;
import com.amazon.opendistroforelasticsearch.ad.util.ThrowingConsumerWrapper;

/**
* EntityResultTransportAction depends on this class. I cannot use
* AnomalyIndexHandler &lt; AnomalyResult &gt; . All transport actions
* needs dependency injection. Guice has a hard time initializing generics class
* AnomalyIndexHandler &lt; AnomalyResult &gt; due to type erasure.
* To avoid that, I create a class with a built-in details so
* that Guice would be able to work out the details.
*
*/
public class MultiEntityResultHandler extends AnomalyIndexHandler<AnomalyResult> {
private static final Logger LOG = LogManager.getLogger(MultiEntityResultHandler.class);
private final NodeStateManager nodeStateManager;
private final Clock clock;

@Inject
public MultiEntityResultHandler(
Client client,
Settings settings,
ThreadPool threadPool,
AnomalyDetectionIndices anomalyDetectionIndices,
ClientUtil clientUtil,
IndexUtils indexUtils,
ClusterService clusterService,
NodeStateManager nodeStateManager,
Clock clock
) {
super(
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
clientUtil,
indexUtils,
clusterService
);
this.nodeStateManager = nodeStateManager;
this.clock = clock;
}

/**
* Execute the bulk request
* @param currentBulkRequest The bulk request
* @param detectorId Detector Id
*/
public void flush(ADResultBulkRequest currentBulkRequest, String detectorId) {
if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.indexName)) {
LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId));
return;
}

try {
if (!indexExists.getAsBoolean()) {
createIndex
.accept(
ActionListener
.wrap(initResponse -> onCreateIndexResponse(initResponse, currentBulkRequest, detectorId), exception -> {
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
// It is possible the index has been created while we sending the create request
bulk(currentBulkRequest, detectorId);
} else {
throw new AnomalyDetectionException(
detectorId,
String.format("Unexpected error creating index %s", indexName),
exception
);
}
})
);
} else {
bulk(currentBulkRequest, detectorId);
}
} catch (Exception e) {
throw new AnomalyDetectionException(
detectorId,
String.format(Locale.ROOT, "Error in bulking %s for detector %s", indexName, detectorId),
e
);
}
}

private void onCreateIndexResponse(CreateIndexResponse response, ADResultBulkRequest bulkRequest, String detectorId) {
if (response.isAcknowledged()) {
bulk(bulkRequest, detectorId);
} else {
throw new AnomalyDetectionException(detectorId, "Creating %s with mappings call not acknowledged.");
}
}

private void bulk(ADResultBulkRequest currentBulkRequest, String detectorId) {
if (currentBulkRequest.numberOfActions() <= 0) {
return;
}
client
.execute(
ADResultBulkAction.INSTANCE,
currentBulkRequest,
ActionListener.<BulkResponse>wrap(response -> LOG.debug(String.format(SUCCESS_SAVING_MSG, detectorId)), exception -> {
LOG.error(String.format(FAIL_TO_SAVE_ERR_MSG, detectorId), exception);
Throwable cause = Throwables.getRootCause(exception);
// too much indexing pressure
// TODO: pause indexing a bit before trying again, ideally with randomized exponential backoff.
if (cause instanceof RejectedExecutionException) {
nodeStateManager.setLastIndexThrottledTime(clock.instant());
}
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -53,6 +55,7 @@
import org.mockito.MockitoAnnotations;

import com.amazon.opendistroforelasticsearch.ad.AbstractADTest;
import com.amazon.opendistroforelasticsearch.ad.NodeStateManager;
import com.amazon.opendistroforelasticsearch.ad.TestHelpers;
import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
Expand Down Expand Up @@ -89,6 +92,12 @@ public class AnomalyResultHandlerTests extends AbstractADTest {

private IndexUtils indexUtil;

@Mock
private NodeStateManager nodeStateManager;

@Mock
private Clock clock;

@BeforeClass
public static void setUpBeforeClass() {
setUpThreadPool(AnomalyResultTests.class.getSimpleName());
Expand Down Expand Up @@ -141,7 +150,6 @@ public void testSavingAdResult() throws IOException {
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
clientUtil,
indexUtil,
clusterService
Expand Down Expand Up @@ -179,7 +187,6 @@ public void testIndexWriteBlock() {
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
clientUtil,
indexUtil,
clusterService
Expand All @@ -199,7 +206,6 @@ public void testAdResultIndexExist() throws IOException {
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
clientUtil,
indexUtil,
clusterService
Expand All @@ -221,7 +227,6 @@ public void testAdResultIndexOtherException() throws IOException {
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
clientUtil,
indexUtil,
clusterService
Expand Down Expand Up @@ -300,15 +305,14 @@ private void savingFailureTemplate(boolean throwEsRejectedExecutionException, in
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
ThrowingConsumerWrapper.throwingConsumerWrapper(anomalyDetectionIndices::initAnomalyResultIndexDirectly),
anomalyDetectionIndices::doesAnomalyResultIndexExist,
false,
clientUtil,
indexUtil,
clusterService
);

handler.index(TestHelpers.randomAnomalyDetectResult(), detectorId);

backoffLatch.await();
backoffLatch.await(1, TimeUnit.MINUTES);
}

@SuppressWarnings("unchecked")
Expand All @@ -324,5 +328,4 @@ private void setUpSavingAnomalyResultIndex(boolean anomalyResultIndexExists) thr
}).when(anomalyDetectionIndices).initAnomalyResultIndexDirectly(any());
when(anomalyDetectionIndices.doesAnomalyResultIndexExist()).thenReturn(anomalyResultIndexExists);
}

}
Loading