Skip to content

Commit

Permalink
update rollover to use write indices when set
Browse files Browse the repository at this point in the history
Rollover should not swap aliases when `is_write_index` is set to `true`.
Instead, both the new and old indices should have the rollover alias,
with the newly created index as the new write index
  • Loading branch information
talevy committed Jul 27, 2018
1 parent 1628c83 commit fa078f8
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 37 deletions.
3 changes: 3 additions & 0 deletions docs/reference/indices/aliases.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ and there are multiple indices referenced by an alias, then writes will not be a
It is possible to specify an index associated with an alias as a write index using both the aliases API
and index creation API.

Setting an index to be the write index with an alias also affects how the alias is manipulated during
Rollover (see <<indices-rollover-index, Rollover With Write Index>>).

[source,js]
--------------------------------------------------
POST /_aliases
Expand Down
112 changes: 108 additions & 4 deletions docs/reference/indices/rollover-index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
The rollover index API rolls an alias over to a new index when the existing
index is considered to be too large or too old.

The API accepts a single alias name and a list of `conditions`. The alias
must point to a single index only. If the index satisfies the specified
conditions then a new index is created and the alias is switched to point to
the new index.
The API accepts a single alias name and a list of `conditions`. The alias must point to a write index for
a Rollover request to be valid. There are two ways this can be achieved, and depending on the configuration, the
alias metadata will be updated differently. The two scenarios are as follows:

- The alias only points to a single index with `is_write_index` not configured (defaults to `null`).

In this scenario, the original index will have their rollover alias will be added to the newly created index, and removed
from the original (rolled-over) index.

- The alias points to one or more indices with `is_write_index` set to `true` on the index to be rolled over (the write index).

In this scenario, the write index will have its rollover alias' `is_write_index` set to `false`, while the newly created index
will now have the rollover alias pointing to it as the write index with `is_write_index` as `true`.


[source,js]
Expand Down Expand Up @@ -231,3 +240,98 @@ POST /logs_write/_rollover?dry_run
Because the rollover operation creates a new index to rollover to, the
<<create-index-wait-for-active-shards,`wait_for_active_shards`>> setting on
index creation applies to the rollover action as well.

[[indices-rollover-is-write-index]]
[float]
=== Write Index Alias Behavior

The rollover alias when rolling over a write index that has `is_write_index` explicitly set to `true` is not
swapped during rollover actions. Since having an alias point to multiple indices is ambiguous in distinguishing
which is the correct write index to roll over, it is not valid to rollover an alias that points to multiple indices.
For this reason, the default behavior is to swap which index is being pointed to by the write-oriented alias. This
was `logs_write` in some of the above examples. Since setting `is_write_index` enables an alias to point to multiple indices
while also being explicit as to which is the write index that rollover should target, removing the alias from the rolled over
index is not necessary. This simplifies things by allowing for one alias to behave both as the write and read aliases for
indices that are being managed with Rollover.

Look at the behavior of the aliases in the following example where `is_write_index` is set on the rolled over index.

[source,js]
--------------------------------------------------
PUT my_logs_index-000001
{
"aliases": {
"logs": { "is_write_index": true } <1>
}
}
PUT logs/_doc/1
{
"message": "a dummy log"
}
POST logs/_refresh
POST /logs/_rollover
{
"conditions": {
"max_docs": "1"
}
}
PUT logs/_doc/2 <2>
{
"message": "a newer log"
}
--------------------------------------------------
// CONSOLE
<1> configures `my_logs_index` as the write index for the `logs` alias
<2> newly indexed documents against the `logs` alias will write to the new index

[source,js]
--------------------------------------------------
{
"_index" : "my_logs_index-000002",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1
}
--------------------------------------------------
// TESTRESPONSE

//////////////////////////
[source,js]
--------------------------------------------------
GET _alias
--------------------------------------------------
// CONSOLE
// TEST[continued]
//////////////////////////

After the rollover, the alias metadata for the two indices will have the `is_write_index` setting
reflect each index's role, with the newly created index as the write index.

