Skip to content

Commit

Permalink
Core: add only_ancient_segments to upgrade API, so only segments with…
Browse files Browse the repository at this point in the history
… an old Lucene version are upgraded

This option defaults to false, because it is also important to upgrade
the "merely old" segments since many Lucene improvements happen within
minor releases.

But you can pass true to do the minimal work necessary to upgrade to
the next major Elasticsearch release.

The HTTP GET upgrade request now also breaks out how many bytes of
ancient segments need upgrading.

Closes #10213

Closes #10540

Conflicts:
	dev-tools/create_bwc_index.py
	rest-api-spec/api/indices.upgrade.json
	src/main/java/org/elasticsearch/action/admin/indices/optimize/OptimizeRequest.java
	src/main/java/org/elasticsearch/action/admin/indices/optimize/ShardOptimizeRequest.java
	src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java
	src/main/java/org/elasticsearch/index/engine/InternalEngine.java
	src/test/java/org/elasticsearch/bwcompat/StaticIndexBackwardCompatibilityTest.java
	src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
	src/test/java/org/elasticsearch/rest/action/admin/indices/upgrade/UpgradeReallyOldIndexTest.java
  • Loading branch information
mikemccand committed Apr 16, 2015
1 parent 5806e85 commit 399f0cc
Show file tree
Hide file tree
Showing 15 changed files with 330 additions and 67 deletions.
56 changes: 37 additions & 19 deletions dev-tools/create-bwc-index.py → dev-tools/create_bwc_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import logging
import os
import random
import shutil
import subprocess
import sys
import tempfile
import time

DEFAULT_TRANSPORT_TCP_PORT = 9300
DEFAULT_HTTP_TCP_PORT = 9200

if sys.version_info[0] < 3:
print('%s must use python 3.x (for the ES python client)' % sys.argv[0])

Expand Down Expand Up @@ -126,14 +130,17 @@ def build_version(version_tuple):
def build_tuple(version_string):
return [int(x) for x in version_string.split('.')]

def start_node(version, release_dir, data_dir, tcp_port, http_port):
logging.info('Starting node from %s on port %s/%s' % (release_dir, tcp_port, http_port))
def start_node(version, release_dir, data_dir, tcp_port=DEFAULT_TRANSPORT_TCP_PORT, http_port=DEFAULT_HTTP_TCP_PORT, cluster_name=None):
logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, tcp_port, http_port, data_dir))
if cluster_name is None:
cluster_name = 'bwc_index_' + version

cmd = [
os.path.join(release_dir, 'bin/elasticsearch'),
'-Des.path.data=%s' % data_dir,
'-Des.path.logs=logs',
'-Des.cluster.name=bwc_index_' + version,
'-Des.network.host=localhost',
'-Des.cluster.name=%s' % cluster_name,
'-Des.network.host=localhost',
'-Des.discovery.zen.ping.multicast.enabled=false',
'-Des.transport.tcp.port=%s' % tcp_port,
'-Des.http.port=%s' % http_port
Expand All @@ -142,7 +149,7 @@ def start_node(version, release_dir, data_dir, tcp_port, http_port):
cmd.append('-f') # version before 1.0 start in background automatically
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

def create_client(http_port, timeout=30):
def create_client(http_port=DEFAULT_HTTP_TCP_PORT, timeout=30):
logging.info('Waiting for node to startup')
for _ in range(0, timeout):
# TODO: ask Honza if there is a better way to do this?
Expand All @@ -158,8 +165,6 @@ def create_client(http_port, timeout=30):

def generate_index(client, version, index_name):
client.indices.delete(index=index_name, ignore=404)
num_shards = random.randint(1, 10)
num_replicas = random.randint(0, 1)
logging.info('Create single shard test index')

mappings = {}
Expand Down Expand Up @@ -300,7 +305,7 @@ def compress(tmp_dir, output_dir, zipfile, directory):
zipfile = os.path.join(abs_output_dir, zipfile)
if os.path.exists(zipfile):
os.remove(zipfile)
logging.info('Compressing index into %s', zipfile)
logging.info('Compressing index into %s, tmpDir %s', zipfile, tmp_dir)
olddir = os.getcwd()
os.chdir(tmp_dir)
subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
Expand All @@ -318,9 +323,9 @@ def parse_config():
help='The directory containing elasticsearch releases')
parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat',
help='The directory to write the zipped index into')
parser.add_argument('--tcp-port', default=9300, type=int,
parser.add_argument('--tcp-port', default=DEFAULT_TRANSPORT_TCP_PORT, type=int,
help='The port to use as the minimum port for TCP communication')
parser.add_argument('--http-port', default=9200, type=int,
parser.add_argument('--http-port', default=DEFAULT_HTTP_TCP_PORT, type=int,
help='The port to use as the minimum port for HTTP communication')
cfg = parser.parse_args()

Expand All @@ -339,14 +344,17 @@ def create_bwc_index(cfg, version):
logging.info('--> Creating bwc index for %s' % version)
release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version)
if not os.path.exists(release_dir):
parser.error('ES version %s does not exist in %s' % (version, cfg.releases_dir))
raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir))
snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1')
tmp_dir = tempfile.mkdtemp()

