Skip to content

Commit

Permalink
Add tests for migration between source modes in logsdb data stream (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
lkts authored and georgewallace committed Oct 25, 2024
1 parent 4d82ba9 commit 60e19aa
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.logsdb;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class LogsDbSourceModeMigrationIT extends LogsIndexModeRestTestIT {
public static final String INDEX_TEMPLATE = """
{
"index_patterns": ["my-logs-*-*"],
"priority": 100,
"data_stream": {},
"composed_of": [
"my-logs-mapping",
"my-logs-original-source",
"my-logs-migrated-source"
],
"ignore_missing_component_templates": ["my-logs-original-source", "my-logs-migrated-source"]
}
""";

public static final String MAPPING_COMPONENT_TEMPLATE = """
{
"template": {
"settings": {
"index": {
"mode": "logsdb"
}
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date",
"format": "epoch_millis"
},
"message": {
"type": "text"
},
"method": {
"type": "keyword"
},
"hits": {
"type": "long"
}
}
}
}
}""";

public static final String STORED_SOURCE_COMPONENT_TEMPLATE = """
{
"template": {
"settings": {
"index": {
"mapping.source.mode": "stored"
}
}
}
}""";

public static final String SYNTHETIC_SOURCE_COMPONENT_TEMPLATE = """
{
"template": {
"settings": {
"index": {
"mapping.source.mode": "synthetic"
}
}
}
}""";

@ClassRule()
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.module("constant-keyword")
.module("data-streams")
.module("mapper-extras")
.module("x-pack-aggregate-metric")
.module("x-pack-stack")
.setting("xpack.security.enabled", "false")
.setting("xpack.otel_data.registry.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("cluster.logsdb.enabled", "true")
.setting("stack.templates.enabled", "false")
.build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@Before
public void setup() {
client = client();
}

private RestClient client;

public void testSwitchFromStoredToSyntheticSource() throws IOException {
assertOK(putComponentTemplate(client, "my-logs-mapping", MAPPING_COMPONENT_TEMPLATE));
assertOK(putComponentTemplate(client, "my-logs-original-source", STORED_SOURCE_COMPONENT_TEMPLATE));

assertOK(putTemplate(client, "my-logs", INDEX_TEMPLATE));
assertOK(createDataStream(client, "my-logs-ds-test"));

var initialSourceMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "my-logs-ds-test", 0),
"index.mapping.source.mode"
);
assertThat(initialSourceMode, equalTo("stored"));
var initialIndexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "my-logs-ds-test", 0), "index.mode");
assertThat(initialIndexMode, equalTo("logsdb"));

var indexedWithStoredSource = new ArrayList<XContentBuilder>();
var indexedWithSyntheticSource = new ArrayList<XContentBuilder>();
for (int i = 0; i < 10; i++) {
indexedWithStoredSource.add(generateDoc());
indexedWithSyntheticSource.add(generateDoc());
}

Response storedSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithStoredSource, 0);
assertOK(storedSourceBulkResponse);
assertThat(entityAsMap(storedSourceBulkResponse).get("errors"), Matchers.equalTo(false));

assertOK(putComponentTemplate(client, "my-logs-migrated-source", SYNTHETIC_SOURCE_COMPONENT_TEMPLATE));
var rolloverResponse = rolloverDataStream(client, "my-logs-ds-test");
assertOK(rolloverResponse);
assertThat(entityAsMap(rolloverResponse).get("rolled_over"), is(true));

var finalSourceMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "my-logs-ds-test", 1),
"index.mapping.source.mode"
);
assertThat(finalSourceMode, equalTo("synthetic"));

Response syntheticSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithSyntheticSource, 10);
assertOK(syntheticSourceBulkResponse);
assertThat(entityAsMap(syntheticSourceBulkResponse).get("errors"), Matchers.equalTo(false));

var allDocs = Stream.concat(indexedWithStoredSource.stream(), indexedWithSyntheticSource.stream()).toList();

var sourceList = search(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(allDocs.size()), "my-logs-ds-test");
assertThat(sourceList.size(), equalTo(allDocs.size()));

for (int i = 0; i < sourceList.size(); i++) {
var expected = XContentHelper.convertToMap(BytesReference.bytes(allDocs.get(i)), false, XContentType.JSON).v2();
assertThat(sourceList.get(i), equalTo(expected));
}
}

public void testSwitchFromSyntheticToStoredSource() throws IOException {
assertOK(putComponentTemplate(client, "my-logs-mapping", MAPPING_COMPONENT_TEMPLATE));
assertOK(putComponentTemplate(client, "my-logs-original-source", SYNTHETIC_SOURCE_COMPONENT_TEMPLATE));

assertOK(putTemplate(client, "my-logs", INDEX_TEMPLATE));
assertOK(createDataStream(client, "my-logs-ds-test"));

var initialSourceMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "my-logs-ds-test", 0),
"index.mapping.source.mode"
);
assertThat(initialSourceMode, equalTo("synthetic"));
var initialIndexMode = (String) getSetting(client, getDataStreamBackingIndex(client, "my-logs-ds-test", 0), "index.mode");
assertThat(initialIndexMode, equalTo("logsdb"));

