Skip to content

Commit

Permalink
[7.5] Extract indexData method out of RegressionIT tests (elastic#49306
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Nov 20, 2019
1 parent d90f523 commit 76c8098
Showing 1 changed file with 29 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,7 @@ public void cleanup() {

public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws Exception {
initialize("regression_single_numeric_feature_and_mixed_data_set");

{ // Index 350 rows, 300 of them being training rows.
client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc", NUMERICAL_FEATURE_FIELD, "type=double", DEPENDENT_VARIABLE_FIELD, "type=double")
.get();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 300; i++) {
Double field = NUMERICAL_FEATURE_VALUES.get(i % 3);
Double value = DEPENDENT_VARIABLE_VALUES.get(i % 3);

IndexRequest indexRequest = new IndexRequest(sourceIndex)
.source(NUMERICAL_FEATURE_FIELD, field, DEPENDENT_VARIABLE_FIELD, value);
bulkRequestBuilder.add(indexRequest);
}
for (int i = 300; i < 350; i++) {
Double field = NUMERICAL_FEATURE_VALUES.get(i % 3);

IndexRequest indexRequest = new IndexRequest(sourceIndex)
.source(NUMERICAL_FEATURE_FIELD, field);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
indexData(sourceIndex, 300, 50);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
Expand Down Expand Up @@ -119,23 +92,7 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws

public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Exception {
initialize("regression_only_training_data_and_training_percent_is_100");

{ // Index 350 rows, all of them being training rows.
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 350; i++) {
Double field = NUMERICAL_FEATURE_VALUES.get(i % 3);
Double value = DEPENDENT_VARIABLE_VALUES.get(i % 3);

IndexRequest indexRequest = new IndexRequest(sourceIndex)
.source(NUMERICAL_FEATURE_FIELD, field, DEPENDENT_VARIABLE_FIELD, value);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
indexData(sourceIndex, 350, 0);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
Expand Down Expand Up @@ -171,23 +128,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti

public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception {
initialize("regression_only_training_data_and_training_percent_is_50");

{ // Index 350 rows, all of them being training rows.
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 350; i++) {
Double field = NUMERICAL_FEATURE_VALUES.get(i % 3);
Double value = DEPENDENT_VARIABLE_VALUES.get(i % 3);

IndexRequest indexRequest = new IndexRequest(sourceIndex)
.source(NUMERICAL_FEATURE_FIELD, field, DEPENDENT_VARIABLE_FIELD, value);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}
indexData(sourceIndex, 350, 0);

DataFrameAnalyticsConfig config =
buildAnalytics(
Expand Down Expand Up @@ -239,21 +180,7 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/47612")
public void testStopAndRestart() throws Exception {
initialize("regression_stop_and_restart");

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 350; i++) {
Double field = NUMERICAL_FEATURE_VALUES.get(i % 3);
Double value = DEPENDENT_VARIABLE_VALUES.get(i % 3);

IndexRequest indexRequest = new IndexRequest(sourceIndex)
.source("feature", field, "variable", value);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
indexData(sourceIndex, 350, 0);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
Expand Down Expand Up @@ -306,6 +233,31 @@ private void initialize(String jobId) {
this.destIndex = sourceIndex + "_results";
}

private static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows) {
client().admin().indices().prepareCreate(sourceIndex)
.addMapping("_doc", NUMERICAL_FEATURE_FIELD, "type=double", DEPENDENT_VARIABLE_FIELD, "type=double")
.get();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < numTrainingRows; i++) {
List<Object> source = Arrays.asList(
NUMERICAL_FEATURE_FIELD, NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()),
DEPENDENT_VARIABLE_FIELD, DEPENDENT_VARIABLE_VALUES.get(i % DEPENDENT_VARIABLE_VALUES.size()));
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
bulkRequestBuilder.add(indexRequest);
}
for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) {
List<Object> source = Arrays.asList(NUMERICAL_FEATURE_FIELD, NUMERICAL_FEATURE_VALUES.get(i % NUMERICAL_FEATURE_VALUES.size()));
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray());
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
}

private static Map<String, Object> getDestDoc(DataFrameAnalyticsConfig config, SearchHit hit) {
GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get();
assertThat(destDocGetResponse.isExists(), is(true));
Expand Down

0 comments on commit 76c8098

Please sign in to comment.