Skip to content

Commit

Permalink
[7.x][Transform] improve performance by using point in time API for s…
Browse files Browse the repository at this point in the history
…earch (#75333)

Use point in time API for every checkpoint in transform. Using point in time reduces pressure
on the source indexes, e.g. less refreshes. In case, pit isn't supported (e.g. when searching
remote clusters) it falls back to ordinary search requests as before.

closes #73481
backport #74984
  • Loading branch information
Hendrik Muhs authored Jul 14, 2021
1 parent ce4e83c commit f314465
Show file tree
Hide file tree
Showing 10 changed files with 552 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public final class TransformServices {

public TransformServices(
TransformConfigManager transformConfigManager,
TransformCheckpointService checkpointProvider,
TransformCheckpointService checkpointService,
TransformAuditor transformAuditor,
SchedulerEngine schedulerEngine
) {
this.configManager = Objects.requireNonNull(transformConfigManager);
this.checkpointService = Objects.requireNonNull(checkpointProvider);
this.checkpointService = Objects.requireNonNull(checkpointService);
this.auditor = Objects.requireNonNull(transformAuditor);
this.schedulerEngine = Objects.requireNonNull(schedulerEngine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeAction;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand All @@ -42,10 +50,9 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.Collection;
Expand All @@ -57,21 +64,24 @@

class ClientTransformIndexer extends TransformIndexer {

private static final TimeValue PIT_KEEP_ALIVE = TimeValue.timeValueSeconds(30);
private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class);

private final Client client;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
private volatile PointInTimeBuilder pit;
private volatile long pitCheckpoint;
private volatile boolean disablePit = false;

ClientTransformIndexer(
ThreadPool threadPool,
TransformConfigManager transformsConfigManager,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
Client client,
TransformAuditor auditor,
TransformIndexerStats initialStats,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
Expand All @@ -84,9 +94,8 @@ class ClientTransformIndexer extends TransformIndexer {
) {
super(
ExceptionsHelper.requireNonNull(threadPool, "threadPool"),
transformsConfigManager,
transformServices,
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
Expand All @@ -111,13 +120,14 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId()));
return;
}
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,

if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) {
closePointInTime();
}

injectPointInTimeIfNeeded(
buildSearchRequest(),
nextPhase
ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure)
);
}

Expand Down Expand Up @@ -440,6 +450,134 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
return seqNoPrimaryTermAndIndex.get();
}

@Override
protected void afterFinishOrFailure() {
closePointInTime();
super.afterFinishOrFailure();
}

@Override
protected void onStop() {
closePointInTime();
super.onStop();
}

private void closePointInTime() {
if (pit == null) {
return;
}

String oldPit = pit.getEncodedId();
pit = null;
ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
ClosePointInTimeAction.INSTANCE,
closePitRequest,
ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> {
// note: closing the pit should never throw, even if the pit is invalid
logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e);
})
);
}

private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener<SearchRequest> listener) {
if (disablePit) {
listener.onResponse(searchRequest);
return;
}

if (pit != null) {
searchRequest.source().pointInTimeBuilder(pit);
listener.onResponse(searchRequest);
return;
}

// no pit, create a new one
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
OpenPointInTimeAction.INSTANCE,
pitRequest,
ActionListener.wrap(response -> {
pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
searchRequest.source().pointInTimeBuilder(pit);
pitCheckpoint = getNextCheckpoint().getCheckpoint();
logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId());
listener.onResponse(searchRequest);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
// if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
// try)
if (unwrappedException instanceof ActionNotFoundTransportException) {
logger.warn(
"[{}] source does not support point in time reader, falling back to normal search (more resource intensive)",
getJobId()
);
auditor.warning(
getJobId(),
"Source does not support point in time reader, falling back to normal search (more resource intensive)"
);
disablePit = true;
} else {
logger.warn(
new ParameterizedMessage(
"[{}] Failed to create a point in time reader, falling back to normal search.",
getJobId()
),
e
);
}
listener.onResponse(searchRequest);
})
);
}

private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
logger.trace("searchRequest: {}", searchRequest);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
ActionListener.wrap(response -> {
// did the pit change?
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
logger.trace("point in time handle has changed");
}

listener.onResponse(response);
}, e -> {
// check if the error has been caused by a missing search context, which could be a timed out pit
// re-try this search without pit, if it fails again the normal failure handler is called, if it
// succeeds a new pit gets created at the next run
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
if (unwrappedException instanceof SearchContextMissingException) {
logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e);
pit = null;
searchRequest.source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
listener
);
}
listener.onFailure(e);
})
);
}

private static String getBulkIndexDetailedFailureMessage(String prefix, Map<String, BulkItemResponse> failures) {
if (failures.isEmpty()) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

class ClientTransformIndexerBuilder {
private ParentTaskAssigningClient parentTaskClient;
private TransformConfigManager transformsConfigManager;
private TransformCheckpointService transformsCheckpointService;
private TransformAuditor auditor;
private TransformServices transformServices;
private Map<String, String> fieldMappings;
private TransformConfig transformConfig;
private TransformIndexerStats initialStats;
Expand All @@ -45,16 +41,16 @@ class ClientTransformIndexerBuilder {
}

ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {
CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig);
CheckpointProvider checkpointProvider = transformServices.getCheckpointService()
.getCheckpointProvider(parentTaskClient, transformConfig);

return new ClientTransformIndexer(
threadPool,
transformsConfigManager,
transformServices,
checkpointProvider,
new AtomicReference<>(this.indexerState),
initialPosition,
parentTaskClient,
auditor,
initialStats,
transformConfig,
fieldMappings,
Expand All @@ -77,18 +73,8 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie
return this;
}

ClientTransformIndexerBuilder setTransformsConfigManager(TransformConfigManager transformsConfigManager) {
this.transformsConfigManager = transformsConfigManager;
return this;
}

ClientTransformIndexerBuilder setTransformsCheckpointService(TransformCheckpointService transformsCheckpointService) {
this.transformsCheckpointService = transformsCheckpointService;
return this;
}

ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) {
this.auditor = auditor;
ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) {
this.transformServices = transformServices;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
Expand Down Expand Up @@ -112,8 +113,8 @@ private enum RunState {

private volatile Integer initialConfiguredPageSize;
private volatile int pageSize = 0;
private long logEvery = 1;
private long logCount = 0;
private volatile long logEvery = 1;
private volatile long logCount = 0;
private volatile TransformCheckpoint lastCheckpoint;
private volatile TransformCheckpoint nextCheckpoint;

Expand All @@ -128,9 +129,8 @@ private enum RunState {

public TransformIndexer(
ThreadPool threadPool,
TransformConfigManager transformsConfigManager,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
TransformAuditor auditor,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
Expand All @@ -142,16 +142,16 @@ public TransformIndexer(
TransformContext context
) {
super(threadPool, initialState, initialPosition, jobStats);
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
ExceptionsHelper.requireNonNull(transformServices, "transformServices");
this.transformsConfigManager = transformServices.getConfigManager();
this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider");
this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor");
this.auditor = transformServices.getAuditor();
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint");
this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint");
this.context = ExceptionsHelper.requireNonNull(context, "context");

// give runState a default
this.runState = RunState.APPLY_RESULTS;

Expand Down
Loading

0 comments on commit f314465

Please sign in to comment.