diff --git a/dev-tools/create-bwc-index.py b/dev-tools/create_bwc_index.py similarity index 90% rename from dev-tools/create-bwc-index.py rename to dev-tools/create_bwc_index.py index 1dd295fe1dbc4..cb2ceaf2de380 100644 --- a/dev-tools/create-bwc-index.py +++ b/dev-tools/create_bwc_index.py @@ -19,11 +19,15 @@ import logging import os import random +import shutil import subprocess import sys import tempfile import time +DEFAULT_TRANSPORT_TCP_PORT = 9300 +DEFAULT_HTTP_TCP_PORT = 9200 + if sys.version_info[0] < 3: print('%s must use python 3.x (for the ES python client)' % sys.argv[0]) @@ -126,14 +130,17 @@ def build_version(version_tuple): def build_tuple(version_string): return [int(x) for x in version_string.split('.')] -def start_node(version, release_dir, data_dir, tcp_port, http_port): - logging.info('Starting node from %s on port %s/%s' % (release_dir, tcp_port, http_port)) +def start_node(version, release_dir, data_dir, tcp_port=DEFAULT_TRANSPORT_TCP_PORT, http_port=DEFAULT_HTTP_TCP_PORT, cluster_name=None): + logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, tcp_port, http_port, data_dir)) + if cluster_name is None: + cluster_name = 'bwc_index_' + version + cmd = [ os.path.join(release_dir, 'bin/elasticsearch'), '-Des.path.data=%s' % data_dir, '-Des.path.logs=logs', - '-Des.cluster.name=bwc_index_' + version, - '-Des.network.host=localhost', + '-Des.cluster.name=%s' % cluster_name, + '-Des.network.host=localhost', '-Des.discovery.zen.ping.multicast.enabled=false', '-Des.transport.tcp.port=%s' % tcp_port, '-Des.http.port=%s' % http_port @@ -142,7 +149,7 @@ def start_node(version, release_dir, data_dir, tcp_port, http_port): cmd.append('-f') # version before 1.0 start in background automatically return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) -def create_client(http_port, timeout=30): +def create_client(http_port=DEFAULT_HTTP_TCP_PORT, timeout=30): logging.info('Waiting for node to startup') for _ in range(0, timeout): # TODO: ask Honza if there is a better way to do this? @@ -158,8 +165,6 @@ def create_client(http_port, timeout=30): def generate_index(client, version, index_name): client.indices.delete(index=index_name, ignore=404) - num_shards = random.randint(1, 10) - num_replicas = random.randint(0, 1) logging.info('Create single shard test index') mappings = {} @@ -300,7 +305,7 @@ def compress(tmp_dir, output_dir, zipfile, directory): zipfile = os.path.join(abs_output_dir, zipfile) if os.path.exists(zipfile): os.remove(zipfile) - logging.info('Compressing index into %s', zipfile) + logging.info('Compressing index into %s, tmpDir %s', zipfile, tmp_dir) olddir = os.getcwd() os.chdir(tmp_dir) subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True) @@ -318,9 +323,9 @@ def parse_config(): help='The directory containing elasticsearch releases') parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat', help='The directory to write the zipped index into') - parser.add_argument('--tcp-port', default=9300, type=int, + parser.add_argument('--tcp-port', default=DEFAULT_TRANSPORT_TCP_PORT, type=int, help='The port to use as the minimum port for TCP communication') - parser.add_argument('--http-port', default=9200, type=int, + parser.add_argument('--http-port', default=DEFAULT_HTTP_TCP_PORT, type=int, help='The port to use as the minimum port for HTTP communication') cfg = parser.parse_args() @@ -339,14 +344,17 @@ def create_bwc_index(cfg, version): logging.info('--> Creating bwc index for %s' % version) release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version) if not os.path.exists(release_dir): - parser.error('ES version %s does not exist in %s' % (version, cfg.releases_dir)) + raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir)) snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1') tmp_dir = tempfile.mkdtemp() + data_dir = os.path.join(tmp_dir, 'data') repo_dir = os.path.join(tmp_dir, 'repo') logging.info('Temp data dir: %s' % data_dir) logging.info('Temp repo dir: %s' % repo_dir) + node = None + try: node = start_node(version, release_dir, data_dir, cfg.tcp_port, cfg.http_port) client = create_client(cfg.http_port) @@ -359,16 +367,26 @@ def create_bwc_index(cfg, version): # this after the snapshot, because it calls flush. Otherwise the index # will already have the deletions applied on upgrade. delete_by_query(client, version, index_name, 'doc') - + + shutdown_node(node) + node = None + + compress_index(version, tmp_dir, cfg.output_dir) + if snapshot_supported: + compress_repo(version, tmp_dir, cfg.output_dir) finally: - if 'node' in vars(): - logging.info('Shutting down node with pid %d', node.pid) - node.terminate() - time.sleep(1) # some nodes take time to terminate - compress_index(version, tmp_dir, cfg.output_dir) - if snapshot_supported: - compress_repo(version, tmp_dir, cfg.output_dir) + if node is not None: + # This only happens if we've hit an exception: + shutdown_node(node) + + shutil.rmtree(tmp_dir) + +def shutdown_node(node): + logging.info('Shutting down node with pid %d', node.pid) + node.terminate() + node.wait() + def main(): logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %I:%M:%S %p') diff --git a/dev-tools/create_bwc_index_with_some_ancient_segments.py b/dev-tools/create_bwc_index_with_some_ancient_segments.py new file mode 100644 index 0000000000000..d1162d4690f95 --- /dev/null +++ b/dev-tools/create_bwc_index_with_some_ancient_segments.py @@ -0,0 +1,113 @@ +import create_bwc_index +import logging +import os +import random +import shutil +import subprocess +import sys +import tempfile + +def fetch_version(version): + logging.info('fetching ES version %s' % version) + if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0: + raise RuntimeError('failed to download ES version %s' % version) + +def main(): + ''' + Creates a static back compat index (.zip) with mixed 0.20 (Lucene 3.x) and 0.90 (Lucene 4.x) segments. + ''' + + logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO, + datefmt='%Y-%m-%d %I:%M:%S %p') + logging.getLogger('elasticsearch').setLevel(logging.ERROR) + logging.getLogger('urllib3').setLevel(logging.WARN) + + tmp_dir = tempfile.mkdtemp() + try: + data_dir = os.path.join(tmp_dir, 'data') + logging.info('Temp data dir: %s' % data_dir) + + first_version = '0.20.6' + second_version = '0.90.6' + index_name = 'index-%s-and-%s' % (first_version, second_version) + + # Download old ES releases if necessary: + release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version) + if not os.path.exists(release_dir): + fetch_version(first_version) + + node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name) + client = create_bwc_index.create_client() + + # Creates the index & indexes docs w/ first_version: + create_bwc_index.generate_index(client, first_version, index_name) + + # Make sure we write segments: + flush_result = client.indices.flush(index=index_name) + if not flush_result['ok']: + raise RuntimeError('flush failed: %s' % str(flush_result)) + + segs = client.indices.segments(index=index_name) + shards = segs['indices'][index_name]['shards'] + if len(shards) != 1: + raise RuntimeError('index should have 1 shard but got %s' % len(shards)) + + first_version_segs = shards['0'][0]['segments'].keys() + + create_bwc_index.shutdown_node(node) + print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8'))) + node = None + + release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version) + if not os.path.exists(release_dir): + fetch_version(second_version) + + # Now also index docs with second_version: + node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name) + client = create_bwc_index.create_client() + + # If we index too many docs, the random refresh/flush causes the ancient segments to be merged away: + num_docs = 10 + create_bwc_index.index_documents(client, index_name, 'doc', num_docs) + + # Make sure we get a segment: + flush_result = client.indices.flush(index=index_name) + if not flush_result['ok']: + raise RuntimeError('flush failed: %s' % str(flush_result)) + + # Make sure we see mixed segments (it's possible Lucene could have "accidentally" merged away the first_version segments): + segs = client.indices.segments(index=index_name) + shards = segs['indices'][index_name]['shards'] + if len(shards) != 1: + raise RuntimeError('index should have 1 shard but got %s' % len(shards)) + + second_version_segs = shards['0'][0]['segments'].keys() + #print("first: %s" % first_version_segs) + #print("second: %s" % second_version_segs) + + for segment_name in first_version_segs: + if segment_name in second_version_segs: + # Good: an ancient version seg "survived": + break + else: + raise RuntimeError('index has no first_version segs left') + + for segment_name in second_version_segs: + if segment_name not in first_version_segs: + # Good: a second_version segment was written + break + else: + raise RuntimeError('index has no second_version segs left') + + create_bwc_index.shutdown_node(node) + print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8'))) + node = None + create_bwc_index.compress_index('%s-and-%s' % (first_version, second_version), tmp_dir, 'src/test/resources/org/elasticsearch/rest/action/admin/indices/upgrade') + finally: + if node is not None: + create_bwc_index.shutdown_node(node) + shutil.rmtree(tmp_dir) + +if __name__ == '__main__': + main() + diff --git a/docs/reference/indices/upgrade.asciidoc b/docs/reference/indices/upgrade.asciidoc index 5b4ebb9ec790a..295a407f97921 100644 --- a/docs/reference/indices/upgrade.asciidoc +++ b/docs/reference/indices/upgrade.asciidoc @@ -21,12 +21,30 @@ This call will block until the upgrade is complete. If the http connection is lost, the request will continue in the background, and any new requests will block until the previous upgrade is complete. +[float] +[[upgrade-parameters]] +==== Request Parameters + +The `upgrade` API accepts the following request parameters: + +[horizontal] +`only_ancient_segments`:: If true, only very old segments (from a +previous Lucene major release) will be upgraded. While this will do +the minimal work to ensure the next major release of Elasticsearch can +read the segments, it's dangerous because it can leave other very old +segments in sub-optimal formats. Defaults to `false`. + [float] === Check upgrade status Use a `GET` request to monitor how much of an index is upgraded. This -can also be used prior to starting an upgrade to identify which indices -you want to upgrade at the same time. +can also be used prior to starting an upgrade to identify which +indices you want to upgrade at the same time. + +The `ancient` byte values that are returned indicate total bytes of +segments whose version is extremely old (Lucene major version is +different from the current version), showing how much upgrading is +necessary when you run with `only_ancient_segments=true`. [source,sh] -------------------------------------------------- @@ -41,6 +59,8 @@ curl 'http://localhost:9200/twitter/_upgrade?pretty&human' "size_in_bytes": "21000000000", "size_to_upgrade": "10gb", "size_to_upgrade_in_bytes": "10000000000" + "size_to_upgrade_ancient": "1gb", + "size_to_upgrade_ancient_in_bytes": "1000000000" } } -------------------------------------------------- diff --git a/rest-api-spec/api/indices.upgrade.json b/rest-api-spec/api/indices.upgrade.json index ce8cfdfbe3c29..0e5e4ffd244c6 100644 --- a/rest-api-spec/api/indices.upgrade.json +++ b/rest-api-spec/api/indices.upgrade.json @@ -27,8 +27,12 @@ "description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)" }, "wait_for_completion": { + "type" : "boolean", + "description" : "Specify whether the request should block until the all segments are upgraded (default: false)" + }, + "only_ancient_segments": { "type" : "boolean", - "description" : "Specify whether the request should block until the all segments are upgraded (default: false)" + "description" : "If true, only ancient (an older Lucene major release) segments will be upgraded" } } }, diff --git a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java index 9994627e0fc77..d5b822f58cb1b 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java @@ -44,12 +44,14 @@ public static final class Defaults { public static final boolean ONLY_EXPUNGE_DELETES = false; public static final boolean FLUSH = true; public static final boolean UPGRADE = false; + public static final boolean UPGRADE_ONLY_ANCIENT_SEGMENTS = false; } private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS; private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES; private boolean flush = Defaults.FLUSH; private boolean upgrade = Defaults.UPGRADE; + private boolean upgradeOnlyAncientSegments = Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS; /** * Constructs an optimization request over one or more indices. @@ -136,6 +138,7 @@ public void readFrom(StreamInput in) throws IOException { onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); upgrade = in.readBoolean(); + upgradeOnlyAncientSegments = in.readBoolean(); } @Override @@ -145,6 +148,23 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); out.writeBoolean(upgrade); + out.writeBoolean(upgradeOnlyAncientSegments); + } + + /** + * Should the merge upgrade only the ancient (older major version of Lucene) segments? + * Defaults to false. + */ + public boolean upgradeOnlyAncientSegments() { + return upgradeOnlyAncientSegments; + } + + /** + * See {@link #upgradeOnlyAncientSegments()} + */ + public OptimizeRequest upgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) { + this.upgradeOnlyAncientSegments = upgradeOnlyAncientSegments; + return this; } @Override @@ -154,6 +174,7 @@ public String toString() { ", onlyExpungeDeletes=" + onlyExpungeDeletes + ", flush=" + flush + ", upgrade=" + upgrade + + ", upgradeOnlyAncientSegments=" + upgradeOnlyAncientSegments + '}'; } } diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 83ebfb72a7c8e..392a663d29310 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -430,13 +430,13 @@ public final boolean refreshNeeded() { * Optimizes to 1 segment */ public void forceMerge(boolean flush) { - forceMerge(flush, 1, false, false); + forceMerge(flush, 1, false, false, false); } /** * Triggers a forced merge on this engine */ - public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException; + public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException; /** * Snapshots the index and returns a handle to it. Will always try and "commit" the diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ab0088aa4f6f7..b4db6b93f179f 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -615,7 +615,7 @@ private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) } } /* - * Unfortunately the lock order is important here. We have to acquire the readlock fist otherwise + * Unfortunately the lock order is important here. We have to acquire the readlock first otherwise * if we are flushing at the end of the recovery while holding the write lock we can deadlock if: * Thread 1: flushes via API and gets the flush lock but blocks on the readlock since Thread 2 has the writeLock * Thread 2: flushes at the end of the recovery holding the writeLock and blocks on the flushLock owned by Thread 1 @@ -742,7 +742,8 @@ private void pruneDeletedTombstones() { } @Override - public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException { + public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, + final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException { /* * We do NOT acquire the readlock here since we are waiting on the merges to finish * that's fine since the IW.rollback should stop all the threads and trigger an IOException @@ -760,8 +761,8 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu try { ensureOpen(); if (upgrade) { - logger.info("starting segment upgrade"); - mp.setUpgradeInProgress(true); + logger.info("starting segment upgrade upgradeOnlyAncientSegments={}", upgradeOnlyAncientSegments); + mp.setUpgradeInProgress(true, upgradeOnlyAncientSegments); } store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize try { @@ -789,7 +790,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu throw ex; } finally { try { - mp.setUpgradeInProgress(false); // reset it just to make sure we reset it in a case of an error + mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error } finally { optimizeLock.unlock(); } diff --git a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index c49758398bc0e..31c5a23c578a8 100644 --- a/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -147,7 +147,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { } @Override - public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException { + public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException { // no-op logger.trace("skipping FORCE-MERGE on shadow engine"); } diff --git a/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java b/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java index d53a809163dda..dcd58e40f5ab6 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java @@ -48,7 +48,13 @@ public final class ElasticsearchMergePolicy extends MergePolicy { private static ESLogger logger = Loggers.getLogger(ElasticsearchMergePolicy.class); private final MergePolicy delegate; + + // True if the next merge request should do segment upgrades: private volatile boolean upgradeInProgress; + + // True if the next merge request should only upgrade ancient (an older Lucene major version than current) segments; + private volatile boolean upgradeOnlyAncientSegments; + private static final int MAX_CONCURRENT_UPGRADE_MERGES = 5; /** @param delegate the merge policy to wrap */ @@ -113,6 +119,26 @@ public MergeSpecification findMerges(MergeTrigger mergeTrigger, return upgradedMergeSpecification(delegate.findMerges(mergeTrigger, segmentInfos, writer)); } + private boolean shouldUpgrade(SegmentCommitInfo info) { + org.apache.lucene.util.Version old = info.info.getVersion(); + org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion; + + // Something seriously wrong if this trips: + assert old.major <= cur.major; + + if (cur.major > old.major) { + // Always upgrade segment if Lucene's major version is too old + return true; + } + if (upgradeOnlyAncientSegments == false && cur.minor > old.minor) { + // If it's only a minor version difference, and we are not upgrading only ancient segments, + // also upgrade: + return true; + } + // Version matches, or segment is not ancient and we are only upgrading ancient segments: + return false; + } + @Override public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, IndexWriter writer) @@ -121,27 +147,35 @@ public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, if (upgradeInProgress) { MergeSpecification spec = new IndexUpgraderMergeSpecification(); for (SegmentCommitInfo info : segmentInfos) { - org.apache.lucene.util.Version old = info.info.getVersion(); - org.apache.lucene.util.Version cur = Version.CURRENT.luceneVersion; - if (cur.major > old.major || - cur.major == old.major && cur.minor > old.minor) { + + if (shouldUpgrade(info)) { + // TODO: Use IndexUpgradeMergePolicy instead. We should be comparing codecs, // for now we just assume every minor upgrade has a new format. logger.debug("Adding segment " + info.info.name + " to be upgraded"); spec.add(new OneMerge(Lists.newArrayList(info))); } + + // TODO: we could check IndexWriter.getMergingSegments and avoid adding merges that IW will just reject? + if (spec.merges.size() == MAX_CONCURRENT_UPGRADE_MERGES) { // hit our max upgrades, so return the spec. we will get a cascaded call to continue. logger.debug("Returning " + spec.merges.size() + " merges for upgrade"); return spec; } } + // We must have less than our max upgrade merges, so the next return will be our last in upgrading mode. - upgradeInProgress = false; if (spec.merges.isEmpty() == false) { - logger.debug("Return " + spec.merges.size() + " merges for end of upgrade"); + logger.debug("Returning " + spec.merges.size() + " merges for end of upgrade"); return spec; } + + // Only set this once there are 0 segments needing upgrading, because when we return a + // spec, IndexWriter may (silently!) reject that merge if some of the segments we asked + // to be merged were already being (naturally) merged: + upgradeInProgress = false; + // fall through, so when we don't have any segments to upgrade, the delegate policy // has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount) } @@ -166,8 +200,9 @@ public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegme * {@link IndexWriter#forceMerge} that is handled by this {@link MergePolicy}, as well as * cascading calls made by {@link IndexWriter}. */ - public void setUpgradeInProgress(boolean upgrade) { + public void setUpgradeInProgress(boolean upgrade, boolean onlyAncientSegments) { this.upgradeInProgress = upgrade; + this.upgradeOnlyAncientSegments = onlyAncientSegments; } @Override diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9e20a73b26be1..1ce97d60d483e 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -702,7 +702,8 @@ public void optimize(OptimizeRequest optimize) throws ElasticsearchException { if (logger.isTraceEnabled()) { logger.trace("optimize with {}", optimize); } - engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade()); + engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), + optimize.upgrade(), optimize.upgradeOnlyAncientSegments()); } public SnapshotIndexCommit snapshotIndex() throws EngineException { diff --git a/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java b/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java index cc9a6281c51b6..23509582a669c 100644 --- a/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java +++ b/src/main/java/org/elasticsearch/rest/action/admin/indices/upgrade/RestUpgradeAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.support.RestBuilderListener; +import java.io.IOException; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -73,12 +74,10 @@ void handleGet(RestRequest request, RestChannel channel, Client client) { public RestResponse buildResponse(IndicesSegmentResponse response, XContentBuilder builder) throws Exception { builder.startObject(); - // TODO: getIndices().values() is what IndecesSegmentsResponse uses, but this will produce different orders with jdk8? + // TODO: getIndices().values() is what IndicesSegmentsResponse uses, but this will produce different orders with jdk8? for (IndexSegments indexSegments : response.getIndices().values()) { - Tuple summary = calculateUpgradeStatus(indexSegments); builder.startObject(indexSegments.getIndex()); - builder.byteSizeField(SIZE_IN_BYTES, SIZE, summary.v1()); - builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, summary.v2()); + buildUpgradeStatus(indexSegments, builder); builder.endObject(); } @@ -92,6 +91,7 @@ void handlePost(RestRequest request, RestChannel channel, Client client) { OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index"))); optimizeReq.flush(true); optimizeReq.upgrade(true); + optimizeReq.upgradeOnlyAncientSegments(request.paramAsBoolean("only_ancient_segments", false)); optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment client.admin().indices().optimize(optimizeReq, new RestBuilderListener(channel) { @Override @@ -104,15 +104,18 @@ public RestResponse buildResponse(OptimizeResponse response, XContentBuilder bui }); } - Tuple calculateUpgradeStatus(IndexSegments indexSegments) { + void buildUpgradeStatus(IndexSegments indexSegments, XContentBuilder builder) throws IOException { long total_bytes = 0; long to_upgrade_bytes = 0; + long to_upgrade_bytes_ancient = 0; for (IndexShardSegments shard : indexSegments) { for (ShardSegments segs : shard.getShards()) { for (Segment seg : segs.getSegments()) { total_bytes += seg.sizeInBytes; - if (seg.version.major != Version.CURRENT.luceneVersion.major || - seg.version.minor != Version.CURRENT.luceneVersion.minor) { + if (seg.version.major != Version.CURRENT.luceneVersion.major) { + to_upgrade_bytes_ancient += seg.sizeInBytes; + to_upgrade_bytes += seg.sizeInBytes; + } else if (seg.version.minor != Version.CURRENT.luceneVersion.minor) { // TODO: this comparison is bogus! it would cause us to upgrade even with the same format // instead, we should check if the codec has changed to_upgrade_bytes += seg.sizeInBytes; @@ -120,11 +123,16 @@ Tuple calculateUpgradeStatus(IndexSegments indexSegments) { } } } - return new Tuple<>(total_bytes, to_upgrade_bytes); + + builder.byteSizeField(SIZE_IN_BYTES, SIZE, total_bytes); + builder.byteSizeField(SIZE_TO_UPGRADE_IN_BYTES, SIZE_TO_UPGRADE, to_upgrade_bytes); + builder.byteSizeField(SIZE_TO_UPGRADE_ANCIENT_IN_BYTES, SIZE_TO_UPGRADE_ANCIENT, to_upgrade_bytes_ancient); } static final XContentBuilderString SIZE = new XContentBuilderString("size"); static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); static final XContentBuilderString SIZE_TO_UPGRADE = new XContentBuilderString("size_to_upgrade"); + static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT = new XContentBuilderString("size_to_upgrade_ancient"); static final XContentBuilderString SIZE_TO_UPGRADE_IN_BYTES = new XContentBuilderString("size_to_upgrade_in_bytes"); + static final XContentBuilderString SIZE_TO_UPGRADE_ANCIENT_IN_BYTES = new XContentBuilderString("size_to_upgrade_ancient_in_bytes"); } diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 1afd76b46e065..426030aef2ae7 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -113,9 +113,11 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version); if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) { - logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); + logger.warn("Message not fully read (request) for requestId [{}], action [{}], readerIndex [{}] vs expected [{}]; resetting", + requestId, action, buffer.readerIndex(), expectedIndexReader); } else { - logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action); + logger.warn("Message read past expected size (request) for requestId=[{}], action [{}], readerIndex [{}] vs expected [{}]; resetting", + requestId, action, buffer.readerIndex(), expectedIndexReader); } buffer.readerIndex(expectedIndexReader); } diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4778a5d9368ce..ad9d9c57cec32 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1034,13 +1034,13 @@ public void testForceMerge() { try (Engine.Searcher test = engine.acquireSearcher("test")) { assertEquals(numDocs, test.reader().numDocs()); } - engine.forceMerge(true, 1, false, false); + engine.forceMerge(true, 1, false, false, false); assertEquals(engine.segments(true).size(), 1); ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), B_1, false); Engine.Index index = new Engine.Index(null, newUid(Integer.toString(0)), doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); - engine.forceMerge(true, 10, true, false); //expunge deletes + engine.forceMerge(true, 10, true, false, false); //expunge deletes assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { @@ -1051,7 +1051,7 @@ public void testForceMerge() { doc = testParsedDocument(Integer.toString(1), Integer.toString(1), "test", null, -1, -1, testDocument(), B_1, false); index = new Engine.Index(null, newUid(Integer.toString(1)), doc); engine.delete(new Engine.Delete(index.type(), index.id(), index.uid())); - engine.forceMerge(true, 10, false, false); //expunge deletes + engine.forceMerge(true, 10, false, false, false); //expunge deletes assertEquals(engine.segments(true).size(), 1); try (Engine.Searcher test = engine.acquireSearcher("test")) { @@ -1089,7 +1089,7 @@ public void run() { engine.refresh("test"); indexed.countDown(); try { - engine.forceMerge(randomBoolean(), 1, false, randomBoolean()); + engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean()); } catch (ForceMergeFailedEngineException ex) { // ok return; @@ -1105,7 +1105,7 @@ public void run() { startGun.countDown(); int someIters = randomIntBetween(1, 10); for (int i = 0; i < someIters; i++) { - engine.forceMerge(randomBoolean(), 1, false, randomBoolean()); + engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean()); } indexed.await(); IOUtils.close(engine, translog); @@ -1711,7 +1711,7 @@ public void testDeletesAloneCanTriggerRefresh() throws Exception { } // Force merge so we know all merges are done before we start deleting: - engine.forceMerge(true, 1, false, false); + engine.forceMerge(true, 1, false, false, false); Searcher s = engine.acquireSearcher("test"); final long version1 = ((DirectoryReader) s.reader()).getVersion(); diff --git a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java index 3cd5b3981f269..264d1385874fd 100644 --- a/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java +++ b/src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeTest.java @@ -137,32 +137,34 @@ public boolean apply(Object o) { logger.info("--> Nodes upgrade complete"); logSegmentsState(); - final HttpRequestBuilder httpClient = httpClient(); - - assertNotUpgraded(httpClient, null); + assertNotUpgraded(httpClient(), null); final String indexToUpgrade = "test" + randomInt(numIndexes - 1); + + // This test fires up another node running an older version of ES, but because wire protocol changes across major ES versions, it + // means we can never generate ancient segments in this test (unless Lucene major version bumps but ES major version does not): + assertFalse(hasAncientSegments(httpClient(), indexToUpgrade)); logger.info("--> Running upgrade on index " + indexToUpgrade); - runUpgrade(httpClient, indexToUpgrade); + runUpgrade(httpClient(), indexToUpgrade); awaitBusy(new Predicate() { @Override public boolean apply(Object o) { try { - return isUpgraded(httpClient, indexToUpgrade); + return isUpgraded(httpClient(), indexToUpgrade); } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); } } }); logger.info("--> Single index upgrade complete"); - + logger.info("--> Running upgrade on the rest of the indexes"); - runUpgrade(httpClient, null); + runUpgrade(httpClient(), null); logSegmentsState(); logger.info("--> Full upgrade complete"); - assertUpgraded(httpClient, null); + assertUpgraded(httpClient(), null); } - + static String upgradePath(String index) { String path = "/_upgrade"; if (index != null) { @@ -182,6 +184,39 @@ public static void assertNotUpgraded(HttpRequestBuilder httpClient, String index } } + public static void assertNoAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception { + for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { + assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); + // TODO: it would be better for this to be strictly greater, but sometimes an extra flush + // mysteriously happens after the second round of docs are indexed + assertTrue("index " + status.indexName + " should not have any ancient segments", + status.toUpgradeBytesAncient == 0); + assertTrue("index " + status.indexName + " should have recovered some segments from transaction log", + status.totalBytes >= status.toUpgradeBytes); + assertTrue("index " + status.indexName + " should need upgrading", status.toUpgradeBytes != 0); + } + } + + /** Returns true if there are any ancient segments. */ + public static boolean hasAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception { + for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { + if (status.toUpgradeBytesAncient != 0) { + return true; + } + } + return false; + } + + /** Returns true if there are any old but not ancient segments. */ + public static boolean hasOldButNotAncientSegments(HttpRequestBuilder httpClient, String index) throws Exception { + for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { + if (status.toUpgradeBytes > status.toUpgradeBytesAncient) { + return true; + } + } + return false; + } + public static void assertUpgraded(HttpRequestBuilder httpClient, String index) throws Exception { for (UpgradeStatus status : getUpgradeStatus(httpClient, upgradePath(index))) { assertTrue("index " + status.indexName + " should not be zero sized", status.totalBytes != 0); @@ -209,7 +244,7 @@ public static void assertUpgraded(HttpRequestBuilder httpClient, String index) t } } } - + static boolean isUpgraded(HttpRequestBuilder httpClient, String index) throws Exception { ESLogger logger = Loggers.getLogger(UpgradeTest.class); int toUpgrade = 0; @@ -224,11 +259,14 @@ static class UpgradeStatus { public final String indexName; public final int totalBytes; public final int toUpgradeBytes; + public final int toUpgradeBytesAncient; - public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes) { + public UpgradeStatus(String indexName, int totalBytes, int toUpgradeBytes, int toUpgradeBytesAncient) { this.indexName = indexName; this.totalBytes = totalBytes; this.toUpgradeBytes = toUpgradeBytes; + this.toUpgradeBytesAncient = toUpgradeBytesAncient; + assert toUpgradeBytesAncient <= toUpgradeBytes; } } @@ -256,7 +294,9 @@ static List getUpgradeStatus(HttpRequestBuilder httpClient, Strin assertTrue("missing key size_to_upgrade_in_bytes for index " + index, status.containsKey("size_to_upgrade_in_bytes")); Object toUpgradeBytes = status.get("size_to_upgrade_in_bytes"); assertTrue("size_to_upgrade_in_bytes for index " + index + " is not an integer", toUpgradeBytes instanceof Integer); - ret.add(new UpgradeStatus(index, (Integer)totalBytes, (Integer)toUpgradeBytes)); + Object toUpgradeBytesAncient = status.get("size_to_upgrade_ancient_in_bytes"); + assertTrue("size_to_upgrade_ancient_in_bytes for index " + index + " is not an integer", toUpgradeBytesAncient instanceof Integer); + ret.add(new UpgradeStatus(index, (Integer) totalBytes, (Integer) toUpgradeBytes, (Integer) toUpgradeBytesAncient)); } return ret; } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 37f369fe8b9e2..02ad6c76af0df 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -1003,7 +1003,7 @@ public ClusterHealthStatus ensureGreen(TimeValue timeout, String... indices) { .health(Requests.clusterHealthRequest(indices).timeout(timeout).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); if (actionGet.isTimedOut()) { logger.info("ensureGreen timed out, cluster state:\n{}\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint(), client().admin().cluster().preparePendingClusterTasks().get().prettyPrint()); - assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); + fail("timed out waiting for green state"); } assertThat(actionGet.getStatus(), equalTo(ClusterHealthStatus.GREEN)); logger.debug("indices {} are green", indices.length == 0 ? "[_all]" : indices);