Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into carlosdelest/semantic-text-exists…
Browse files Browse the repository at this point in the history
…-query
  • Loading branch information
carlosdelest committed Jun 21, 2024
2 parents b0a5b53 + 29172b1 commit 12b8533
Show file tree
Hide file tree
Showing 321 changed files with 4,741 additions and 2,207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,7 @@ private void logFileContents(String description, Path from, boolean tailLogs) {
}
}
if (foundLeaks) {
throw new TestClustersException("Found resource leaks in node logs.");
throw new TestClustersException("Found resource leaks in node log: " + from);
}
}

Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/109636.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 109636
summary: "Ensure a lazy rollover request will rollover the target data stream once."
area: Data streams
type: bug
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/109848.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 109848
summary: Denser in-memory representation of `ShardBlobsToDelete`
area: Snapshot/Restore
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/109931.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 109931
summary: Apply FLS to the contents of `IgnoredSourceFieldMapper`
area: Mapping
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/reference/features/apis/reset-features-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ To list the features that will be affected, use the <<get-features-api,get featu

IMPORTANT: The features installed on the node you submit this request to are the features that will be reset. Run on the master node if you have any doubts about which plugins are installed on individual nodes.

[[reset-features-api-query-params]]
==== {api-query-parms-title}

include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]

==== {api-examples-title}
Example response:
[source,console-result]
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequestBuilder;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class LazyRolloverDuringDisruptionIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class);
}

public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedException {
String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(3);
ensureStableCluster(4);

String dataStreamName = "my-data-stream";
createDataStream(dataStreamName);

// Mark it to lazy rollover
new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute().get();

// Verify that the data stream is marked for rollover and that it has currently one index
DataStream dataStream = getDataStream(dataStreamName);
assertThat(dataStream.rolloverOnWrite(), equalTo(true));
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1));

// Introduce a disruption to the master node that should delay the rollover execution
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), masterNode, 100, 200, 30000, 60000);
internalCluster().setDisruptionScheme(masterNodeDisruption);
masterNodeDisruption.startDisrupting();

// Start indexing operations
int docs = randomIntBetween(5, 10);
CountDownLatch countDownLatch = new CountDownLatch(docs);
for (int i = 0; i < docs; i++) {
var indexRequest = new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE);
final String doc = "{ \"@timestamp\": \"2099-05-06T16:21:15.000Z\", \"message\": \"something cool happened\" }";
indexRequest.source(doc, XContentType.JSON);
client().index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(DocWriteResponse docWriteResponse) {
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Indexing request should have succeeded eventually, failed with " + e.getMessage());
}
});
}

// End the disruption so that all pending tasks will complete
masterNodeDisruption.stopDisrupting();

// Wait for all the indexing requests to be processed successfully
countDownLatch.await();

// Verify that the rollover has happened once
dataStream = getDataStream(dataStreamName);
assertThat(dataStream.rolloverOnWrite(), equalTo(false));
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(2));
}

private DataStream getDataStream(String dataStreamName) {
return client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName }))
.actionGet()
.getDataStreams()
.get(0)
.getDataStream();
}

private void createDataStream(String dataStreamName) throws InterruptedException, ExecutionException {
final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest =
new TransportPutComposableIndexTemplateAction.Request("my-template");
putComposableTemplateRequest.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
.build()
);
final AcknowledgedResponse putComposableTemplateResponse = client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
putComposableTemplateRequest
).actionGet();
assertThat(putComposableTemplateResponse.isAcknowledged(), is(true));

final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
final AcknowledgedResponse createDataStreamResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
.get();
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ public void testSystemDataStreamInGlobalState() throws Exception {
}

assertSuccessful(
clusterAdmin().prepareCreateSnapshot(REPO, SNAPSHOT).setWaitForCompletion(true).setIncludeGlobalState(true).execute()
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIncludeGlobalState(true)
.execute()
);

// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
Expand All @@ -98,7 +101,7 @@ public void testSystemDataStreamInGlobalState() throws Exception {
// Make sure requesting the data stream by name throws.
// For some reason, expectThrows() isn't working for me here, hence the try/catch.
try {
clusterAdmin().prepareRestoreSnapshot(REPO, SNAPSHOT)
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setIndices(".test-data-stream")
.setWaitForCompletion(true)
.setRestoreGlobalState(randomBoolean()) // this shouldn't matter
Expand All @@ -117,7 +120,7 @@ public void testSystemDataStreamInGlobalState() throws Exception {
assertSystemDataStreamDoesNotExist();

// Now actually restore the data stream
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(REPO, SNAPSHOT)
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setRestoreGlobalState(true)
.get();
Expand All @@ -132,7 +135,10 @@ public void testSystemDataStreamInGlobalState() throws Exception {

// Attempting to restore again without specifying indices or global/feature states should work, as the wildcard should not be
// resolved to system indices/data streams.
clusterAdmin().prepareRestoreSnapshot(REPO, SNAPSHOT).setWaitForCompletion(true).setRestoreGlobalState(false).get();
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setRestoreGlobalState(false)
.get();
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());
}

Expand Down Expand Up @@ -182,7 +188,7 @@ public void testSystemDataStreamInFeatureState() throws Exception {
}

SnapshotInfo snapshotInfo = assertSuccessful(
clusterAdmin().prepareCreateSnapshot(REPO, SNAPSHOT)
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setIndices("my-index")
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
.setWaitForCompletion(true)
Expand All @@ -207,7 +213,7 @@ public void testSystemDataStreamInFeatureState() throws Exception {
assertThat(indicesRemaining.indices(), arrayWithSize(0));
}

RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(REPO, SNAPSHOT)
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("my-index")
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ protected SecureSettings credentials() {

@Override
protected void createRepository(String repoName) {
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(repoName)
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
repoName
)
.setType("azure")
.setSettings(
Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ protected SecureSettings credentials() {

@Override
protected void createRepository(final String repoName) {
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(repoName)
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
repoName
)
.setType("gcs")
.setSettings(
Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ public void testAbortRequestStats() throws Exception {
// Intentionally fail snapshot to trigger abortMultipartUpload requests
shouldFailCompleteMultipartUploadRequest.set(true);
final String snapshot = "snapshot";
clusterAdmin().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index).get();
clusterAdmin().prepareDeleteSnapshot(repository, snapshot).get();
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).setWaitForCompletion(true).setIndices(index).get();
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).get();

final RepositoryStats repositoryStats = StreamSupport.stream(
internalCluster().getInstances(RepositoriesService.class).spliterator(),
Expand Down Expand Up @@ -242,12 +242,16 @@ public void testMetrics() throws Exception {
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);

final String snapshot = "snapshot";
assertSuccessfulSnapshot(clusterAdmin().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index));
assertSuccessfulSnapshot(
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).setWaitForCompletion(true).setIndices(index)
);
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(clusterAdmin().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
assertSuccessfulRestore(
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).setWaitForCompletion(true)
);
ensureGreen(index);
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);
assertAcked(clusterAdmin().prepareDeleteSnapshot(repository, snapshot).get());
assertAcked(clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repository, snapshot).get());

final Map<S3BlobStore.StatsKey, Long> aggregatedMetrics = new HashMap<>();
// Compare collected stats and metrics for each node and they should be the same
Expand Down Expand Up @@ -389,7 +393,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
true
);

final SnapshotId fakeOldSnapshot = clusterAdmin().prepareCreateSnapshot(repoName, "snapshot-old")
final SnapshotId fakeOldSnapshot = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, "snapshot-old")
.setWaitForCompletion(true)
.setIndices()
.get()
Expand Down Expand Up @@ -434,15 +438,15 @@ public void testEnforcedCooldownPeriod() throws IOException {

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
clusterAdmin().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos();
clusterAdmin().prepareDeleteSnapshot(repoName, newSnapshotName).get();
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repoName, newSnapshotName).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos()));

final long beforeFastDelete = repository.threadPool().relativeTimeInNanos();
clusterAdmin().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get();
clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repoName, fakeOldSnapshot.getName()).get();
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ protected void createRepository(String repoName) {
settings.put("storage_class", storageClass);
}
}
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(repoName)
.setType("s3")
.setSettings(settings)
.get();
AcknowledgedResponse putRepositoryResponse = clusterAdmin().preparePutRepository(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
repoName
).setType("s3").setSettings(settings).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ public void sendResponse(RestResponse response) {
}

private void createRepository(final String name, final Settings repositorySettings) {
assertAcked(clusterAdmin().preparePutRepository(name).setType(S3Repository.TYPE).setVerify(false).setSettings(repositorySettings));
assertAcked(
clusterAdmin().preparePutRepository(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, name)
.setType(S3Repository.TYPE)
.setVerify(false)
.setSettings(repositorySettings)
);
}

/**
Expand Down
Loading

0 comments on commit 12b8533

Please sign in to comment.