Skip to content

Commit

Permalink
Remove DataStream constructor that sets rolloverOnWrite flag to false
Browse files Browse the repository at this point in the history
  • Loading branch information
nielsbauman committed Apr 4, 2024
1 parent edc9e67 commit 44d034b
Show file tree
Hide file tree
Showing 15 changed files with 53 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
original.getLifecycle(),
original.isFailureStore(),
original.getFailureIndices(),
null
original.rolloverOnWrite(),
original.getAutoShardingEvent()
);
brokenDataStreamHolder.set(broken);
return ClusterState.builder(currentState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ public void testGetAdditionalIndexSettingsDataStreamAlreadyCreatedTimeSettingsMi
ds.getLifecycle(),
ds.isFailureStore(),
ds.getFailureIndices(),
null
ds.rolloverOnWrite(),
ds.getAutoShardingEvent()
)
);
Metadata metadata = mb.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public void testUpdateTimeSeriesTemporalRange_NoUpdateBecauseReplicated() {
d.getLifecycle(),
d.isFailureStore(),
d.getFailureIndices(),
null
d.rolloverOnWrite(),
d.getAutoShardingEvent()
)
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
new DataStreamLifecycle(),
true,
failureStores,
false,
null
);

Expand Down Expand Up @@ -200,6 +201,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
new DataStreamLifecycle(null, null, false),
true,
failureStores,
false,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() {
DataStreamLifecycle.newBuilder().dataRetention(0L).build(),
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
null
dataStream.rolloverOnWrite(),
dataStream.getAutoShardingEvent()
)
);
clusterState = ClusterState.builder(clusterState).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,40 +119,6 @@ public static boolean isFailureStoreEnabled() {
@Nullable
private final DataStreamAutoShardingEvent autoShardingEvent;

public DataStream(
String name,
List<Index> indices,
long generation,
Map<String, Object> metadata,
boolean hidden,
boolean replicated,
boolean system,
boolean allowCustomRouting,
IndexMode indexMode,
DataStreamLifecycle lifecycle,
boolean failureStore,
List<Index> failureIndices,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
) {
this(
name,
indices,
generation,
metadata,
hidden,
replicated,
system,
System::currentTimeMillis,
allowCustomRouting,
indexMode,
lifecycle,
failureStore,
failureIndices,
false,
autoShardingEvent
);
}

public DataStream(
String name,
List<Index> indices,
Expand Down Expand Up @@ -238,7 +204,22 @@ public DataStream(
boolean allowCustomRouting,
IndexMode indexMode
) {
this(name, indices, generation, metadata, hidden, replicated, system, allowCustomRouting, indexMode, null, false, List.of(), null);
this(
name,
indices,
generation,
metadata,
hidden,
replicated,
system,
allowCustomRouting,
indexMode,
null,
false,
List.of(),
false,
null
);
}

private static boolean assertConsistent(List<Index> indices) {
Expand Down Expand Up @@ -507,6 +488,7 @@ public DataStream unsafeRollover(Index writeIndex, long generation, boolean time
lifecycle,
failureStore,
failureIndices,
false,
autoShardingEvent
);
}
Expand Down Expand Up @@ -544,6 +526,7 @@ public DataStream unsafeRolloverFailureStore(Index writeIndex, long generation)
lifecycle,
failureStore,
failureIndices,
false,
autoShardingEvent
);
}
Expand Down Expand Up @@ -646,6 +629,7 @@ public DataStream removeBackingIndex(Index index) {
lifecycle,
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
);
}
Expand Down Expand Up @@ -692,6 +676,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
lifecycle,
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
);
}
Expand Down Expand Up @@ -753,6 +738,7 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) {
lifecycle,
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
);
}
Expand Down Expand Up @@ -810,6 +796,7 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
lifecycle,
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ static ClusterState createDataStream(
lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle,
template.getDataStreamTemplate().hasFailureStore(),
failureIndices,
false,
null
);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ static ClusterState updateDataLifecycle(
lifecycle,
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
dataStream.rolloverOnWrite(),
dataStream.getAutoShardingEvent()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
dataStream.getLifecycle(),
dataStream.isFailureStore(),
dataStream.getFailureIndices(),
dataStream.rolloverOnWrite(),
dataStream.getAutoShardingEvent()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ private DataStream createDataStream(
null,
false,
List.of(),
false,
autoShardingEvent
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ protected DataStream mutateInstance(DataStream instance) {
var lifecycle = instance.getLifecycle();
var failureStore = instance.isFailureStore();
var failureIndices = instance.getFailureIndices();
var rolloverOnWrite = instance.rolloverOnWrite();
var autoShardingEvent = instance.getAutoShardingEvent();
switch (between(0, 11)) {
switch (between(0, 12)) {
case 0 -> name = randomAlphaOfLength(10);
case 1 -> indices = randomNonEmptyIndexInstances();
case 2 -> generation = instance.getGeneration() + randomIntBetween(1, 10);
Expand Down Expand Up @@ -130,7 +131,8 @@ protected DataStream mutateInstance(DataStream instance) {
failureIndices = randomValueOtherThan(failureIndices, DataStreamTestHelper::randomIndexInstances);
failureStore = failureIndices.isEmpty() == false;
}
case 11 -> {
case 11 -> rolloverOnWrite = rolloverOnWrite == false;
case 12 -> {
autoShardingEvent = randomBoolean() && autoShardingEvent != null
? null
: new DataStreamAutoShardingEvent(
Expand All @@ -154,6 +156,7 @@ protected DataStream mutateInstance(DataStream instance) {
lifecycle,
failureStore,
failureIndices,
rolloverOnWrite,
autoShardingEvent
);
}
Expand Down Expand Up @@ -212,6 +215,7 @@ public void testRolloverUpgradeToTsdbDataStream() {
ds.getLifecycle(),
ds.isFailureStore(),
ds.getFailureIndices(),
ds.rolloverOnWrite(),
ds.getAutoShardingEvent()
);
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
Expand Down Expand Up @@ -240,6 +244,7 @@ public void testRolloverDowngradeToRegularDataStream() {
ds.getLifecycle(),
ds.isFailureStore(),
ds.getFailureIndices(),
ds.rolloverOnWrite(),
ds.getAutoShardingEvent()
);
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA);
Expand Down Expand Up @@ -629,6 +634,7 @@ public void testSnapshot() {
preSnapshotDataStream.getLifecycle(),
preSnapshotDataStream.isFailureStore(),
preSnapshotDataStream.getFailureIndices(),
preSnapshotDataStream.rolloverOnWrite(),
preSnapshotDataStream.getAutoShardingEvent()
);

Expand Down Expand Up @@ -670,6 +676,7 @@ public void testSnapshotWithAllBackingIndicesRemoved() {
preSnapshotDataStream.getLifecycle(),
preSnapshotDataStream.isFailureStore(),
preSnapshotDataStream.getFailureIndices(),
preSnapshotDataStream.rolloverOnWrite(),
preSnapshotDataStream.getAutoShardingEvent()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public void testRemoveBrokenBackingIndexReference() {
original.getLifecycle(),
original.isFailureStore(),
original.getFailureIndices(),
original.rolloverOnWrite(),
original.getAutoShardingEvent()
);
var brokenState = ClusterState.builder(state).metadata(Metadata.builder(state.getMetadata()).put(broken).build()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static DataStream newInstance(
lifecycle,
false,
List.of(),
false,
autoShardingEvent
);
}
Expand All @@ -169,6 +170,7 @@ public static DataStream newInstance(
lifecycle,
failureStores.size() > 0,
failureStores,
false,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ static DataStream updateLocalDataStream(
remoteDataStream.getLifecycle(),
remoteDataStream.isFailureStore(),
remoteDataStream.getFailureIndices(),
// Replicated data streams can't be rolled over, so having the `rolloverOnWrite` flag set to `true` wouldn't make sense
// (and potentially even break things).
false,
remoteDataStream.getAutoShardingEvent()
);
} else {
Expand Down Expand Up @@ -395,6 +398,7 @@ static DataStream updateLocalDataStream(
localDataStream.getLifecycle(),
localDataStream.isFailureStore(),
localDataStream.getFailureIndices(),
localDataStream.rolloverOnWrite(),
localDataStream.getAutoShardingEvent()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void testAction() throws Exception {
lifecycle,
false,
List.of(),
randomBoolean(),
null
);
dataStreamMap.put(dataStream.getName(), dataStream);
Expand Down

0 comments on commit 44d034b

Please sign in to comment.