Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc add failure store #8

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.hamcrest.MatcherAssert;
import org.junit.Before;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

/**
* This should be a yaml test, but in order to write one we would need to expose the new parameter in the rest-api-spec.
* We do not want to do that until the feature flag is removed. For this reason, we temporarily, test the affected APIs here.
* Please convert this to a yaml test when the feature flag is removed.
*/
public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase {

private static final String DATA_STREAM_NAME = "failure-data-stream";
private String backingIndex;
private String failureStoreIndex;

@SuppressWarnings("unchecked")
@Before
public void setup() throws IOException {
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["failure-data-stream"],
"template": {
"settings": {
"number_of_replicas": 0
}
},
"data_stream": {
"failure_store": true
}
}
""");
assertOK(client().performRequest(putComposableIndexTemplateRequest));

assertOK(client().performRequest(new Request("PUT", "/_data_stream/" + DATA_STREAM_NAME)));
ensureGreen(DATA_STREAM_NAME);

final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + DATA_STREAM_NAME));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
MatcherAssert.assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
MatcherAssert.assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
List<String> backingIndices = getBackingIndices(dataStream);
MatcherAssert.assertThat(backingIndices.size(), is(1));
List<String> failureStore = getFailureStore(dataStream);
MatcherAssert.assertThat(failureStore.size(), is(1));
backingIndex = backingIndices.get(0);
failureStoreIndex = failureStore.get(0);
}

public void testGetIndexApi() throws IOException {
{
final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME));
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(2));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "?failure_store=exclude"));
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "?failure_store=only"));
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
}

@SuppressWarnings("unchecked")
public void testGetIndexStatsApi() throws IOException {
{
final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_stats"));
Map<String, Object> indices = (Map<String, Object>) entityAsMap(indicesResponse).get("indices");
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(
new Request("GET", "/" + DATA_STREAM_NAME + "/_stats?failure_store=include")
);
Map<String, Object> indices = (Map<String, Object>) entityAsMap(indicesResponse).get("indices");
MatcherAssert.assertThat(indices.size(), is(2));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(
new Request("GET", "/" + DATA_STREAM_NAME + "/_stats?failure_store=only")
);
Map<String, Object> indices = (Map<String, Object>) entityAsMap(indicesResponse).get("indices");
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
}

public void testGetIndexSettingsApi() throws IOException {
{
final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_settings"));
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(
new Request("GET", "/" + DATA_STREAM_NAME + "/_settings?failure_store=include")
);
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(2));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(
new Request("GET", "/" + DATA_STREAM_NAME + "/_settings?failure_store=only")
);
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
}

public void testGetIndexMappingApi() throws IOException {
{
final Response indicesResponse = client().performRequest(new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping"));
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(
new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping?failure_store=include")
);
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(2));
MatcherAssert.assertThat(indices.containsKey(backingIndex), is(true));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
{
final Response indicesResponse = client().performRequest(
new Request("GET", "/" + DATA_STREAM_NAME + "/_mapping?failure_store=only")
);
Map<String, Object> indices = entityAsMap(indicesResponse);
MatcherAssert.assertThat(indices.size(), is(1));
MatcherAssert.assertThat(indices.containsKey(failureStoreIndex), is(true));
}
}

private List<String> getBackingIndices(Map<String, Object> response) {
return getIndices(response, "indices");
}

private List<String> getFailureStore(Map<String, Object> response) {
return getIndices(response, "failure_indices");

}

@SuppressWarnings("unchecked")
private List<String> getIndices(Map<String, Object> response, String fieldName) {
List<Map<String, String>> indices = (List<Map<String, String>>) response.get(fieldName);
return indices.stream().map(index -> index.get("index_name")).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ static TransportVersion def(int id) {
public static final TransportVersion NESTED_KNN_MORE_INNER_HITS = def(8_577_00_0);
public static final TransportVersion REQUIRE_DATA_STREAM_ADDED = def(8_578_00_0);
public static final TransportVersion ML_INFERENCE_COHERE_EMBEDDINGS_ADDED = def(8_579_00_0);
public static final TransportVersion ADD_FAILURE_STORE_OPTIONS = def(8_579_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Loading
Loading