Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register CcrRepository based on settings update #35801

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@

package org.elasticsearch.xpack.ccr;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
Expand All @@ -20,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand All @@ -32,17 +43,21 @@
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
Expand Down Expand Up @@ -77,12 +92,16 @@
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
Expand All @@ -92,7 +111,7 @@
/**
* Container class for CCR functionality.
*/
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin {
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin {

public static final String CCR_THREAD_POOL_NAME = "ccr";
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
Expand All @@ -104,6 +123,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
private SetOnce<CCRRepositoryManager> repositoryManager = new SetOnce<>();

/**
* Construct an instance of the CCR container with the specified settings.
Expand Down Expand Up @@ -142,6 +162,8 @@ public Collection<Object> createComponents(
return emptyList();
}

repositoryManager.set(new CCRRepositoryManager(settings, clusterService));

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
Expand Down Expand Up @@ -259,6 +281,119 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections.singletonList(new FixedExecutorBuilder(settings, CCR_THREAD_POOL_NAME, 32, 100, "xpack.ccr.ccr_thread_pool"));
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, settings);
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }

private static class CCRRepositoryManager extends RemoteClusterAware implements LocalNodeMasterListener {

private static final Logger LOGGER = LogManager.getLogger(CCRRepositoryManager.class);
private static final String SOURCE = "refreshing " + CcrRepository.TYPE + " repositories";

private final ClusterService clusterService;
private final Set<String> clusters = ConcurrentCollections.newConcurrentSet();
private volatile boolean isMasterNode = false;

private CCRRepositoryManager(Settings settings, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
clusters.addAll(buildRemoteClustersDynamicConfig(settings).keySet());
clusterService.addLocalNodeMasterListener(this);
listenForUpdates(clusterService.getClusterSettings());
}

@Override
protected Set<String> getRemoteClusterNames() {
return clusters;
}

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
if (addresses.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this method be called multiple times if multiple clusters changed?

if (clusters.remove(clusterAlias) && isMasterNode) {
refreshCCRRepositories();
}
} else {
if (clusters.add(clusterAlias) && isMasterNode) {
refreshCCRRepositories();
}
}
}

@Override
public void onMaster() {
this.isMasterNode = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of caching this - why not ask the cluster service if the current node is master every time we want to know? we already have a reference to it.

refreshCCRRepositories();
}

@Override
public void offMaster() {
this.isMasterNode = false;
}

@Override
public String executorName() {
return ThreadPool.Names.SAME;
}

private void refreshCCRRepositories() {
clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData metaData = currentState.metaData();
RepositoriesMetaData repositories = metaData.custom(RepositoriesMetaData.TYPE);

if (repositories == null) {
List<RepositoryMetaData> repositoriesMetaData = new ArrayList<>(clusters.size());
for (String cluster : clusters) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of relying on a cached version, maybe parse the cluster state and get the current list of remote clusters? we can maybe add static methods in the RemoteClusterAware class to parse the settings into a whatever we need

LOGGER.info("put [{}] repository [{}]", CcrRepository.TYPE, cluster);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be debug.

repositoriesMetaData.add(new RepositoryMetaData(cluster, CcrRepository.TYPE, Settings.EMPTY));
}
repositories = new RepositoriesMetaData(repositoriesMetaData);
} else {
List<RepositoryMetaData> repositoriesMetaData = new ArrayList<>(repositories.repositories().size());

Set<String> needToAdd = new HashSet<>(clusters);

for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
String name = repositoryMetaData.name();
if (CcrRepository.TYPE.equals(repositoryMetaData.type())) {
if (needToAdd.remove(name)) {
repositoriesMetaData.add(new RepositoryMetaData(name, CcrRepository.TYPE, Settings.EMPTY));
} else {
LOGGER.info("delete [{}] repository [{}]", CcrRepository.TYPE, name);
}
} else {
if (needToAdd.remove(name)) {
throw new IllegalStateException("Repository name conflict. Cannot put [" +
CcrRepository.TYPE + "] repository [" + name + "]. A [" +
repositoryMetaData.type() + "] repository with the same name is already registered.");
}
repositoriesMetaData.add(repositoryMetaData);
}
}
for (String cluster : needToAdd) {
LOGGER.info("put [{}] repository [{}]", CcrRepository.TYPE, cluster);
repositoriesMetaData.add(new RepositoryMetaData(cluster, CcrRepository.TYPE, Settings.EMPTY));
}
repositories = new RepositoriesMetaData(repositoriesMetaData);
}

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
mdBuilder.putCustom(RepositoriesMetaData.TYPE, repositories);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}

@Override
public void onFailure(String source, Exception e) {
LOGGER.warn(new ParameterizedMessage("failed to refresh [{}] repositories", CcrRepository.TYPE), e);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.repository;

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;

import java.io.IOException;
import java.util.List;

/**
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
* restore shards/indexes that exist on the remote cluster.
*/
public class CcrRepository extends AbstractLifecycleComponent implements Repository {

public static final String TYPE = "_ccr_";

private final RepositoryMetaData metadata;

public CcrRepository(RepositoryMetaData metadata, Settings settings) {
super(settings);
this.metadata = metadata;
}

@Override
protected void doStart() {

}

@Override
protected void doStop() {

}

@Override
protected void doClose() throws IOException {

}

@Override
public RepositoryMetaData getMetadata() {
return metadata;
}

@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public RepositoryData getRepositoryData() {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public long getSnapshotThrottleTimeInNanos() {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
}

@Override
public String startVerification() {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public void endVerification(String verificationToken) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public void verify(String verificationToken, DiscoveryNode localNode) {
}

@Override
public boolean isReadOnly() {
return true;
}

@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
}
Loading