var indexedWithSyntheticSource = new ArrayList<XContentBuilder>();
var indexedWithStoredSource = new ArrayList<XContentBuilder>();
for (int i = 0; i < 10; i++) {
indexedWithSyntheticSource.add(generateDoc());
indexedWithStoredSource.add(generateDoc());
}

Response syntheticSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithSyntheticSource, 0);
assertOK(syntheticSourceBulkResponse);
assertThat(entityAsMap(syntheticSourceBulkResponse).get("errors"), Matchers.equalTo(false));

assertOK(putComponentTemplate(client, "my-logs-migrated-source", STORED_SOURCE_COMPONENT_TEMPLATE));
var rolloverResponse = rolloverDataStream(client, "my-logs-ds-test");
assertOK(rolloverResponse);
assertThat(entityAsMap(rolloverResponse).get("rolled_over"), is(true));

var finalSourceMode = (String) getSetting(
client,
getDataStreamBackingIndex(client, "my-logs-ds-test", 1),
"index.mapping.source.mode"
);
assertThat(finalSourceMode, equalTo("stored"));

Response storedSourceBulkResponse = bulkIndex(client, "my-logs-ds-test", indexedWithStoredSource, 10);
assertOK(storedSourceBulkResponse);
assertThat(entityAsMap(storedSourceBulkResponse).get("errors"), Matchers.equalTo(false));

var allDocs = Stream.concat(indexedWithSyntheticSource.stream(), indexedWithStoredSource.stream()).toList();

var sourceList = search(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(allDocs.size()), "my-logs-ds-test");
assertThat(sourceList.size(), equalTo(allDocs.size()));

for (int i = 0; i < sourceList.size(); i++) {
var expected = XContentHelper.convertToMap(BytesReference.bytes(allDocs.get(i)), false, XContentType.JSON).v2();
assertThat(sourceList.get(i), equalTo(expected));
}
}

private static Response bulkIndex(RestClient client, String dataStreamName, List<XContentBuilder> documents, int startId)
throws IOException {
var sb = new StringBuilder();
int id = startId;
for (var document : documents) {
sb.append(Strings.format("{ \"create\": { \"_id\" : \"%d\" } }", id)).append("\n");
sb.append(Strings.toString(document)).append("\n");
id++;
}

var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
bulkRequest.addParameter("refresh", "true");
return client.performRequest(bulkRequest);
}

@SuppressWarnings("unchecked")
private List<Map<String, Object>> search(SearchSourceBuilder search, String dataStreamName) throws IOException {
var request = new Request("GET", "/" + dataStreamName + "/_search");
request.setJsonEntity(Strings.toString(search));
var searchResponse = client.performRequest(request);
assertOK(searchResponse);

Map<String, Object> searchResponseMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
searchResponse.getEntity().getContent(),
false
);
var hitsMap = (Map<String, Object>) searchResponseMap.get("hits");

var hitsList = (List<Map<String, Object>>) hitsMap.get("hits");
assertThat(hitsList.size(), greaterThan(0));

return hitsList.stream()
.sorted(Comparator.comparingInt((Map<String, Object> hit) -> Integer.parseInt((String) hit.get("_id"))))
.map(hit -> (Map<String, Object>) hit.get("_source"))
.toList();
}

private static XContentBuilder generateDoc() throws IOException {
var doc = XContentFactory.jsonBuilder();
doc.startObject();
{
doc.field("@timestamp", Long.toString(randomMillisUpToYear9999()));
doc.field("message", randomAlphaOfLengthBetween(20, 50));
doc.field("method", randomAlphaOfLength(3));
doc.field("hits", randomLong());
}
doc.endObject();

return doc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.logsdb;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -496,16 +495,6 @@ public void testIgnoreAboveSetting() throws IOException {
}
}

private static Map<String, Object> getMapping(final RestClient client, final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_mapping");

Map<String, Object> mappings = ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get(
"mappings"
);

return mappings;
}

private Function<Object, Map<String, Object>> subObject(String key) {
return (mapAsObject) -> (Map<String, Object>) ((Map<String, Object>) mapAsObject).get(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,15 @@ protected static Response putClusterSetting(final RestClient client, final Strin
request.setJsonEntity("{ \"transient\": { \"" + settingName + "\": " + settingValue + " } }");
return client.performRequest(request);
}

@SuppressWarnings("unchecked")
protected static Map<String, Object> getMapping(final RestClient client, final String indexName) throws IOException {
final Request request = new Request("GET", "/" + indexName + "/_mapping");

Map<String, Object> mappings = ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get(
"mappings"
);

return mappings;
}
}

0 comments on commit 60e19aa

Please sign in to comment.