Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] improve performance by using point in time API for search #74984

Merged
merged 9 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public final class TransformServices {

public TransformServices(
TransformConfigManager transformConfigManager,
TransformCheckpointService checkpointProvider,
TransformCheckpointService checkpointService,
TransformAuditor transformAuditor,
SchedulerEngine schedulerEngine
) {
this.configManager = Objects.requireNonNull(transformConfigManager);
this.checkpointService = Objects.requireNonNull(checkpointProvider);
this.checkpointService = Objects.requireNonNull(checkpointService);
this.auditor = Objects.requireNonNull(transformAuditor);
this.schedulerEngine = Objects.requireNonNull(schedulerEngine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeAction;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
Expand All @@ -42,10 +50,9 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.util.Collection;
Expand All @@ -57,21 +64,24 @@

class ClientTransformIndexer extends TransformIndexer {

private static final TimeValue PIT_KEEP_ALIVE = TimeValue.timeValueSeconds(30);
private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class);

private final Client client;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex;
private volatile PointInTimeBuilder pit;
private volatile long pitCheckpoint;
private volatile boolean disablePit = false;

ClientTransformIndexer(
ThreadPool threadPool,
TransformConfigManager transformsConfigManager,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
AtomicReference<IndexerState> initialState,
TransformIndexerPosition initialPosition,
Client client,
TransformAuditor auditor,
TransformIndexerStats initialStats,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
Expand All @@ -84,9 +94,8 @@ class ClientTransformIndexer extends TransformIndexer {
) {
super(
ExceptionsHelper.requireNonNull(threadPool, "threadPool"),
transformsConfigManager,
transformServices,
checkpointProvider,
auditor,
transformConfig,
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
Expand All @@ -111,13 +120,14 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId()));
return;
}
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,

if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) {
closePointInTime();
}
Comment on lines +124 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we should not move this execution thread forward until the pit is closed.

It is conceivable right (though unlikely) that this closePointInTime() is executing, but doSearch is being handled and consequently, we close the wrong PIT and leave one left over.

Copy link
Author

@hendrikmuhs hendrikmuhs Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pit is "copied" (not literally, but the reference) and set to null in the sync part of closePointInTime(), see line 470++. So you are right that we might open a new pit while still closing the other, however that's allowed and I don't see a race condition that could lead to mixing up the two.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmuhs 100%, I misread the method. Setting a local variable synchronously should avoid that problem :).


injectPointInTimeIfNeeded(
buildSearchRequest(),
nextPhase
ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure)
);
}

Expand Down Expand Up @@ -440,6 +450,134 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() {
return seqNoPrimaryTermAndIndex.get();
}

@Override
protected void afterFinishOrFailure() {
closePointInTime();
super.afterFinishOrFailure();
}

@Override
protected void onStop() {
closePointInTime();
super.onStop();
}

private void closePointInTime() {
if (pit == null) {
return;
}

String oldPit = pit.getEncodedId();
pit = null;
ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
ClosePointInTimeAction.INSTANCE,
closePitRequest,
ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> {
// note: closing the pit should never throw, even if the pit is invalid
logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e);
})
Comment on lines +479 to +482
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logger.trace should have a message supplier like () -> new ParameterizedMessage to prevent strings from being created when trace is disabled.

Not a huge deal as this is not a "hot path"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this applies. This is only a problem if one or more arguments needs to be constructed, e.g. if getJobId() would build the id and therefore execute something. This is not the case.

The message string itself gets only constructed after the check whether trace is enabled or not.

I wish we have static code analysis for this this, it's such a common problem.

);
}

