Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Oct 10, 2024
1 parent ab6f607 commit cd48cdf
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
* and current merges.
*/
public class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
public class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler implements ElasticsearchMergeScheduler {

protected final Logger logger;
private final Settings indexSettings;
Expand All @@ -47,10 +47,13 @@ public class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeSchedu
this.shardId = shardId;
this.indexSettings = indexSettings.getSettings();
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergeTracking = new MergeTracking(logger, () -> config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
this.mergeTracking = new MergeTracking(
logger,
() -> indexSettings.getMergeSchedulerConfig().isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
refreshConfig();
}

@Override
public Set<OnGoingMerge> onGoingMerges() {
return mergeTracking.onGoingMerges();
}
Expand Down Expand Up @@ -137,11 +140,13 @@ protected MergeThread getMergeThread(MergeSource mergeSource, MergePolicy.OneMer
return thread;
}

MergeStats stats() {
@Override
public MergeStats stats() {
return mergeTracking.stats();
}

void refreshConfig() {
@Override
public void refreshConfig() {
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;

import java.util.Set;

public interface ElasticsearchMergeScheduler {

Set<OnGoingMerge> onGoingMerges();

MergeStats stats();

void refreshConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
Expand Down Expand Up @@ -139,7 +140,7 @@ public class InternalEngine extends Engine {
private volatile long lastDeleteVersionPruneTimeMSec;

private final Translog translog;
private final ElasticsearchConcurrentMergeScheduler mergeScheduler;
private final ElasticsearchMergeScheduler mergeScheduler;

private final IndexWriter indexWriter;

Expand Down Expand Up @@ -248,11 +249,12 @@ public InternalEngine(EngineConfig engineConfig) {
Translog translog = null;
ExternalReaderManager externalReaderManager = null;
ElasticsearchReaderManager internalReaderManager = null;
ElasticsearchConcurrentMergeScheduler scheduler = null;
MergeScheduler scheduler = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
mergeScheduler = scheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
scheduler = (MergeScheduler) mergeScheduler;
throttle = new IndexThrottle();
try {
store.trimUnsafeCommits(config().getTranslogConfig().getTranslogPath());
Expand Down Expand Up @@ -383,7 +385,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {

@Nullable
private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
Engine.IndexCommitListener listener = engineConfig.getIndexCommitListener();
IndexCommitListener listener = engineConfig.getIndexCommitListener();
if (listener != null) {
final IndexCommitListener wrappedListener = Assertions.ENABLED ? assertingCommitsOrderListener(listener) : listener;
return new CombinedDeletionPolicy.CommitsListener() {
Expand Down Expand Up @@ -824,7 +826,7 @@ private GetResult getFromTranslog(
config(),
translogInMemorySegmentsCount::incrementAndGet
);
final Engine.Searcher searcher = new Engine.Searcher(
final Searcher searcher = new Searcher(
"realtime_get",
ElasticsearchDirectoryReader.wrap(inMemoryReader, shardId),
config().getSimilarity(),
Expand All @@ -841,7 +843,7 @@ public GetResult get(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper
Function<Searcher, Searcher> searcherWrapper
) {
try (var ignored = acquireEnsureOpenRef()) {
if (get.realtime()) {
Expand Down Expand Up @@ -875,7 +877,7 @@ protected GetResult realtimeGetUnderLock(
Get get,
MappingLookup mappingLookup,
DocumentParser documentParser,
Function<Engine.Searcher, Engine.Searcher> searcherWrapper,
Function<Searcher, Searcher> searcherWrapper,
boolean getFromSearcher
) {
assert isDrainedForClose() == false;
Expand Down Expand Up @@ -1098,7 +1100,7 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
return true;
}

private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
private boolean assertIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
if (origin == Operation.Origin.PRIMARY) {
assert assertPrimaryIncomingSequenceNumber(origin, seqNo);
} else {
Expand All @@ -1108,7 +1110,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi
return true;
}

protected boolean assertPrimaryIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
protected boolean assertPrimaryIncomingSequenceNumber(final Operation.Origin origin, final long seqNo) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
Expand Down Expand Up @@ -2700,7 +2702,7 @@ private IndexWriterConfig getIndexWriterConfig() {
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler);
iwc.setMergeScheduler((MergeScheduler) mergeScheduler);
// Give us the opportunity to upgrade old segments while performing
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
Expand Down Expand Up @@ -2753,7 +2755,7 @@ private IndexWriterConfig getIndexWriterConfig() {

/** A listener that warms the segments if needed when acquiring a new reader */
static final class RefreshWarmerListener implements BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> {
private final Engine.Warmer warmer;
private final Warmer warmer;
private final Logger logger;
private final AtomicBoolean isEngineClosed;

Expand Down Expand Up @@ -2817,7 +2819,7 @@ LiveIndexWriterConfig getCurrentIndexWriterConfig() {
return indexWriter.getConfig();
}

protected ElasticsearchConcurrentMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
return new EngineMergeScheduler(shardId, indexSettings);
}

Expand All @@ -2831,7 +2833,7 @@ private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeSch

@Override
public synchronized void beforeMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMergeCount();
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
Expand All @@ -2842,7 +2844,7 @@ public synchronized void beforeMerge(OnGoingMerge merge) {

@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMergeCount();
int maxNumMerges = getMaxMergeCount();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
Expand Down Expand Up @@ -2880,25 +2882,29 @@ protected void doRun() {

@Override
protected void handleMergeException(final Throwable exc) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("merge failure action rejected", e);
}

@Override
protected void doRun() throws Exception {
/*
* We do this on another thread rather than the merge thread that we are initially called on so that we have complete
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc));
}
});
mergeException(exc);
}
}

protected void mergeException(final Throwable exc) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.debug("merge failure action rejected", e);
}

@Override
protected void doRun() throws Exception {
/*
* We do this on another thread rather than the merge thread that we are initially called on so that we have complete
* confidence that the call stack does not contain catch statements that would cause the error that might be thrown
* here from being caught and never reaching the uncaught exception handler.
*/
failEngine("merge failed", new MergePolicy.MergeException(exc));
}
});
}

/**
* Commits the specified index writer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public Set<OnGoingMerge> onGoingMerges() {
public OnGoingMerge mergeStarted(MergePolicy.OneMerge merge) {
int totalNumDocs = merge.totalNumDocs();
long totalSizeInBytes = merge.totalBytesSize();
long timeNS = System.nanoTime();
currentMerges.inc();
currentMergesNumDocs.inc(totalNumDocs);
currentMergesSizeInBytes.inc(totalSizeInBytes);
Expand Down Expand Up @@ -115,7 +114,7 @@ public void mergeFinished(final MergePolicy.OneMerge merge, final OnGoingMerge o
}
}

MergeStats stats() {
public MergeStats stats() {
final MergeStats mergeStats = new MergeStats();
mergeStats.add(
totalMerges.count(),
Expand Down

0 comments on commit cd48cdf

Please sign in to comment.