diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index e61cf4128b4e..f0e16fc7d24e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -120,6 +120,20 @@ public void testRetrieveUnusedSegmentsActionWithVersions() Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments); } + @Test + public void testRetrieveUnusedSegmentsActionWithEmptyVersions() + { + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction( + task.getDataSource(), + INTERVAL, + ImmutableList.of(), + null, + null + ); + final Set observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(ImmutableSet.of(), observedUnusedSegments); + } + @Test public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 382673bed4b8..20de427508f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -214,6 +214,54 @@ public void testKillSegmentsWithVersions() throws Exception Assert.assertEquals(ImmutableSet.of(segment5V3), new HashSet<>(observedUnusedSegments)); } + @Test + public void testKillSegmentsWithEmptyVersions() throws Exception + { + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.minusHours(2).toString(); + final String v3 = now.minusHours(3).toString(); + + final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + + final Set segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3); + + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + Assert.assertEquals( + segments.size(), + getSegmentsMetadataManager().markSegmentsAsUnused( + segments.stream().map(DataSegment::getId).collect(Collectors.toSet()) + ) + ); + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018/2020")) + .versions(ImmutableList.of()) + .batchSize(3) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(0, 1, 0), + getReportedStats() + ); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); + + Assert.assertEquals(segments, new HashSet<>(observedUnusedSegments)); + } + @Test public void testKillSegmentsWithVersionsAndLimit() throws Exception { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 72abff93df7a..a889605c210f 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -159,7 +159,8 @@ default List retrieveUnusedSegmentsForInterval( * @param dataSource The data source the segments belong to * @param interval Filter the data segments to ones that include data in this interval exclusively. * @param versions An optional list of segment versions to retrieve in the given {@code interval}. If unspecified, all - * versions of unused segments in the {@code interval} must be retrieved. + * versions of unused segments in the {@code interval} must be retrieved. If an empty list is passed, + * no segments are retrieved. * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} * with {@code used_status_last_updated} no later than this time will be included in the diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index f0b9f06425d9..b0aaa54d5bac 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -56,7 +56,7 @@ public interface SegmentsMetadataManager /** * Marks non-overshadowed unused segments for the given interval and optional list of versions * as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval - * will be marked as used. + * will be marked as used. If an empty list of versions is passed, no segments are marked as used. * @return Number of segments updated */ int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions); @@ -89,7 +89,8 @@ public interface SegmentsMetadataManager /** * Marks segments as unused that are fully contained in the given interval for an optional list of versions. - * If versions are not specified, all versions of segments in the interval will be marked as unused. + * If versions are not specified, all versions of segments in the interval will be marked as unused. If an empty list + * of versions is passed, no segments are marked as unused. * Segments that are already marked as unused are not updated. * @return The number of segments updated */ diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 4d9921d2b06e..cee378537d6d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -35,7 +35,6 @@ import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -159,7 +158,8 @@ public CloseableIterator retrieveUsedSegments( * @param dataSource The name of the datasource * @param intervals The intervals to search over * @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}. - * If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. + * If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. If an + * empty list is passed, no segments are retrieved. * @param limit The limit of segments to return * @param lastSegmentId the last segment id from which to search for results. All segments returned are > * this segment lexigraphically if sortOrder is null or ASC, or < this segment @@ -256,7 +256,7 @@ public List retrieveSegmentsById(String datasource, Set private List retrieveSegmentBatchById(String datasource, List segmentIds) { - if (CollectionUtils.isNullOrEmpty(segmentIds)) { + if (segmentIds.isEmpty()) { return Collections.emptyList(); } @@ -344,6 +344,10 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) */ public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List versions) { + if (versions != null && versions.isEmpty()) { + return 0; + } + if (Intervals.isEternity(interval)) { final StringBuilder sb = new StringBuilder(); sb.append( @@ -354,9 +358,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, ) ); - final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); - - if (hasVersions) { + if (versions != null) { sb.append(getParameterizedInConditionForColumn("version", versions)); } @@ -366,7 +368,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, .bind("used", false) .bind("used_status_last_updated", DateTimes.nowUtc().toString()); - if (hasVersions) { + if (versions != null) { bindColumnValuesToQueryWithInCondition("version", versions, stmt); } @@ -386,9 +388,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, ) ); - final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); - - if (hasVersions) { + if (versions != null) { sb.append(getParameterizedInConditionForColumn("version", versions)); } @@ -400,7 +400,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, .bind("end", interval.getEnd().toString()) .bind("used_status_last_updated", DateTimes.nowUtc().toString()); - if (hasVersions) { + if (versions != null) { bindColumnValuesToQueryWithInCondition("version", versions, stmt); } return stmt.execute(); @@ -558,6 +558,10 @@ private CloseableIterator retrieveSegments( @Nullable final DateTime maxUsedStatusLastUpdatedTime ) { + if (versions != null && versions.isEmpty()) { + return CloseableIterators.withEmptyBaggage(Collections.emptyIterator()); + } + if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) @@ -733,9 +737,7 @@ private Query> buildSegmentsTableQuery( sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString())); } - final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); - - if (hasVersions) { + if (versions != null) { sb.append(getParameterizedInConditionForColumn("version", versions)); } @@ -786,7 +788,7 @@ private Query> buildSegmentsTableQuery( bindIntervalsToQuery(sql, intervals); } - if (hasVersions) { + if (versions != null) { bindColumnValuesToQueryWithInCondition("version", versions, sql); } @@ -898,7 +900,7 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm */ private static String getParameterizedInConditionForColumn(final String columnName, final List values) { - if (CollectionUtils.isNullOrEmpty(values)) { + if (values == null) { return ""; } @@ -927,7 +929,7 @@ private static void bindColumnValuesToQueryWithInCondition( final SQLStatement query ) { - if (CollectionUtils.isNullOrEmpty(values)) { + if (values == null) { return; } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index a7128ce27143..f3ee63887cbe 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -640,6 +640,106 @@ public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithVersions( ); } + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInIntervalWithEmptyVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.of("2017/2018"), + ImmutableList.of() + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithEmptyVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.ETERNITY, + ImmutableList.of() + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception { @@ -1045,6 +1145,104 @@ public void testMarkAsUnusedSegmentsInIntervalAndNonExistentVersions() throws IO ); } + @Test + public void testMarkAsUnusedSegmentsInIntervalWithEmptyVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of() + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUnusedSegmentsInEternityIntervalWithEmptyVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of() + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException {