Skip to content

Commit

Permalink
Close Index API should force a flush if a sync is needed (#37961)
Browse files Browse the repository at this point in the history
This commit changes the TransportVerifyShardBeforeCloseAction so that it issues a 
forced flush, forcing the translog and the Lucene commit to contain the same max seq 
number and global checkpoint in the case the Translog contains operations that were 
not written in the IndexWriter (like a Delete that touches a non existing doc). This way 
the assertion added in #37426 won't trip.

Related to #33888
  • Loading branch information
tlrx authored Jan 29, 2019
1 parent 504a89f commit 460f10c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.action.admin.indices.close;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -50,6 +52,7 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

public static final String NAME = CloseIndexAction.NAME + "[s]";
protected Logger logger = LogManager.getLogger(getClass());

@Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
Expand Down Expand Up @@ -111,8 +114,10 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
}
indexShard.flush(new FlushRequest());
logger.debug("{} shard is ready for closing", shardId);

final boolean forced = indexShard.isSyncNeeded();
indexShard.flush(new FlushRequest().force(forced));
logger.trace("{} shard is ready for closing [forced:{}]", shardId, forced);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
Expand All @@ -46,6 +48,8 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;

public class FrozenIndexTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -340,4 +344,30 @@ public void testFreezeIndexIncreasesIndexSettingsVersion() throws ExecutionExcep
assertThat(client().admin().cluster().prepareState().get().getState().metaData().index(index).getSettingsVersion(),
equalTo(settingsVersion + 1));
}

public void testFreezeEmptyIndexWithTranslogOps() throws Exception {
final String indexName = "empty";
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", TimeValue.MINUS_ONE)
.build());

final long nbNoOps = randomIntBetween(1, 10);
for (long i = 0; i < nbNoOps; i++) {
final DeleteResponse deleteResponse = client().prepareDelete(indexName, "_doc", Long.toString(i)).get();
assertThat(deleteResponse.status(), is(RestStatus.NOT_FOUND));
}

final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
assertBusy(() -> {
final Index index = client().admin().cluster().prepareState().get().getState().metaData().index(indexName).getIndex();
final IndexService indexService = indicesService.indexService(index);
assertThat(indexService.hasShard(0), is(true));
assertThat(indexService.getShard(0).getGlobalCheckpoint(), greaterThanOrEqualTo(nbNoOps - 1L));
});

assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName)));
assertIndexFrozen(indexName);
}
}

0 comments on commit 460f10c

Please sign in to comment.