data_dir = os.path.join(tmp_dir, 'data')
repo_dir = os.path.join(tmp_dir, 'repo')
logging.info('Temp data dir: %s' % data_dir)
logging.info('Temp repo dir: %s' % repo_dir)

node = None

try:
node = start_node(version, release_dir, data_dir, cfg.tcp_port, cfg.http_port)
client = create_client(cfg.http_port)
Expand All @@ -359,16 +367,26 @@ def create_bwc_index(cfg, version):
# this after the snapshot, because it calls flush. Otherwise the index
# will already have the deletions applied on upgrade.
delete_by_query(client, version, index_name, 'doc')


shutdown_node(node)
node = None

compress_index(version, tmp_dir, cfg.output_dir)
if snapshot_supported:
compress_repo(version, tmp_dir, cfg.output_dir)
finally:
if 'node' in vars():
logging.info('Shutting down node with pid %d', node.pid)
node.terminate()
time.sleep(1) # some nodes take time to terminate
compress_index(version, tmp_dir, cfg.output_dir)
if snapshot_supported:
compress_repo(version, tmp_dir, cfg.output_dir)

if node is not None:
# This only happens if we've hit an exception:
shutdown_node(node)

shutil.rmtree(tmp_dir)

def shutdown_node(node):
logging.info('Shutting down node with pid %d', node.pid)
node.terminate()
node.wait()

def main():
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %I:%M:%S %p')
Expand Down
113 changes: 113 additions & 0 deletions dev-tools/create_bwc_index_with_some_ancient_segments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import create_bwc_index
import logging
import os
import random
import shutil
import subprocess
import sys
import tempfile

def fetch_version(version):
logging.info('fetching ES version %s' % version)
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
raise RuntimeError('failed to download ES version %s' % version)

def main():
'''
Creates a static back compat index (.zip) with mixed 0.20 (Lucene 3.x) and 0.90 (Lucene 4.x) segments.
'''

logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %I:%M:%S %p')
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.WARN)

tmp_dir = tempfile.mkdtemp()
try:
data_dir = os.path.join(tmp_dir, 'data')
logging.info('Temp data dir: %s' % data_dir)

first_version = '0.20.6'
second_version = '0.90.6'
index_name = 'index-%s-and-%s' % (first_version, second_version)

# Download old ES releases if necessary:
release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version)
if not os.path.exists(release_dir):
fetch_version(first_version)

node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name)
client = create_bwc_index.create_client()

# Creates the index & indexes docs w/ first_version:
create_bwc_index.generate_index(client, first_version, index_name)

# Make sure we write segments:
flush_result = client.indices.flush(index=index_name)
if not flush_result['ok']:
raise RuntimeError('flush failed: %s' % str(flush_result))

segs = client.indices.segments(index=index_name)
shards = segs['indices'][index_name]['shards']
if len(shards) != 1:
raise RuntimeError('index should have 1 shard but got %s' % len(shards))

first_version_segs = shards['0'][0]['segments'].keys()

create_bwc_index.shutdown_node(node)
print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8')))
node = None

release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version)
if not os.path.exists(release_dir):
fetch_version(second_version)

# Now also index docs with second_version:
node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name)
client = create_bwc_index.create_client()

# If we index too many docs, the random refresh/flush causes the ancient segments to be merged away:
num_docs = 10
create_bwc_index.index_documents(client, index_name, 'doc', num_docs)

# Make sure we get a segment:
flush_result = client.indices.flush(index=index_name)
if not flush_result['ok']:
raise RuntimeError('flush failed: %s' % str(flush_result))

# Make sure we see mixed segments (it's possible Lucene could have "accidentally" merged away the first_version segments):
segs = client.indices.segments(index=index_name)
shards = segs['indices'][index_name]['shards']
if len(shards) != 1:
raise RuntimeError('index should have 1 shard but got %s' % len(shards))

second_version_segs = shards['0'][0]['segments'].keys()
#print("first: %s" % first_version_segs)
#print("second: %s" % second_version_segs)

