Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dlm add auto rollover condition max age #94950

Merged
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
18bea61
Extract XContentFragment method that only serializes the conditions
gmarouli Mar 31, 2023
39c25e8
Introduce a model for configuring automatic rollover conditions
gmarouli Mar 31, 2023
79e4f22
Introduce a model for configuring automatic rollover conditions
gmarouli Mar 31, 2023
ea2bdd9
Change the representation of an automatic when the value is known
gmarouli Mar 31, 2023
f16de3c
Add helper constructor
gmarouli Mar 31, 2023
fb381e4
Allow RolloverConditions builder to override values
gmarouli Mar 31, 2023
aeb01d0
Revert WaitForRolloverReadyStep
gmarouli Mar 31, 2023
b4a2508
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
gmarouli Mar 31, 2023
428561a
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
elasticmachine Apr 3, 2023
025934f
Polishing
gmarouli Apr 3, 2023
6295092
Test label "automatic"
gmarouli Apr 3, 2023
28c0b20
Polishing comments
gmarouli Apr 3, 2023
93dce69
Update docs/changelog/94950.yaml
gmarouli Apr 3, 2023
2bc60de
Fix DataLifecycleTests to account for an automatic condition
gmarouli Apr 3, 2023
cdb0032
Change the scope of getAutomaticConditions
gmarouli Apr 3, 2023
897b50e
Update max_age heuristic
gmarouli Apr 3, 2023
c75754e
Update comment
gmarouli Apr 3, 2023
518b989
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
elasticmachine Apr 5, 2023
cd28e05
Update new endpoints with rollover configuration
gmarouli Apr 5, 2023
5574421
Test evaluateMaxAgeCondition separately
gmarouli Apr 5, 2023
acd5175
Change the default configuration
gmarouli Apr 5, 2023
9bac049
Polish DataLifecycleService and add a test
gmarouli Apr 5, 2023
7a554be
Parse all rollover configuration values from String
gmarouli Apr 5, 2023
63c4640
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
gmarouli Apr 5, 2023
e4d147c
Fix test
gmarouli Apr 5, 2023
595d97b
Revert "Parse all rollover configuration values from String"
gmarouli Apr 6, 2023
1bc624c
Remove the XContent parser
gmarouli Apr 6, 2023
1af590b
Add validation in the RolloverConfiguration
gmarouli Apr 6, 2023
a11bc27
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
elasticmachine Apr 6, 2023
a4f5efe
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
elasticmachine Apr 6, 2023
36defcd
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
elasticmachine Apr 7, 2023
d5aaa4c
Merge branch 'main' into dlm-add-auto-rollover-condition-max-age
elasticmachine Apr 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/94950.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94950
summary: Dlm add auto rollover condition max age
area: DLM
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testGetLifecycle() throws Exception {
assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
assertThat(response.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-2"));
assertThat(response.getDataStreamLifecycles().get(1).lifecycle(), equalTo(lifecycle));
assertThat(response.getRolloverConditions(), nullValue());
assertThat(response.getRolloverConfiguration(), nullValue());
}

// Test retrieving all lifecycles prefixed wildcard
Expand All @@ -79,7 +79,7 @@ public void testGetLifecycle() throws Exception {
assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
assertThat(response.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-2"));
assertThat(response.getDataStreamLifecycles().get(1).lifecycle(), equalTo(lifecycle));
assertThat(response.getRolloverConditions(), nullValue());
assertThat(response.getRolloverConfiguration(), nullValue());
}

// Test retrieving concrete data streams
Expand All @@ -91,7 +91,7 @@ public void testGetLifecycle() throws Exception {
assertThat(response.getDataStreamLifecycles().size(), equalTo(2));
assertThat(response.getDataStreamLifecycles().get(0).dataStreamName(), equalTo("with-lifecycle-1"));
assertThat(response.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
assertThat(response.getRolloverConditions(), nullValue());
assertThat(response.getRolloverConfiguration(), nullValue());
}

// Test include defaults
Expand All @@ -106,7 +106,7 @@ public void testGetLifecycle() throws Exception {
assertThat(responseWithRollover.getDataStreamLifecycles().get(0).lifecycle(), equalTo(lifecycle));
assertThat(responseWithRollover.getDataStreamLifecycles().get(1).dataStreamName(), equalTo("with-lifecycle-2"));
assertThat(responseWithRollover.getDataStreamLifecycles().get(1).lifecycle(), equalTo(lifecycle));
assertThat(responseWithRollover.getRolloverConditions(), notNullValue());
assertThat(responseWithRollover.getRolloverConfiguration(), notNullValue());
}

public void testPutLifecycle() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testExplainLifecycle() throws Exception {
.actionGet();
assertThat(response.getIndices().size(), is(2));
// we requested the explain for indices with the default include_details=false
assertThat(response.getRolloverConditions(), nullValue());
assertThat(response.getRolloverConfiguration(), nullValue());
for (ExplainIndexDataLifecycle explainIndex : response.getIndices()) {
assertThat(explainIndex.isManagedByDLM(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
Expand Down Expand Up @@ -148,9 +149,9 @@ public void testExplainLifecycle() throws Exception {
ExplainDataLifecycleAction.Response response = client().execute(ExplainDataLifecycleAction.INSTANCE, explainIndicesRequest)
.actionGet();
assertThat(response.getIndices().size(), is(2));
RolloverConditions rolloverConditions = response.getRolloverConditions();
assertThat(rolloverConditions, notNullValue());
Map<String, Condition<?>> conditions = rolloverConditions.getConditions();
RolloverConfiguration rolloverConfiguration = response.getRolloverConfiguration();
assertThat(rolloverConfiguration, notNullValue());
Map<String, Condition<?>> conditions = rolloverConfiguration.resolveRolloverConditions(null).getConditions();
assertThat(conditions.size(), is(2));
assertThat(conditions.get(RolloverConditions.MAX_DOCS_FIELD.getPreferredName()).value(), is(1L));
assertThat(conditions.get(RolloverConditions.MIN_DOCS_FIELD.getPreferredName()).value(), is(1L));
Expand Down Expand Up @@ -198,7 +199,7 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception {
.actionGet();
assertThat(response.getIndices().size(), is(1));
// we requested the explain for indices with the default include_details=false
assertThat(response.getRolloverConditions(), nullValue());
assertThat(response.getRolloverConfiguration(), nullValue());
for (ExplainIndexDataLifecycle explainIndex : response.getIndices()) {
assertThat(explainIndex.getIndex(), is(writeIndexName));
assertThat(explainIndex.isManagedByDLM(), is(true));
Expand Down Expand Up @@ -244,7 +245,7 @@ public void testExplainDLMForUnmanagedIndices() throws Exception {
ExplainDataLifecycleAction.Response response = client().execute(ExplainDataLifecycleAction.INSTANCE, explainIndicesRequest)
.actionGet();
assertThat(response.getIndices().size(), is(1));
assertThat(response.getRolloverConditions(), nullValue());
assertThat(response.getRolloverConfiguration(), nullValue());
for (ExplainIndexDataLifecycle explainIndex : response.getIndices()) {
assertThat(explainIndex.isManagedByDLM(), is(false));
assertThat(explainIndex.getIndex(), is(writeIndexName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResultDeduplicator;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -78,7 +78,7 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
private final DataLifecycleErrorStore errorStore;
private volatile boolean isMaster = false;
private volatile TimeValue pollInterval;
private volatile RolloverConditions rolloverConditions;
private volatile RolloverConfiguration rolloverConfiguration;
private SchedulerEngine.Job scheduledJob;
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();

Expand All @@ -100,7 +100,7 @@ public DataLifecycleService(
this.errorStore = errorStore;
this.scheduledJob = null;
this.pollInterval = DLM_POLL_INTERVAL_SETTING.get(settings);
this.rolloverConditions = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING);
this.rolloverConfiguration = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING);
}

/**
Expand All @@ -110,7 +110,7 @@ public void init() {
clusterService.addListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DLM_POLL_INTERVAL_SETTING, this::updatePollInterval);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING, this::updateRolloverConditions);
.addSettingsUpdateConsumer(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING, this::updateRolloverConfiguration);
}

@Override
Expand Down Expand Up @@ -214,7 +214,11 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
Index writeIndex = dataStream.getWriteIndex();
if (dataStream.isIndexManagedByDLM(writeIndex, state.metadata()::index)) {
RolloverRequest rolloverRequest = getDefaultRolloverRequest(dataStream.getName());
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
dataStream.getLifecycle().getDataRetention()
);
transportActionsDeduplicator.executeOnce(
rolloverRequest,
new ErrorRecordingActionListener(writeIndex.getName(), errorStore),
Expand Down Expand Up @@ -360,9 +364,13 @@ public void onFailure(Exception e) {
}
}

private RolloverRequest getDefaultRolloverRequest(String dataStream) {
static RolloverRequest getDefaultRolloverRequest(
RolloverConfiguration rolloverConfiguration,
String dataStream,
TimeValue dataRetention
) {
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
rolloverRequest.setConditions(rolloverConditions);
rolloverRequest.setConditions(rolloverConfiguration.resolveRolloverConditions(dataRetention));
return rolloverRequest;
}

Expand All @@ -371,8 +379,8 @@ private void updatePollInterval(TimeValue newInterval) {
maybeScheduleJob();
}

private void updateRolloverConditions(RolloverConditions newRolloverConditions) {
this.rolloverConditions = newRolloverConditions;
private void updateRolloverConfiguration(RolloverConfiguration newRolloverConfiguration) {
this.rolloverConfiguration = newRolloverConfiguration;
}

private void cancelJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.action.dlm.ExplainIndexDataLifecycle;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
Expand Down Expand Up @@ -137,31 +137,31 @@ public static class Response extends ActionResponse implements ChunkedToXContent
public static final ParseField INDICES_FIELD = new ParseField("indices");
private List<ExplainIndexDataLifecycle> indices;
@Nullable
private final RolloverConditions rolloverConditions;
private final RolloverConfiguration rolloverConfiguration;

public Response(List<ExplainIndexDataLifecycle> indices, @Nullable RolloverConditions rolloverConditions) {
public Response(List<ExplainIndexDataLifecycle> indices, @Nullable RolloverConfiguration rolloverConfiguration) {
this.indices = indices;
this.rolloverConditions = rolloverConditions;
this.rolloverConfiguration = rolloverConfiguration;
}

public Response(StreamInput in) throws IOException {
super(in);
this.indices = in.readList(ExplainIndexDataLifecycle::new);
this.rolloverConditions = in.readOptionalWriteable(RolloverConditions::new);
this.rolloverConfiguration = in.readOptionalWriteable(RolloverConfiguration::new);
}

public List<ExplainIndexDataLifecycle> getIndices() {
return indices;
}

public RolloverConditions getRolloverConditions() {
return rolloverConditions;
public RolloverConfiguration getRolloverConfiguration() {
return rolloverConfiguration;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(indices);
out.writeOptionalWriteable(rolloverConditions);
out.writeOptionalWriteable(rolloverConfiguration);
}

@Override
Expand All @@ -173,20 +173,20 @@ public boolean equals(Object o) {
return false;
}
Response response = (Response) o;
return Objects.equals(indices, response.indices) && Objects.equals(rolloverConditions, response.rolloverConditions);
return Objects.equals(indices, response.indices) && Objects.equals(rolloverConfiguration, response.rolloverConfiguration);
}

@Override
public int hashCode() {
return Objects.hash(indices, rolloverConditions);
return Objects.hash(indices, rolloverConfiguration);
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
final Iterator<? extends ToXContent> indicesIterator = indices.stream()
.map(explainIndexDataLifecycle -> (ToXContent) (builder, params) -> {
builder.field(explainIndexDataLifecycle.getIndex());
explainIndexDataLifecycle.toXContent(builder, params, rolloverConditions);
explainIndexDataLifecycle.toXContent(builder, params, rolloverConfiguration);
return builder;
})
.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.cluster.metadata.DataLifecycle;
Expand Down Expand Up @@ -162,47 +162,47 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
/**
* Converts the response to XContent and passes the RolloverConditions, when provided, to the data lifecycle.
*/
public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConditions rolloverConditions)
public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration)
throws IOException {
builder.startObject();
builder.field(NAME_FIELD.getPreferredName(), dataStreamName);
builder.field(LIFECYCLE_FIELD.getPreferredName());
lifecycle.toXContent(builder, params, rolloverConditions);
lifecycle.toXContent(builder, params, rolloverConfiguration);
builder.endObject();
return builder;
}
}

private final List<DataStreamLifecycle> dataStreamLifecycles;
@Nullable
private final RolloverConditions rolloverConditions;
private final RolloverConfiguration rolloverConfiguration;

public Response(List<DataStreamLifecycle> dataStreamLifecycles) {
this(dataStreamLifecycles, null);
}

public Response(List<DataStreamLifecycle> dataStreamLifecycles, @Nullable RolloverConditions rolloverConditions) {
public Response(List<DataStreamLifecycle> dataStreamLifecycles, @Nullable RolloverConfiguration rolloverConfiguration) {
this.dataStreamLifecycles = dataStreamLifecycles;
this.rolloverConditions = rolloverConditions;
this.rolloverConfiguration = rolloverConfiguration;
}

public Response(StreamInput in) throws IOException {
this(in.readList(DataStreamLifecycle::new), in.readOptionalWriteable(RolloverConditions::new));
this(in.readList(DataStreamLifecycle::new), in.readOptionalWriteable(RolloverConfiguration::new));
}

public List<DataStreamLifecycle> getDataStreamLifecycles() {
return dataStreamLifecycles;
}

@Nullable
public RolloverConditions getRolloverConditions() {
return rolloverConditions;
public RolloverConfiguration getRolloverConfiguration() {
return rolloverConfiguration;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(dataStreamLifecycles);
out.writeOptionalWriteable(rolloverConditions);
out.writeOptionalWriteable(rolloverConfiguration);
}

@Override
Expand All @@ -212,7 +212,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
dataStreamLifecycle -> (ToXContent) (builder, params) -> dataStreamLifecycle.toXContent(
builder,
params,
rolloverConditions
rolloverConfiguration
)
)
.iterator();
Expand All @@ -234,12 +234,12 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return dataStreamLifecycles.equals(response.dataStreamLifecycles)
&& Objects.equals(rolloverConditions, response.rolloverConditions);
&& Objects.equals(rolloverConfiguration, response.rolloverConfiguration);
}

@Override
public int hashCode() {
return Objects.hash(dataStreamLifecycles, rolloverConditions);
return Objects.hash(dataStreamLifecycles, rolloverConfiguration);
}
}
}
Loading