Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Transforms] fixing rolling upgrade continuous transform test #45823

Merged
merged 5 commits into from
Sep 4, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
Expand All @@ -28,6 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.xpack.test.rest.XPackRestTestConstants;
Expand All @@ -37,7 +38,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -48,7 +51,6 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662")
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {

private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
Expand Down Expand Up @@ -80,11 +82,18 @@ protected static void waitForPendingDataFrameTasks() throws Exception {
*/
public void testDataFramesRollingUpgrade() throws Exception {
assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0));
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
adjustLoggingLevels.setJsonEntity(
"{\"transient\": {" +
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
client().performRequest(adjustLoggingLevels);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug leftover?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep it. These tests are crazy difficult to troubleshoot without this higher level logging.

Request waitForYellow = new Request("GET", "/_cluster/health");
waitForYellow.addParameter("wait_for_nodes", "3");
waitForYellow.addParameter("wait_for_status", "yellow");
switch (CLUSTER_TYPE) {
case OLD:
client().performRequest(waitForYellow);
createAndStartContinuousDataFrame();
break;
case MIXED:
Expand Down Expand Up @@ -113,35 +122,44 @@ private void cleanUpTransforms() throws Exception {

private void createAndStartContinuousDataFrame() throws Exception {
createIndex(CONTINUOUS_DATA_FRAME_SOURCE);
long totalDocsWritten = 0;
long totalDocsWrittenSum = 0;
for (TimeValue bucket : BUCKETS) {
int docs = randomIntBetween(1, 25);
putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES);
totalDocsWritten += docs * ENTITIES.size();
totalDocsWrittenSum += docs * ENTITIES.size();
}

long totalDocsWritten = totalDocsWrittenSum;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you use totalDocsWrittenSum on line 138, you do not need this extra variable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming you mean line 151, but I cannot use it there as it is not "effectively final" and assertBusy utilizes a lambda.

DataFrameTransformConfig config = DataFrameTransformConfig.builder()
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(30)))
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.setPivotConfig(PivotConfig.builder()
.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars")))
.setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build())
.build())
.setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build())
.setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build())
.setId(CONTINUOUS_DATA_FRAME_ID)
.setFrequency(TimeValue.timeValueSeconds(1))
.build();
putTransform(CONTINUOUS_DATA_FRAME_ID, config);

startTransform(CONTINUOUS_DATA_FRAME_ID);
waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L);

DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
assertBusy(() -> {
DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten));
// Even if we get back to started, we may periodically get set back to `indexing` when triggered.
// Though short lived due to no changes on the source indices, it could result in flaky test behavior
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
}, 120, TimeUnit.SECONDS);

assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten));
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));

// We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable
awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, IndexerState.STARTED.value());
}

@SuppressWarnings("unchecked")
private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception {

// A continuous data frame should automatically become started when it gets assigned to a node
Expand All @@ -161,9 +179,9 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t
List<String> entities = new ArrayList<>(1);
entities.add("user_" + ENTITIES.size() + expectedLastCheckpoint);
int docs = 5;
// Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin
// wait later.
putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities);
// Index the data
// The frequency and delay should see the data once its indexed
putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(0), entities);

waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint);

Expand All @@ -176,10 +194,55 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t

assertThat(stateAndStats.getState(),
oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(),
greaterThan(previousStateAndStats.getIndexerStats().getOutputDocuments()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(),
greaterThanOrEqualTo(docs + previousStateAndStats.getIndexerStats().getNumDocuments()));
awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, (responseBody) -> {
Map<String, Object> indexerStats = (Map<String,Object>)((List<?>)XContentMapValues.extractValue("hits.hits._source.stats",
responseBody))
.get(0);
assertThat((Integer)indexerStats.get("documents_indexed"),
greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getOutputDocuments()).intValue()));
assertThat((Integer)indexerStats.get("documents_processed"),
greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getNumDocuments()).intValue()));
});
}

private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAssertion) throws Exception {
Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search");
getStatsDocsRequest.setJsonEntity("{\n" +
" \"query\": {\n" +
" \"bool\": {\n" +
" \"filter\": \n" +
" {\"term\": {\n" +
" \"_id\": \"data_frame_transform_state_and_stats-" + id + "\"\n" +
" }}\n" +
" }\n" +
" },\n" +
" \"sort\": [\n" +
" {\n" +
" \"_index\": {\n" +
" \"order\": \"desc\"\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"size\": 1\n" +
"}");
assertBusy(() -> {
// Want to make sure we get the latest docs
client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh"));
Response response = client().performRequest(getStatsDocsRequest);
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = entityAsMap(response);
assertEquals(1, XContentMapValues.extractValue("hits.total.value", responseBody));
responseAssertion.accept(responseBody);
}, 60, TimeUnit.SECONDS);
}

private void awaitWrittenIndexerState(String id, String indexerState) throws Exception {
awaitWrittenIndexerState(id, (responseBody) -> {
String storedState = ((List<?>)XContentMapValues.extractValue("hits.hits._source.state.indexer_state", responseBody))
.get(0)
.toString();
assertThat(storedState, equalTo(indexerState));
});
}

private void putTransform(String id, DataFrameTransformConfig config) throws IOException {
Expand Down Expand Up @@ -222,7 +285,7 @@ private DataFrameTransformStats getTransformStats(String id) throws IOException
}

private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception {
assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getNext().getCheckpoint(), greaterThan(currentCheckpoint)),
assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getLast().getCheckpoint(), greaterThan(currentCheckpoint)),
60, TimeUnit.SECONDS);
}

Expand All @@ -249,7 +312,7 @@ private void createIndex(String indexName) throws IOException {
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", indexName);
req.setEntity(entity);
client().performRequest(req);
assertThat(client().performRequest(req).getStatusLine().getStatusCode(), equalTo(200));
}
}

Expand Down