Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register CcrRepository based on settings update #36086

Merged
merged 26 commits into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c92a1c7
Work RepositoriesService into CCR
Tim-Brooks Nov 26, 2018
a2c7576
WIP
Tim-Brooks Nov 28, 2018
e1b260e
WIP
Tim-Brooks Nov 28, 2018
b637022
Merge remote-tracking branch 'upstream/master' into add_ccr_repo_inte…
Tim-Brooks Nov 28, 2018
e45dfe7
Work on creating actions
Tim-Brooks Nov 28, 2018
68823cc
WIP
Tim-Brooks Nov 29, 2018
719e7e8
WIP
Tim-Brooks Nov 29, 2018
1949578
WIP
Tim-Brooks Nov 29, 2018
0f77fce
WIP
Tim-Brooks Nov 29, 2018
3852087
Merge remote-tracking branch 'upstream/master' into add_ccr_repo_inte…
Tim-Brooks Nov 29, 2018
3dd4a44
WIP
Tim-Brooks Nov 29, 2018
8bf1a1d
WIP
Tim-Brooks Nov 29, 2018
9dbf0c3
comment
Tim-Brooks Nov 29, 2018
9e4736e
Fix checkstyle
Tim-Brooks Nov 29, 2018
8f11a77
Changes from review
Tim-Brooks Nov 30, 2018
4d58c11
Changes from review
Tim-Brooks Nov 30, 2018
a15bffd
Fix licenses
Tim-Brooks Nov 30, 2018
2b79524
Fix license
Tim-Brooks Nov 30, 2018
6cb2750
Changes for review
Tim-Brooks Dec 3, 2018
777ece2
Merge remote-tracking branch 'upstream/master' into add_ccr_repo_inte…
Tim-Brooks Dec 3, 2018
0259d7d
Changes
Tim-Brooks Dec 3, 2018
a7400cc
Add validation
Tim-Brooks Dec 3, 2018
2dd4981
Changes for review
Tim-Brooks Dec 4, 2018
987c5a5
Merge remote-tracking branch 'upstream/master' into add_ccr_repo_inte…
Tim-Brooks Dec 4, 2018
d5b9de8
Merge remote-tracking branch 'upstream/master' into add_ccr_repo_inte…
Tim-Brooks Dec 4, 2018
c73877a
Merge remote-tracking branch 'upstream/master' into add_ccr_repo_inte…
Tim-Brooks Dec 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ public interface RepositoryPlugin {
default Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.emptyMap();
}

/**
* Returns internal repository types added by this plugin. Internal repositories cannot be registered
* through the external API.
*
* @param env The environment for the local node, which may be used for the local settings and path.repo
*
* The key of the returned {@link Map} is the type name of the repository and
* the value is a factory to construct the {@link Repository} interface.
*/
default Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,24 @@ public RepositoriesModule(Environment env, List<RepositoryPlugin> repoPlugins, T
}
}

Map<String, Repository.Factory> internalFactories = new HashMap<>();
for (RepositoryPlugin repoPlugin : repoPlugins) {
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) {
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we enforce that these types are distinct to the non-internal ones?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test for this?

}
if (factories.put(entry.getKey(), entry.getValue()) != null) {
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered as a " +
"non-internal repository");
}
}
}

Map<String, Repository.Factory> repositoryTypes = Collections.unmodifiableMap(factories);
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes, threadPool);
Map<String, Repository.Factory> internalRepositoryTypes = Collections.unmodifiableMap(internalFactories);
repositoriesService = new RepositoriesService(env.settings(), clusterService, transportService, repositoryTypes,
internalRepositoryTypes, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -57,19 +58,22 @@ public class RepositoriesService implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);

private final Map<String, Repository.Factory> typesRegistry;
private final Map<String, Repository.Factory> internalTypesRegistry;

private final ClusterService clusterService;

private final ThreadPool threadPool;

private final VerifyNodeRepositoryAction verifyAction;

private final Map<String, Repository> internalRepositories = ConcurrentCollections.newConcurrentMap();
private volatile Map<String, Repository> repositories = Collections.emptyMap();

