forked from elastic/elasticsearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Split searchable snapshot into multiple repo operations (elastic#116987)
* Split searchable snapshot into multiple repo operations Each operation on a snapshot repository uses the same `Repository`, `BlobStore`, etc. instances throughout, in order to avoid the complexity arising from handling metadata updates that occur while an operation is running. Today we model the entire lifetime of a searchable snapshot shard as a single repository operation since there should be no metadata updates that matter in this context (other than those that are handled dynamically via other mechanisms) and some metadata updates might be positively harmful to a searchable snapshot shard. It turns out that there are some undocumented legacy settings which _do_ matter to searchable snapshots, and which are still in use, so with this commit we move to a finer-grained model of repository operations within a searchable snapshot. Backport of elastic#116918 to 8.16 * Add end-to-end test for reloading S3 credentials We don't seem to have a test that completely verifies that a S3 repository can reload credentials from an updated keystore. This commit adds such a test. Backport of elastic#116762 to 8.16.
- Loading branch information
1 parent
33b8fb1
commit f663886
Showing
9 changed files
with
505 additions
and
79 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 116918 | ||
summary: Split searchable snapshot into multiple repo operations | ||
area: Snapshot/Restore | ||
type: enhancement | ||
issues: [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
281 changes: 281 additions & 0 deletions
281
.../elasticsearch/xpack/searchablesnapshots/s3/S3SearchableSnapshotsCredentialsReloadIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.searchablesnapshots.s3; | ||
|
||
import fixture.s3.S3HttpFixture; | ||
import io.netty.handler.codec.http.HttpMethod; | ||
|
||
import org.apache.http.client.methods.HttpPut; | ||
import org.apache.http.entity.ByteArrayEntity; | ||
import org.apache.http.entity.ContentType; | ||
import org.elasticsearch.client.Request; | ||
import org.elasticsearch.client.RequestOptions; | ||
import org.elasticsearch.client.ResponseException; | ||
import org.elasticsearch.client.WarningsHandler; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.test.ESTestCase; | ||
import org.elasticsearch.test.cluster.ElasticsearchCluster; | ||
import org.elasticsearch.test.cluster.MutableSettingsProvider; | ||
import org.elasticsearch.test.cluster.local.distribution.DistributionType; | ||
import org.elasticsearch.test.rest.ESRestTestCase; | ||
import org.elasticsearch.test.rest.ObjectPath; | ||
import org.elasticsearch.xcontent.XContentBuilder; | ||
import org.elasticsearch.xcontent.XContentType; | ||
import org.junit.Before; | ||
import org.junit.ClassRule; | ||
import org.junit.rules.RuleChain; | ||
import org.junit.rules.TestRule; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.util.function.UnaryOperator; | ||
|
||
import static org.hamcrest.CoreMatchers.containsString; | ||
import static org.hamcrest.Matchers.allOf; | ||
|
||
public class S3SearchableSnapshotsCredentialsReloadIT extends ESRestTestCase { | ||
|
||
private static final String BUCKET = "S3SearchableSnapshotsCredentialsReloadIT-bucket"; | ||
private static final String BASE_PATH = "S3SearchableSnapshotsCredentialsReloadIT-base-path"; | ||
|
||
public static final S3HttpFixture s3Fixture = new S3HttpFixture(true, BUCKET, BASE_PATH, "ignored"); | ||
|
||
private static final MutableSettingsProvider keystoreSettings = new MutableSettingsProvider(); | ||
|
||
public static ElasticsearchCluster cluster = ElasticsearchCluster.local() | ||
.distribution(DistributionType.DEFAULT) | ||
.setting("xpack.license.self_generated.type", "trial") | ||
.keystore(keystoreSettings) | ||
.setting("xpack.searchable.snapshot.shared_cache.size", "4kB") | ||
.setting("xpack.searchable.snapshot.shared_cache.region_size", "4kB") | ||
.setting("xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive", "0ms") | ||
.setting("xpack.security.enabled", "false") | ||
.systemProperty("es.allow_insecure_settings", "true") | ||
.build(); | ||
|
||
@ClassRule | ||
public static TestRule ruleChain = RuleChain.outerRule(s3Fixture).around(cluster); | ||
|
||
@Override | ||
protected String getTestRestCluster() { | ||
return cluster.getHttpAddresses(); | ||
} | ||
|
||
@Before | ||
public void skipFips() { | ||
assumeFalse("getting these tests to run in a FIPS JVM is kinda fiddly and we don't really need the extra coverage", inFipsJvm()); | ||
} | ||
|
||
public void testReloadCredentialsFromKeystore() throws IOException { | ||
final TestHarness testHarness = new TestHarness(); | ||
testHarness.putRepository(); | ||
|
||
// Set up initial credentials | ||
final String accessKey1 = randomIdentifier(); | ||
s3Fixture.setAccessKey(accessKey1); | ||
keystoreSettings.put("s3.client.default.access_key", accessKey1); | ||
keystoreSettings.put("s3.client.default.secret_key", randomIdentifier()); | ||
cluster.updateStoredSecureSettings(); | ||
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); | ||
|
||
testHarness.createFrozenSearchableSnapshotIndex(); | ||
|
||
// Verify searchable snapshot functionality | ||
testHarness.ensureSearchSuccess(); | ||
|
||
// Rotate credentials in blob store | ||
logger.info("--> rotate credentials"); | ||
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); | ||
s3Fixture.setAccessKey(accessKey2); | ||
|
||
// Ensure searchable snapshot now does not work due to invalid credentials | ||
logger.info("--> expect failure"); | ||
testHarness.ensureSearchFailure(); | ||
|
||
// Set up refreshed credentials | ||
logger.info("--> update keystore contents"); | ||
keystoreSettings.put("s3.client.default.access_key", accessKey2); | ||
cluster.updateStoredSecureSettings(); | ||
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); | ||
|
||
// Check access using refreshed credentials | ||
logger.info("--> expect success"); | ||
testHarness.ensureSearchSuccess(); | ||
} | ||
|
||
public void testReloadCredentialsFromAlternativeClient() throws IOException { | ||
final TestHarness testHarness = new TestHarness(); | ||
testHarness.putRepository(); | ||
|
||
// Set up credentials | ||
final String accessKey1 = randomIdentifier(); | ||
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); | ||
final String alternativeClient = randomValueOtherThan("default", ESTestCase::randomIdentifier); | ||
|
||
s3Fixture.setAccessKey(accessKey1); | ||
keystoreSettings.put("s3.client.default.access_key", accessKey1); | ||
keystoreSettings.put("s3.client.default.secret_key", randomIdentifier()); | ||
keystoreSettings.put("s3.client." + alternativeClient + ".access_key", accessKey2); | ||
keystoreSettings.put("s3.client." + alternativeClient + ".secret_key", randomIdentifier()); | ||
cluster.updateStoredSecureSettings(); | ||
assertOK(client().performRequest(new Request("POST", "/_nodes/reload_secure_settings"))); | ||
|
||
testHarness.createFrozenSearchableSnapshotIndex(); | ||
|
||
// Verify searchable snapshot functionality | ||
testHarness.ensureSearchSuccess(); | ||
|
||
// Rotate credentials in blob store | ||
logger.info("--> rotate credentials"); | ||
s3Fixture.setAccessKey(accessKey2); | ||
|
||
// Ensure searchable snapshot now does not work due to invalid credentials | ||
logger.info("--> expect failure"); | ||
testHarness.ensureSearchFailure(); | ||
|
||
// Adjust repository to use new client | ||
logger.info("--> update repository metadata"); | ||
testHarness.putRepository(b -> b.put("client", alternativeClient)); | ||
|
||
// Check access using refreshed credentials | ||
logger.info("--> expect success"); | ||
testHarness.ensureSearchSuccess(); | ||
} | ||
|
||
public void testReloadCredentialsFromMetadata() throws IOException { | ||
final TestHarness testHarness = new TestHarness(); | ||
testHarness.warningsHandler = WarningsHandler.PERMISSIVE; | ||
|
||
// Set up credentials | ||
final String accessKey1 = randomIdentifier(); | ||
final String accessKey2 = randomValueOtherThan(accessKey1, ESTestCase::randomIdentifier); | ||
|
||
testHarness.putRepository(b -> b.put("access_key", accessKey1).put("secret_key", randomIdentifier())); | ||
s3Fixture.setAccessKey(accessKey1); | ||
|
||
testHarness.createFrozenSearchableSnapshotIndex(); | ||
|
||
// Verify searchable snapshot functionality | ||
testHarness.ensureSearchSuccess(); | ||
|
||
// Rotate credentials in blob store | ||
logger.info("--> rotate credentials"); | ||
s3Fixture.setAccessKey(accessKey2); | ||
|
||
// Ensure searchable snapshot now does not work due to invalid credentials | ||
logger.info("--> expect failure"); | ||
testHarness.ensureSearchFailure(); | ||
|
||
// Adjust repository to use new client | ||
logger.info("--> update repository metadata"); | ||
testHarness.putRepository(b -> b.put("access_key", accessKey2).put("secret_key", randomIdentifier())); | ||
|
||
// Check access using refreshed credentials | ||
logger.info("--> expect success"); | ||
testHarness.ensureSearchSuccess(); | ||
} | ||
|
||
private class TestHarness { | ||
private final String mountedIndexName = randomIdentifier(); | ||
private final String repositoryName = randomIdentifier(); | ||
|
||
@Nullable // to use the default | ||
WarningsHandler warningsHandler; | ||
|
||
void putRepository() throws IOException { | ||
putRepository(UnaryOperator.identity()); | ||
} | ||
|
||
void putRepository(UnaryOperator<Settings.Builder> settingsOperator) throws IOException { | ||
// Register repository | ||
final Request request = newXContentRequest( | ||
HttpMethod.PUT, | ||
"/_snapshot/" + repositoryName, | ||
(b, p) -> b.field("type", "s3") | ||
.startObject("settings") | ||
.value( | ||
settingsOperator.apply( | ||
Settings.builder().put("bucket", BUCKET).put("base_path", BASE_PATH).put("endpoint", s3Fixture.getAddress()) | ||
).build() | ||
) | ||
.endObject() | ||
); | ||
request.addParameter("verify", "false"); // because we don't have access to the blob store yet | ||
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler)); | ||
assertOK(client().performRequest(request)); | ||
} | ||
|
||
void createFrozenSearchableSnapshotIndex() throws IOException { | ||
// Create an index, large enough that its data is not all captured in the file headers | ||
final String indexName = randomValueOtherThan(mountedIndexName, ESTestCase::randomIdentifier); | ||
createIndex(indexName, indexSettings(1, 0).build()); | ||
try (var bodyStream = new ByteArrayOutputStream()) { | ||
for (int i = 0; i < 1024; i++) { | ||
try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) { | ||
bodyLineBuilder.startObject().startObject("index").endObject().endObject(); | ||
} | ||
bodyStream.write(0x0a); | ||
try (XContentBuilder bodyLineBuilder = new XContentBuilder(XContentType.JSON.xContent(), bodyStream)) { | ||
bodyLineBuilder.startObject().field("foo", "bar").endObject(); | ||
} | ||
bodyStream.write(0x0a); | ||
} | ||
bodyStream.flush(); | ||
final Request request = new Request("PUT", indexName + "/_bulk"); | ||
request.setEntity(new ByteArrayEntity(bodyStream.toByteArray(), ContentType.APPLICATION_JSON)); | ||
client().performRequest(request); | ||
} | ||
|
||
// Take a snapshot and delete the original index | ||
final String snapshotName = randomIdentifier(); | ||
final Request createSnapshotRequest = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repositoryName + '/' + snapshotName); | ||
createSnapshotRequest.addParameter("wait_for_completion", "true"); | ||
createSnapshotRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler)); | ||
assertOK(client().performRequest(createSnapshotRequest)); | ||
|
||
deleteIndex(indexName); | ||
|
||
// Mount the snapshotted index as a searchable snapshot | ||
final Request mountRequest = newXContentRequest( | ||
HttpMethod.POST, | ||
"/_snapshot/" + repositoryName + "/" + snapshotName + "/_mount", | ||
(b, p) -> b.field("index", indexName).field("renamed_index", mountedIndexName) | ||
); | ||
mountRequest.addParameter("wait_for_completion", "true"); | ||
mountRequest.addParameter("storage", "shared_cache"); | ||
assertOK(client().performRequest(mountRequest)); | ||
ensureGreen(mountedIndexName); | ||
} | ||
|
||
void ensureSearchSuccess() throws IOException { | ||
final Request searchRequest = new Request("GET", mountedIndexName + "/_search"); | ||
searchRequest.addParameter("size", "10000"); | ||
assertEquals( | ||
"bar", | ||
ObjectPath.createFromResponse(assertOK(client().performRequest(searchRequest))).evaluate("hits.hits.0._source.foo") | ||
); | ||
} | ||
|
||
void ensureSearchFailure() throws IOException { | ||
assertOK(client().performRequest(new Request("POST", "/_searchable_snapshots/cache/clear"))); | ||
final Request searchRequest = new Request("GET", mountedIndexName + "/_search"); | ||
searchRequest.addParameter("size", "10000"); | ||
assertThat( | ||
expectThrows(ResponseException.class, () -> client().performRequest(searchRequest)).getMessage(), | ||
allOf( | ||
containsString("Bad access key"), | ||
containsString("Status Code: 403"), | ||
containsString("Error Code: AccessDenied"), | ||
containsString("failed to read data from cache") | ||
) | ||
); | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.