private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener<SearchRequest> listener) {
if (disablePit) {
listener.onResponse(searchRequest);
return;
}

if (pit != null) {
searchRequest.source().pointInTimeBuilder(pit);
listener.onResponse(searchRequest);
return;
}

// no pit, create a new one
OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
OpenPointInTimeAction.INSTANCE,
pitRequest,
ActionListener.wrap(response -> {
pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
searchRequest.source().pointInTimeBuilder(pit);
pitCheckpoint = getNextCheckpoint().getCheckpoint();
logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment () -> new ParameterizedMessage seems better to me for trace

listener.onResponse(searchRequest);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
// if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
// try)
if (unwrappedException instanceof ActionNotFoundTransportException) {
logger.warn(
"[{}] source does not support point in time reader, falling back to normal search (more resource intensive)",
getJobId()
);
auditor.warning(
getJobId(),
"Source does not support point in time reader, falling back to normal search (more resource intensive)"
);
disablePit = true;
} else {
logger.warn(
new ParameterizedMessage(
"[{}] Failed to create a point in time reader, falling back to normal search.",
getJobId()
),
e
);
}
listener.onResponse(searchRequest);
})
);
}

private void doSearch(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
logger.trace("searchRequest: {}", searchRequest);

ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
ActionListener.wrap(response -> {
// did the pit change?
if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) {
pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE);
logger.trace("point in time handle has changed");
}

listener.onResponse(response);
}, e -> {
// check if the error has been caused by a missing search context, which could be a timed out pit
// re-try this search without pit, if it fails again the normal failure handler is called, if it
// succeeds a new pit gets created at the next run
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);
if (unwrappedException instanceof SearchContextMissingException) {
logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e);
pit = null;
searchRequest.source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
listener
);
}
listener.onFailure(e);
})
);
}

private static String getBulkIndexDetailedFailureMessage(String prefix, Map<String, BulkItemResponse> failures) {
if (failures.isEmpty()) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,16 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

class ClientTransformIndexerBuilder {
private ParentTaskAssigningClient parentTaskClient;
private TransformConfigManager transformsConfigManager;
private TransformCheckpointService transformsCheckpointService;
private TransformAuditor auditor;
private TransformServices transformServices;
private Map<String, String> fieldMappings;
private TransformConfig transformConfig;
private TransformIndexerStats initialStats;
Expand All @@ -45,16 +41,16 @@ class ClientTransformIndexerBuilder {
}

ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {
CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig);
CheckpointProvider checkpointProvider = transformServices.getCheckpointService()
.getCheckpointProvider(parentTaskClient, transformConfig);

return new ClientTransformIndexer(
threadPool,
transformsConfigManager,
transformServices,
checkpointProvider,
new AtomicReference<>(this.indexerState),
initialPosition,
parentTaskClient,
auditor,
initialStats,
transformConfig,
fieldMappings,
Expand All @@ -77,18 +73,8 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie
return this;
}

ClientTransformIndexerBuilder setTransformsConfigManager(TransformConfigManager transformsConfigManager) {
this.transformsConfigManager = transformsConfigManager;
return this;
}

ClientTransformIndexerBuilder setTransformsCheckpointService(TransformCheckpointService transformsCheckpointService) {
this.transformsCheckpointService = transformsCheckpointService;
return this;
}

ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) {
this.auditor = auditor;
ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) {
this.transformServices = transformServices;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
Expand Down Expand Up @@ -112,8 +113,8 @@ private enum RunState {

private volatile Integer initialConfiguredPageSize;
private volatile int pageSize = 0;
private long logEvery = 1;
private long logCount = 0;
private volatile long logEvery = 1;
private volatile long logCount = 0;
private volatile TransformCheckpoint lastCheckpoint;
private volatile TransformCheckpoint nextCheckpoint;

Expand All @@ -128,9 +129,8 @@ private enum RunState {

public TransformIndexer(
ThreadPool threadPool,
TransformConfigManager transformsConfigManager,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
TransformAuditor auditor,
TransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
Expand All @@ -142,16 +142,16 @@ public TransformIndexer(
TransformContext context
) {
super(threadPool, initialState, initialPosition, jobStats);
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
ExceptionsHelper.requireNonNull(transformServices, "transformServices");
this.transformsConfigManager = transformServices.getConfigManager();
this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider");
this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor");
this.auditor = transformServices.getAuditor();
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint");
this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint");
this.context = ExceptionsHelper.requireNonNull(context, "context");

// give runState a default
this.runState = RunState.APPLY_RESULTS;

Expand Down
Loading