Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Nov 11, 2024
1 parent 19291cf commit b86632f
Show file tree
Hide file tree
Showing 16 changed files with 149 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.DocsStats;
Expand Down Expand Up @@ -101,13 +101,13 @@ public void configureClusterSettings() {
Settings.builder()
// we want to manually trigger the rollovers in this test suite to be able to assert incrementally the changes in shard
// configurations
.put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "30d")
.put(DataStreamLifecycleService2.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "30d")
);
}

@After
public void resetClusterSetting() {
updateClusterSettings(Settings.builder().putNull(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL));
updateClusterSettings(Settings.builder().putNull(DataStreamLifecycleService2.DATA_STREAM_LIFECYCLE_POLL_INTERVAL));
}

public void testRolloverOnAutoShardCondition() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.ONE_HUNDRED_MB;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2.ONE_HUNDRED_MB;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2.TARGET_MERGE_FACTOR_VALUE;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_NAME;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS;
import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF;
Expand Down Expand Up @@ -129,10 +129,10 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "1s");
settings.put(DataStreamLifecycleService2.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "1s");
settings.put(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING.getKey(), "min_docs=1,max_docs=1");
// we'll test DSL errors reach the health node, so we're lowering the threshold over which we report errors
settings.put(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING.getKey(), "3");
settings.put(DataStreamLifecycleService2.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING.getKey(), "3");
return settings.build();
}

Expand Down Expand Up @@ -214,7 +214,7 @@ public void testSystemDataStreamRetention() throws Exception {
* This test makes sure that global data stream retention is ignored by system data streams, and that the configured retention
* for a system data stream is respected instead.
*/
Iterable<DataStreamLifecycleService> dataStreamLifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
Iterable<DataStreamLifecycleService2> dataStreamLifecycleServices = internalCluster().getInstances(DataStreamLifecycleService2.class);
Clock clock = Clock.systemUTC();
AtomicLong now = new AtomicLong(clock.millis());
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(now::get));
Expand Down Expand Up @@ -488,13 +488,13 @@ public void testAutomaticForceMerge() throws Exception {
toBeForceMergedIndex = getBackingIndices(dataStreamName).get(currentGeneration - 2);
}
int currentBackingIndexCount = currentGeneration;
DataStreamLifecycleService dataStreamLifecycleService = internalCluster().getInstance(
DataStreamLifecycleService.class,
DataStreamLifecycleService2 dataStreamLifecycleService2 = internalCluster().getInstance(
DataStreamLifecycleService2.class,
internalCluster().getMasterName()
);
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());
// run data stream lifecycle once
dataStreamLifecycleService.run(clusterService.state());
dataStreamLifecycleService2.run(clusterService.state());
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
Expand Down Expand Up @@ -532,7 +532,7 @@ public void testAutomaticForceMerge() throws Exception {
}

private static void disableDataStreamLifecycle() {
updateClusterSettings(Settings.builder().put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, TimeValue.MAX_VALUE));
updateClusterSettings(Settings.builder().put(DataStreamLifecycleService2.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, TimeValue.MAX_VALUE));
}

public void testErrorRecordingOnRollover() throws Exception {
Expand Down Expand Up @@ -589,9 +589,9 @@ public void testErrorRecordingOnRollover() throws Exception {
String writeIndexName = getBackingIndices(dataStreamName).get(1);
assertBusy(() -> {
ErrorEntry writeIndexRolloverError = null;
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
Iterable<DataStreamLifecycleService2> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService2.class);

for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
for (DataStreamLifecycleService2 lifecycleService : lifecycleServices) {
writeIndexRolloverError = lifecycleService.getErrorStore().getError(writeIndexName);
if (writeIndexRolloverError != null) {
break;
Expand Down Expand Up @@ -666,9 +666,9 @@ public void testErrorRecordingOnRollover() throws Exception {
// we recorded the error against the previous write index (generation 2)
// let's check there's no error recorded against it anymore
String previousWriteInddex = backingIndices.get(1);
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
Iterable<DataStreamLifecycleService2> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService2.class);

for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
for (DataStreamLifecycleService2 lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(previousWriteInddex), nullValue());
}
});
Expand Down Expand Up @@ -763,9 +763,9 @@ public void testErrorRecordingOnRetention() throws Exception {
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));

ErrorEntry recordedRetentionExecutionError = null;
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
Iterable<DataStreamLifecycleService2> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService2.class);