for segment_name in first_version_segs:
if segment_name in second_version_segs:
# Good: an ancient version seg "survived":
break
else:
raise RuntimeError('index has no first_version segs left')

for segment_name in second_version_segs:
if segment_name not in first_version_segs:
# Good: a second_version segment was written
break
else:
raise RuntimeError('index has no second_version segs left')

create_bwc_index.shutdown_node(node)
print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))
node = None
create_bwc_index.compress_index('%s-and-%s' % (first_version, second_version), tmp_dir, 'src/test/resources/org/elasticsearch/rest/action/admin/indices/upgrade')
finally:
if node is not None:
create_bwc_index.shutdown_node(node)
shutil.rmtree(tmp_dir)

if __name__ == '__main__':
main()

24 changes: 22 additions & 2 deletions docs/reference/indices/upgrade.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,30 @@ This call will block until the upgrade is complete. If the http connection
is lost, the request will continue in the background, and
any new requests will block until the previous upgrade is complete.

[float]
[[upgrade-parameters]]
==== Request Parameters

The `upgrade` API accepts the following request parameters:

[horizontal]
`only_ancient_segments`:: If true, only very old segments (from a
previous Lucene major release) will be upgraded. While this will do
the minimal work to ensure the next major release of Elasticsearch can
read the segments, it's dangerous because it can leave other very old
segments in sub-optimal formats. Defaults to `false`.

[float]
=== Check upgrade status

Use a `GET` request to monitor how much of an index is upgraded. This
can also be used prior to starting an upgrade to identify which indices
you want to upgrade at the same time.
can also be used prior to starting an upgrade to identify which
indices you want to upgrade at the same time.

The `ancient` byte values that are returned indicate total bytes of
segments whose version is extremely old (Lucene major version is
different from the current version), showing how much upgrading is
necessary when you run with `only_ancient_segments=true`.

[source,sh]
--------------------------------------------------
Expand All @@ -41,6 +59,8 @@ curl 'http://localhost:9200/twitter/_upgrade?pretty&human'
"size_in_bytes": "21000000000",
"size_to_upgrade": "10gb",
"size_to_upgrade_in_bytes": "10000000000"
"size_to_upgrade_ancient": "1gb",
"size_to_upgrade_ancient_in_bytes": "1000000000"
}
}
--------------------------------------------------
6 changes: 5 additions & 1 deletion rest-api-spec/api/indices.upgrade.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"wait_for_completion": {
"type" : "boolean",
"description" : "Specify whether the request should block until the all segments are upgraded (default: false)"
},
"only_ancient_segments": {
"type" : "boolean",
"description" : "Specify whether the request should block until the all segments are upgraded (default: false)"
"description" : "If true, only ancient (an older Lucene major release) segments will be upgraded"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ public static final class Defaults {
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean UPGRADE = false;
public static final boolean UPGRADE_ONLY_ANCIENT_SEGMENTS = false;
}

private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
private boolean upgrade = Defaults.UPGRADE;
private boolean upgradeOnlyAncientSegments = Defaults.UPGRADE_ONLY_ANCIENT_SEGMENTS;

/**
* Constructs an optimization request over one or more indices.
Expand Down Expand Up @@ -136,6 +138,7 @@ public void readFrom(StreamInput in) throws IOException {
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
upgrade = in.readBoolean();
upgradeOnlyAncientSegments = in.readBoolean();
}

@Override
Expand All @@ -145,6 +148,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
out.writeBoolean(upgrade);
out.writeBoolean(upgradeOnlyAncientSegments);
}

/**
* Should the merge upgrade only the ancient (older major version of Lucene) segments?
* Defaults to <tt>false</tt>.
*/
public boolean upgradeOnlyAncientSegments() {
return upgradeOnlyAncientSegments;
}

/**
* See {@link #upgradeOnlyAncientSegments()}
*/
public OptimizeRequest upgradeOnlyAncientSegments(boolean upgradeOnlyAncientSegments) {
this.upgradeOnlyAncientSegments = upgradeOnlyAncientSegments;
return this;
}

@Override
Expand All @@ -154,6 +174,7 @@ public String toString() {
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
", upgrade=" + upgrade +
", upgradeOnlyAncientSegments=" + upgradeOnlyAncientSegments +
'}';
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,13 @@ public final boolean refreshNeeded() {
* Optimizes to 1 segment
*/
public void forceMerge(boolean flush) {
forceMerge(flush, 1, false, false);
forceMerge(flush, 1, false, false, false);
}

/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;

/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
Expand Down
Loading

0 comments on commit 399f0cc

Please sign in to comment.