Skip to content

Commit

Permalink
Merge pull request elastic#13918 from mikemccand/immediate_shard_active
Browse files Browse the repository at this point in the history
When shard becomes active again, immediately increase its indexing buffer instead of waiting for up to 30 seconds while indexing with a tiny (500 KB) indexing buffer.
  • Loading branch information
Michael McCandless committed Oct 8, 2015
2 parents 7e53123 + 23f97c3 commit 9688e86
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index;

import org.elasticsearch.common.Nullable;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;

/**
Expand All @@ -58,9 +60,10 @@ public final class IndexServicesProvider {
private final EngineFactory factory;
private final BigArrays bigArrays;
private final IndexSearcherWrapper indexSearcherWrapper;
private final IndexingMemoryController indexingMemoryController;

@Inject
public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper) {
public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, IndicesQueryCache indicesQueryCache, CodecService codecService, TermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, @Nullable IndicesWarmer warmer, SimilarityService similarityService, EngineFactory factory, BigArrays bigArrays, @Nullable IndexSearcherWrapper indexSearcherWrapper, IndexingMemoryController indexingMemoryController) {
this.indicesLifecycle = indicesLifecycle;
this.threadPool = threadPool;
this.mapperService = mapperService;
Expand All @@ -76,6 +79,7 @@ public IndexServicesProvider(IndicesLifecycle indicesLifecycle, ThreadPool threa
this.factory = factory;
this.bigArrays = bigArrays;
this.indexSearcherWrapper = indexSearcherWrapper;
this.indexingMemoryController = indexingMemoryController;
}

public IndicesLifecycle getIndicesLifecycle() {
Expand Down Expand Up @@ -134,5 +138,11 @@ public BigArrays getBigArrays() {
return bigArrays;
}

public IndexSearcherWrapper getIndexSearcherWrapper() { return indexSearcherWrapper; }
public IndexSearcherWrapper getIndexSearcherWrapper() {
return indexSearcherWrapper;
}

public IndexingMemoryController getIndexingMemoryController() {
return indexingMemoryController;
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ protected void writerSegmentStats(SegmentsStats stats) {
stats.addIndexWriterMaxMemoryInBytes(0);
}

/** How much heap Lucene's IndexWriter is using */
abstract public long indexWriterRAMBytesUsed();

protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
ensureOpen();
Map<String, Segment> segments = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -107,8 +108,6 @@ public final class EngineConfig {

public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
public static final ByteSizeValue DEFAULT_INDEX_BUFFER_SIZE = new ByteSizeValue(64, ByteSizeUnit.MB);
public static final ByteSizeValue INACTIVE_SHARD_INDEXING_BUFFER = ByteSizeValue.parseBytesSizeValue("500kb", "INACTIVE_SHARD_INDEXING_BUFFER");

public static final String DEFAULT_VERSION_MAP_SIZE = "25%";

Expand Down Expand Up @@ -139,7 +138,8 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
this.failedEngineListener = failedEngineListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE;
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
indexingBufferSize = IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER;
gcDeletesInMillis = indexSettings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
versionMapSizeSetting = indexSettings.get(INDEX_VERSION_MAP_SIZE, DEFAULT_VERSION_MAP_SIZE);
updateVersionMapSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,11 @@ protected final void writerSegmentStats(SegmentsStats stats) {
stats.addIndexWriterMaxMemoryInBytes((long) (indexWriter.getConfig().getRAMBufferSizeMB() * 1024 * 1024));
}

@Override
public long indexWriterRAMBytesUsed() {
return indexWriter.ramBytesUsed();
}

@Override
public List<Segment> segments(boolean verbose) {
try (ReleasableLock lock = readLock.acquire()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,9 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}

@Override
public long indexWriterRAMBytesUsed() {
// No IndexWriter
throw new UnsupportedOperationException("ShadowEngine has no IndexWriter");
}
}
87 changes: 73 additions & 14 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -83,8 +84,8 @@
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
Expand All @@ -99,6 +100,7 @@
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.percolator.PercolatorService;
Expand All @@ -117,6 +119,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;


public class IndexShard extends AbstractIndexShardComponent implements IndexSettingsService.Listener {

private final ThreadPool threadPool;
Expand Down Expand Up @@ -189,6 +192,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett

private final IndexSearcherWrapper searcherWrapper;

/** True if this shard is still indexing (recently) and false if we've been idle for long enough (as periodically checked by {@link
* IndexingMemoryController}). */
private final AtomicBoolean active = new AtomicBoolean();

private volatile long lastWriteNS;
private final IndexingMemoryController indexingMemoryController;

@Inject
public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardPath path, Store store, IndexServicesProvider provider) {
super(shardId, indexSettings);
Expand Down Expand Up @@ -241,11 +251,16 @@ public IndexShard(ShardId shardId, @IndexSettings Settings indexSettings, ShardP
this.flushThresholdSize = indexSettings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.disableFlush = indexSettings.getAsBoolean(INDEX_TRANSLOG_DISABLE_FLUSH, false);
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.indexingMemoryController = provider.getIndexingMemoryController();

this.searcherWrapper = provider.getIndexSearcherWrapper();
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryParserService, indexingService, mapperService, indexFieldDataService);
if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) {
percolatorQueriesRegistry.enableRealTimePercolator();
}

// We start up inactive
active.set(false);
}