[source,js]
--------------------------------------------------
{
"my_logs_index-000002": {
"aliases": {
"logs": { "is_write_index": true }
}
},
"my_logs_index-000001": {
"aliases": {
"logs": { "is_write_index" : false }
}
}
}
--------------------------------------------------
// TESTRESPONSE
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ protected void masterOperation(final RolloverRequest rolloverRequest, final Clus
final ActionListener<RolloverResponse> listener) {
final MetaData metaData = state.metaData();
validate(metaData, rolloverRequest);
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(rolloverRequest.getAlias());
final IndexMetaData indexMetaData = aliasOrIndex.getIndices().get(0);
final AliasOrIndex.Alias alias = (AliasOrIndex.Alias) metaData.getAliasAndIndexLookup().get(rolloverRequest.getAlias());
final IndexMetaData indexMetaData = alias.getWriteIndex();
final boolean explicitWriteIndex = Boolean.TRUE.equals(indexMetaData.getAliases().get(alias.getAliasName()).writeIndex());
final String sourceProvidedName = indexMetaData.getSettings().get(IndexMetaData.SETTING_INDEX_PROVIDED_NAME,
indexMetaData.getIndex().getName());
final String sourceIndexName = indexMetaData.getIndex().getName();
Expand Down Expand Up @@ -138,10 +139,15 @@ public void onResponse(IndicesStatsResponse statsResponse) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(unresolvedName, rolloverIndexName,
rolloverRequest);
createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> {
// switch the alias to point to the newly created index
indexAliasesService.indicesAliases(
prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName,
rolloverRequest),
final IndicesAliasesClusterStateUpdateRequest aliasesUpdateRequest;
if (explicitWriteIndex) {
aliasesUpdateRequest = prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
} else {
aliasesUpdateRequest = prepareRolloverAliasesUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
}
indexAliasesService.indicesAliases(aliasesUpdateRequest,
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
clusterService.submitStateUpdateTask("update_rollover_info", new ClusterStateUpdateTask() {
Expand Down Expand Up @@ -196,8 +202,19 @@ public void onFailure(Exception e) {
static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
List<AliasAction> actions = unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias())));
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias())));
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
return updateRequest;
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesWriteIndexUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
List<AliasAction> actions = unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, true),
new AliasAction.Add(oldIndex, request.getAlias(), null, null, null, false)));
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
Expand Down Expand Up @@ -244,8 +261,9 @@ static void validate(MetaData metaData, RolloverRequest request) {
if (aliasOrIndex.isAlias() == false) {
throw new IllegalArgumentException("source alias is a concrete index");
}
if (aliasOrIndex.getIndices().size() != 1) {
throw new IllegalArgumentException("source alias maps to multiple indices");
final AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
if (alias.getWriteIndex() == null) {
throw new IllegalArgumentException("source alias [" + alias.getAliasName() + "] does not point to a write index");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {


public void testRolloverOnEmptyIndex() throws Exception {
assertAcked(prepareCreate("test_index-1").addAlias(new Alias("test_alias")).get());
Alias testAlias = new Alias("test_alias");
boolean explicitWriteIndex = randomBoolean();
if (explicitWriteIndex) {
testAlias.writeIndex(true);
}
assertAcked(prepareCreate("test_index-1").addAlias(testAlias).get());
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").get();
assertThat(response.getOldIndex(), equalTo("test_index-1"));
assertThat(response.getNewIndex(), equalTo("test_index-000002"));
Expand All @@ -69,7 +74,12 @@ public void testRolloverOnEmptyIndex() throws Exception {
assertThat(response.getConditionStatus().size(), equalTo(0));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index-1");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
if (explicitWriteIndex) {
assertTrue(oldIndex.getAliases().containsKey("test_alias"));
assertFalse(oldIndex.getAliases().get("test_alias").writeIndex());
} else {
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
}
final IndexMetaData newIndex = state.metaData().index("test_index-000002");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
}
Expand Down Expand Up @@ -97,8 +107,38 @@ public void testRollover() throws Exception {
is(both(greaterThanOrEqualTo(beforeTime)).and(lessThanOrEqualTo(client().threadPool().absoluteTimeInMillis() + 1000L))));
}

public void testRolloverWithExplicitWriteIndex() throws Exception {
long beforeTime = client().threadPool().absoluteTimeInMillis() - 1000L;
assertAcked(prepareCreate("test_index-2").addAlias(new Alias("test_alias").writeIndex(true)).get());
index("test_index-2", "type1", "1", "field", "value");
flush("test_index-2");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").get();
assertThat(response.getOldIndex(), equalTo("test_index-2"));
assertThat(response.getNewIndex(), equalTo("test_index-000003"));
assertThat(response.isDryRun(), equalTo(false));
assertThat(response.isRolledOver(), equalTo(true));
assertThat(response.getConditionStatus().size(), equalTo(0));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index-2");
assertTrue(oldIndex.getAliases().containsKey("test_alias"));
assertFalse(oldIndex.getAliases().get("test_alias").writeIndex());
final IndexMetaData newIndex = state.metaData().index("test_index-000003");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
assertTrue(newIndex.getAliases().get("test_alias").writeIndex());
assertThat(oldIndex.getRolloverInfos().size(), equalTo(1));
assertThat(oldIndex.getRolloverInfos().get("test_alias").getAlias(), equalTo("test_alias"));
assertThat(oldIndex.getRolloverInfos().get("test_alias").getMetConditions(), is(empty()));
assertThat(oldIndex.getRolloverInfos().get("test_alias").getTime(),
is(both(greaterThanOrEqualTo(beforeTime)).and(lessThanOrEqualTo(client().threadPool().absoluteTimeInMillis() + 1000L))));
}

public void testRolloverWithIndexSettings() throws Exception {
assertAcked(prepareCreate("test_index-2").addAlias(new Alias("test_alias")).get());
Alias testAlias = new Alias("test_alias");
boolean explicitWriteIndex = randomBoolean();
if (explicitWriteIndex) {
testAlias.writeIndex(true);
}
assertAcked(prepareCreate("test_index-2").addAlias(testAlias).get());
index("test_index-2", "type1", "1", "field", "value");
flush("test_index-2");
final Settings settings = Settings.builder()
Expand All @@ -114,12 +154,17 @@ public void testRolloverWithIndexSettings() throws Exception {
assertThat(response.getConditionStatus().size(), equalTo(0));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index-2");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_index-000003");
assertThat(newIndex.getNumberOfShards(), equalTo(1));
assertThat(newIndex.getNumberOfReplicas(), equalTo(0));
assertTrue(newIndex.getAliases().containsKey("test_alias"));
assertTrue(newIndex.getAliases().containsKey("extra_alias"));
if (explicitWriteIndex) {
assertFalse(oldIndex.getAliases().get("test_alias").writeIndex());
assertTrue(newIndex.getAliases().get("test_alias").writeIndex());
} else {
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
}
}

public void testRolloverDryRun() throws Exception {
Expand All @@ -140,7 +185,12 @@ public void testRolloverDryRun() throws Exception {
}

public void testRolloverConditionsNotMet() throws Exception {
assertAcked(prepareCreate("test_index-0").addAlias(new Alias("test_alias")).get());
boolean explicitWriteIndex = randomBoolean();
Alias testAlias = new Alias("test_alias");
if (explicitWriteIndex) {
testAlias.writeIndex(true);
}
assertAcked(prepareCreate("test_index-0").addAlias(testAlias).get());
index("test_index-0", "type1", "1", "field", "value");
flush("test_index-0");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias")
Expand All @@ -160,12 +210,22 @@ public void testRolloverConditionsNotMet() throws Exception {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index-0");
assertTrue(oldIndex.getAliases().containsKey("test_alias"));
if (explicitWriteIndex) {
assertTrue(oldIndex.getAliases().get("test_alias").writeIndex());
} else {
assertNull(oldIndex.getAliases().get("test_alias").writeIndex());
}
final IndexMetaData newIndex = state.metaData().index("test_index-000001");
assertNull(newIndex);
}

public void testRolloverWithNewIndexName() throws Exception {
assertAcked(prepareCreate("test_index").addAlias(new Alias("test_alias")).get());
Alias testAlias = new Alias("test_alias");
boolean explicitWriteIndex = randomBoolean();
if (explicitWriteIndex) {
testAlias.writeIndex(true);
}
assertAcked(prepareCreate("test_index").addAlias(testAlias).get());
index("test_index", "type1", "1", "field", "value");
flush("test_index");
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias")
Expand All @@ -177,9 +237,14 @@ public void testRolloverWithNewIndexName() throws Exception {
assertThat(response.getConditionStatus().size(), equalTo(0));
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final IndexMetaData oldIndex = state.metaData().index("test_index");
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
final IndexMetaData newIndex = state.metaData().index("test_new_index");
assertTrue(newIndex.getAliases().containsKey("test_alias"));
if (explicitWriteIndex) {
assertFalse(oldIndex.getAliases().get("test_alias").writeIndex());
assertTrue(newIndex.getAliases().get("test_alias").writeIndex());
} else {
assertFalse(oldIndex.getAliases().containsKey("test_alias"));
}
}

public void testRolloverOnExistingIndex() throws Exception {
Expand Down
Loading

0 comments on commit fa078f8

Please sign in to comment.