Skip to content

Commit

Permalink
Core: Don't allow indices containing too-old segments to be opened
Browse files Browse the repository at this point in the history
When index is introduced into the cluster via cluster upgrade, restore or as a dangled index the MetaDataIndexUpgradeService checks if this index can be upgraded to the current version. If upgrade is not possible, the newly upgraded cluster startup and restore process are aborted, the dangled index is imported as a closed index that cannot be open.

Closes elastic#10215
  • Loading branch information
imotov committed May 8, 2015
1 parent 573caca commit 823aaf4
Show file tree
Hide file tree
Showing 13 changed files with 425 additions and 73 deletions.
6 changes: 3 additions & 3 deletions dev-tools/create_bwc_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def generate_index(client, version, index_name):
}
}
# completion type was added in 0.90.3
if version not in ['0.90.0.Beta1', '0.90.0.RC1', '0.90.0.RC2', '0.90.0', '0.90.1', '0.90.2']:
if not version.startswith('0.20') and version not in ['0.90.0.Beta1', '0.90.0.RC1', '0.90.0.RC2', '0.90.0', '0.90.1', '0.90.2']:
mappings['analyzer_type1']['properties']['completion_with_index_analyzer'] = {
'type': 'completion',
'index_analyzer': 'standard'
Expand Down Expand Up @@ -257,7 +257,7 @@ def generate_index(client, version, index_name):
logging.info('Running basic asserts on the data added')
run_basic_asserts(client, index_name, 'doc', num_docs)

def snapshot_index(client, cfg, version, repo_dir):
def snapshot_index(client, version, repo_dir):
# Add bogus persistent settings to make sure they can be restored
client.cluster.put_settings(body={
'persistent': {
Expand Down Expand Up @@ -361,7 +361,7 @@ def create_bwc_index(cfg, version):
index_name = 'index-%s' % version.lower()
generate_index(client, version, index_name)
if snapshot_supported:
snapshot_index(client, cfg, version, repo_dir)
snapshot_index(client, version, repo_dir)

# 10067: get a delete-by-query into the translog on upgrade. We must do
# this after the snapshot, because it calls flush. Otherwise the index
Expand Down
76 changes: 76 additions & 0 deletions dev-tools/create_bwc_repo_with_ancient_indices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import create_bwc_index
import logging
import os
import shutil
import subprocess
import sys
import tempfile

def fetch_version(version):
logging.info('fetching ES version %s' % version)
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
raise RuntimeError('failed to download ES version %s' % version)

def main():
'''
Creates a back compat index (.zip) using v0.20 and then creates a snapshot of it using v1.1
'''

logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %I:%M:%S %p')
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.WARN)

tmp_dir = tempfile.mkdtemp()
try:
data_dir = os.path.join(tmp_dir, 'data')
logging.info('Temp data dir: %s' % data_dir)

first_version = '0.20.6'
second_version = '1.1.2'
index_name = 'index-%s-and-%s' % (first_version, second_version)

# Download old ES releases if necessary:
release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version)
if not os.path.exists(release_dir):
fetch_version(first_version)

node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name)
client = create_bwc_index.create_client()

# Creates the index & indexes docs w/ first_version:
create_bwc_index.generate_index(client, first_version, index_name)

# Make sure we write segments:
flush_result = client.indices.flush(index=index_name)
if not flush_result['ok']:
raise RuntimeError('flush failed: %s' % str(flush_result))

create_bwc_index.shutdown_node(node)
print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8')))
node = None

release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version)
if not os.path.exists(release_dir):
fetch_version(second_version)

# Now use second_version to snapshot the index:
node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name)
client = create_bwc_index.create_client()

repo_dir = os.path.join(tmp_dir, 'repo')
create_bwc_index.snapshot_index(client, second_version, repo_dir)
create_bwc_index.shutdown_node(node)
print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))

create_bwc_index.compress(tmp_dir, "src/test/resources/org/elasticsearch/bwcompat", 'unsupportedrepo-%s.zip' % first_version, 'repo')

node = None
finally:
if node is not None:
create_bwc_index.shutdown_node(node)
shutil.rmtree(tmp_dir)

if __name__ == '__main__':
main()

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexPrimaryShardNotAllocatedException;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -58,11 +59,14 @@ public class MetaDataIndexStateService extends AbstractComponent {

private final AllocationService allocationService;

private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;

@Inject
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService) {
super(settings);
this.clusterService = clusterService;
this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
}

public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
Expand Down Expand Up @@ -160,7 +164,15 @@ public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
for (String index : indicesToOpen) {
mdBuilder.put(IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN));
IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN).build();
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
try {
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
} catch (Exception ex) {
throw new IndexException(new Index(index), "unpgradable index", ex);
}
mdBuilder.put(indexMetaData, true);
blocksBuilder.removeIndexBlock(index, INDEX_CLOSED_BLOCK);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;

