Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling an empty list of versions #16198

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ public void testRetrieveUnusedSegmentsActionWithVersions()
Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithEmptyVersions()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(
task.getDataSource(),
INTERVAL,
ImmutableList.of(),
null,
null
);
final Set<DataSegment> observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(ImmutableSet.of(), observedUnusedSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,54 @@ public void testKillSegmentsWithVersions() throws Exception
Assert.assertEquals(ImmutableSet.of(segment5V3), new HashSet<>(observedUnusedSegments));
}

@Test
public void testKillSegmentsWithEmptyVersions() throws Exception
{
final DateTime now = DateTimes.nowUtc();
final String v1 = now.toString();
final String v2 = now.minusHours(2).toString();
final String v3 = now.minusHours(3).toString();

final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1);
final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1);
final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1);
final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2);
final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3);

final Set<DataSegment> segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3);

Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments));
Assert.assertEquals(
segments.size(),
getSegmentsMetadataManager().markSegmentsAsUnused(
segments.stream().map(DataSegment::getId).collect(Collectors.toSet())
)
);

final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder()
.dataSource(DATA_SOURCE)
.interval(Intervals.of("2018/2020"))
.versions(ImmutableList.of())
.batchSize(3)
.build();

Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode());
Assert.assertEquals(
new KillTaskReport.Stats(0, 1, 0),
getReportedStats()
);

final List<DataSegment> observedUnusedSegments =
getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval(
DATA_SOURCE,
Intervals.of("2018/2020"),
null,
null
);

Assert.assertEquals(segments, new HashSet<>(observedUnusedSegments));
}

@Test
public void testKillSegmentsWithVersionsAndLimit() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ default List<DataSegment> retrieveUnusedSegmentsForInterval(
* @param dataSource The data source the segments belong to
* @param interval Filter the data segments to ones that include data in this interval exclusively.
* @param versions An optional list of segment versions to retrieve in the given {@code interval}. If unspecified, all
* versions of unused segments in the {@code interval} must be retrieved.
* versions of unused segments in the {@code interval} must be retrieved. If an empty list is passed,
* no segments are retrieved.
* @param limit The maximum number of unused segments to retreive. If null, no limit is applied.
* @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval}
* with {@code used_status_last_updated} no later than this time will be included in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface SegmentsMetadataManager
/**
* Marks non-overshadowed unused segments for the given interval and optional list of versions
* as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval
* will be marked as used.
* will be marked as used. If an empty list of versions is passed, no segments are marked as used.
* @return Number of segments updated
*/
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List<String> versions);
Expand Down Expand Up @@ -89,7 +89,8 @@ public interface SegmentsMetadataManager

/**
* Marks segments as unused that are <b>fully contained</b> in the given interval for an optional list of versions.
* If versions are not specified, all versions of segments in the interval will be marked as unused.
* If versions are not specified, all versions of segments in the interval will be marked as unused. If an empty list
* of versions is passed, no segments are marked as unused.
* Segments that are already marked as unused are not updated.
* @return The number of segments updated
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.druid.server.http.DataSegmentPlus;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
Expand Down Expand Up @@ -159,7 +158,8 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
* @param dataSource The name of the datasource
* @param intervals The intervals to search over
* @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}.
* If unspecified, all versions of unused segments in the {@code intervals} must be retrieved.
* If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. If an
* empty list is passed, no segments are retrieved.
* @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment
Expand Down Expand Up @@ -256,7 +256,7 @@ public List<DataSegmentPlus> retrieveSegmentsById(String datasource, Set<String>

private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource, List<String> segmentIds)
{
if (CollectionUtils.isNullOrEmpty(segmentIds)) {
if (segmentIds.isEmpty()) {
return Collections.emptyList();
}

Expand Down Expand Up @@ -344,6 +344,10 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
*/
public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List<String> versions)
{
if (versions != null && versions.isEmpty()) {
return 0;
}

if (Intervals.isEternity(interval)) {
final StringBuilder sb = new StringBuilder();
sb.append(
Expand All @@ -354,9 +358,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
)
);

final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
if (versions != null) {
sb.append(getParameterizedInConditionForColumn("version", versions));
}

Expand All @@ -366,7 +368,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
.bind("used", false)
.bind("used_status_last_updated", DateTimes.nowUtc().toString());

if (hasVersions) {
if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}

Expand All @@ -386,9 +388,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
)
);

final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
if (versions != null) {
sb.append(getParameterizedInConditionForColumn("version", versions));
}

Expand All @@ -400,7 +400,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
.bind("end", interval.getEnd().toString())
.bind("used_status_last_updated", DateTimes.nowUtc().toString());

if (hasVersions) {
if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
Expand Down Expand Up @@ -558,6 +558,10 @@ private CloseableIterator<DataSegment> retrieveSegments(
@Nullable final DateTime maxUsedStatusLastUpdatedTime
)
{
if (versions != null && versions.isEmpty()) {
return CloseableIterators.withEmptyBaggage(Collections.emptyIterator());
}

if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)
Expand Down Expand Up @@ -733,9 +737,7 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString()));
}

final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
if (versions != null) {
sb.append(getParameterizedInConditionForColumn("version", versions));
}

Expand Down Expand Up @@ -786,7 +788,7 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
bindIntervalsToQuery(sql, intervals);
}

if (hasVersions) {
if (versions != null) {
bindColumnValuesToQueryWithInCondition("version", versions, sql);
}

Expand Down Expand Up @@ -898,7 +900,7 @@ private static int computeNumChangedSegments(List<String> segmentIds, int[] segm
*/
private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
{
if (CollectionUtils.isNullOrEmpty(values)) {
if (values == null) {
return "";
}

Expand Down Expand Up @@ -927,7 +929,7 @@ private static void bindColumnValuesToQueryWithInCondition(
final SQLStatement<?> query
)
{
if (CollectionUtils.isNullOrEmpty(values)) {
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
if (values == null) {
return;
}

Expand Down
Loading
Loading