diff --git a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java index 7532ade3fa327..21a77c2e0f2b3 100644 --- a/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/termvectors/TermVectorsResponse.java @@ -305,9 +305,9 @@ private void buildFieldStatistics(XContentBuilder builder, Terms curTerms) throw long sumDocFreq = curTerms.getSumDocFreq(); int docCount = curTerms.getDocCount(); long sumTotalTermFrequencies = curTerms.getSumTotalTermFreq(); - if (docCount > 0) { - assert ((sumDocFreq > 0)) : "docCount >= 0 but sumDocFreq ain't!"; - assert ((sumTotalTermFrequencies > 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!"; + if (docCount >= 0) { + assert ((sumDocFreq >= 0)) : "docCount >= 0 but sumDocFreq ain't!"; + assert ((sumTotalTermFrequencies >= 0)) : "docCount >= 0 but sumTotalTermFrequencies ain't!"; builder.startObject(FieldStrings.FIELD_STATISTICS); builder.field(FieldStrings.SUM_DOC_FREQ, sumDocFreq); builder.field(FieldStrings.DOC_COUNT, docCount); diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index a755044c11334..c01dd986e2526 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -90,7 +90,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; +import java.util.function.BiFunction; public abstract class Engine implements Closeable { @@ -465,8 +465,9 @@ public enum SyncedFlushResult { PENDING_OPERATIONS } - protected final GetResult getFromSearcher(Get get, Function searcherFactory) throws EngineException { - final Searcher searcher = searcherFactory.apply("get"); + protected final GetResult getFromSearcher(Get get, BiFunction searcherFactory, + SearcherScope scope) throws EngineException { + final Searcher searcher = searcherFactory.apply("get", scope); final DocIdAndVersion docIdAndVersion; try { docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid()); @@ -494,23 +495,40 @@ protected final GetResult getFromSearcher(Get get, Function se } } - public abstract GetResult get(Get get, Function searcherFactory) throws EngineException; + public abstract GetResult get(Get get, BiFunction searcherFactory) throws EngineException; + /** * Returns a new searcher instance. The consumer of this * API is responsible for releasing the returned searcher in a * safe manner, preferably in a try/finally block. * + * @param source the source API or routing that triggers this searcher acquire + * * @see Searcher#close() */ public final Searcher acquireSearcher(String source) throws EngineException { + return acquireSearcher(source, SearcherScope.EXTERNAL); + } + + /** + * Returns a new searcher instance. The consumer of this + * API is responsible for releasing the returned searcher in a + * safe manner, preferably in a try/finally block. + * + * @param source the source API or routing that triggers this searcher acquire + * @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes + * + * @see Searcher#close() + */ + public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { boolean success = false; /* Acquire order here is store -> manager since we need * to make sure that the store is not closed before * the searcher is acquired. */ store.incRef(); try { - final SearcherManager manager = getSearcherManager(); // can never be null + final SearcherManager manager = getSearcherManager(source, scope); // can never be null /* This might throw NPE but that's fine we will run ensureOpen() * in the catch block and throw the right exception */ final IndexSearcher searcher = manager.acquire(); @@ -536,6 +554,10 @@ public final Searcher acquireSearcher(String source) throws EngineException { } } + public enum SearcherScope { + EXTERNAL, INTERNAL + } + /** returns the translog for this engine */ public abstract Translog getTranslog(); @@ -768,7 +790,7 @@ public final boolean refreshNeeded() { the store is closed so we need to make sure we increment it here */ try { - return getSearcherManager().isSearcherCurrent() == false; + return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false; } catch (IOException e) { logger.error("failed to access searcher manager", e); failEngine("failed to access searcher manager", e); @@ -1306,7 +1328,7 @@ public void release() { } } - protected abstract SearcherManager getSearcherManager(); + protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope); /** * Method to close the engine while the write lock is held. diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a9540a38be314..3f322d8b044fd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -93,7 +93,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Function; +import java.util.function.BiFunction; import java.util.function.LongSupplier; public class InternalEngine extends Engine { @@ -108,20 +108,18 @@ public class InternalEngine extends Engine { private final IndexWriter indexWriter; - private final SearcherFactory searcherFactory; - private final SearcherManager searcherManager; + private final SearcherManager externalSearcherManager; + private final SearcherManager internalSearcherManager; private final Lock flushLock = new ReentrantLock(); private final ReentrantLock optimizeLock = new ReentrantLock(); // A uid (in the form of BytesRef) to the version map // we use the hashed variant since we iterate over it and check removal and additions on existing keys - private final LiveVersionMap versionMap; + private final LiveVersionMap versionMap = new LiveVersionMap(); private final KeyedLock keyedLock = new KeyedLock<>(); - private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean(); - private volatile SegmentInfos lastCommittedSegmentInfos; private final IndexThrottle throttle; @@ -146,14 +144,13 @@ public class InternalEngine extends Engine { @Nullable private final String historyUUID; - public InternalEngine(EngineConfig engineConfig) throws EngineException { + public InternalEngine(EngineConfig engineConfig) { super(engineConfig); openMode = engineConfig.getOpenMode(); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; - this.versionMap = new LiveVersionMap(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() @@ -163,7 +160,8 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { store.incRef(); IndexWriter writer = null; Translog translog = null; - SearcherManager manager = null; + SearcherManager externalSearcherManager = null; + SearcherManager internalSearcherManager = null; EngineMergeScheduler scheduler = null; boolean success = false; try { @@ -171,7 +169,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); - this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { final SeqNoStats seqNoStats; switch (openMode) { @@ -218,20 +215,21 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { } this.translog = translog; - manager = createSearcherManager(); - this.searcherManager = manager; - this.versionMap.setManager(searcherManager); + internalSearcherManager = createSearcherManager(new SearcherFactory(), false); + externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true); + this.internalSearcherManager = internalSearcherManager; + this.externalSearcherManager = externalSearcherManager; + internalSearcherManager.addListener(versionMap); assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; // don't allow commits until we are done with recovering pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); for (ReferenceManager.RefreshListener listener: engineConfig.getRefreshListeners()) { - searcherManager.addListener(listener); + this.externalSearcherManager.addListener(listener); } success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler); - versionMap.clear(); + IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler); if (isClosed.get() == false) { // failure we need to dec the store reference store.decRef(); @@ -348,6 +346,7 @@ private void recoverFromTranslogInternal() throws IOException { logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration()); flush(true, true); + refresh("translog_recovery"); } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); refreshLastCommittedSegmentInfos(); @@ -454,14 +453,16 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force return uuid; } - private SearcherManager createSearcherManager() throws EngineException { + private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException { boolean success = false; SearcherManager searcherManager = null; try { try { final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); searcherManager = new SearcherManager(directoryReader, searcherFactory); - lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); + if (readSegmentsInfo) { + lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store); + } success = true; return searcherManager; } catch (IOException e) { @@ -481,10 +482,11 @@ private SearcherManager createSearcherManager() throws EngineException { } @Override - public GetResult get(Get get, Function searcherFactory) throws EngineException { + public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { assert Objects.equals(get.uid().field(), uidField) : get.uid().field(); try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); + SearcherScope scope; if (get.realtime()) { VersionValue versionValue = versionMap.getUnderLock(get.uid()); if (versionValue != null) { @@ -495,12 +497,16 @@ public GetResult get(Get get, Function searcherFactory) throws throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } - refresh("realtime_get"); + refresh("realtime_get", SearcherScope.INTERNAL); } + scope = SearcherScope.INTERNAL; + } else { + // we expose what has been externally expose in a point in time snapshot via an explicit refresh + scope = SearcherScope.EXTERNAL; } // no version, get the version from the index, we know that we refresh on flush - return getFromSearcher(get, searcherFactory); + return getFromSearcher(get, searcherFactory, scope); } } @@ -532,7 +538,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) } else { // load from index assert incrementIndexVersionLookup(); - try (Searcher searcher = acquireSearcher("load_seq_no")) { + try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) { DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), op.uid()); if (docAndSeqNo == null) { status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; @@ -959,7 +965,7 @@ private boolean assertDocDoesNotExist(final Index index, final boolean allowDele throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")"); } } else { - try (Searcher searcher = acquireSearcher("assert doc doesn't exist")) { + try (Searcher searcher = acquireSearcher("assert doc doesn't exist", SearcherScope.INTERNAL)) { final long docsWithId = searcher.searcher().count(new TermQuery(index.uid())); if (docsWithId > 0) { throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists [" + docsWithId + "] times in index"); @@ -1200,17 +1206,34 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { @Override public void refresh(String source) throws EngineException { + refresh(source, SearcherScope.EXTERNAL); + } + + final void refresh(String source, SearcherScope scope) throws EngineException { // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - searcherManager.maybeRefreshBlocking(); + switch (scope) { + case EXTERNAL: + // even though we maintain 2 managers we really do the heavy-lifting only once. + // the second refresh will only do the extra work we have to do for warming caches etc. + externalSearcherManager.maybeRefreshBlocking(); + // the break here is intentional we never refresh both internal / external together + break; + case INTERNAL: + internalSearcherManager.maybeRefreshBlocking(); + break; + + default: + throw new IllegalArgumentException("unknown scope: " + scope); + } } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; } catch (Exception e) { try { - failEngine("refresh failed", e); + failEngine("refresh failed source[" + source + "]", e); } catch (Exception inner) { e.addSuppressed(inner); } @@ -1221,36 +1244,20 @@ public void refresh(String source) throws EngineException { // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes // for a long time: maybePruneDeletedTombstones(); - versionMapRefreshPending.set(false); mergeScheduler.refreshConfig(); } @Override public void writeIndexingBuffer() throws EngineException { - // we obtain a read lock here, since we don't want a flush to happen while we are writing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - - // TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two - // searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking - // refresh API), and another for version map interactions. See #15768. final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); final long indexingBufferBytes = indexWriter.ramBytesUsed(); - - final boolean useRefresh = versionMapRefreshPending.get() || (indexingBufferBytes / 4 < versionMapBytes); - if (useRefresh) { - // The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears - logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", - new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); - refresh("write indexing buffer"); - } else { - // Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush: - logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])", - new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); - indexWriter.flush(); - } + logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", + new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); + refresh("write indexing buffer", SearcherScope.INTERNAL); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -1315,10 +1322,11 @@ final boolean tryRenewSyncCommit() { maybeFailEngine("renew sync commit", ex); throw new EngineException(shardId, "failed to renew sync commit", ex); } - if (renewed) { // refresh outside of the write lock - refresh("renew sync commit"); + if (renewed) { + // refresh outside of the write lock + // we have to refresh internal searcher here to ensure we release unreferenced segments. + refresh("renew sync commit", SearcherScope.INTERNAL); } - return renewed; } @@ -1360,7 +1368,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti commitIndexWriter(indexWriter, translog, null); logger.trace("finished commit for flush"); // we need to refresh in order to clear older version values - refresh("version_table_flush"); + refresh("version_table_flush", SearcherScope.INTERNAL); translog.trimUnreferencedReaders(); } catch (Exception e) { throw new FlushFailedEngineException(shardId, e); @@ -1664,8 +1672,11 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { this.versionMap.clear(); + if (internalSearcherManager != null) { + internalSearcherManager.removeListener(versionMap); + } try { - IOUtils.close(searcherManager); + IOUtils.close(externalSearcherManager, internalSearcherManager); } catch (Exception e) { logger.warn("Failed to close SearcherManager", e); } @@ -1697,8 +1708,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { } @Override - protected SearcherManager getSearcherManager() { - return searcherManager; + protected SearcherManager getSearcherManager(String source, SearcherScope scope) { + switch (scope) { + case INTERNAL: + return internalSearcherManager; + case EXTERNAL: + return externalSearcherManager; + default: + throw new IllegalStateException("unknown scope: " + scope); + } } private Releasable acquireLock(BytesRef uid) { @@ -1711,7 +1729,7 @@ private Releasable acquireLock(Term uid) { private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); - try (Searcher searcher = acquireSearcher("load_version")) { + try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 9ee4bd43c2129..7396c3143c651 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -59,8 +59,6 @@ private static class Maps { private volatile Maps maps = new Maps(); - private ReferenceManager mgr; - /** Bytes consumed for each BytesRef UID: * In this base value, we account for the {@link BytesRef} object itself as * well as the header of the byte[] array it holds, and some lost bytes due @@ -98,21 +96,6 @@ private static class Maps { /** Tracks bytes used by tombstones (deletes) */ final AtomicLong ramBytesUsedTombstones = new AtomicLong(); - /** Sync'd because we replace old mgr. */ - synchronized void setManager(ReferenceManager newMgr) { - if (mgr != null) { - mgr.removeListener(this); - } - mgr = newMgr; - - // In case InternalEngine closes & opens a new IndexWriter/SearcherManager, all deletes are made visible, so we clear old and - // current here. This is safe because caller holds writeLock here (so no concurrent adds/deletes can be happeninge): - maps = new Maps(); - - // So we are notified when reopen starts and finishes - mgr.addListener(this); - } - @Override public void beforeRefresh() throws IOException { // Start sending all updates after this point to the new @@ -249,11 +232,6 @@ synchronized void clear() { // and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index // is being closed: //ramBytesUsedTombstones.set(0); - - if (mgr != null) { - mgr.removeListener(this); - mgr = null; - } } @Override diff --git a/core/src/main/java/org/elasticsearch/index/mapper/Uid.java b/core/src/main/java/org/elasticsearch/index/mapper/Uid.java index 4b5ed5fd3cd92..1d5293259317f 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/Uid.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/Uid.java @@ -135,36 +135,36 @@ static boolean isURLBase64WithoutPadding(String id) { // 'xxx=' and 'xxx' could be considered the same id final int length = id.length(); switch (length & 0x03) { - case 0: - break; - case 1: - return false; - case 2: - // the last 2 symbols (12 bits) are encoding 1 byte (8 bits) - // so the last symbol only actually uses 8-6=2 bits and can only take 4 values - char last = id.charAt(length - 1); - if (last != 'A' && last != 'Q' && last != 'g' && last != 'w') { + case 0: + break; + case 1: return false; - } - break; - case 3: - // The last 3 symbols (18 bits) are encoding 2 bytes (16 bits) - // so the last symbol only actually uses 16-12=4 bits and can only take 16 values - last = id.charAt(length - 1); - if (last != 'A' && last != 'E' && last != 'I' && last != 'M' && last != 'Q'&& last != 'U'&& last != 'Y' + case 2: + // the last 2 symbols (12 bits) are encoding 1 byte (8 bits) + // so the last symbol only actually uses 8-6=2 bits and can only take 4 values + char last = id.charAt(length - 1); + if (last != 'A' && last != 'Q' && last != 'g' && last != 'w') { + return false; + } + break; + case 3: + // The last 3 symbols (18 bits) are encoding 2 bytes (16 bits) + // so the last symbol only actually uses 16-12=4 bits and can only take 16 values + last = id.charAt(length - 1); + if (last != 'A' && last != 'E' && last != 'I' && last != 'M' && last != 'Q'&& last != 'U'&& last != 'Y' && last != 'c'&& last != 'g'&& last != 'k' && last != 'o' && last != 's' && last != 'w' && last != '0' && last != '4' && last != '8') { - return false; - } - break; - default: - // number & 0x03 is always in [0,3] - throw new AssertionError("Impossible case"); + return false; + } + break; + default: + // number & 0x03 is always in [0,3] + throw new AssertionError("Impossible case"); } for (int i = 0; i < length; ++i) { final char c = id.charAt(i); final boolean allowed = - (c >= '0' && c <= '9') || + (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '-' || c == '_'; @@ -244,16 +244,16 @@ public static BytesRef encodeId(String id) { } } - private static String decodeNumericId(byte[] idBytes) { - assert Byte.toUnsignedInt(idBytes[0]) == NUMERIC; - int length = (idBytes.length - 1) * 2; + private static String decodeNumericId(byte[] idBytes, int offset, int len) { + assert Byte.toUnsignedInt(idBytes[offset]) == NUMERIC; + int length = (len - 1) * 2; char[] chars = new char[length]; - for (int i = 1; i < idBytes.length; ++i) { - final int b = Byte.toUnsignedInt(idBytes[i]); + for (int i = 1; i < len; ++i) { + final int b = Byte.toUnsignedInt(idBytes[offset + i]); final int b1 = (b >>> 4); final int b2 = b & 0x0f; chars[(i - 1) * 2] = (char) (b1 + '0'); - if (i == idBytes.length - 1 && b2 == 0x0f) { + if (i == len - 1 && b2 == 0x0f) { length--; break; } @@ -262,15 +262,17 @@ private static String decodeNumericId(byte[] idBytes) { return new String(chars, 0, length); } - private static String decodeUtf8Id(byte[] idBytes) { - assert Byte.toUnsignedInt(idBytes[0]) == UTF8; - return new BytesRef(idBytes, 1, idBytes.length - 1).utf8ToString(); + private static String decodeUtf8Id(byte[] idBytes, int offset, int length) { + assert Byte.toUnsignedInt(idBytes[offset]) == UTF8; + return new BytesRef(idBytes, offset + 1, length - 1).utf8ToString(); } - private static String decodeBase64Id(byte[] idBytes) { - assert Byte.toUnsignedInt(idBytes[0]) <= BASE64_ESCAPE; - if (Byte.toUnsignedInt(idBytes[0]) == BASE64_ESCAPE) { - idBytes = Arrays.copyOfRange(idBytes, 1, idBytes.length); + private static String decodeBase64Id(byte[] idBytes, int offset, int length) { + assert Byte.toUnsignedInt(idBytes[offset]) <= BASE64_ESCAPE; + if (Byte.toUnsignedInt(idBytes[offset]) == BASE64_ESCAPE) { + idBytes = Arrays.copyOfRange(idBytes, offset + 1, offset + length); + } else if ((idBytes.length == length && offset == 0) == false) { // no need to copy if it's not a slice + idBytes = Arrays.copyOfRange(idBytes, offset, offset + length); } return Base64.getUrlEncoder().withoutPadding().encodeToString(idBytes); } @@ -278,17 +280,23 @@ private static String decodeBase64Id(byte[] idBytes) { /** Decode an indexed id back to its original form. * @see #encodeId */ public static String decodeId(byte[] idBytes) { - if (idBytes.length == 0) { + return decodeId(idBytes, 0, idBytes.length); + } + + /** Decode an indexed id back to its original form. + * @see #encodeId */ + public static String decodeId(byte[] idBytes, int offset, int length) { + if (length == 0) { throw new IllegalArgumentException("Ids can't be empty"); } - final int magicChar = Byte.toUnsignedInt(idBytes[0]); + final int magicChar = Byte.toUnsignedInt(idBytes[offset]); switch (magicChar) { - case NUMERIC: - return decodeNumericId(idBytes); - case UTF8: - return decodeUtf8Id(idBytes); - default: - return decodeBase64Id(idBytes); + case NUMERIC: + return decodeNumericId(idBytes, offset, length); + case UTF8: + return decodeUtf8Id(idBytes, offset, length); + default: + return decodeBase64Id(idBytes, offset, length); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 76ebba246d66c..eb4859878d513 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1004,6 +1004,7 @@ public Engine.CommitId flush(FlushRequest request) { } final long time = System.nanoTime(); final Engine.CommitId commitId = engine.flush(force, waitIfOngoing); + engine.refresh("flush"); // TODO this is technically wrong we should remove this in 7.0 flushMetric.inc(System.nanoTime() - time); return commitId; } @@ -1031,8 +1032,12 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException { if (logger.isTraceEnabled()) { logger.trace("force merge with {}", forceMerge); } - getEngine().forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), + Engine engine = getEngine(); + engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), forceMerge.onlyExpungeDeletes(), false, false); + if (forceMerge.flush()) { + engine.refresh("force_merge"); // TODO this is technically wrong we should remove this in 7.0 + } } /** @@ -1045,9 +1050,12 @@ public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOE } org.apache.lucene.util.Version previousVersion = minimumCompatibleVersion(); // we just want to upgrade the segments, not actually forge merge to a single segment - getEngine().forceMerge(true, // we need to flush at the end to make sure the upgrade is durable + final Engine engine = getEngine(); + engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment false, true, upgrade.upgradeOnlyAncientSegments()); + engine.refresh("upgrade"); // TODO this is technically wrong we should remove this in 7.0 + org.apache.lucene.util.Version version = minimumCompatibleVersion(); if (logger.isTraceEnabled()) { logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version); @@ -1126,11 +1134,14 @@ public void failShard(String reason, @Nullable Exception e) { // fail the engine. This will cause this shard to also be removed from the node's index service. getEngine().failEngine(reason, e); } - public Engine.Searcher acquireSearcher(String source) { + return acquireSearcher(source, Engine.SearcherScope.EXTERNAL); + } + + private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { readAllowed(); final Engine engine = getEngine(); - final Engine.Searcher searcher = engine.acquireSearcher(source); + final Engine.Searcher searcher = engine.acquireSearcher(source, scope); boolean success = false; try { final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher); diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPutStoredScriptAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPutStoredScriptAction.java index 2ad2db2964d56..27083503195e0 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPutStoredScriptAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPutStoredScriptAction.java @@ -41,6 +41,7 @@ public RestPutStoredScriptAction(Settings settings, RestController controller) { controller.registerHandler(POST, "/_scripts/{id}", this); controller.registerHandler(PUT, "/_scripts/{id}", this); + controller.registerHandler(POST, "/_scripts/{id}/{context}", this); controller.registerHandler(PUT, "/_scripts/{id}/{context}", this); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java index c5385c68839ef..fc4ac58fb15ac 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregator.java @@ -35,16 +35,17 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.function.Supplier; /** * Aggregate all docs that match a filter. */ public class FilterAggregator extends BucketsAggregator implements SingleBucketAggregator { - private final Weight filter; + private final Supplier filter; public FilterAggregator(String name, - Weight filter, + Supplier filter, AggregatorFactories factories, SearchContext context, Aggregator parent, List pipelineAggregators, @@ -57,7 +58,7 @@ public FilterAggregator(String name, public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { // no need to provide deleted docs to the filter - final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.scorerSupplier(ctx)); + final Bits bits = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filter.get().scorerSupplier(ctx)); return new LeafBucketCollectorBase(sub, null) { @Override public void collect(int doc, long bucket) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java index 482bcb3d00951..4b54dccbf96c1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorFactory.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.Weight; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.AggregationInitializationException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; @@ -35,20 +36,40 @@ public class FilterAggregatorFactory extends AggregatorFactory { - final Weight weight; + private Weight weight; + private Query filter; public FilterAggregatorFactory(String name, QueryBuilder filterBuilder, SearchContext context, AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder, Map metaData) throws IOException { super(name, context, parent, subFactoriesBuilder, metaData); - IndexSearcher contextSearcher = context.searcher(); - Query filter = filterBuilder.toFilter(context.getQueryShardContext()); - weight = contextSearcher.createNormalizedWeight(filter, false); + filter = filterBuilder.toFilter(context.getQueryShardContext()); + } + + /** + * Returns the {@link Weight} for this filter aggregation, creating it if + * necessary. This is done lazily so that the {@link Weight} is only created + * if the aggregation collects documents reducing the overhead of the + * aggregation in teh case where no documents are collected. + * + * Note that as aggregations are initialsed and executed in a serial manner, + * no concurrency considerations are necessary here. + */ + public Weight getWeight() { + if (weight == null) { + IndexSearcher contextSearcher = context.searcher(); + try { + weight = contextSearcher.createNormalizedWeight(filter, false); + } catch (IOException e) { + throw new AggregationInitializationException("Failed to initialse filter", e); + } + } + return weight; } @Override public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) throws IOException { - return new FilterAggregator(name, weight, factories, context, parent, pipelineAggregators, metaData); + return new FilterAggregator(name, () -> this.getWeight(), factories, context, parent, pipelineAggregators, metaData); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java index d488d092360d8..97724aa8b9735 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; public class FiltersAggregator extends BucketsAggregator { @@ -115,13 +116,13 @@ public boolean equals(Object obj) { } private final String[] keys; - private Weight[] filters; + private Supplier filters; private final boolean keyed; private final boolean showOtherBucket; private final String otherBucketKey; private final int totalNumKeys; - public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Weight[] filters, boolean keyed, + public FiltersAggregator(String name, AggregatorFactories factories, String[] keys, Supplier filters, boolean keyed, String otherBucketKey, SearchContext context, Aggregator parent, List pipelineAggregators, Map metaData) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); @@ -141,6 +142,7 @@ public FiltersAggregator(String name, AggregatorFactories factories, String[] ke public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { // no need to provide deleted docs to the filter + Weight[] filters = this.filters.get(); final Bits[] bits = new Bits[filters.length]; for (int i = 0; i < filters.length; ++i) { bits[i] = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filters[i].scorerSupplier(ctx)); @@ -164,7 +166,7 @@ public void collect(int doc, long bucket) throws IOException { @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { - List buckets = new ArrayList<>(filters.length); + List buckets = new ArrayList<>(keys.length); for (int i = 0; i < keys.length; i++) { long bucketOrd = bucketOrd(owningBucketOrdinal, i); InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], bucketDocCount(bucketOrd), @@ -184,7 +186,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE @Override public InternalAggregation buildEmptyAggregation() { InternalAggregations subAggs = buildEmptySubAggregations(); - List buckets = new ArrayList<>(filters.length); + List buckets = new ArrayList<>(keys.length); for (int i = 0; i < keys.length; i++) { InternalFilters.InternalBucket bucket = new InternalFilters.InternalBucket(keys[i], 0, subAggs, keyed); buckets.add(bucket); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java index 07c7af1d19d66..048042f05ff65 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorFactory.java @@ -22,6 +22,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.Weight; +import org.elasticsearch.search.aggregations.AggregationInitializationException; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactory; @@ -36,7 +37,8 @@ public class FiltersAggregatorFactory extends AggregatorFactory { private final String[] keys; - final Weight[] weights; + private final Query[] filters; + private Weight[] weights; private final boolean keyed; private final boolean otherBucket; private final String otherBucketKey; @@ -48,21 +50,43 @@ public FiltersAggregatorFactory(String name, List filters, boolean this.keyed = keyed; this.otherBucket = otherBucket; this.otherBucketKey = otherBucketKey; - IndexSearcher contextSearcher = context.searcher(); - weights = new Weight[filters.size()]; keys = new String[filters.size()]; + this.filters = new Query[filters.size()]; for (int i = 0; i < filters.size(); ++i) { KeyedFilter keyedFilter = filters.get(i); this.keys[i] = keyedFilter.key(); - Query filter = keyedFilter.filter().toFilter(context.getQueryShardContext()); - this.weights[i] = contextSearcher.createNormalizedWeight(filter, false); + this.filters[i] = keyedFilter.filter().toFilter(context.getQueryShardContext()); } } + /** + * Returns the {@link Weight}s for this filter aggregation, creating it if + * necessary. This is done lazily so that the {@link Weight}s are only + * created if the aggregation collects documents reducing the overhead of + * the aggregation in the case where no documents are collected. + * + * Note that as aggregations are initialsed and executed in a serial manner, + * no concurrency considerations are necessary here. + */ + public Weight[] getWeights() { + if (weights == null) { + try { + IndexSearcher contextSearcher = context.searcher(); + weights = new Weight[filters.length]; + for (int i = 0; i < filters.length; ++i) { + this.weights[i] = contextSearcher.createNormalizedWeight(filters[i], false); + } + } catch (IOException e) { + throw new AggregationInitializationException("Failed to initialse filters for aggregation [" + name() + "]", e); + } + } + return weights; + } + @Override public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) throws IOException { - return new FiltersAggregator(name, factories, keys, weights, keyed, otherBucket ? otherBucketKey : null, context, parent, + return new FiltersAggregator(name, factories, keys, () -> getWeights(), keyed, otherBucket ? otherBucketKey : null, context, parent, pipelineAggregators, metaData); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index b39bf864ad2b7..c7d721baa6798 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.nested; import com.carrotsearch.hppc.LongArrayList; + import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.ReaderUtil; diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index b673b02496e35..59961130ef4ee 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -905,11 +905,9 @@ protected final void doStop() { // first stop to accept any incoming connections so nobody can connect to this transport for (Map.Entry> entry : serverChannels.entrySet()) { try { - closeChannels(entry.getValue(), true, true); + closeChannels(entry.getValue(), true, false); } catch (Exception e) { - logger.debug( - (Supplier) () -> new ParameterizedMessage( - "Error closing serverChannel for profile [{}]", entry.getKey()), e); + logger.warn(new ParameterizedMessage("Error closing serverChannel for profile [{}]", entry.getKey()), e); } } // we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close @@ -1024,9 +1022,9 @@ protected void innerOnFailure(Exception e) { * * @param channels the channels to close * @param blocking whether the channels should be closed synchronously - * @param closingTransport whether we abort the connection on RST instead of FIN + * @param doNotLinger whether we abort the connection on RST instead of FIN */ - protected abstract void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException; + protected abstract void closeChannels(List channels, boolean blocking, boolean doNotLinger) throws IOException; /** * Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5971cd3877493..d2b15c0a113d9 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -86,6 +86,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; @@ -942,7 +943,7 @@ public void testConcurrentGetAndFlush() throws Exception { engine.index(indexForDoc(doc)); final AtomicReference latestGetResult = new AtomicReference<>(); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); final AtomicBoolean flushFinished = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -977,7 +978,7 @@ public void testSimpleOperations() throws Exception { MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); searchResult.close(); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; // create a document Document document = testDocumentWithTextField(); @@ -1002,6 +1003,12 @@ public void testSimpleOperations() throws Exception { assertThat(getResult.docIdAndVersion(), notNullValue()); getResult.release(); + // but not real time is not yet visible + getResult = engine.get(newGet(false, doc), searcherFactory); + assertThat(getResult.exists(), equalTo(false)); + getResult.release(); + + // refresh and it should be there engine.refresh("test"); @@ -1237,6 +1244,7 @@ public void testRenewSyncFlush() throws Exception { assertTrue(engine.tryRenewSyncCommit()); assertEquals(1, engine.segments(false).size()); } else { + engine.refresh("test"); assertBusy(() -> assertEquals(1, engine.segments(false).size())); } assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); @@ -1306,11 +1314,87 @@ public void testVersioningNewCreate() throws IOException { Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), + create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); } + public void testReplicatedVersioningWithFlush() throws IOException { + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); + Engine.IndexResult indexResult = engine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); + assertTrue(indexResult.isCreated()); + + + create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), + create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + indexResult = replicaEngine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); + assertTrue(indexResult.isCreated()); + + if (randomBoolean()) { + engine.flush(); + } + if (randomBoolean()) { + replicaEngine.flush(); + } + + Engine.Index update = new Engine.Index(newUid(doc), doc, 1); + Engine.IndexResult updateResult = engine.index(update); + assertThat(updateResult.getVersion(), equalTo(2L)); + assertFalse(updateResult.isCreated()); + + + update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(), + update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + updateResult = replicaEngine.index(update); + assertThat(updateResult.getVersion(), equalTo(2L)); + assertFalse(updateResult.isCreated()); + replicaEngine.refresh("test"); + try (Searcher searcher = replicaEngine.acquireSearcher("test")) { + assertEquals(1, searcher.getDirectoryReader().numDocs()); + } + + engine.refresh("test"); + try (Searcher searcher = engine.acquireSearcher("test")) { + assertEquals(1, searcher.getDirectoryReader().numDocs()); + } + } + + /** + * simulates what an upsert / update API does + */ + public void testVersionedUpdate() throws IOException { + final BiFunction searcherFactory = engine::acquireSearcher; + + ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); + Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); + Engine.IndexResult indexResult = engine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + assertEquals(1, get.version()); + } + + Engine.Index update_1 = new Engine.Index(newUid(doc), doc, 1); + Engine.IndexResult update_1_result = engine.index(update_1); + assertThat(update_1_result.getVersion(), equalTo(2L)); + + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + assertEquals(2, get.version()); + } + + Engine.Index update_2 = new Engine.Index(newUid(doc), doc, 2); + Engine.IndexResult update_2_result = engine.index(update_2); + assertThat(update_2_result.getVersion(), equalTo(3L)); + + try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + assertEquals(3, get.version()); + } + + } + public void testVersioningNewIndex() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); @@ -1337,12 +1421,14 @@ public void testForceMerge() throws IOException { assertEquals(numDocs, test.reader().numDocs()); } engine.forceMerge(true, 1, false, false, false); + engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); engine.forceMerge(true, 10, true, false, false); //expunge deletes + engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { @@ -1354,7 +1440,7 @@ public void testForceMerge() throws IOException { index = indexForDoc(doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); engine.forceMerge(true, 10, false, false, false); //expunge deletes - + engine.refresh("test"); assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { assertEquals(numDocs - 2, test.reader().numDocs()); @@ -1561,6 +1647,7 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } if (randomBoolean()) { engine.flush(); + engine.refresh("test"); } firstOp = false; } @@ -1716,11 +1803,12 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion } if (randomBoolean()) { engine.flush(); + engine.refresh("test"); } if (rarely()) { // simulate GC deletes - engine.refresh("gc_simulation"); + engine.refresh("gc_simulation", Engine.SearcherScope.INTERNAL); engine.clearDeletedTombstones(); if (docDeleted) { lastOpVersion = Versions.NOT_FOUND; @@ -1805,6 +1893,7 @@ public void testNonInternalVersioningOnPrimary() throws IOException { } if (randomBoolean()) { engine.flush(); + engine.refresh("test"); } } @@ -1884,7 +1973,7 @@ class OpAndVersion { ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); final Term uidTerm = newUid(doc); engine.index(indexForDoc(doc)); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < thread.length; i++) { thread[i] = new Thread(() -> { startGun.countDown(); @@ -2314,7 +2403,7 @@ public void testEnableGcDeletes() throws Exception { Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; // Add document Document document = testDocument(); @@ -2644,6 +2733,7 @@ public void testTranslogReplay() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); if (flush) { engine.flush(); + engine.refresh("test"); } doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); @@ -3847,7 +3937,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); final Term uid = newUid(doc); - final Function searcherFactory = engine::acquireSearcher; + final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( @@ -4203,4 +4293,58 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { IOUtils.close(recoveringEngine); } } + + + public void assertSameReader(Searcher left, Searcher right) { + List leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves(); + List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); + assertEquals(rightLeaves.size(), leftLeaves.size()); + for (int i = 0; i < leftLeaves.size(); i++) { + assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader()); + } + } + + public void assertNotSameReader(Searcher left, Searcher right) { + List leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves(); + List rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); + if (rightLeaves.size() == leftLeaves.size()) { + for (int i = 0; i < leftLeaves.size(); i++) { + if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) { + return; // all is well + } + } + fail("readers are same"); + } + } + + public void testRefreshScopedSearcher() throws IOException { + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertSameReader(getSearcher, searchSearcher); + } + for (int i = 0; i < 10; i++) { + final String docId = Integer.toString(i); + final ParsedDocument doc = + testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null); + Engine.Index primaryResponse = indexForDoc(doc); + engine.index(primaryResponse); + } + assertTrue(engine.refreshNeeded()); + engine.refresh("test", Engine.SearcherScope.INTERNAL); + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertEquals(10, getSearcher.reader().numDocs()); + assertEquals(0, searchSearcher.reader().numDocs()); + assertNotSameReader(getSearcher, searchSearcher); + } + + engine.refresh("test", Engine.SearcherScope.EXTERNAL); + + try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); + Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){ + assertEquals(10, getSearcher.reader().numDocs()); + assertEquals(10, searchSearcher.reader().numDocs()); + assertSameReader(getSearcher, searchSearcher); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/mapper/UidTests.java b/core/src/test/java/org/elasticsearch/index/mapper/UidTests.java index 10b475e57ff87..c4fb94abd3846 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/UidTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/UidTests.java @@ -79,7 +79,7 @@ public void testEncodeUTF8Ids() { for (int iter = 0; iter < iters; ++iter) { final String id = TestUtil.randomRealisticUnicodeString(random(), 1, 10); BytesRef encoded = Uid.encodeId(id); - assertEquals(id, Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length))); + assertEquals(id, doDecodeId(encoded)); assertTrue(encoded.length <= 1 + new BytesRef(id).length); } } @@ -93,7 +93,7 @@ public void testEncodeNumericIds() { id = "0" + id; } BytesRef encoded = Uid.encodeId(id); - assertEquals(id, Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length))); + assertEquals(id, doDecodeId(encoded)); assertEquals(1 + (id.length() + 1) / 2, encoded.length); } } @@ -105,9 +105,26 @@ public void testEncodeBase64Ids() { random().nextBytes(binaryId); final String id = Base64.getUrlEncoder().withoutPadding().encodeToString(binaryId); BytesRef encoded = Uid.encodeId(id); - assertEquals(id, Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length))); + assertEquals(id, doDecodeId(encoded)); assertTrue(encoded.length <= 1 + binaryId.length); } } + private static String doDecodeId(BytesRef encoded) { + + if (randomBoolean()) { + return Uid.decodeId(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length)); + } else { + if (randomBoolean()) { + BytesRef slicedCopy = new BytesRef(randomIntBetween(encoded.length + 1, encoded.length + 100)); + slicedCopy.offset = randomIntBetween(1, slicedCopy.bytes.length - encoded.length); + slicedCopy.length = encoded.length; + System.arraycopy(encoded.bytes, encoded.offset, slicedCopy.bytes, slicedCopy.offset, encoded.length); + assertArrayEquals(Arrays.copyOfRange(encoded.bytes, encoded.offset, encoded.offset + encoded.length), + Arrays.copyOfRange(slicedCopy.bytes, slicedCopy.offset, slicedCopy.offset + slicedCopy.length)); + encoded = slicedCopy; + } + return Uid.decodeId(encoded.bytes, encoded.offset, encoded.length); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 58dc974018d5b..5dd7a8f9a4017 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1162,7 +1162,7 @@ public void testRefreshMetric() throws IOException { indexDoc(shard, "test", "test"); try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { - assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount + 1)); + assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount)); } closeShards(shard); } diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 1f24d0b079dd1..da90f8023d2ef 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -270,7 +270,6 @@ public void testConcurrentRefresh() throws Exception { * Uses a bunch of threads to index, wait for refresh, and non-realtime get documents to validate that they are visible after waiting * regardless of what crazy sequence of events causes the refresh listener to fire. */ - @TestLogging("_root:debug,org.elasticsearch.index.engine.Engine.DW:trace") public void testLotsOfThreads() throws Exception { int threadCount = between(3, 10); maxListeners = between(1, threadCount * 2); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorTests.java index fb615e66dfb57..f3d057d8e8cd0 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FilterAggregatorTests.java @@ -36,9 +36,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorTestCase; -import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregatorFactory; -import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; import org.hamcrest.Matchers; import org.junit.Before; @@ -121,7 +118,7 @@ public void testParsedAsFilter() throws IOException { AggregatorFactory factory = createAggregatorFactory(builder, indexSearcher, fieldType); assertThat(factory, Matchers.instanceOf(FilterAggregatorFactory.class)); FilterAggregatorFactory filterFactory = (FilterAggregatorFactory) factory; - Query parsedQuery = filterFactory.weight.getQuery(); + Query parsedQuery = filterFactory.getWeight().getQuery(); assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class)); assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size()); // means the bool query has been parsed as a filter, if it was a query minShouldMatch would diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java index 0420e9d5b9b76..6fdf207249f43 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregatorTests.java @@ -214,7 +214,7 @@ public void testParsedAsFilter() throws IOException { AggregatorFactory factory = createAggregatorFactory(builder, indexSearcher, fieldType); assertThat(factory, Matchers.instanceOf(FiltersAggregatorFactory.class)); FiltersAggregatorFactory filtersFactory = (FiltersAggregatorFactory) factory; - Query parsedQuery = filtersFactory.weights[0].getQuery(); + Query parsedQuery = filtersFactory.getWeights()[0].getQuery(); assertThat(parsedQuery, Matchers.instanceOf(BooleanQuery.class)); assertEquals(2, ((BooleanQuery) parsedQuery).clauses().size()); // means the bool query has been parsed as a filter, if it was a query minShouldMatch would diff --git a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index e67324fffe868..54efd231182b6 100644 --- a/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -45,7 +45,7 @@ import static org.hamcrest.Matchers.equalTo; -/** Unit tests for TCPTransport */ +/** Unit tests for {@link TcpTransport} */ public class TcpTransportTests extends ESTestCase { /** Test ipv4 host with a default port works */ @@ -191,7 +191,7 @@ protected Object bind(String name, InetSocketAddress address) throws IOException } @Override - protected void closeChannels(List channel, boolean blocking, boolean closingTransport) throws IOException { + protected void closeChannels(List channel, boolean blocking, boolean doNotLinger) throws IOException { } diff --git a/docs/reference/cat/nodes.asciidoc b/docs/reference/cat/nodes.asciidoc index 74b2d0cc2bc62..151ce80196b50 100644 --- a/docs/reference/cat/nodes.asciidoc +++ b/docs/reference/cat/nodes.asciidoc @@ -27,6 +27,11 @@ The last (`node.role`, `master`, and `name`) columns provide ancillary information that can often be useful when looking at the cluster as a whole, particularly large ones. How many master-eligible nodes do I have? +The `nodes` API accepts an additional URL parameter `full_id` accepting `true` +or `false`. The purpose of this parameter is to format the ID field (if +requested with `id` or `nodeId`) in its full length or in abbreviated form (the +default). + [float] === Columns diff --git a/docs/reference/docs/update-by-query.asciidoc b/docs/reference/docs/update-by-query.asciidoc index 2aee8faadcfd5..59e82ff36c49a 100644 --- a/docs/reference/docs/update-by-query.asciidoc +++ b/docs/reference/docs/update-by-query.asciidoc @@ -98,8 +98,8 @@ parameter in the same way as the search api. So far we've only been updating documents without changing their source. That is genuinely useful for things like <> but it's only half the -fun. `_update_by_query` supports a `script` object to update the document. This -will increment the `likes` field on all of kimchy's tweets: +fun. `_update_by_query` <> to update +the document. This will increment the `likes` field on all of kimchy's tweets: [source,js] -------------------------------------------------- diff --git a/docs/reference/index-modules/similarity.asciidoc b/docs/reference/index-modules/similarity.asciidoc index b20b2ef5369b3..20bf9a51357e4 100644 --- a/docs/reference/index-modules/similarity.asciidoc +++ b/docs/reference/index-modules/similarity.asciidoc @@ -5,7 +5,7 @@ A similarity (scoring / ranking model) defines how matching documents are scored. Similarity is per field, meaning that via the mapping one can define a different similarity per field. -Configuring a custom similarity is considered a expert feature and the +Configuring a custom similarity is considered an expert feature and the builtin similarities are most likely sufficient as is described in <>. diff --git a/docs/reference/indices/templates.asciidoc b/docs/reference/indices/templates.asciidoc index 86f02f850066e..6aefe3ccb4744 100644 --- a/docs/reference/indices/templates.asciidoc +++ b/docs/reference/indices/templates.asciidoc @@ -7,8 +7,10 @@ applied when new indices are created. The templates include both and a simple pattern template that controls whether the template should be applied to the new index. -NOTE: Templates are only applied at index creation time. Changing a template -will have no impact on existing indices. +NOTE: Templates are only applied at index creation time. Changing a template +will have no impact on existing indices. When using the create index API, the +settings/mappings defined as part of the create index call will take precedence +over any matching settings/mappings defined in the template. For example: diff --git a/docs/reference/migration/migrate_6_0.asciidoc b/docs/reference/migration/migrate_6_0.asciidoc index ffebfd9cfe7c6..66ba91f0e3530 100644 --- a/docs/reference/migration/migrate_6_0.asciidoc +++ b/docs/reference/migration/migrate_6_0.asciidoc @@ -21,6 +21,20 @@ way to reindex old indices is to use the `reindex` API. ========================================= +[IMPORTANT] +.Multiple mapping types are not supported in indices created in 6.0 +========================================= + +The ability to have multiple mapping types per index has been removed in 6.0. +New indices will be restricted to a single type. This is the first step in the +plan to remove mapping types altogether. Indices created in 5.x will continue +to support multiple mapping types. + +See <> for more information. + +========================================= + + [float] === Also see: diff --git a/docs/reference/modules/scripting/using.asciidoc b/docs/reference/modules/scripting/using.asciidoc index 646bd4dd0921c..0393472e684dd 100644 --- a/docs/reference/modules/scripting/using.asciidoc +++ b/docs/reference/modules/scripting/using.asciidoc @@ -49,10 +49,7 @@ GET my_index/_search `lang`:: - Specifies the language the script is written in. Defaults to `painless` but - may be set to any of languages listed in <>. The - default language may be changed in the `elasticsearch.yml` config file by - setting `script.default_lang` to the appropriate language. + Specifies the language the script is written in. Defaults to `painless`. `source`, `id`:: @@ -108,6 +105,30 @@ minute will be compiled. You can change this setting dynamically by setting ======================================== +[float] +[[modules-scripting-short-script-form]] +=== Short Script Form +A short script form can be used for brevity. In the short form, `script` is represented +by a string instead of an object. This string contains the source of the script. + +Short form: + +[source,js] +---------------------- + "script": "ctx._source.likes++" +---------------------- +// NOTCONSOLE + +The same script in the normal form: + +[source,js] +---------------------- + "script": { + "source": "ctx._source.likes++" + } +---------------------- +// NOTCONSOLE + [float] [[modules-scripting-stored-scripts]] === Stored Scripts diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 84c86bd2d770a..11e5d2f44a81a 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -331,8 +331,8 @@ protected void sendMessage(Channel channel, BytesReference reference, ActionList } @Override - protected void closeChannels(final List channels, boolean blocking, boolean closingTransport) throws IOException { - if (closingTransport) { + protected void closeChannels(final List channels, boolean blocking, boolean doNotLinger) throws IOException { + if (doNotLinger) { for (Channel channel : channels) { /* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't have a gazillion connections sitting * in TIME_WAIT to free up resources quickly. This is really the only part where we close the connection from the server diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cat.snapshots.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cat.snapshots.json index 90f4ca32730b6..eec22e2e0412d 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cat.snapshots.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cat.snapshots.json @@ -10,7 +10,6 @@ "parts": { "repository": { "type" : "list", - "required": true, "description": "Name of repository from which to fetch the snapshot information" } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/delete_script.json b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_script.json index c61aecd6bb2a8..83bb690cc0428 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/delete_script.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_script.json @@ -10,11 +10,6 @@ "type" : "string", "description" : "Script ID", "required" : true - }, - "lang" : { - "type" : "string", - "description" : "Script language", - "required" : true } }, "params" : { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/get_script.json b/rest-api-spec/src/main/resources/rest-api-spec/api/get_script.json index 1bdc546ad03ac..2240f0e1a0b75 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/get_script.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/get_script.json @@ -10,11 +10,6 @@ "type" : "string", "description" : "Script ID", "required" : true - }, - "lang" : { - "type" : "string", - "description" : "Script language", - "required" : true } }, "params" : { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/put_script.json b/rest-api-spec/src/main/resources/rest-api-spec/api/put_script.json index 45b97f9f2857a..34bd4f63c285e 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/put_script.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/put_script.json @@ -11,10 +11,9 @@ "description" : "Script ID", "required" : true }, - "lang" : { + "context" : { "type" : "string", - "description" : "Script language", - "required" : true + "description" : "Script context" } }, "params" : { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 6229db7a9b62a..6d5b94dd67a05 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -243,8 +243,8 @@ protected void sendMessage(MockChannel mockChannel, BytesReference reference, Ac } @Override - protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { - if (closingTransport) { + protected void closeChannels(List channels, boolean blocking, boolean doNotLinger) throws IOException { + if (doNotLinger) { for (MockChannel channel : channels) { if (channel.activeChannel != null) { /* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't have a gazillion connections sitting diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index c3e3d023b2af3..0c00f34c69a17 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -99,15 +99,15 @@ protected NioServerSocketChannel bind(String name, InetSocketAddress address) th } @Override - protected void closeChannels(List channels, boolean blocking, boolean closingTransport) throws IOException { - if (closingTransport) { + protected void closeChannels(List channels, boolean blocking, boolean doNotLinger) throws IOException { + if (doNotLinger) { for (NioChannel channel : channels) { /* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't have a gazillion connections sitting * in TIME_WAIT to free up resources quickly. This is really the only part where we close the connection from the server * side otherwise the client (node) initiates the TCP closing sequence which doesn't cause these issues. Setting this * by default from the beginning can have unexpected side-effects an should be avoided, our protocol is designed * in a way that clients close connection which is how it should be*/ - if (channel.isOpen()) { + if (channel.isOpen() && channel.getRawChannel().supportedOptions().contains(StandardSocketOptions.SO_LINGER)) { channel.getRawChannel().setOption(StandardSocketOptions.SO_LINGER, 0); } }