From cbf984912d78921355d01c1088f8c7d1004da7d8 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 25 Mar 2024 08:15:20 -0700 Subject: [PATCH 1/3] Differentiate null and empty lists of segment IDs and versions. Treat them differently so the. Segment IDs and versions can be An empty list, in which case, the queries should just not return anything. Versions are optional, so they can be null, which just indicates nothing, so the queries should return segments with all possible versions. Segment IDs cannot be null as indicated by the absence of @Nullable annotation. --- .../metadata/SqlSegmentsMetadataQuery.java | 33 +-- .../SqlSegmentsMetadataManagerTest.java | 198 ++++++++++++++++++ 2 files changed, 215 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 4d9921d2b06e..7e0e531c390e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -35,7 +35,6 @@ import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -256,7 +255,7 @@ public List retrieveSegmentsById(String datasource, Set private List retrieveSegmentBatchById(String datasource, List segmentIds) { - if (CollectionUtils.isNullOrEmpty(segmentIds)) { + if (segmentIds.isEmpty()) { return Collections.emptyList(); } @@ -344,6 +343,10 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) */ public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List versions) { + if (versions != null && versions.isEmpty()) { + return 0; + } + if (Intervals.isEternity(interval)) { final StringBuilder sb = new StringBuilder(); sb.append( @@ -354,9 +357,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, ) ); - final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); - - if (hasVersions) { + if (versions != null) { sb.append(getParameterizedInConditionForColumn("version", versions)); } @@ -366,7 +367,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, .bind("used", false) .bind("used_status_last_updated", DateTimes.nowUtc().toString()); - if (hasVersions) { + if (versions != null) { bindColumnValuesToQueryWithInCondition("version", versions, stmt); } @@ -386,9 +387,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, ) ); - final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); - - if (hasVersions) { + if (versions != null) { sb.append(getParameterizedInConditionForColumn("version", versions)); } @@ -400,7 +399,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval, .bind("end", interval.getEnd().toString()) .bind("used_status_last_updated", DateTimes.nowUtc().toString()); - if (hasVersions) { + if (versions != null) { bindColumnValuesToQueryWithInCondition("version", versions, stmt); } return stmt.execute(); @@ -558,6 +557,10 @@ private CloseableIterator retrieveSegments( @Nullable final DateTime maxUsedStatusLastUpdatedTime ) { + if (versions != null && versions.isEmpty()) { + return CloseableIterators.withEmptyBaggage(Collections.emptyIterator()); + } + if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( retrieveSegmentsInIntervalsBatch(dataSource, intervals, versions, matchMode, used, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime) @@ -733,9 +736,7 @@ private Query> buildSegmentsTableQuery( sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString())); } - final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); - - if (hasVersions) { + if (versions != null) { sb.append(getParameterizedInConditionForColumn("version", versions)); } @@ -786,7 +787,7 @@ private Query> buildSegmentsTableQuery( bindIntervalsToQuery(sql, intervals); } - if (hasVersions) { + if (versions != null) { bindColumnValuesToQueryWithInCondition("version", versions, sql); } @@ -898,7 +899,7 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm */ private static String getParameterizedInConditionForColumn(final String columnName, final List values) { - if (CollectionUtils.isNullOrEmpty(values)) { + if (values == null) { return ""; } @@ -927,7 +928,7 @@ private static void bindColumnValuesToQueryWithInCondition( final SQLStatement query ) { - if (CollectionUtils.isNullOrEmpty(values)) { + if (values == null) { return; } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index a7128ce27143..f3ee63887cbe 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -640,6 +640,106 @@ public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithVersions( ); } + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInIntervalWithEmptyVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.of("2017/2018"), + ImmutableList.of() + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithEmptyVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.ETERNITY, + ImmutableList.of() + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception { @@ -1045,6 +1145,104 @@ public void testMarkAsUnusedSegmentsInIntervalAndNonExistentVersions() throws IO ); } + @Test + public void testMarkAsUnusedSegmentsInIntervalWithEmptyVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of() + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUnusedSegmentsInEternityIntervalWithEmptyVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of() + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException { From ff69edeadbc47e986e66a00bc9d733ec644c47a8 Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 25 Mar 2024 09:14:13 -0700 Subject: [PATCH 2/3] Update javadocs and add empty versions test to kill task. --- .../task/KillUnusedSegmentsTaskTest.java | 48 +++++++++++++++++++ .../metadata/SegmentsMetadataManager.java | 5 +- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 382673bed4b8..20de427508f7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -214,6 +214,54 @@ public void testKillSegmentsWithVersions() throws Exception Assert.assertEquals(ImmutableSet.of(segment5V3), new HashSet<>(observedUnusedSegments)); } + @Test + public void testKillSegmentsWithEmptyVersions() throws Exception + { + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.minusHours(2).toString(); + final String v3 = now.minusHours(3).toString(); + + final DataSegment segment1V1 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v1); + final DataSegment segment2V1 = newSegment(Intervals.of("2019-02-01/2019-03-01"), v1); + final DataSegment segment3V1 = newSegment(Intervals.of("2019-03-01/2019-04-01"), v1); + final DataSegment segment4V2 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v2); + final DataSegment segment5V3 = newSegment(Intervals.of("2019-01-01/2019-02-01"), v3); + + final Set segments = ImmutableSet.of(segment1V1, segment2V1, segment3V1, segment4V2, segment5V3); + + Assert.assertEquals(segments, getMetadataStorageCoordinator().commitSegments(segments)); + Assert.assertEquals( + segments.size(), + getSegmentsMetadataManager().markSegmentsAsUnused( + segments.stream().map(DataSegment::getId).collect(Collectors.toSet()) + ) + ); + + final KillUnusedSegmentsTask task = new KillUnusedSegmentsTaskBuilder() + .dataSource(DATA_SOURCE) + .interval(Intervals.of("2018/2020")) + .versions(ImmutableList.of()) + .batchSize(3) + .build(); + + Assert.assertEquals(TaskState.SUCCESS, taskRunner.run(task).get().getStatusCode()); + Assert.assertEquals( + new KillTaskReport.Stats(0, 1, 0), + getReportedStats() + ); + + final List observedUnusedSegments = + getMetadataStorageCoordinator().retrieveUnusedSegmentsForInterval( + DATA_SOURCE, + Intervals.of("2018/2020"), + null, + null + ); + + Assert.assertEquals(segments, new HashSet<>(observedUnusedSegments)); + } + @Test public void testKillSegmentsWithVersionsAndLimit() throws Exception { diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index f0b9f06425d9..b0aaa54d5bac 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -56,7 +56,7 @@ public interface SegmentsMetadataManager /** * Marks non-overshadowed unused segments for the given interval and optional list of versions * as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval - * will be marked as used. + * will be marked as used. If an empty list of versions is passed, no segments are marked as used. * @return Number of segments updated */ int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions); @@ -89,7 +89,8 @@ public interface SegmentsMetadataManager /** * Marks segments as unused that are fully contained in the given interval for an optional list of versions. - * If versions are not specified, all versions of segments in the interval will be marked as unused. + * If versions are not specified, all versions of segments in the interval will be marked as unused. If an empty list + * of versions is passed, no segments are marked as unused. * Segments that are already marked as unused are not updated. * @return The number of segments updated */ From 424b6b541555f4379b3b3b2bf0c8bd6f0b93bb3a Mon Sep 17 00:00:00 2001 From: Abhishek Balaji Radhakrishnan Date: Mon, 25 Mar 2024 09:37:33 -0700 Subject: [PATCH 3/3] Add test for RetrieveSegmentsActions as well. --- .../actions/RetrieveSegmentsActionsTest.java | 14 ++++++++++++++ .../IndexerMetadataStorageCoordinator.java | 3 ++- .../druid/metadata/SqlSegmentsMetadataQuery.java | 3 ++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index e61cf4128b4e..f0e16fc7d24e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -120,6 +120,20 @@ public void testRetrieveUnusedSegmentsActionWithVersions() Assert.assertEquals(expectedUnusedSegments, observedUnusedSegments); } + @Test + public void testRetrieveUnusedSegmentsActionWithEmptyVersions() + { + final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction( + task.getDataSource(), + INTERVAL, + ImmutableList.of(), + null, + null + ); + final Set observedUnusedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); + Assert.assertEquals(ImmutableSet.of(), observedUnusedSegments); + } + @Test public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 72abff93df7a..a889605c210f 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -159,7 +159,8 @@ default List retrieveUnusedSegmentsForInterval( * @param dataSource The data source the segments belong to * @param interval Filter the data segments to ones that include data in this interval exclusively. * @param versions An optional list of segment versions to retrieve in the given {@code interval}. If unspecified, all - * versions of unused segments in the {@code interval} must be retrieved. + * versions of unused segments in the {@code interval} must be retrieved. If an empty list is passed, + * no segments are retrieved. * @param limit The maximum number of unused segments to retreive. If null, no limit is applied. * @param maxUsedStatusLastUpdatedTime The maximum {@code used_status_last_updated} time. Any unused segment in {@code interval} * with {@code used_status_last_updated} no later than this time will be included in the diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 7e0e531c390e..cee378537d6d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -158,7 +158,8 @@ public CloseableIterator retrieveUsedSegments( * @param dataSource The name of the datasource * @param intervals The intervals to search over * @param versions An optional list of unused segment versions to retrieve in the given {@code intervals}. - * If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. + * If unspecified, all versions of unused segments in the {@code intervals} must be retrieved. If an + * empty list is passed, no segments are retrieved. * @param limit The limit of segments to return * @param lastSegmentId the last segment id from which to search for results. All segments returned are > * this segment lexigraphically if sortOrder is null or ASC, or < this segment