Skip to content

Commit

Permalink
Migrate async search to use an auto-created system index (#66154)
Browse files Browse the repository at this point in the history
Part of #61656.

Migrate async search to use an auto-created system index. This does
change the behaviour of `AsyncTaskIndexService` - previously, it
would ensure the index existed before carrying out any operation,
whereas now the index is only created when a document is created.
For any other operation, the wrapped `IndexNotFoundException` will
be allowed to bubble up.
  • Loading branch information
pugnascotia authored Dec 15, 2020
1 parent bcefc7d commit 06938dd
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -45,7 +44,7 @@ public AsyncResultsIndexPlugin(Settings settings) {

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(XPackPlugin.ASYNC_RESULTS_INDEX, this.getClass().getSimpleName()));
return List.of(AsyncTaskIndexService.getSystemIndexDescriptor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
*/
package org.elasticsearch.xpack.core.async;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -35,15 +31,18 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
Expand All @@ -54,19 +53,25 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY;

/**
* A service that exposes the CRUD operations for the async task-specific index.
*/
public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class);

public static final String HEADERS_FIELD = "headers";
public static final String RESPONSE_HEADERS_FIELD = "response_headers";
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
public static final String RESULT_FIELD = "result";

// Usually the settings, mappings and system index descriptor below
// would be co-located with the SystemIndexPlugin implementation,
// however in this case this service is in a different project to
// AsyncResultsIndexPlugin, as are tests that need access to
// #settings().

static Settings settings() {
return Settings.builder()
.put("index.codec", "best_compression")
Expand All @@ -76,44 +81,58 @@ static Settings settings() {
.build();
}

static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
.startObject("_meta")
.field("version", Version.CURRENT)
.endObject()
.field("dynamic", "strict")
.startObject("properties")
.startObject(HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESPONSE_HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESULT_FIELD)
.field("type", "object")
.field("enabled", "false")
private static XContentBuilder mappings() {
try {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
.startObject("_meta")
.field("version", Version.CURRENT)
.endObject()
.startObject(EXPIRATION_TIME_FIELD)
.field("type", "long")
.field("dynamic", "strict")
.startObject("properties")
.startObject(HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESPONSE_HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESULT_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(EXPIRATION_TIME_FIELD)
.field("type", "long")
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
.endObject();
return builder;
} catch (IOException e) {
throw new UncheckedIOException("Failed to build mappings for " + XPackPlugin.ASYNC_RESULTS_INDEX, e);
}
}

public static SystemIndexDescriptor getSystemIndexDescriptor() {
return SystemIndexDescriptor.builder()
.setIndexPattern(XPackPlugin.ASYNC_RESULTS_INDEX)
.setDescription("Async search results")
.setPrimaryIndex(XPackPlugin.ASYNC_RESULTS_INDEX)
.setMappings(mappings())
.setSettings(settings())
.setVersionMetaKey("version")
.setOrigin(ASYNC_SEARCH_ORIGIN)
.build();
}

private final String index;
private final ClusterService clusterService;
private final Client client;
private final SecurityContext securityContext;
private final NamedWriteableRegistry registry;
private final Writeable.Reader<R> reader;


public AsyncTaskIndexService(String index,
ClusterService clusterService,
ThreadContext threadContext,
Expand All @@ -122,7 +141,6 @@ public AsyncTaskIndexService(String index,
Writeable.Reader<R> reader,
NamedWriteableRegistry registry) {
this.index = index;
this.clusterService = clusterService;
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
this.client = new OriginSettingClient(client, origin);
this.registry = registry;
Expand All @@ -136,34 +154,6 @@ public Client getClient() {
return client;
}

/**
* Creates the index with the expected settings and mappings if it doesn't exist.
*/
void createIndexIfNecessary(ActionListener<Void> listener) {
if (clusterService.state().routingTable().hasIndex(index) == false) {
try {
client.admin().indices().prepareCreate(index)
.setSettings(settings())
.setMapping(mappings())
.execute(ActionListener.wrap(
resp -> listener.onResponse(null),
exc -> {
if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) {
listener.onResponse(null);
} else {
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
}));
} catch (Exception exc) {
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
} else {
listener.onResponse(null);
}
}

/**
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
Expand All @@ -180,7 +170,7 @@ public void createResponse(String docId,
.create(true)
.id(docId)
.source(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.index(indexRequest, listener), listener::onFailure));
client.index(indexRequest, listener);
}

/**
Expand All @@ -199,9 +189,7 @@ public void updateResponse(String docId,
.id(docId)
.doc(source, XContentType.JSON)
.retryOnConflict(5);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
} catch(Exception e) {
listener.onFailure(e);
}
Expand All @@ -219,9 +207,7 @@ public void updateExpirationTime(String docId,
.id(docId)
.doc(source, XContentType.JSON)
.retryOnConflict(5);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
}

/**
Expand All @@ -231,9 +217,7 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId,
ActionListener<DeleteResponse> listener) {
try {
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
// deletes create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure));
client.delete(request, listener);
} catch(Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
Expand All @@ -24,7 +27,10 @@
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

// TODO: test CRUD operations
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
Expand All @@ -41,6 +47,23 @@ public void setup() {
client(), "test_origin", AsyncSearchResponse::new, writableRegistry());
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
plugins.add(TestPlugin.class);
return plugins;
}

/**
* This class exists because AsyncResultsIndexPlugin exists in a different x-pack module.
*/
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(AsyncTaskIndexService.getSystemIndexDescriptor());
}
}

public void testEnsuredAuthenticatedUserIsSame() throws IOException {
Authentication original =
new Authentication(new User("test", "role"), new Authentication.RealmRef("realm", "file", "node"), null);
Expand Down Expand Up @@ -99,15 +122,7 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException {
}

public void testAutoCreateIndex() throws Exception {
{
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexService.createIndexIfNecessary(future);
future.get();
assertSettings();
}
AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// To begin with, the results index should be auto-created.
AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
{
Expand All @@ -116,37 +131,48 @@ public void testAutoCreateIndex() throws Exception {
future.get();
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();

// Delete the index, so we can test subsequent auto-create behaviour
AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// Subsequent response deletes throw a (wrapped) index not found exception
{
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
indexService.deleteResponse(id, future);
future.get();
assertSettings();
expectThrows(Exception.class, future::get);
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// So do updates
{
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future);
expectThrows(Exception.class, () -> future.get());
expectThrows(Exception.class, future::get);
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// And so does updating the expiration time
{
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
indexService.updateExpirationTime("0", 10L, future);
expectThrows(Exception.class, () -> future.get());
expectThrows(Exception.class, future::get);
assertSettings();
}

// But the index is still auto-created
{
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
future.get();
assertSettings();
}
}

private void assertSettings() throws IOException {
private void assertSettings() {
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
new GetIndexRequest().indices(index)).actionGet();
Settings settings = getIndexResponse.getSettings().get(index);
Settings expected = AsyncTaskIndexService.settings();
assertEquals(expected, settings.filter(key -> expected.hasValue(key)));
assertEquals(expected, settings.filter(expected::hasValue));
}
}

0 comments on commit 06938dd

Please sign in to comment.