-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Register CcrRepository based on settings update #36086
Conversation
Pinging @elastic/es-distributed |
@bleskes - In response to your comment here: Since we are using the client for the propagation to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @tbrooks8. I've left some initial comments. I think we should also do more unit-level testing. Start e.g. with the newly added methods to RepositoriesService. They can easily be unit-tested. The repo manager might also allow some unit-tests.
import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
|
||
public class DeleteInternalRepositoryAction extends Action<AcknowledgedResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move these actions to the CCR plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Although I will note that the conception on "internal repositories" still exist in open source. even if the actions to manipulate them do not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want me to change the names from what they are such as:
"cluster:admin/internal_repository/put"
to something ccr oriented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, same for the action names and requests. For now, these can be DeleteInternalCCRRepositoryAction
, DeleteInternalCCRRepositoryRequest
, ...
} | ||
|
||
@Override | ||
public AcknowledgedResponse newResponse() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why AcknowledgedResponse? You're not interested in checking the acknowledged
flag, so maybe just an ActionResponse.
|
||
public class DeleteInternalRepositoryRequest extends ActionRequest { | ||
|
||
private String name; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
if (name == null) { | ||
validationException = addValidationError("name is missing", validationException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe easier to check this right away on object creation, i.e., this.name = Objects.requireNonNull(name);
|
||
private String name; | ||
private String type; | ||
private Settings settings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final^3
closeRepository(repository); | ||
repository.close(); | ||
} else { | ||
logger.warn(() -> new ParameterizedMessage("Attempted to unregistered internal repository [{}][{}]. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unregister
Repository existingRepository = internalRepositories.putIfAbsent(name, repository); | ||
|
||
if (existingRepository != null) { | ||
logger.error(new ParameterizedMessage("Error registering internal repository [{}][{}]. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start logging messages with lower case (in style with the rest of the class / code)?
@@ -104,6 +110,8 @@ | |||
private final boolean enabled; | |||
private final Settings settings; | |||
private final CcrLicenseChecker ccrLicenseChecker; | |||
private SetOnce<ClusterService> clusterService = new SetOnce<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not used anywhere?
@@ -104,6 +110,8 @@ | |||
private final boolean enabled; | |||
private final Settings settings; | |||
private final CcrLicenseChecker ccrLicenseChecker; | |||
private SetOnce<ClusterService> clusterService = new SetOnce<>(); | |||
private SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
class CcrRepositoryManager extends RemoteClusterAware { | ||
|
||
private final NodeClient client; | ||
private final Set<String> clusters = ConcurrentCollections.newConcurrentSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of caching this list here, can we directly ask RepositoriesService whether it already has a repo for this thing, and add / remove based on what RepositoriesService has?
Having these extra caches are always tricky, in particular when there are failure scenarios and the list of internal repositories in RepositoriesService might go out of sync with the cached clusters here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since put
and delete
should be idempotent (and they other for other normal repositories requests) I removed the cache and just call the action each time.
The other option we to create a get
action, but I thought that was unnecessary as we need to handle potential concurrency in the put
and delete
methods anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the cache and just call the action each time.
sounds good
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
undo
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); | ||
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) { | ||
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { | ||
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we enforce that these types are distinct to the non-internal ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a test for this?
Repository newRepository = createRepository(metaData, internalTypesRegistry); | ||
Repository repositoryToClose = null; | ||
boolean updated = false; | ||
synchronized (internalRepositories) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the concurrency logic looks complicated here. Maybe just add synchronized on the registerInternalRepository and unregisterInternalRepository methods? We will not be calling those concurrently anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the changes where we do not support updates, we can rely on the ConcurrentMap
to provide concurrency control.
@@ -278,7 +278,7 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no | |||
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) { | |||
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>(); | |||
if (isCrossClusterSearchEnabled()) { | |||
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists); | |||
final Map<String, List<String>> groupedIndices = groupClusterIndices(remoteClusters.keySet(), indices, indexExists); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use getRemoteClusterNames()
here?
|
||
@Override | ||
public Writeable.Reader<ActionResponse> getResponseReader() { | ||
return in -> new ActionResponse() {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safer to use the ActionResponse(StreamInput in)
constructor here. Maybe use add a dummy response sub-class here.
client.executeLocally(DeleteInternalRepositoryAction.INSTANCE, request, future); | ||
assert future.isDone() : "Should be completed as it is executed synchronously"; | ||
} else { | ||
ActionRequest request = new PutInternalRepositoryRequest(clusterAlias, CcrRepository.TYPE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using the clusterAlias name verbatim here, let's prepend something like "ccr" to avoid name conflicts with standard repositories.
import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
|
||
public class DeleteInternalRepositoryAction extends Action<AcknowledgedResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, same for the action names and requests. For now, these can be DeleteInternalCCRRepositoryAction
, DeleteInternalCCRRepositoryRequest
, ...
this(name, type, Settings.EMPTY); | ||
} | ||
|
||
public PutInternalRepositoryRequest(String name, String type, Settings settings) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we rename this to PutInternalCCRRepositoryRequest
, we can get rid of the settings parameter here. We don't use it for CCR.
|
||
// TODO: Normally we would do validation when we update a repository to ensure that it is not in use. | ||
// Are we okay with not including that validation under the assumption that internal operations | ||
// will do the right thing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine not having this validation here. If we remove the settings parameter from this method, a repo will never be updated with different settings.
class CcrRepositoryManager extends RemoteClusterAware { | ||
|
||
private final NodeClient client; | ||
private final Set<String> clusters = ConcurrentCollections.newConcurrentSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the cache and just call the action each time.
sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few more comments. Looking great already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 smaller asks, looks good o.w.
Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry); | ||
for (Map.Entry<String, Repository.Factory> entry : newRepoTypes.entrySet()) { | ||
if (internalFactories.put(entry.getKey(), entry.getValue()) != null) { | ||
throw new IllegalArgumentException("Internal repository type [" + entry.getKey() + "] is already registered"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a test for this?
assertAcked(followerClient().admin().cluster().updateSettings(putFollowerRequest).actionGet()); | ||
|
||
String followerCopyRepoName = CcrRepository.NAME_PREFIX + "follower_cluster_copy"; | ||
assertBusy(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the assertBusy should not be needed here. The update settings call will only return if the corresponding cluster state has been updated on all nodes, and the repositories are created as part of that CS update.
run the docbldesx |
run default distro tests |
run the docbldesx |
This is a follow-up to elastic#36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions. Additionally, this PR stops casting Client to NodeClient. The client we have is a NodeClient so executing the actions will be local.
This is a follow-up to #36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions. Additionally, this PR stops casting Client to NodeClient. The client we have is a NodeClient so executing the actions will be local.
This commit adds an empty CcrRepository snapshot/restore repository. When a new cluster is registered in the remote cluster settings, a new CcrRepository is registered for that cluster. This is implemented using a new concept of "internal repositories". RepositoryPlugin now allows implementations to return factories for "internal repositories". The "internal repositories" are different from normal repositories in that they cannot be registered through the external repository api. Additionally, "internal repositories" are local to a node and are not stored in the cluster state. The repository will be unregistered if the remote cluster is removed.
This commit adds an empty CcrRepository snapshot/restore repository. When a new cluster is registered in the remote cluster settings, a new CcrRepository is registered for that cluster. This is implemented using a new concept of "internal repositories". RepositoryPlugin now allows implementations to return factories for "internal repositories". The "internal repositories" are different from normal repositories in that they cannot be registered through the external repository api. Additionally, "internal repositories" are local to a node and are not stored in the cluster state. The repository will be unregistered if the remote cluster is removed.
This is a follow-up to elastic#36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions. Additionally, this PR stops casting Client to NodeClient. The client we have is a NodeClient so executing the actions will be local.
This is a follow-up to #36086. It renames the internal repository actions to be prefixed by "internal". This allows the system user to execute the actions.
This commit adds an empty
CcrRepository
snapshot/restore repository.When a new cluster is registered in the remote cluster settings, a new
CcrRepository
is registered for that cluster.This is implemented using a new concept of "internal repositories".
RepositoryPlugin
now allows implementations to return factories for"internal repositories". The "internal repositories" are different from
normal repositories in that they cannot be registered through the
external repository api. Additionally, "internal repositories" are local
to a node and are not stored in the cluster state.
The repository will be unregistered if the remote cluster is removed.