Skip to content

Commit

Permalink
add support for write index resolution when creating/updating documen…
Browse files Browse the repository at this point in the history
…ts (elastic#31520)

Now write operations like Index, Delete, Update rely on the write-index associated with
an alias to operate against. This means writes will be accepted even when an alias points to multiple indices, so long as one is the write index. Routing values will be used from the AliasMetaData for the alias in the write-index. All read operations are left untouched.
  • Loading branch information
talevy committed Jul 19, 2018
1 parent c14062e commit 4817155
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ protected void doRun() throws Exception {
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
Expand Down Expand Up @@ -478,7 +478,7 @@ Index getConcreteIndex(String indexOrAlias) {
Index resolveIfAbsent(DocWriteRequest request) {
Index concreteIndex = indices.get(request.index());
if (concreteIndex == null) {
concreteIndex = indexNameExpressionResolver.concreteSingleIndex(state, request);
concreteIndex = indexNameExpressionResolver.concreteWriteIndex(state, request);
indices.put(request.index(), concreteIndex);
}
return concreteIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappi

/* resolve the routing if needed */
public void resolveRouting(MetaData metaData) {
routing(metaData.resolveIndexRouting(parent, routing, index));
routing(metaData.resolveWriteIndexRouting(parent, routing, index));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected void resolveRequest(ClusterState state, UpdateRequest request) {
}

public static void resolveAndValidateRouting(MetaData metaData, String concreteIndex, UpdateRequest request) {
request.routing((metaData.resolveIndexRouting(request.parent(), request.routing(), request.index())));
request.routing((metaData.resolveWriteIndexRouting(request.parent(), request.routing(), request.index())));
// Fail fast on the node that received the request, rather than failing when translating on the index or delete request.
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -103,7 +102,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, S
return concreteIndexNames(context, indexExpressions);
}

/**
/**
* Translates the provided index expression into actual concrete indices, properly deduplicated.
*
* @param state the cluster state containing all the data to resolve to expressions to concrete indices
Expand All @@ -117,7 +116,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, S
* indices options in the context don't allow such a case.
*/
public Index[] concreteIndices(ClusterState state, IndicesOptions options, String... indexExpressions) {
Context context = new Context(state, options);
Context context = new Context(state, options, false, false);
return concreteIndices(context, indexExpressions);
}

Expand Down Expand Up @@ -193,30 +192,40 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}
}

Collection<IndexMetaData> resolvedIndices = aliasOrIndex.getIndices();
if (resolvedIndices.size() > 1 && !options.allowAliasesToMultipleIndices()) {
String[] indexNames = new String[resolvedIndices.size()];
int i = 0;
for (IndexMetaData indexMetaData : resolvedIndices) {
indexNames[i++] = indexMetaData.getIndex().getName();
if (aliasOrIndex.isAlias() && context.isResolveToWriteIndex()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
IndexMetaData writeIndex = alias.getWriteIndex();
if (writeIndex == null) {
throw new IllegalArgumentException("no write index is defined for alias [" + alias.getAliasName() + "]." +
" The write index may be explicitly disabled using is_write_index=false or the alias points to multiple" +
" indices without one being designated as a write index");
}
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" +
concreteIndices.add(writeIndex.getIndex());
} else {
if (aliasOrIndex.getIndices().size() > 1 && !options.allowAliasesToMultipleIndices()) {
String[] indexNames = new String[aliasOrIndex.getIndices().size()];
int i = 0;
for (IndexMetaData indexMetaData : aliasOrIndex.getIndices()) {
indexNames[i++] = indexMetaData.getIndex().getName();
}
throw new IllegalArgumentException("Alias [" + expression + "] has more than one indices associated with it [" +
Arrays.toString(indexNames) + "], can't execute a single index op");
}
}

for (IndexMetaData index : resolvedIndices) {
if (index.getState() == IndexMetaData.State.CLOSE) {
if (failClosed) {
throw new IndexClosedException(index.getIndex());
} else {
if (options.forbidClosedIndices() == false) {
concreteIndices.add(index.getIndex());
for (IndexMetaData index : aliasOrIndex.getIndices()) {
if (index.getState() == IndexMetaData.State.CLOSE) {
if (failClosed) {
throw new IndexClosedException(index.getIndex());
} else {
if (options.forbidClosedIndices() == false) {
concreteIndices.add(index.getIndex());
}
}
} else if (index.getState() == IndexMetaData.State.OPEN) {
concreteIndices.add(index.getIndex());
} else {
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
}
} else if (index.getState() == IndexMetaData.State.OPEN) {
concreteIndices.add(index.getIndex());
} else {
throw new IllegalStateException("index state [" + index.getState() + "] not supported");
}
}
}
Expand Down Expand Up @@ -255,6 +264,28 @@ public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
return indices[0];
}

/**
* Utility method that allows to resolve an index expression to its corresponding single write index.
*
* @param state the cluster state containing all the data to resolve to expression to a concrete index
* @param request The request that defines how the an alias or an index need to be resolved to a concrete index
* and the expression that can be resolved to an alias or an index name.
* @throws IllegalArgumentException if the index resolution does not lead to an index, or leads to more than one index
* @return the write index obtained as a result of the index resolution
*/
public Index concreteWriteIndex(ClusterState state, IndicesRequest request) {
if (request.indices() == null || (request.indices() != null && request.indices().length != 1)) {
throw new IllegalArgumentException("indices request must specify a single index expression");
}
Context context = new Context(state, request.indicesOptions(), false, true);
Index[] indices = concreteIndices(context, request.indices()[0]);
if (indices.length != 1) {
throw new IllegalArgumentException("The index expression [" + request.indices()[0] +
"] and options provided did not point to a single write-index");
}
return indices[0];
}

/**
* @return whether the specified alias or index exists. If the alias or index contains datemath then that is resolved too.
*/
Expand Down Expand Up @@ -292,7 +323,7 @@ public String[] indexAliases(ClusterState state, String index, Predicate<AliasMe
String... expressions) {
// expand the aliases wildcard
List<String> resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList();
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true);
Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false);
for (ExpressionResolver expressionResolver : expressionResolvers) {
resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions);
}
Expand Down Expand Up @@ -512,24 +543,26 @@ static final class Context {
private final IndicesOptions options;
private final long startTime;
private final boolean preserveAliases;
private final boolean resolveToWriteIndex;

Context(ClusterState state, IndicesOptions options) {
this(state, options, System.currentTimeMillis());
}

Context(ClusterState state, IndicesOptions options, boolean preserveAliases) {
this(state, options, System.currentTimeMillis(), preserveAliases);
Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex) {
this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex);
}

Context(ClusterState state, IndicesOptions options, long startTime) {
this(state, options, startTime, false);
this(state, options, startTime, false, false);
}

Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases) {
Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases, boolean resolveToWriteIndex) {
this.state = state;
this.options = options;
this.startTime = startTime;
this.preserveAliases = preserveAliases;
this.resolveToWriteIndex = resolveToWriteIndex;
}

public ClusterState getState() {
Expand All @@ -552,6 +585,14 @@ public long getStartTime() {
boolean isPreserveAliases() {
return preserveAliases;
}

/**
* This is used to require that aliases resolve to their write-index. It is currently not used in conjunction
* with <code>preserveAliases</code>.
*/
boolean isResolveToWriteIndex() {
return resolveToWriteIndex;
}
}

private interface ExpressionResolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,42 @@ public String[] getConcreteAllClosedIndices() {
return allClosedIndices;
}

/**
* Returns indexing routing for the given <code>aliasOrIndex</code>. Resolves routing from the alias metadata used
* in the write index.
*/
public String resolveWriteIndexRouting(@Nullable String parent, @Nullable String routing, String aliasOrIndex) {
if (aliasOrIndex == null) {
return routingOrParent(parent, routing);
}

AliasOrIndex result = getAliasAndIndexLookup().get(aliasOrIndex);
if (result == null || result.isAlias() == false) {
return routingOrParent(parent, routing);
}
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) result;
IndexMetaData writeIndex = alias.getWriteIndex();
if (writeIndex == null) {
throw new IllegalArgumentException("alias [" + aliasOrIndex + "] does not have a write index");
}
AliasMetaData aliasMd = writeIndex.getAliases().get(alias.getAliasName());
if (aliasMd.indexRouting() != null) {
if (aliasMd.indexRouting().indexOf(',') != -1) {
throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value ["
+ aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation");
}
if (routing != null) {
if (!routing.equals(aliasMd.indexRouting())) {
throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has index routing associated with it ["
+ aliasMd.indexRouting() + "], and was provided with routing value [" + routing + "], rejecting operation");
}
}
// Alias routing overrides the parent routing (if any).
return aliasMd.indexRouting();
}
return routingOrParent(parent, routing);
}

/**
* Returns indexing routing for the given index.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.hamcrest.Matchers.equalTo;

public class BulkIntegrationIT extends ESIntegTestCase {
public void testBulkIndexCreatesMapping() throws Exception {
Expand All @@ -40,4 +47,37 @@ public void testBulkIndexCreatesMapping() throws Exception {
assertTrue(mappingsResponse.getMappings().get("logstash-2014.03.30").containsKey("logs"));
});
}

/**
* This tests that the {@link TransportBulkAction} evaluates alias routing values correctly when dealing with
* an alias pointing to multiple indices, while a write index exits.
*/
public void testBulkWithWriteIndexAndRouting() {
Map<String, Integer> twoShardsSettings = Collections.singletonMap(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2);
client().admin().indices().prepareCreate("index1")
.addAlias(new Alias("alias1").indexRouting("0")).setSettings(twoShardsSettings).get();
client().admin().indices().prepareCreate("index2")
.addAlias(new Alias("alias1").indexRouting("0").writeIndex(randomFrom(false, null)))
.setSettings(twoShardsSettings).get();
client().admin().indices().prepareCreate("index3")
.addAlias(new Alias("alias1").indexRouting("1").writeIndex(true)).setSettings(twoShardsSettings).get();

IndexRequest indexRequestWithAlias = new IndexRequest("alias1", "type", "id");
if (randomBoolean()) {
indexRequestWithAlias.routing("1");
}
indexRequestWithAlias.source(Collections.singletonMap("foo", "baz"));
BulkResponse bulkResponse = client().prepareBulk().add(indexRequestWithAlias).get();
assertThat(bulkResponse.getItems()[0].getResponse().getIndex(), equalTo("index3"));
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[0].getResponse().status(), equalTo(RestStatus.CREATED));
assertThat(client().prepareGet("index3", "type", "id").setRouting("1").get().getSource().get("foo"), equalTo("baz"));

bulkResponse = client().prepareBulk().add(client().prepareUpdate("alias1", "type", "id").setDoc("foo", "updated")).get();
assertFalse(bulkResponse.hasFailures());
assertThat(client().prepareGet("index3", "type", "id").setRouting("1").get().getSource().get("foo"), equalTo("updated"));
bulkResponse = client().prepareBulk().add(client().prepareDelete("alias1", "type", "id")).get();
assertFalse(bulkResponse.hasFailures());
assertFalse(client().prepareGet("index3", "type", "id").setRouting("1").get().isExists());
}
}
Loading

0 comments on commit 4817155

Please sign in to comment.