/**
* This service is responsible for upgrading legacy index metadata to the current version
*
* Every time an existing index is introduced into cluster this service should be used
* to upgrade the existing index metadata to the latest version of the cluster. It typically
* occurs during cluster upgrade, when dangling indices are imported into the cluster or indices
* are restored from a repository.
*/
public class MetaDataIndexUpgradeService extends AbstractComponent {

private static final String DEPRECATED_SETTING_ROUTING_HASH_FUNCTION = "cluster.routing.operation.hash.type";
private static final String DEPRECATED_SETTING_ROUTING_USE_TYPE = "cluster.routing.operation.use_type";

private final Class<? extends HashFunction> pre20HashFunction;
private final Boolean pre20UseType;

@Inject
public MetaDataIndexUpgradeService(Settings settings) {
super(settings);

final String pre20HashFunctionName = settings.get(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, null);
final boolean hasCustomPre20HashFunction = pre20HashFunctionName != null;
// the hash function package has changed we replace the two hash functions if their fully qualified name is used.
if (hasCustomPre20HashFunction) {
switch (pre20HashFunctionName) {
case "org.elasticsearch.cluster.routing.operation.hash.simple.SimpleHashFunction":
pre20HashFunction = SimpleHashFunction.class;
break;
case "org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction":
pre20HashFunction = DjbHashFunction.class;
break;
default:
pre20HashFunction = settings.getAsClass(DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DjbHashFunction.class, "org.elasticsearch.cluster.routing.", "HashFunction");
}
} else {
pre20HashFunction = DjbHashFunction.class;
}
pre20UseType = settings.getAsBoolean(DEPRECATED_SETTING_ROUTING_USE_TYPE, null);
if (hasCustomPre20HashFunction|| pre20UseType != null) {
logger.warn("Settings [{}] and [{}] are deprecated. Index settings from your old indices have been updated to record the fact that they "
+ "used some custom routing logic, you can now remove these settings from your `elasticsearch.yml` file", DEPRECATED_SETTING_ROUTING_HASH_FUNCTION, DEPRECATED_SETTING_ROUTING_USE_TYPE);
}
}

/**
* Checks that the index can be upgraded to the current version of the master node.
*
* If the index does need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index cannot be
* updated the method throws an exception.
*/
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) throws Exception {
IndexMetaData newMetaData = indexMetaData;
newMetaData = checkSupportedVersion(newMetaData);
newMetaData = upgradeLegacyRoutingSettings(newMetaData);
return newMetaData;
}

/**
* Elasticsearch 2.0 deprecated no longer supports indices with pre Lucene v4.0 segments. All indices
* that were created before Elasticsearch v0.90.0 should be upgraded using upgrade plugin before they can
* be open by this version of elasticsearch.
*/
private IndexMetaData checkSupportedVersion(IndexMetaData indexMetaData) throws Exception {
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) {
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v0.90.0 and wasn't upgraded."
+ " This index should be open using a version before " + Version.CURRENT.minimumCompatibilityVersion()
+ " and upgraded using upgrade API.");
}
return indexMetaData;
}

/*
* Returns true if this index can be supported by the current version of elasticsearch
*/
private static boolean isSupportedVersion(IndexMetaData indexMetaData) {
return indexMetaData.creationVersion() != null && indexMetaData.creationVersion().onOrAfter(Version.V_0_90_0_Beta1);
}

/**
* Elasticsearch 2.0 deprecated custom routing hash functions. So what we do here is that for old indices, we
* move this old and deprecated node setting to an index setting so that we can keep things backward compatible.
*/
private IndexMetaData upgradeLegacyRoutingSettings(IndexMetaData indexMetaData) throws Exception {
if (indexMetaData.settings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) == null
&& indexMetaData.getCreationVersion().before(Version.V_2_0_0)) {
// these settings need an upgrade
Settings indexSettings = ImmutableSettings.builder().put(indexMetaData.settings())
.put(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION, pre20HashFunction)
.put(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE, pre20UseType == null ? false : pre20UseType)
.build();
return IndexMetaData.builder(indexMetaData)
.version(indexMetaData.version())
.settings(indexSettings)
.build();
} else if (indexMetaData.getCreationVersion().onOrAfter(Version.V_2_0_0)) {
if (indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION) != null
|| indexMetaData.getSettings().get(IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE) != null) {
throw new IllegalStateException("Indices created on or after 2.0 should NOT contain [" + IndexMetaData.SETTING_LEGACY_ROUTING_HASH_FUNCTION
+ "] + or [" + IndexMetaData.SETTING_LEGACY_ROUTING_USE_TYPE + "] in their index settings");
}
}
return indexMetaData;
}

}
Loading

0 comments on commit 823aaf4

Please sign in to comment.