public Store store() {
Expand Down Expand Up @@ -447,7 +462,8 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse
* updated.
*/
public boolean index(Engine.Index index) {
writeAllowed(index.origin());
ensureWriteAllowed(index);
markLastWrite(index);
index = indexingService.preIndex(index);
final boolean created;
try {
Expand All @@ -471,7 +487,8 @@ public Engine.Delete prepareDelete(String type, String id, long version, Version
}

public void delete(Engine.Delete delete) {
writeAllowed(delete.origin());
ensureWriteAllowed(delete);
markLastWrite(delete);
delete = indexingService.preDelete(delete);
try {
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -881,7 +898,24 @@ public void readAllowed() throws IllegalIndexShardStateException {
}
}

private void writeAllowed(Engine.Operation.Origin origin) throws IllegalIndexShardStateException {
/** Returns timestamp of last indexing operation */
public long getLastWriteNS() {
return lastWriteNS;
}

/** Records timestamp of the last write operation, possibly switching {@code active} to true if we were inactive. */
private void markLastWrite(Engine.Operation op) {
lastWriteNS = op.startTime();
if (active.getAndSet(true) == false) {
// We are currently inactive, but a new write operation just showed up, so we now notify IMC
// to wake up and fix our indexing buffer. We could do this async instead, but cost should
// be low, and it's rare this happens.
indexingMemoryController.forceCheck();
}
}

private void ensureWriteAllowed(Engine.Operation op) throws IllegalIndexShardStateException {
Engine.Operation.Origin origin = op.origin();
IndexShardState state = this.state; // one time volatile read

if (origin == Engine.Operation.Origin.PRIMARY) {
Expand Down Expand Up @@ -943,6 +977,8 @@ public void addFailedEngineListener(Engine.FailedEngineListener failedEngineList
this.failedEngineListener.delegates.add(failedEngineListener);
}

/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
* the new buffering indexing size then we do a refresh to free up the heap. */
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {

final EngineConfig config = engineConfig;
Expand All @@ -961,27 +997,50 @@ public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValu
// so we push changes these changes down to IndexWriter:
engine.onSettingsChanged();

if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
// it's inactive: make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize);
long iwBytesUsed = engine.indexWriterRAMBytesUsed();

String message = LoggerMessageFormat.format("updating index_buffer_size from [{}] to [{}]; IndexWriter now using [{}] bytes",
preValue, shardIndexingBufferSize, iwBytesUsed);

if (iwBytesUsed > shardIndexingBufferSize.bytes()) {
// our allowed buffer was changed to less than we are currently using; we ask IW to refresh
// so it clears its buffers (otherwise it won't clear until the next indexing/delete op)
logger.debug(message + "; now refresh to clear IndexWriter memory");

// TODO: should IW have an API to move segments to disk, but not refresh? Its flush method is protected...
try {
refresh("update index buffer");
} catch (Throwable e) {
logger.warn("failed to refresh after setting shard to inactive", e);
logger.warn("failed to refresh after decreasing index buffer", e);
}
} else {
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);
logger.debug(message);
}
}

engine.getTranslog().updateBuffer(shardTranslogBufferSize);
}

public void markAsInactive() {
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, TranslogConfig.INACTIVE_SHARD_TRANSLOG_BUFFER);
indicesLifecycle.onShardInactive(this);
/** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and become inactive (reducing indexing and translog buffers to tiny values) if so. This returns true
* if the shard is inactive. */
public boolean checkIdle(long inactiveTimeNS) {
if (System.nanoTime() - lastWriteNS >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
updateBufferSize(IndexingMemoryController.INACTIVE_SHARD_INDEXING_BUFFER, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER);
logger.debug("shard is now inactive");
indicesLifecycle.onShardInactive(this);
}
}

return active.get() == false;
}

/** Returns {@code true} if this shard is active (has seen indexing ops in the last {@link
* IndexingMemoryController#SHARD_INACTIVE_TIME_SETTING} (default 5 minutes), else {@code false}. */
public boolean getActive() {
return active.get();
}

public final boolean isFlushOnClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.index.shard;

import java.io.IOException;

import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexServicesProvider;
Expand All @@ -28,8 +30,6 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;

import java.io.IOException;

/**
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
* from the primary when a flush happens. It also ensures that a replica being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.TranslogGeneration;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;

import java.nio.file.Path;
Expand All @@ -42,7 +43,6 @@ public final class TranslogConfig {
public static final String INDEX_TRANSLOG_FS_TYPE = "index.translog.fs.type";
public static final String INDEX_TRANSLOG_BUFFER_SIZE = "index.translog.fs.buffer_size";
public static final String INDEX_TRANSLOG_SYNC_INTERVAL = "index.translog.sync_interval";
public static final ByteSizeValue INACTIVE_SHARD_TRANSLOG_BUFFER = ByteSizeValue.parseBytesSizeValue("1kb", "INACTIVE_SHARD_TRANSLOG_BUFFER");

private final TimeValue syncInterval;
private final BigArrays bigArrays;
Expand Down Expand Up @@ -73,7 +73,7 @@ public TranslogConfig(ShardId shardId, Path translogPath, @IndexSettings Setting
this.threadPool = threadPool;
this.bigArrays = bigArrays;
this.type = TranslogWriter.Type.fromString(indexSettings.get(INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.BUFFERED.name()));
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, ByteSizeValue.parseBytesSizeValue("64k", INDEX_TRANSLOG_BUFFER_SIZE)).bytes(); // Not really interesting, updated by IndexingMemoryController...
this.bufferSize = (int) indexSettings.getAsBytesSize(INDEX_TRANSLOG_BUFFER_SIZE, IndexingMemoryController.INACTIVE_SHARD_TRANSLOG_BUFFER).bytes(); // Not really interesting, updated by IndexingMemoryController...

syncInterval = indexSettings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
if (syncInterval.millis() > 0 && threadPool != null) {
Expand Down
Loading

0 comments on commit 9688e86

Please sign in to comment.