Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce index store plugins #32375

Merged
merged 8 commits into from
Jul 26, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,26 @@

package org.elasticsearch.plugin.store.smb;

import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.smbmmapfs.SmbMmapFsIndexStore;
import org.elasticsearch.index.store.smbsimplefs.SmbSimpleFsIndexStore;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.Plugin;

public class SMBStorePlugin extends Plugin {
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class SMBStorePlugin extends Plugin implements IndexStorePlugin {

@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexStore("smb_mmap_fs", SmbMmapFsIndexStore::new);
indexModule.addIndexStore("smb_simple_fs", SmbSimpleFsIndexStore::new);
public Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories() {
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories = new HashMap<>(2);
indexStoreFactories.put("smb_mmap_fs", SmbMmapFsIndexStore::new);
indexStoreFactories.put("smb_simple_fs", SmbSimpleFsIndexStore::new);
return Collections.unmodifiableMap(indexStoreFactories);
}

}
40 changes: 14 additions & 26 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -74,7 +75,7 @@
* {@link #addSimilarity(String, TriFunction)} while existing Providers can be referenced through Settings under the
* {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix along with the "type" value. For example, to reference the
* {@link BM25Similarity}, the configuration {@code "index.similarity.my_similarity.type : "BM25"} can be used.</li>
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, Function)}</li>
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link IndexStorePlugin}</li>
* <li>{@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via
* {@link #addIndexEventListener(IndexEventListener)}</li>
* <li>Settings update listener - Custom settings update listener can be registered via
Expand Down Expand Up @@ -109,7 +110,7 @@ public final class IndexModule {
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, Function<IndexSettings, IndexStore>> storeTypes = new HashMap<>();
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
Expand All @@ -119,16 +120,22 @@ public final class IndexModule {
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
* via {@link org.elasticsearch.plugins.PluginsService#onIndexModule(IndexModule)}.
*
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexSettings the index settings
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param indexStoreFactories the available store types
*/
public IndexModule(final IndexSettings indexSettings, final AnalysisRegistry analysisRegistry, final EngineFactory engineFactory) {
public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
this.engineFactory = Objects.requireNonNull(engineFactory);
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.indexStoreFactories = Collections.unmodifiableMap(indexStoreFactories);
}

/**
Expand Down Expand Up @@ -245,25 +252,6 @@ public void addIndexOperationListener(IndexingOperationListener listener) {
this.indexOperationListeners.add(listener);
}

/**
* Adds an {@link IndexStore} type to this index module. Typically stores are registered with a reference to
* it's constructor:
* <pre>
* indexModule.addIndexStore("my_store_type", MyStore::new);
* </pre>
*
* @param type the type to register
* @param provider the instance provider / factory method
*/
public void addIndexStore(String type, Function<IndexSettings, IndexStore> provider) {
ensureNotFrozen();
if (storeTypes.containsKey(type)) {
throw new IllegalArgumentException("key [" + type +"] already registered");
}
storeTypes.put(type, provider);
}


/**
* Registers the given {@link Similarity} with the given name.
* The function takes as parameters:<ul>
Expand Down Expand Up @@ -360,7 +348,7 @@ public IndexService newIndexService(
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) {
store = new IndexStore(indexSettings);
} else {
Function<IndexSettings, IndexStore> factory = storeTypes.get(storeType);
Function<IndexSettings, IndexStore> factory = indexStoreFactories.get(storeType);
if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -181,6 +182,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndicesQueryCache indicesQueryCache;
private final MetaStateService metaStateService;
private final Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders;
private final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories;

@Override
protected void doStart() {
Expand All @@ -193,7 +195,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi
MapperRegistry mapperRegistry, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadPool,
IndexScopedSettings indexScopedSettings, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ScriptService scriptService, Client client, MetaStateService metaStateService,
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders) {
Collection<Function<IndexSettings, Optional<EngineFactory>>> engineFactoryProviders,
Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories) {
super(settings);
this.threadPool = threadPool;
this.pluginsService = pluginsService;
Expand Down Expand Up @@ -225,6 +228,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
this.cacheCleaner = new CacheCleaner(indicesFieldDataCache, indicesRequestCache, logger, threadPool, this.cleanInterval);
this.metaStateService = metaStateService;
this.engineFactoryProviders = engineFactoryProviders;
this.indexStoreFactories = indexStoreFactories;
}

@Override
Expand Down Expand Up @@ -464,7 +468,7 @@ private synchronized IndexService createIndexService(final String reason,
idxSettings.getNumberOfReplicas(),
reason);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
Expand Down Expand Up @@ -524,7 +528,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
*/
public synchronized MapperService createIndexMapperService(IndexMetaData indexMetaData) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexScopedSettings);
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings));
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
pluginsService.onIndexModule(indexModule);
return indexModule.newIndexMapperService(xContentRegistry, mapperRegistry, scriptService);
}
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -94,6 +94,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
Expand All @@ -117,6 +118,7 @@
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
Expand Down Expand Up @@ -407,11 +409,19 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
enginePlugins.stream().map(plugin -> plugin::getEngineFactory))
.collect(Collectors.toList());


final Map<String, Function<IndexSettings, IndexStore>> indexStoreFactories =
pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexStoreFactories)
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final IndicesService indicesService =
new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry, analysisModule.getAnalysisRegistry(),
clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders);
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);


Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStore;

import java.util.Map;
import java.util.function.Function;

/**
* A plugin that provides alternative index store implementations.
*/
public interface IndexStorePlugin {

/**
* The index store factories for this plugin. When an index is created the store type setting
* {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
* built-in type, or looked up among all the index store factories from {@link IndexStore} plugins.
*
* @return a map from store type to an index store factory
*/
Map<String, Function<IndexSettings, IndexStore>> getIndexStoreFactories();

}
Loading