for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
for (DataStreamLifecycleService2 lifecycleService : lifecycleServices) {
recordedRetentionExecutionError = lifecycleService.getErrorStore().getError(firstGenerationIndex);
if (recordedRetentionExecutionError != null && recordedRetentionExecutionError.retryCount() > 3) {
break;
Expand Down Expand Up @@ -835,8 +835,8 @@ public void testErrorRecordingOnRetention() throws Exception {
assertThat(backingIndices.size(), equalTo(1));

// error stores don't contain anything for the first generation index anymore
Iterable<DataStreamLifecycleService> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
for (DataStreamLifecycleService lifecycleService : lifecycleServices) {
Iterable<DataStreamLifecycleService2> lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService2.class);
for (DataStreamLifecycleService2 lifecycleService : lifecycleServices) {
assertThat(lifecycleService.getErrorStore().getError(firstGenerationIndex), nullValue());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "1s");
settings.put(DataStreamLifecycleService2.DATA_STREAM_LIFECYCLE_POLL_INTERVAL, "1s");
settings.put(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING.getKey(), "min_docs=1,max_docs=1");
return settings.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2;
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleStatsAction;
import org.elasticsearch.datastreams.lifecycle.action.TransportDeleteDataStreamLifecycleAction;
Expand Down Expand Up @@ -140,7 +140,7 @@ public static TimeValue getLookAheadTime(Settings settings) {
private final SetOnce<UpdateTimeSeriesRangeService> updateTimeSeriesRangeService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();

private final SetOnce<DataStreamLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleService2> dataLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthInfoPublisher> dataStreamLifecycleErrorsPublisher = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthIndicatorService> dataStreamLifecycleHealthIndicatorService = new SetOnce<>();
private final Settings settings;
Expand Down Expand Up @@ -173,10 +173,10 @@ public List<Setting<?>> getSettings() {
pluginSettings.add(TIME_SERIES_POLL_INTERVAL);
pluginSettings.add(LOOK_AHEAD_TIME);
pluginSettings.add(LOOK_BACK_TIME);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING);
pluginSettings.add(DataStreamLifecycleService2.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
pluginSettings.add(DataStreamLifecycleService2.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
pluginSettings.add(DataStreamLifecycleService2.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
pluginSettings.add(DataStreamLifecycleService2.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING);
return pluginSettings;
}

Expand All @@ -202,7 +202,7 @@ public Collection<?> createComponents(PluginServices services) {
)
);
dataLifecycleInitialisationService.set(
new DataStreamLifecycleService(
new DataStreamLifecycleService2(
settings,
new OriginSettingClient(services.client(), DATA_STREAM_LIFECYCLE_ORIGIN),
services.clusterService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@
/**
* This service will implement the needed actions (e.g. rollover, retention) to manage the data streams with a data stream lifecycle
* configured. It runs on the master node and it schedules a job according to the configured
* {@link DataStreamLifecycleService#DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING}.
* {@link DataStreamLifecycleService2#DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING}.
*/
public class DataStreamLifecycleService implements ClusterStateListener, Closeable, SchedulerEngine.Listener {
public class DataStreamLifecycleService2 implements ClusterStateListener, Closeable, SchedulerEngine.Listener {

public static final String DATA_STREAM_LIFECYCLE_POLL_INTERVAL = "data_streams.lifecycle.poll_interval";
public static final Setting<TimeValue> DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING = Setting.timeSetting(
Expand Down Expand Up @@ -147,7 +147,7 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab

public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-";

private static final Logger logger = LogManager.getLogger(DataStreamLifecycleService.class);
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleService2.class);
/**
* Name constant for the job that schedules the data stream lifecycle
*/
Expand Down Expand Up @@ -202,7 +202,7 @@ public void taskSucceeded(UpdateForceMergeCompleteTask task, Void unused) {
}
};

public DataStreamLifecycleService(
public DataStreamLifecycleService2(
Settings settings,
Client client,
ClusterService clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2;
import org.elasticsearch.index.Index;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -38,7 +38,7 @@ public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterN
GetDataStreamLifecycleStatsAction.Request,
GetDataStreamLifecycleStatsAction.Response> {

private final DataStreamLifecycleService lifecycleService;
private final DataStreamLifecycleService2 lifecycleService;

@Inject
public TransportGetDataStreamLifecycleStatsAction(
Expand All @@ -47,7 +47,7 @@ public TransportGetDataStreamLifecycleStatsAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
DataStreamLifecycleService lifecycleService
DataStreamLifecycleService2 lifecycleService
) {
super(
GetDataStreamLifecycleStatsAction.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
Expand All @@ -28,7 +29,7 @@

import java.util.List;

import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService2.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING;

/**
* Provides the infrastructure to send errors encountered by indices managed by data stream lifecycle service to the health node.
Expand Down Expand Up @@ -86,7 +87,7 @@ private void updateNumberOfErrorsToPublish(int newValue) {

/**
* Publishes the DSL errors that have passed the signaling threshold (as defined by
* {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService#DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING}
* {@link DataStreamLifecycleService2#DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING}
*/
public void publishDslErrorEntries(ActionListener<AcknowledgedResponse> actionListener) {
if (featureService.clusterHasFeature(clusterService.state(), DSL_HEALTH_INFO_FEATURE) == false) {
Expand Down
Loading

0 comments on commit b86632f

Please sign in to comment.