public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map<String, Repository.Factory> typesRegistry,
Map<String, Repository.Factory> typesRegistry, Map<String, Repository.Factory> internalTypesRegistry,
ThreadPool threadPool) {
this.typesRegistry = typesRegistry;
this.internalTypesRegistry = internalTypesRegistry;
this.clusterService = clusterService;
this.threadPool = threadPool;
// Doesn't make sense to maintain repositories on non-master and non-data nodes
Expand Down Expand Up @@ -101,7 +105,7 @@ public void registerRepository(final RegisterRepositoryRequest request, final Ac

// Trying to create the new repository on master to make sure it works
try {
closeRepository(createRepository(newRepositoryMetaData));
closeRepository(createRepository(newRepositoryMetaData, typesRegistry));
} catch (Exception e) {
registrationListener.onFailure(e);
return;
Expand Down Expand Up @@ -315,7 +319,7 @@ public void applyClusterState(ClusterChangedEvent event) {
closeRepository(repository);
repository = null;
try {
repository = createRepository(repositoryMetaData);
repository = createRepository(repositoryMetaData, typesRegistry);
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
Expand All @@ -324,7 +328,7 @@ public void applyClusterState(ClusterChangedEvent event) {
}
} else {
try {
repository = createRepository(repositoryMetaData);
repository = createRepository(repositoryMetaData, typesRegistry);
} catch (RepositoryException ex) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", repositoryMetaData.name()), ex);
}
Expand Down Expand Up @@ -355,31 +359,59 @@ public Repository repository(String repositoryName) {
if (repository != null) {
return repository;
}
repository = internalRepositories.get(repositoryName);
if (repository != null) {
return repository;
}
throw new RepositoryMissingException(repositoryName);
}

public void registerInternalRepository(String name, String type) {
RepositoryMetaData metaData = new RepositoryMetaData(name, type, Settings.EMPTY);
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
logger.debug("put internal repository [{}][{}]", name, type);
return createRepository(metaData, internalTypesRegistry);
});
if (type.equals(repository.getMetadata().type()) == false) {
logger.warn(new ParameterizedMessage("internal repository [{}][{}] already registered. this prevented the registration of " +
"internal repository [{}][{}].", name, repository.getMetadata().type(), name, type));
} else if (repositories.containsKey(name)) {
logger.warn(new ParameterizedMessage("non-internal repository [{}] already registered. this repository will block the " +
"usage of internal repository [{}][{}].", name, metaData.type(), name));
}
}

public void unregisterInternalRepository(String name) {
Repository repository = internalRepositories.remove(name);
if (repository != null) {
RepositoryMetaData metadata = repository.getMetadata();
logger.debug(() -> new ParameterizedMessage("delete internal repository [{}][{}].", metadata.type(), name));
closeRepository(repository);
}
}

/** Closes the given repository. */
private void closeRepository(Repository repository) {
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
repository.close();
}

/**
* Creates repository holder
* Creates repository holder. This method starts the repository
*/
private Repository createRepository(RepositoryMetaData repositoryMetaData) {
private Repository createRepository(RepositoryMetaData repositoryMetaData, Map<String, Repository.Factory> factories) {
logger.debug("creating repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name());
Repository.Factory factory = typesRegistry.get(repositoryMetaData.type());
Repository.Factory factory = factories.get(repositoryMetaData.type());
if (factory == null) {
throw new RepositoryException(repositoryMetaData.name(),
"repository type [" + repositoryMetaData.type() + "] does not exist");
}
try {
Repository repository = factory.create(repositoryMetaData, typesRegistry::get);
Repository repository = factory.create(repositoryMetaData, factories::get);
repository.start();
return repository;
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
logger.warn(new ParameterizedMessage("failed to create repository [{}][{}]", repositoryMetaData.type(), repositoryMetaData.name()), e);
throw new RepositoryException(repositoryMetaData.name(), "failed to create repository", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public String getKey(final String key) {
REMOTE_CLUSTERS_SEEDS);

protected final Settings settings;
protected final ClusterNameExpressionResolver clusterNameResolver;
private final ClusterNameExpressionResolver clusterNameResolver;

/**
* Creates a new {@link RemoteClusterAware} instance
Expand Down Expand Up @@ -242,14 +242,15 @@ static DiscoveryNode buildSeedNode(String clusterName, String address, boolean p
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
* {@link #LOCAL_CLUSTER_GROUP_KEY}. The returned map is mutable.
*
* @param remoteClusterNames the remote cluster names
* @param requestIndices the indices in the search request to filter
* @param indexExists a predicate that can test if a certain index or alias exists in the local cluster
*
* @return a map of grouped remote and local indices
*/
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
protected Map<String, List<String>> groupClusterIndices(Set<String> remoteClusterNames, String[] requestIndices,
Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i >= 0) {
Expand Down Expand Up @@ -281,9 +282,6 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
return perClusterIndices;
}

protected abstract Set<String> getRemoteClusterNames();


/**
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) {
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
if (isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices, indexExists);
if (groupedIndices.isEmpty()) {
//search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
Expand Down Expand Up @@ -380,8 +380,7 @@ RemoteClusterConnection getRemoteClusterConnection(String cluster) {
return connection;
}

@Override
protected Set<String> getRemoteClusterNames() {
Set<String> getRemoteClusterNames() {
return this.remoteClusters.keySet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod
Collections.emptySet());
final ClusterService clusterService = mock(ClusterService.class);
final RepositoriesService repositoriesService = new RepositoriesService(settings, clusterService,
transportService, null, threadPool);
transportService, Collections.emptyMap(), Collections.emptyMap(), threadPool);
final PeerRecoveryTargetService recoveryTargetService = new PeerRecoveryTargetService(threadPool,
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RepositoriesModuleTests extends ESTestCase {

private Environment environment;
private NamedXContentRegistry contentRegistry;
private List<RepositoryPlugin> repoPlugins = new ArrayList<>();
private RepositoryPlugin plugin1;
private RepositoryPlugin plugin2;
private Repository.Factory factory;

@Override
public void setUp() throws Exception {
super.setUp();
environment = mock(Environment.class);
contentRegistry = mock(NamedXContentRegistry.class);
plugin1 = mock(RepositoryPlugin.class);
plugin2 = mock(RepositoryPlugin.class);
factory = mock(Repository.Factory.class);
repoPlugins.add(plugin1);
repoPlugins.add(plugin2);
when(environment.settings()).thenReturn(Settings.EMPTY);
}

public void testCanRegisterTwoRepositoriesWithDifferentTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type2", factory));

// Would throw
new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry);
}

public void testCannotRegisterTwoRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Repository type [type1] is already registered", ex.getMessage());
}

public void testCannotRegisterTwoInternalRepositoriesWithSameTypes() {
when(plugin1.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Internal repository type [type1] is already registered", ex.getMessage());
}

public void testCannotRegisterNormalAndInternalRepositoriesWithSameTypes() {
when(plugin1.getRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));
when(plugin2.getInternalRepositories(environment, contentRegistry)).thenReturn(Collections.singletonMap("type1", factory));

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> new RepositoriesModule(environment, repoPlugins, mock(TransportService.class), mock(ClusterService.class),
mock(ThreadPool.class), contentRegistry));

assertEquals("Internal repository type [type1] is already registered as a non-internal repository", ex.getMessage());
}
}
Loading