Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split searchable snapshot into multiple repo operations #116918

Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ public class RepositoryMetadata implements Writeable {
* @param settings repository settings
*/
public RepositoryMetadata(String name, String type, Settings settings) {
this(name, RepositoryData.MISSING_UUID, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
this(name, RepositoryData.MISSING_UUID, type, settings);
}

public RepositoryMetadata(String name, String uuid, String type, Settings settings) {
this(name, uuid, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
}

public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,17 @@ public RegisterRepositoryTask(final RepositoriesService repositoriesService, fin

@Override
public ClusterState execute(ClusterState currentState) {
RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = RepositoriesMetadata.get(currentState);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
if (repositoryMetadata.name().equals(request.name())) {
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(
request.name(),
repositoryMetadata.uuid(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we copy the UUID from the previous repository instance rather than using _na_. The next time we load the RepositoryData we update the metadata if needed:

if (loaded.getUuid().equals(metadata.uuid())) {
listener.onResponse(loaded);
} else {
// someone switched the repo contents out from under us
RepositoriesService.updateRepositoryUuidInMetadata(
clusterService,
metadata.name(),
loaded,
new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded))
);
}

We could conceivably be stricter here, see #109936, but it doesn't seem necessary today. Instead note that in RepositorySupplier we'll notice the change in UUID and look for a different repository with matching UUID before eventually throwing a RepositoryMissingException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is an optimisation when UUID does not change which should be most of the cases.

When the UUID does change, are you saying that

  1. Copying the UUID only delays the need to load repositoryData.
  2. The repositoryData will be loaded before any writing, e.g. createSnapshot, can happen so that UUID will become consistent.

It is not clear to me why a cached repositoryData would not be loaded in 2 and further delays the UUID consistency update?

The change somehow feels not belong here. But I may be too paranoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're updating settings that fundamentally change the underlying repository then org.elasticsearch.repositories.RepositoriesService#applyClusterState will create a brand-new Repository instance to replace the existing one (i.e. org.elasticsearch.repositories.RepositoriesService#canUpdateInPlace will return false) and this new instance will have no cached RepositoryData.

It's kind of an optimization but also kind of vital for the behaviour here. If we don't do this then we can't see that the new Repository instance is the one we should use for searchable snapshot operations in future (at least not without blocking some thread somewhere while waiting for the new UUID to be loaded).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a brand-new Repository

Thanks! This is an important information that I originally missed.

If we don't do this then we can't see that the new Repository instance is the one we should use for searchable snapshot operations in future (at least not without blocking some thread somewhere while waiting for the new UUID to be loaded).

I see the point now. I guess that means searchable snapshot actions do not always load repo data as the first step? If the repo UUID did change, does that mean it would take a while before searchable snapshot related code realise it? Would that lead to any issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that means searchable snapshot actions do not always load repo data as the first step?

Searchable snapshot actions essentially never load the RepositoryData. They already know how to find the shard data within the blob store (from the index.store.snapshot.index_uuid and index.store.snapshot.snapshot_uuid settings in the index metadata, and the shard ID). If the repo switches out from underneath them then they'll get exceptions indicating that the blobs they need are no longer found.

request.type(),
request.settings()
);
Repository existing = repositoriesService.repositories.get(request.name());
if (existing == null) {
existing = repositoriesService.internalRepositories.get(request.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ private class ShutdownLogger {
public static final String STATELESS_SHARD_WRITE_THREAD_NAME = "stateless_shard_write";
public static final String STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME = "stateless_cluster_state";
public static final String STATELESS_SHARD_PREWARMING_THREAD_NAME = "stateless_prewarm";
public static final String SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME = "searchable_snapshots_cache_fetch_async";
public static final String SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME = "searchable_snapshots_cache_prewarming";

/**
* Prefix for the name of the root {@link RepositoryData} blob.
Expand Down Expand Up @@ -2188,7 +2190,9 @@ private void assertSnapshotOrStatelessPermittedThreadPool() {
STATELESS_TRANSLOG_THREAD_NAME,
STATELESS_SHARD_WRITE_THREAD_NAME,
STATELESS_CLUSTER_STATE_READ_WRITE_THREAD_NAME,
STATELESS_SHARD_PREWARMING_THREAD_NAME
STATELESS_SHARD_PREWARMING_THREAD_NAME,
SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME,
SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.searchablesnapshots.s3;

import fixture.s3.S3HttpFixture;
import io.netty.handler.codec.http.HttpMethod;

import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.MutableSettingsProvider;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.function.UnaryOperator;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.allOf;

public class S3SearchableSnapshotsCredentialsReloadIT extends ESRestTestCase {

private static final String BUCKET = "S3SearchableSnapshotsCredentialsReloadIT-bucket";
private static final String BASE_PATH = "S3SearchableSnapshotsCredentialsReloadIT-base-path";

public static final S3HttpFixture s3Fixture = new S3HttpFixture(true, BUCKET, BASE_PATH, "ignored");

private static final MutableSettingsProvider keystoreSettings = new MutableSettingsProvider();

public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.setting("xpack.license.self_generated.type", "trial")
.keystore(keystoreSettings)
.setting("xpack.searchable.snapshot.shared_cache.size", "4kB")
.setting("xpack.searchable.snapshot.shared_cache.region_size", "4kB")
.setting("xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive", "0ms")
.setting("xpack.security.enabled", "false")
.systemProperty("es.allow_insecure_settings", "true")
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster);

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public void testReloadCredentialsFromKeystore() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to skip it when in fips.

final TestHarness testHarness = new TestHarness();
testHarness.putRepository();

// Set up initial credentials
final String accessKey1 = randomIdentifier();
s3Fixture.setAccessKey(accessKey1);
keystoreSettings.put("s3.client.default.access_key", accessKey1);
keystoreSettings.put("s3.client.default.secret_key", randomIdentifier());
cluster.updateStoredSecureSettings();
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings")));

testHarness.createFrozenSearchableSnapshotIndex();

// Verify searchable snapshot functionality
testHarness.ensureSearchSuccess();

// Rotate credentials in blob store
logger.info("--> rotate credentials");
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier);
s3Fixture.setAccessKey(accessKey2);

// Ensure searchable snapshot now does not work due to invalid credentials
logger.info("--> expect failure");
testHarness.ensureSearchFailure();

// Set up refreshed credentials
logger.info("--> update keystore contents");
keystoreSettings.put("s3.client.default.access_key", accessKey2);
cluster.updateStoredSecureSettings();
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings")));

// Check access using refreshed credentials
logger.info("--> expect success");
testHarness.ensureSearchSuccess();
}

public void testReloadCredentialsFromAlternativeClient() throws IOException {
final TestHarness testHarness = new TestHarness();
testHarness.putRepository();

// Set up credentials
final String accessKey1 = randomIdentifier();
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier);
final String alternativeClient = randomValueOtherThan("default", ESTestCase::randomIdentifier);

s3Fixture.setAccessKey(accessKey1);
keystoreSettings.put("s3.client.default.access_key", accessKey1);
keystoreSettings.put("s3.client.default.secret_key", randomIdentifier());
keystoreSettings.put("s3.client." + alternativeClient + ".access_key", accessKey2);
keystoreSettings.put("s3.client." + alternativeClient + ".secret_key", randomIdentifier());
cluster.updateStoredSecureSettings();
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings")));

testHarness.createFrozenSearchableSnapshotIndex();

// Verify searchable snapshot functionality
testHarness.ensureSearchSuccess();

// Rotate credentials in blob store
logger.info("--> rotate credentials");
s3Fixture.setAccessKey(accessKey2);

// Ensure searchable snapshot now does not work due to invalid credentials
logger.info("--> expect failure");
testHarness.ensureSearchFailure();

// Adjust repository to use new client
logger.info("--> update repository metadata");
testHarness.putRepository(b -> b.put("client", alternativeClient));

// Check access using refreshed credentials
logger.info("--> expect success");
testHarness.ensureSearchSuccess();
}

public void testReloadCredentialsFromMetadata() throws IOException {
final TestHarness testHarness = new TestHarness();
testHarness.warningsHandler = WarningsHandler.PERMISSIVE;

// Set up credentials
final String accessKey1 = randomIdentifier();
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier);

testHarness.putRepository(b -> b.put("access_key", accessKey1).put("secret_key", randomIdentifier()));
s3Fixture.setAccessKey(accessKey1);

testHarness.createFrozenSearchableSnapshotIndex();

// Verify searchable snapshot functionality
testHarness.ensureSearchSuccess();

// Rotate credentials in blob store
logger.info("--> rotate credentials");
s3Fixture.setAccessKey(accessKey2);

// Ensure searchable snapshot now does not work due to invalid credentials
logger.info("--> expect failure");
testHarness.ensureSearchFailure();

// Adjust repository to use new client
logger.info("--> update repository metadata");
testHarness.putRepository(b -> b.put("access_key", accessKey2).put("secret_key", randomIdentifier()));

// Check access using refreshed credentials
logger.info("--> expect success");
testHarness.ensureSearchSuccess();
}

private class TestHarness {
private final String mountedIndexName = randomIdentifier();
private final String repositoryName = randomIdentifier();

@Nullable // to use the default
WarningsHandler warningsHandler;

void putRepository() throws IOException {
putRepository(UnaryOperator.identity());
}

void putRepository(UnaryOperator<Settings.Builder> settingsOperator) throws IOException {
// Register repository
final Request request = newXContentRequest(
HttpMethod.PUT,
"/_snapshot/" + repositoryName,
(b, p) -> b.field("type", "s3")
.startObject("settings")
.value(
settingsOperator.apply(
Settings.builder().put("bucket", BUCKET).put("base_path", BASE_PATH).put("endpoint", s3Fixture.getAddress())
).build()
)
.endObject()
);
request.addParameter("verify", "false"); // because we don't have access to the blob store yet
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
assertOK(client().performRequest(request));
}

void createFrozenSearchableSnapshotIndex() throws IOException {
// Create an index, large enough that its data is not all captured in the file headers
final String indexName = randomValueOtherThan(mountedIndexName, ESTestCase::randomIdentifier);
createIndex(indexName, indexSettings(1, 0).build());
try (var bodyStream = new ByteArrayOutputStream()) {
for (int i = 0; i < 1024; i++) {
try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) {
bodyLineBuilder.startObject().startObject("index").endObject().endObject();
}
bodyStream.write(0x0a);
try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) {
bodyLineBuilder.startObject().field("foo", "bar").endObject();
}
bodyStream.write(0x0a);
}
bodyStream.flush();
final Request request = new Request("PUT", indexName + "/_bulk");
request.setEntity(new ByteArrayEntity(bodyStream.toByteArray(), ContentType.APPLICATION_JSON));
client().performRequest(request);
}

// Take a snapshot and delete the original index
final String snapshotName = randomIdentifier();
final Request createSnapshotRequest = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repositoryName + '/' + snapshotName);
createSnapshotRequest.addParameter("wait_for_completion", "true");
createSnapshotRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
assertOK(client().performRequest(createSnapshotRequest));

deleteIndex(indexName);

// Mount the snapshotted index as a searchable snapshot
final Request mountRequest = newXContentRequest(
HttpMethod.POST,
"/_snapshot/" + repositoryName + "/" + snapshotName + "/_mount",
(b, p) -> b.field("index", indexName).field("renamed_index", mountedIndexName)
);
mountRequest.addParameter("wait_for_completion", "true");
mountRequest.addParameter("storage", "shared_cache");
assertOK(client().performRequest(mountRequest));
ensureGreen(mountedIndexName);
}

void ensureSearchSuccess() throws IOException {
final Request searchRequest = new Request("GET", mountedIndexName + "/_search");
searchRequest.addParameter("size", "10000");
assertEquals(
"bar",
ObjectPath.createFromResponse(assertOK(client().performRequest(searchRequest))).evaluate("hits.hits.0._source.foo")
);
}

void ensureSearchFailure() throws IOException {
assertOK(client().performRequest(new Request("POST", "/_searchable_snapshots/cache/clear")));
final Request searchRequest = new Request("GET", mountedIndexName + "/_search");
searchRequest.addParameter("size", "10000");
assertThat(
expectThrows(ResponseException.class, () -> client().performRequest(searchRequest)).getMessage(),
allOf(
containsString("Bad access key"),
containsString("Status Code: 403"),
containsString("Error Code: AccessDenied"),
containsString("failed to read data from cache")
)
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@ public Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
public static final String CACHE_PREWARMING_THREAD_POOL_NAME = "searchable_snapshots_cache_prewarming";
public static final String CACHE_PREWARMING_THREAD_POOL_SETTING = "xpack.searchable_snapshots.cache_prewarming_thread_pool";

static {
// these thread names must be aligned with those in :server
assert CACHE_FETCH_ASYNC_THREAD_POOL_NAME.equals(BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME);
assert CACHE_PREWARMING_THREAD_POOL_NAME.equals(BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_PREWARMING_THREAD_NAME);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can also actively assign them to be equal? eg:

CACHE_FETCH_ASYNC_THREAD_POOL_NAME = BlobStoreRepository.SEARCHABLE_SNAPSHOTS_CACHE_FETCH_ASYNC_THREAD_NAME;


public static ScalingExecutorBuilder[] executorBuilders(Settings settings) {
final int processors = EsExecutors.allocatedProcessors(settings);
// searchable snapshots cache thread pools should always reject tasks once they are shutting down, otherwise some threads might be
Expand Down
Loading