From 53a56a309df4b6e67e50943d0d226b3a748a2000 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 10 Dec 2014 18:43:03 +0100 Subject: [PATCH] [ENGINE] Fix updates dynamic settings in InternalEngineHolder After the refactoring in #8784 some settings didn't get passed to the actual engine and there exists a race if the settings are updated while the engine is started such that the actual starting engine doesn't see the latest settings. This commit fixes the concurrency issue as well as adds tests to ensure the settings are reflected. Conflicts: src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java --- .../index/codec/CodecService.java | 7 ++ .../index/engine/internal/InternalEngine.java | 29 +++++ .../engine/internal/InternalEngineHolder.java | 112 +++++++++++------- .../settings/IndexDynamicSettingsModule.java | 7 ++ .../engine/internal/InternalEngineTests.java | 69 ++++++++++- 5 files changed, 175 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/codec/CodecService.java b/src/main/java/org/elasticsearch/index/codec/CodecService.java index 907c1d0e2fd2e..48ff8ea6c59a0 100644 --- a/src/main/java/org/elasticsearch/index/codec/CodecService.java +++ b/src/main/java/org/elasticsearch/index/codec/CodecService.java @@ -99,4 +99,11 @@ public Codec codec(String name) throws ElasticsearchIllegalArgumentException { } return codec; } + + /** + * Returns all registered available codec names + */ + public String[] availableCodecs() { + return codecs.keySet().toArray(new String[0]); + } } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index ff4d316d55778..7eb4fd68c894c 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -1742,4 +1742,33 @@ public Condition newCondition() { throw new UnsupportedOperationException("NoOpLock can't provide a condition"); } } + + long getGcDeletesInMillis() { + return gcDeletesInMillis; + } + + String getCodecName() { + return codecName; + } + + boolean isCompoundOnFlush() { + return compoundOnFlush; + } + + int getIndexConcurrency() { + return indexConcurrency; + } + + boolean isFailEngineOnCorruption() { + return failEngineOnCorruption; + } + + LiveIndexWriterConfig getCurrentIndexWriterConfig() { + IndexWriter writer = currentIndexWriter(); + return writer == null ? null : writer.getConfig(); + } + + public boolean isChecksumOnMerge() { + return checksumOnMerge; + } } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java index 811599f98ae1f..0e7d0bb379a51 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -64,7 +65,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements private final FailEngineOnMergeFailure mergeSchedulerFailureListener; private final ApplySettings settingsListener; private final MergeScheduleListener mergeSchedulerListener; - private volatile Boolean failOnMergeFailure; + protected volatile Boolean failOnMergeFailure; protected volatile boolean failEngineOnCorruption; protected volatile ByteSizeValue indexingBufferSize; protected volatile int indexConcurrency; @@ -144,7 +145,7 @@ public InternalEngineHolder(ShardId shardId, @IndexSettings Settings indexSettin this.mergeSchedulerListener = new MergeScheduleListener(); this.mergeScheduler.addListener(mergeSchedulerListener); - this.settingsListener = new ApplySettings(); + this.settingsListener = new ApplySettings(logger, this); this.indexSettingsService.addListener(this.settingsListener); store.incRef(); } @@ -216,20 +217,19 @@ public synchronized void stop() throws EngineException { @Override public synchronized void close() throws ElasticsearchException { - if (closed) { - return; - } - closed = true; - try { - InternalEngine currentEngine = this.currentEngine.getAndSet(null); - if (currentEngine != null) { - currentEngine.close(); + if (closed == false) { + closed = true; + try { + InternalEngine currentEngine = this.currentEngine.getAndSet(null); + if (currentEngine != null) { + currentEngine.close(); + } + mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); + mergeScheduler.removeListener(mergeSchedulerListener); + indexSettingsService.removeListener(settingsListener); + } finally { + store.decRef(); } - mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); - mergeScheduler.removeListener(mergeSchedulerListener); - indexSettingsService.removeListener(settingsListener); - } finally { - store.decRef(); } } @@ -358,63 +358,83 @@ public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable f } } - class ApplySettings implements IndexSettingsService.Listener { + static class ApplySettings implements IndexSettingsService.Listener { + + private final ESLogger logger; + private final InternalEngineHolder holder; + + ApplySettings(ESLogger logger, InternalEngineHolder holder) { + this.logger = logger; + this.holder = holder; + } @Override public void onRefreshSettings(Settings settings) { - InternalEngine currentEngine = InternalEngineHolder.this.currentEngine.get(); boolean change = false; - long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis)).millis(); - if (gcDeletesInMillis != InternalEngineHolder.this.gcDeletesInMillis) { - logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); - InternalEngineHolder.this.gcDeletesInMillis = gcDeletesInMillis; + long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(holder.gcDeletesInMillis)).millis(); + if (gcDeletesInMillis != holder.gcDeletesInMillis) { + logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(holder.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); + holder.gcDeletesInMillis = gcDeletesInMillis; change = true; } - final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush); - if (compoundOnFlush != InternalEngineHolder.this.compoundOnFlush) { - logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush, compoundOnFlush); - InternalEngineHolder.this.compoundOnFlush = compoundOnFlush; + final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush); + if (compoundOnFlush != holder.compoundOnFlush) { + logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush, compoundOnFlush); + holder.compoundOnFlush = compoundOnFlush; change = true; } - final boolean checksumOnMerge = settings.getAsBoolean(INDEX_CHECKSUM_ON_MERGE, InternalEngineHolder.this.checksumOnMerge); - if (checksumOnMerge != InternalEngineHolder.this.checksumOnMerge) { - logger.info("updating {} from [{}] to [{}]", InternalEngineHolder.INDEX_CHECKSUM_ON_MERGE, InternalEngineHolder.this.checksumOnMerge, checksumOnMerge); - InternalEngineHolder.this.checksumOnMerge = checksumOnMerge; + final boolean checksumOnMerge = settings.getAsBoolean(INDEX_CHECKSUM_ON_MERGE, holder.checksumOnMerge); + if (checksumOnMerge != holder.checksumOnMerge) { + logger.info("updating {} from [{}] to [{}]", InternalEngineHolder.INDEX_CHECKSUM_ON_MERGE, holder.checksumOnMerge, checksumOnMerge); + holder.checksumOnMerge = checksumOnMerge; change = true; } - - final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption); - if (failEngineOnCorruption != InternalEngineHolder.this.failEngineOnCorruption) { - logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption, failEngineOnCorruption); - InternalEngineHolder.this.failEngineOnCorruption = failEngineOnCorruption; + final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption); + if (failEngineOnCorruption != holder.failEngineOnCorruption) { + logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption, failEngineOnCorruption); + holder.failEngineOnCorruption = failEngineOnCorruption; change = true; } - int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngineHolder.this.indexConcurrency); - if (indexConcurrency != InternalEngineHolder.this.indexConcurrency) { - logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngineHolder.this.indexConcurrency, indexConcurrency); - InternalEngineHolder.this.indexConcurrency = indexConcurrency; + int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, holder.indexConcurrency); + if (indexConcurrency != holder.indexConcurrency) { + logger.info("updating index.index_concurrency from [{}] to [{}]", holder.indexConcurrency, indexConcurrency); + holder.indexConcurrency = indexConcurrency; // we have to flush in this case, since it only applies on a new index writer change = true; } - if (!codecName.equals(InternalEngineHolder.this.codecName)) { - logger.info("updating index.codec from [{}] to [{}]", InternalEngineHolder.this.codecName, codecName); - InternalEngineHolder.this.codecName = codecName; + final String codecName = settings.get(INDEX_CODEC, holder.codecName); + if (!codecName.equals(holder.codecName)) { + logger.info("updating index.codec from [{}] to [{}]", holder.codecName, codecName); + holder.codecName = codecName; // we want to flush in this case, so the new codec will be reflected right away... change = true; } - if (failOnMergeFailure != InternalEngineHolder.this.failOnMergeFailure) { - logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, InternalEngineHolder.this.failOnMergeFailure, failOnMergeFailure); - InternalEngineHolder.this.failOnMergeFailure = failOnMergeFailure; + final boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure); + if (failOnMergeFailure != holder.failOnMergeFailure) { + logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure, failOnMergeFailure); + holder.failOnMergeFailure = failOnMergeFailure; } - if (change && currentEngine != null) { - currentEngine.updateSettings(gcDeletesInMillis, compoundOnFlush, checksumOnMerge, failEngineOnCorruption, indexConcurrency, codecName); + + + if (change) { + holder.updateSettings(); } } } + synchronized void updateSettings() { + // we need to make sure that we wait for the engine to be fully initialized + // the start method sets the current engine once it's done but samples the settings + // at construction time. + final InternalEngine engine = currentEngine.get(); + if (engine != null) { + engine.updateSettings(gcDeletesInMillis, compoundOnFlush, checksumOnMerge, failEngineOnCorruption, indexConcurrency, codecName); + } + } + class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener { @Override public void onFailedMerge(MergePolicy.MergeException e) { diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 4004ceb4888b0..318d666997d9e 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -134,4 +134,11 @@ public void addDynamicSetting(String setting, Validator validator) { protected void configure() { bind(DynamicSettings.class).annotatedWith(IndexDynamicSettings.class).toInstance(indexDynamicSettings); } + + /** + * Returns true iff the given setting is in the dynamic settings map. Otherwise false. + */ + public boolean containsSetting(String setting) { + return indexDynamicSettings.hasDynamicSetting(setting); + } } diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 35486b12efc1c..f75310d34a6ef 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -31,6 +31,7 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.IOContext; @@ -65,6 +66,7 @@ import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.SerialMergeSchedulerProvider; +import org.elasticsearch.index.settings.IndexDynamicSettingsModule; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.similarity.SimilarityService; @@ -90,13 +92,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; +import static com.carrotsearch.randomizedtesting.RandomizedTest.*; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; +import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom; import static org.elasticsearch.test.ElasticsearchTestCase.terminate; import static org.hamcrest.Matchers.*; @@ -1486,4 +1487,66 @@ public void testFailStart() throws IOException { } } } + + @Test + public void testSettings() { + final InternalEngineHolder holder = (InternalEngineHolder) engine; + IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule(); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_GC_DELETES)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_CODEC)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_CHECKSUM_ON_MERGE)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY)); + + final int iters = between(1, 20); + for (int i = 0; i < iters; i++) { + boolean compoundOnFlush = randomBoolean(); + boolean failOnCorruption = randomBoolean(); + boolean failOnMerge = randomBoolean(); + boolean checksumOnMerge = randomBoolean(); + long gcDeletes = Math.max(0, randomLong()); + int indexConcurrency = randomIntBetween(1, 20); + String codecName = randomFrom(holder.codecService.availableCodecs()); + + Settings build = ImmutableSettings.builder() + .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption) + .put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush) + .put(InternalEngineHolder.INDEX_GC_DELETES, gcDeletes) + .put(InternalEngineHolder.INDEX_CODEC, codecName) + .put(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE, failOnMerge) + .put(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, indexConcurrency) + .put(InternalEngineHolder.INDEX_CHECKSUM_ON_MERGE, checksumOnMerge) + .build(); + + engineSettingsService.refreshSettings(build); + LiveIndexWriterConfig currentIndexWriterConfig = holder.engineSafe().getCurrentIndexWriterConfig(); + assertEquals(holder.compoundOnFlush, compoundOnFlush); + assertEquals(holder.engineSafe().isCompoundOnFlush(), compoundOnFlush); + assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush); + + + assertEquals(holder.gcDeletesInMillis, gcDeletes); + assertEquals(holder.engineSafe().getGcDeletesInMillis(), gcDeletes); + + assertEquals(holder.codecName, codecName); + assertEquals(holder.engineSafe().getCodecName(), codecName); + assertEquals(currentIndexWriterConfig.getCodec(), holder.codecService.codec(codecName)); + + + assertEquals(holder.failEngineOnCorruption, failOnCorruption); + assertEquals(holder.engineSafe().isFailEngineOnCorruption(), failOnCorruption); + + assertEquals(holder.failOnMergeFailure, failOnMerge); // only on the holder + + assertEquals(holder.indexConcurrency, indexConcurrency); + assertEquals(holder.engineSafe().getIndexConcurrency(), indexConcurrency); + assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency); + + assertEquals(holder.checksumOnMerge, checksumOnMerge); + assertEquals(holder.engineSafe().isChecksumOnMerge(), checksumOnMerge); + assertEquals(currentIndexWriterConfig.getCheckIntegrityAtMerge(), checksumOnMerge); + } + } }