Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x][Transform] improve performance by using point in time API for search #75333

Merged
merged 1 commit into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public final class TransformServices {

public TransformServices(
TransformConfigManager transformConfigManager,
TransformCheckpointService checkpointProvider,
TransformCheckpointService checkpointService,
TransformAuditor transformAuditor,
SchedulerEngine schedulerEngine
) {
this.configManager = Objects.requireNonNull(transformConfigManager);
this.checkpointService = Objects.requireNonNull(checkpointProvider);
this.checkpointService = Objects.requireNonNull(checkpointService);
this.auditor = Objects.requireNonNull(transformAuditor);
this.schedulerEngine = Objects.requireNonNull(schedulerEngine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeAction;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand All @@ -42,10 +50,9 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.Collection;
Expand All @@ -57,21 +64,24 @@

class ClientTransformIndexer extends TransformIndexer {

private static final TimeValue PIT_KEEP_ALIVE = TimeValue.timeValueSeconds(30);
private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class);

private final Client client;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
private volatile PointInTimeBuilder pit;
private volatile long pitCheckpoint;
private volatile boolean disablePit = false;

ClientTransformIndexer(
ThreadPool threadPool,
TransformConfigManager transformsConfigManager,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
Client client,
TransformAuditor auditor,
TransformIndexerStats initialStats,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
Expand All @@ -84,9 +94,8 @@ class ClientTransformIndexer extends TransformIndexer {
) {
super(
ExceptionsHelper.requireNonNull(threadPool, "threadPool"),
transformsConfigManager,
transformServices,
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
Expand All @@ -111,13 +120,14 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId()));
return;
}
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,

if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) {
closePointInTime();
}

injectPointInTimeIfNeeded(
buildSearchRequest(),
nextPhase
ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure)
);
}

Expand Down Expand Up @@ -440,6 +450,134 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
return seqNoPrimaryTermAndIndex.get();
}

@Override
protected void afterFinishOrFailure() {
closePointInTime();
super.afterFinishOrFailure();
}

@Override
protected void onStop() {
closePointInTime();
super.onStop();
}

private void closePointInTime() {
if (pit == null) {
return;
}

String oldPit = pit.getEncodedId();
pit = null;
ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
ClosePointInTimeAction.INSTANCE,
closePitRequest,
ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> {
// note: closing the pit should never throw, even if the pit is invalid
logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e);
})
);
}

private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener<SearchRequest> listener) {
if (disablePit) {
listener.onResponse(searchRequest);
return;
}

if (pit != null) {
searchRequest.source().pointInTimeBuilder(pit);
listener.onResponse(searchRequest);
return;
}

// no pit, create a new one
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
OpenPointInTimeAction.INSTANCE,
pitRequest,
ActionListener.wrap(response -> {
pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
searchRequest.source().pointInTimeBuilder(pit);
pitCheckpoint = getNextCheckpoint().getCheckpoint();
logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId());
listener.onResponse(searchRequest);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
// if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
// try)
if (unwrappedException instanceof ActionNotFoundTransportException) {
logger.warn(
"[{}] source does not support point in time reader, falling back to normal search (more resource intensive)",
getJobId()
);
auditor.warning(
getJobId(),
"Source does not support point in time reader, falling back to normal search (more resource intensive)"
);
disablePit = true;
} else {
logger.warn(
new ParameterizedMessage(
"[{}] Failed to create a point in time reader, falling back to normal search.",
getJobId()
),
e
);
}
listener.onResponse(searchRequest);
})
);
}

private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
logger.trace("searchRequest: {}", searchRequest);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
ActionListener.wrap(response -> {
// did the pit change?
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
logger.trace("point in time handle has changed");
}

listener.onResponse(response);
}, e -> {
// check if the error has been caused by a missing search context, which could be a timed out pit
// re-try this search without pit, if it fails again the normal failure handler is called, if it
// succeeds a new pit gets created at the next run
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
if (unwrappedException instanceof SearchContextMissingException) {
logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e);
pit = null;
searchRequest.source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
listener
);
}
listener.onFailure(e);
})
);
}

private static String getBulkIndexDetailedFailureMessage(String prefix, Map<String, BulkItemResponse> failures) {
if (failures.isEmpty()) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

class ClientTransformIndexerBuilder {
private ParentTaskAssigningClient parentTaskClient;
private TransformConfigManager transformsConfigManager;
private TransformCheckpointService transformsCheckpointService;
private TransformAuditor auditor;
private TransformServices transformServices;
private Map<String, String> fieldMappings;
private TransformConfig transformConfig;
private TransformIndexerStats initialStats;
Expand All @@ -45,16 +41,16 @@ class ClientTransformIndexerBuilder {
}

ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {
CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig);
CheckpointProvider checkpointProvider = transformServices.getCheckpointService()
.getCheckpointProvider(parentTaskClient, transformConfig);

return new ClientTransformIndexer(
threadPool,
transformsConfigManager,
transformServices,
checkpointProvider,
new AtomicReference<>(this.indexerState),
initialPosition,
parentTaskClient,
auditor,
initialStats,
transformConfig,
fieldMappings,
Expand All @@ -77,18 +73,8 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie
return this;
}

ClientTransformIndexerBuilder setTransformsConfigManager(TransformConfigManager transformsConfigManager) {
this.transformsConfigManager = transformsConfigManager;
return this;
}

ClientTransformIndexerBuilder setTransformsCheckpointService(TransformCheckpointService transformsCheckpointService) {
this.transformsCheckpointService = transformsCheckpointService;
return this;
}

ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) {
this.auditor = auditor;
ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) {
this.transformServices = transformServices;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
Expand Down Expand Up @@ -112,8 +113,8 @@ private enum RunState {

private volatile Integer initialConfiguredPageSize;
private volatile int pageSize = 0;
private long logEvery = 1;
private long logCount = 0;
private volatile long logEvery = 1;
private volatile long logCount = 0;
private volatile TransformCheckpoint lastCheckpoint;
private volatile TransformCheckpoint nextCheckpoint;

Expand All @@ -128,9 +129,8 @@ private enum RunState {

public TransformIndexer(
ThreadPool threadPool,
TransformConfigManager transformsConfigManager,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
TransformAuditor auditor,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
Expand All @@ -142,16 +142,16 @@ public TransformIndexer(
TransformContext context
) {
super(threadPool, initialState, initialPosition, jobStats);
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
ExceptionsHelper.requireNonNull(transformServices, "transformServices");
this.transformsConfigManager = transformServices.getConfigManager();
this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider");
this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor");
this.auditor = transformServices.getAuditor();
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint");
this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint");
this.context = ExceptionsHelper.requireNonNull(context, "context");

// give runState a default
this.runState = RunState.APPLY_RESULTS;

Expand Down
Loading