Skip to content

Commit

Permalink
Core: Remove ability to run optimize and upgrade async
Browse files Browse the repository at this point in the history
This has been very trappy. Rather than continue to allow buggy behavior
of having upgrade/optimize requests sidestep the single shard per node
limits optimize is supposed to be subject to, this removes
the ability to run the upgrade/optimize async.

closes elastic#9638
  • Loading branch information
rjernst committed Feb 11, 2015
1 parent faae98c commit f735baf
Show file tree
Hide file tree
Showing 17 changed files with 33 additions and 135 deletions.
8 changes: 4 additions & 4 deletions docs/reference/indices/optimize.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ operations (and relates to the number of segments a Lucene index holds
within each shard). The optimize operation allows to reduce the number
of segments by merging them.

This call will block until the optimize is complete. If the http connection
is lost, the request will continue in the background, and
any new requests will block until the previous optimize is complete.

[source,js]
--------------------------------------------------
$ curl -XPOST 'http://localhost:9200/twitter/_optimize'
Expand All @@ -33,10 +37,6 @@ deletes. Defaults to `false`. Note that this won't override the
`flush`:: Should a flush be performed after the optimize. Defaults to
`true`.

`wait_for_merge`:: Should the request wait for the merge to end. Defaults
to `true`. Note, a merge can potentially be a very heavy operation, so
it might make sense to run it set to `false`.

[float]
[[optimize-multi-index]]
=== Multi Index
Expand Down
12 changes: 3 additions & 9 deletions docs/reference/indices/upgrade.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@ NOTE: Upgrading is an I/O intensive operation, and is limited to processing a
single shard per node at a time. It also is not allowed to run at the same
time as optimize.

[float]
[[upgrade-parameters]]
==== Request Parameters

The `upgrade` API accepts the following request parameters:

[horizontal]
`wait_for_completion`:: Should the request wait for the upgrade to complete. Defaults
to `false`.
This call will block until the upgrade is complete. If the http connection
is lost, the request will continue in the background, and
any new requests will block until the previous upgrade is complete.

[float]
=== Check upgrade status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
* <tt>null</tt> for the indices.
* <p/>
* <p>{@link #waitForMerge(boolean)} allows to control if the call will block until the optimize completes and
* defaults to <tt>true</tt>.
* <p/>
* <p>{@link #maxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
* cause the optimize process to optimize down to half the configured number of segments.
*
Expand All @@ -43,14 +40,12 @@
public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest> {

public static final class Defaults {
public static final boolean WAIT_FOR_MERGE = true;
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean UPGRADE = false;
}

private boolean waitForMerge = Defaults.WAIT_FOR_MERGE;

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
Expand All @@ -69,21 +64,6 @@ public OptimizeRequest() {

}

/**
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
*/
public boolean waitForMerge() {
return waitForMerge;
}

/**
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
*/
public OptimizeRequest waitForMerge(boolean waitForMerge) {
this.waitForMerge = waitForMerge;
return this;
}

/**
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
* process to optimize down to half the configured number of segments.
Expand Down Expand Up @@ -151,7 +131,6 @@ public OptimizeRequest upgrade(boolean upgrade) {

public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForMerge = in.readBoolean();
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
Expand All @@ -160,7 +139,6 @@ public void readFrom(StreamInput in) throws IOException {

public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(waitForMerge);
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
Expand All @@ -170,8 +148,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public String toString() {
return "OptimizeRequest{" +
"waitForMerge=" + waitForMerge +
", maxNumSegments=" + maxNumSegments +
"maxNumSegments=" + maxNumSegments +
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
", upgrade=" + upgrade +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
* <tt>null</tt> for the indices.
* <p/>
* <p>{@link #setWaitForMerge(boolean)} allows to control if the call will block until the optimize completes and
* defaults to <tt>true</tt>.
* <p/>
* <p>{@link #setMaxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
* cause the optimize process to optimize down to half the configured number of segments.
*/
Expand All @@ -39,14 +36,6 @@ public OptimizeRequestBuilder(IndicesAdminClient indicesClient) {
super(indicesClient, new OptimizeRequest());
}

/**
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
*/
public OptimizeRequestBuilder setWaitForMerge(boolean waitForMerge) {
request.waitForMerge(waitForMerge);
return this;
}

/**
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
* process to optimize down to half the configured number of segments.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ public Condition newCondition() {
/**
* Optimizes to 1 segment
*/
abstract void forceMerge(boolean flush, boolean waitForMerge);
abstract void forceMerge(boolean flush);

/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;

/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
Expand Down
24 changes: 4 additions & 20 deletions src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -817,12 +817,12 @@ private void waitForMerges(boolean flushAfter, boolean upgrade) {
}

@Override
public void forceMerge(boolean flush, boolean waitForMerge) {
forceMerge(flush, waitForMerge, 1, false, false);
public void forceMerge(boolean flush) {
forceMerge(flush, 1, false, false);
}

@Override
public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
Expand Down Expand Up @@ -855,23 +855,7 @@ public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegm
}
}

// wait for the merges outside of the read lock
if (waitForMerge) {
waitForMerges(flush, upgrade);
} else if (flush || upgrade) {
// we only need to monitor merges for async calls if we are going to flush
engineConfig.getThreadPool().executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("Exception while waiting for merges asynchronously after optimize", t);
}

@Override
protected void doRun() throws Exception {
waitForMerges(flush, upgrade);
}
});
}
waitForMerges(flush, upgrade);
}


Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,7 @@ public void optimize(OptimizeRequest optimize) throws ElasticsearchException {
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
}
engine().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
}

