Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Shutdown the task immediately when force == true #100340

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/100340.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100340
summary: Shutdown the task immediately when force == true
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ private void onSearchResponse(SearchResponse searchResponse) {
if (bulkRequest.numberOfActions() > 0) {
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
Thread.sleep(360_000);

// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
if (bulkResponse.hasFailures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.transform.integration;

import org.apache.http.client.methods.HttpGet;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
Expand All @@ -29,20 +30,26 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;
Expand Down Expand Up @@ -332,7 +339,7 @@ public void testStopWaitForCheckpoint() throws Exception {
});

// waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop
stopTransform(transformId, false, null, true);
stopTransform(transformId, false, null, true, false);

// Wait until the first checkpoint
waitUntilCheckpoint(config.getId(), 1L);
Expand All @@ -354,7 +361,7 @@ public void testStopWaitForCheckpoint() throws Exception {
startTransformWithRetryOnConflict(config.getId(), RequestOptions.DEFAULT);

boolean waitForCompletion = randomBoolean();
stopTransform(transformId, waitForCompletion, null, true);
stopTransform(transformId, waitForCompletion, null, true, false);

assertBusy(() -> {
var stateAndStats = getTransformStats(config.getId());
Expand Down Expand Up @@ -494,6 +501,84 @@ public void testStartTransform_GivenTimeout_Returns408() throws Exception {
}
}

public void testForceStop() throws Exception {
String sourceIndex = "continuous-crud-reviews";
createReviewsIndex(sourceIndex, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);

String transformId = "transform-continuous-crud";
String destIndex = transformId + "-dest";
TransformConfig config;
{
Map<String, SingleGroupSource> groups = Map.of(
"by-day",
createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null),
"by-user",
new TermsGroupSource("user_id", null, false),
"by-business",
new TermsGroupSource("business_id", null, false)
);

AggregatorFactories.Builder aggs = AggregatorFactories.builder()
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));

config = createTransformConfigBuilder(transformId, destIndex, QueryConfig.matchAll(), sourceIndex).setPivotConfig(
createPivotConfig(groups, aggs)
).setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))).build();
}

for (int i = 0; i < 10; ++i) {
try {
putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
assertThatListOfTransformPersistentTasks(is(empty()));

startTransform(config.getId(), RequestOptions.DEFAULT);
assertThatListOfTransformPersistentTasks(hasSize(1));

Exception e = expectThrows(Exception.class, () -> deleteTransform(transformId));
assertThat(
e.getMessage(),
containsString("Cannot delete transform [" + transformId + "] as the task is running. Stop the task first")
);
assertThatListOfTransformPersistentTasks(hasSize(1));

stopTransform(transformId);

if (randomBoolean()) { // Either force-stop and then delete OR just force-delete
stopTransform(transformId, true, null, false, true);
deleteTransform(transformId);
} else {
deleteTransform(transformId, true);
}
assertThatListOfTransformPersistentTasks(is(empty()));
} catch (AssertionError | Exception e) {
fail("Failure at iteration " + i + ": " + e.getMessage());
}
}
}

@SuppressWarnings("unchecked")
private void assertThatListOfTransformPersistentTasks(Matcher<Collection<?>> matcher) {
Request request = new Request(HttpGet.METHOD_NAME, "/_tasks");
Map<String, String> parameters = Map.of(
"actions",
"data_frame/transforms[c]",
"group_by",
"none",
"timeout",
TimeValue.timeValueSeconds(10).getStringRep()
);
request.addParameters(parameters);
try {
Response response = adminClient().performRequest(request);
Map<String, Object> responseAsMap = entityAsMap(response);
List<Object> tasksAsList = (List<Object>) responseAsMap.get("tasks");
assertThat("Tasks were: " + tasksAsList, tasksAsList, matcher);
} catch (Exception e) {
throw new AssertionError("Failed to get pending tasks", e);
}
}

private void indexMoreDocs(long timestamp, long userId, String index) throws Exception {
StringBuilder bulkBuilder = new StringBuilder();
for (int i = 0; i < 25; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,26 @@ protected Map<String, Object> getIndexMapping(String index, RequestOptions optio
}

protected void stopTransform(String id) throws IOException {
stopTransform(id, true, null, false);
stopTransform(id, true, null, false, false);
}

protected void stopTransform(String id, boolean waitForCompletion, @Nullable TimeValue timeout, boolean waitForCheckpoint)
throws IOException {
protected void stopTransform(
String id,
boolean waitForCompletion,
@Nullable TimeValue timeout,
boolean waitForCheckpoint,
boolean force
) throws IOException {

final Request stopTransformRequest = new Request("POST", TRANSFORM_ENDPOINT + id + "/_stop");
stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(waitForCompletion));
stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint));
if (timeout != null) {
stopTransformRequest.addParameter(TransformField.TIMEOUT.getPreferredName(), timeout.getStringRep());
}
if (force) {
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
}
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
Expand Down Expand Up @@ -219,6 +227,7 @@ protected void deleteTransform(String id, boolean force) throws IOException {
request.addParameter("force", "true");
}
assertOK(adminClient().performRequest(request));
createdTransformIds.remove(id);
}

protected void putTransform(String id, String config, RequestOptions options) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
);
}

protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener<BulkResponse> nextPhase) {
protected void handleBulkResponse(BulkResponse bulkResponse, ActionListener<BulkResponse> nextPhase) throws InterruptedException {
if (bulkResponse.hasFailures() == false) {
// We don't know the of failures that have occurred (searching, processing, indexing, etc.),
// but if we search, process and bulk index then we have
Expand Down Expand Up @@ -527,6 +527,8 @@ void doSearch(Tuple<String, SearchRequest> namedSearchRequest, ActionListener<Se
logger.trace("point in time handle has changed; request [{}]", name);
}

Thread.sleep(360_000);

listener.onResponse(response);
}, e -> {
// check if the error has been caused by a missing search context, which could be a timed out pit
Expand Down