public SnapshotIndexCommit snapshotIndex() throws EngineException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeRequest.listenerThreaded(false);
optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions()));
optimizeRequest.waitForMerge(request.paramAsBoolean("wait_for_merge", optimizeRequest.waitForMerge()));
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));
optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuild

void handlePost(RestRequest request, RestChannel channel, Client client) {
OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeReq.waitForMerge(request.paramAsBoolean("wait_for_completion", false));
optimizeReq.flush(true);
optimizeReq.upgrade(true);
optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void testReusePeerRecovery() throws Exception {
}
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareOptimize("test").setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();

logger.info("--> disabling allocation while the cluster is shut down");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,30 +411,9 @@ public void testVerboseSegments() throws Exception {
public void testSegmentsWithMergeFlag() throws Exception {
final Store store = createStore();
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<>();
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<>();
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {
@Override
public void beforeMerge(OnGoingMerge merge) {
try {
if (waitTillMerge.get() != null) {
waitTillMerge.get().countDown();
}
if (waitForMerge.get() != null) {
waitForMerge.get().await();
}
} catch (InterruptedException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}

@Override
public void afterMerge(OnGoingMerge merge) {
}
});

IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider);

ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
Expand All @@ -456,24 +435,13 @@ public void afterMerge(OnGoingMerge merge) {
for (Segment segment : segments) {
assertThat(segment.getMergeId(), nullValue());
}

waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
engine.forceMerge(false, false);
waitTillMerge.get().await();

for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), notNullValue());
}

waitForMerge.get().countDown();


index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush();
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.forceMerge(true, true);
engine.forceMerge(true);

for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), nullValue());
Expand All @@ -483,25 +451,14 @@ public void afterMerge(OnGoingMerge merge) {

final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.forceMerge(flush, false);
waitTillMerge.get().await();
engine.forceMerge(flush);
for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), nullValue());
}
waitForMerge.get().countDown();

if (flush) {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
try {
// we should have had just 1 merge, so last generation should be exact
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
} catch (IOException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
});
// we should have had just 1 merge, so last generation should be exact
assertEquals(gen2 + 1, store.readLastCommittedSegmentsInfo().getLastGeneration());
}

engine.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void testUpdateThrottleSettings() {

// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
logger.info("test: optimize");
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").get();
logger.info("test: optimize done");

// Record current throttling so far
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testUpdateThrottleSettings() {
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:

// Wait for merges to finish
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").get();
flush();

logger.info("test: test done");
Expand Down Expand Up @@ -369,7 +369,7 @@ public void testUpdateMergeMaxThreadCount() {
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
)
.get();

// Make sure we log the change:
assertTrue(mockAppender.sawUpdateMaxThreadCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void throttleStats() throws Exception {
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
logger.info("test: now optimize");
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").get();
flush();
logger.info("test: test done");
}
Expand Down Expand Up @@ -517,7 +517,7 @@ public void testMergeStats() {
client().prepareIndex("test1", "type2", Integer.toString(i)).setSource("field", "value").execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
}
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats()
.setMerge(true)
.execute().actionGet();
Expand All @@ -544,7 +544,7 @@ public void testSegmentsStats() {
assertThat(stats.getTotal().getSegments().getVersionMapMemoryInBytes(), greaterThan(0l));

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public boolean apply(Object o) {
logger.info("--> Single index upgrade complete");

logger.info("--> Running upgrade on the rest of the indexes");
runUpgrade(httpClient, null, "wait_for_completion", "true");
runUpgrade(httpClient, null);
logSegmentsState();
logger.info("--> Full upgrade complete");
assertUpgraded(httpClient, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,7 @@ public void testParentChildCaching() throws Exception {
client().prepareIndex("test", "child", "c1").setParent("p1").setSource("c_field", "blue").get();
client().prepareIndex("test", "child", "c2").setParent("p1").setSource("c_field", "red").get();
client().prepareIndex("test", "child", "c3").setParent("p2").setSource("c_field", "red").get();
client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").setFlush(true).get();
client().prepareIndex("test", "parent", "p3").setSource("p_field", "p_value3").get();
client().prepareIndex("test", "parent", "p4").setSource("p_field", "p_value4").get();
client().prepareIndex("test", "child", "c4").setParent("p3").setSource("c_field", "green").get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1414,7 +1414,7 @@ public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedExc
}
indexRandom(true, builders);
flushAndRefresh();
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get());
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setMaxNumSegments(1).get());

CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get();
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));
Expand Down
Loading

0 comments on commit f735baf

Please sign in to comment.