From ca33d8bf2e131801318b0c170b7e312669443026 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 14 Mar 2024 16:49:41 +0530 Subject: [PATCH 1/9] Fix build --- .../org/apache/druid/sql/calcite/CalciteArraysQueryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java index 141baa5e5308..2c165ffe3c3b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java @@ -1160,7 +1160,7 @@ public void testArrayContainsArrayStringColumns() "SELECT ARRAY_CONTAINS(arrayStringNulls, ARRAY['a', 'b']), ARRAY_CONTAINS(arrayStringNulls, arrayString) FROM druid.arrays LIMIT 5", ImmutableList.of( newScanQueryBuilder() - .dataSource(DATA_SOURCE_ARRAYS) + .dataSource(CalciteTests.ARRAYS_DATASOURCE) .intervals(querySegmentSpec(Filtration.eternity())) .columns("v0", "v1") .virtualColumns( From f8e038dd71f5b3b165fb5d5da6fb1efd66eab2d6 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Tue, 30 Apr 2024 10:36:34 +0530 Subject: [PATCH 2/9] Nit changes in KillUnreferencedSegmentSchema --- .../server/coordinator/DruidCoordinator.java | 4 ++-- ...java => KillUnreferencedSegmentSchema.java} | 16 ++++++++-------- .../druid/server/coordinator/stats/Stats.java | 3 +++ ... => KillUnreferencedSegmentSchemaTest.java} | 18 +++++++++--------- 4 files changed, 22 insertions(+), 19 deletions(-) rename server/src/main/java/org/apache/druid/server/coordinator/duty/{KillUnreferencedSegmentSchemaDuty.java => KillUnreferencedSegmentSchema.java} (90%) rename server/src/test/java/org/apache/druid/server/coordinator/duty/{KillUnreferencedSegmentSchemaDutyTest.java => KillUnreferencedSegmentSchemaTest.java} (95%) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 68668e760299..df9179d355ea 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -68,7 +68,7 @@ import org.apache.druid.server.coordinator.duty.KillRules; import org.apache.druid.server.coordinator.duty.KillStalePendingSegments; import org.apache.druid.server.coordinator.duty.KillSupervisors; -import org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchemaDuty; +import org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchema; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.duty.MarkEternityTombstonesAsUnused; import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused; @@ -591,7 +591,7 @@ private List makeMetadataStoreManagementDuties() duties.add(new KillCompactionConfig(config, metadataManager.segments(), metadataManager.configs())); if (centralizedDatasourceSchemaConfig.isEnabled()) { - duties.add(new KillUnreferencedSegmentSchemaDuty(config, metadataManager.schemas())); + duties.add(new KillUnreferencedSegmentSchema(config, metadataManager.schemas())); } return duties; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchema.java similarity index 90% rename from server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java rename to server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchema.java index 0c5ca6ad973b..d26b91119ae2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDuty.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchema.java @@ -49,23 +49,23 @@ * *

*/ -public class KillUnreferencedSegmentSchemaDuty extends MetadataCleanupDuty +public class KillUnreferencedSegmentSchema extends MetadataCleanupDuty { - private static final Logger log = new Logger(KillUnreferencedSegmentSchemaDuty.class); + private static final Logger log = new Logger(KillUnreferencedSegmentSchema.class); private final SegmentSchemaManager segmentSchemaManager; - public KillUnreferencedSegmentSchemaDuty( + public KillUnreferencedSegmentSchema( DruidCoordinatorConfig config, SegmentSchemaManager segmentSchemaManager ) { super( - "segmentSchema", + "segment schemas", "druid.coordinator.kill.segmentSchema", config.isSegmentSchemaKillEnabled(), config.getSegmentSchemaKillPeriod(), config.getSegmentSchemaKillDurationToRetain(), - Stats.Kill.RULES, + Stats.Kill.SEGMENT_SCHEMAS, config ); this.segmentSchemaManager = segmentSchemaManager; @@ -78,9 +78,9 @@ protected int cleanupEntriesCreatedBefore(DateTime minCreatedTime) int unused = segmentSchemaManager.markUnreferencedSchemasAsUnused(); log.info("Marked [%s] unreferenced schemas as unused.", unused); - // 2 (repair step): Identify unused schema which are still referenced by segments, make them used. - // This case would arise when segment is associated with a schema which turned unused by the previous statement - // or the previous run of this duty. + // 2 (repair step): Find unused schema which are still referenced by segments, make them used. + // This case would arise when segment is associated with a schema which was marked unused in the previous step + // or in the previous run. List schemaFingerprintsToUpdate = segmentSchemaManager.findReferencedSchemaMarkedAsUnused(); if (schemaFingerprintsToUpdate.size() > 0) { segmentSchemaManager.markSchemaAsUsed(schemaFingerprintsToUpdate); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index bae0e4070371..48f81d9d26b3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -155,6 +155,9 @@ public static class Kill = CoordinatorStat.toDebugAndEmit("killEligibleUnusedSegs", "kill/eligibleUnusedSegments/count"); public static final CoordinatorStat PENDING_SEGMENTS = CoordinatorStat.toDebugAndEmit("killPendingSegs", "kill/pendingSegments/count"); + + public static final CoordinatorStat SEGMENT_SCHEMAS + = CoordinatorStat.toDebugAndEmit("killedSegmentSchemas", "kill/segmentSchemas/count"); } public static class Balancer diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDutyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java similarity index 95% rename from server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDutyTest.java rename to server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java index 25ad519eea62..c8ebb5ce6ccf 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaDutyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnreferencedSegmentSchemaTest.java @@ -63,7 +63,7 @@ import java.util.Set; @RunWith(MockitoJUnitRunner.class) -public class KillUnreferencedSegmentSchemaDutyTest +public class KillUnreferencedSegmentSchemaTest { @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = @@ -111,8 +111,8 @@ public void testKillUnreferencedSchema() .withSegmentSchemaKillDurationToRetain(Period.parse("PT6H").toStandardDuration()) .build(); - KillUnreferencedSegmentSchemaDuty duty = - new TestKillUnreferencedSegmentSchemasDuty(druidCoordinatorConfig, segmentSchemaManager, dateTimes); + KillUnreferencedSegmentSchema duty = + new TestKillUnreferencedSegmentSchemas(druidCoordinatorConfig, segmentSchemaManager, dateTimes); Set segments = new HashSet<>(); List schemaMetadataPluses = new ArrayList<>(); @@ -217,8 +217,8 @@ public void testKillUnreferencedSchema_repair() .withSegmentSchemaKillDurationToRetain(Period.parse("PT6H").toStandardDuration()) .build(); - KillUnreferencedSegmentSchemaDuty duty = - new TestKillUnreferencedSegmentSchemasDuty(druidCoordinatorConfig, segmentSchemaManager, dateTimes); + KillUnreferencedSegmentSchema duty = + new TestKillUnreferencedSegmentSchemas(druidCoordinatorConfig, segmentSchemaManager, dateTimes); RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build(); @@ -289,8 +289,8 @@ public void testKillOlderVersionSchema() .withSegmentSchemaKillDurationToRetain(Period.parse("PT6H").toStandardDuration()) .build(); - KillUnreferencedSegmentSchemaDuty duty = - new TestKillUnreferencedSegmentSchemasDuty(druidCoordinatorConfig, segmentSchemaManager, dateTimes); + KillUnreferencedSegmentSchema duty = + new TestKillUnreferencedSegmentSchemas(druidCoordinatorConfig, segmentSchemaManager, dateTimes); // create 2 versions of same schema // unreferenced one should get deleted @@ -363,12 +363,12 @@ public void testKillOlderVersionSchema() Assert.assertNull(getSchemaUsedStatus(fingerprintOldVersion)); } - private static class TestKillUnreferencedSegmentSchemasDuty extends KillUnreferencedSegmentSchemaDuty + private static class TestKillUnreferencedSegmentSchemas extends KillUnreferencedSegmentSchema { private final List dateTimes; private int index = -1; - public TestKillUnreferencedSegmentSchemasDuty( + public TestKillUnreferencedSegmentSchemas( DruidCoordinatorConfig config, SegmentSchemaManager segmentSchemaManager, List dateTimes From 268e83a68f8cc0a5a511e62ecafce78946a936a6 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Tue, 30 Apr 2024 11:10:09 +0530 Subject: [PATCH 3/9] Replace reference to the abbreviation SMQ with Metadata Query, rename inTransit maps in schema cache --- docs/configuration/index.md | 3 + docs/operations/metrics.md | 26 ++--- .../apache/druid/segment/SegmentMetadata.java | 4 +- .../metadata/SqlSegmentsMetadataManager.java | 2 +- .../CoordinatorSegmentMetadataCache.java | 4 +- .../metadata/SegmentSchemaBackFillQueue.java | 2 +- .../segment/metadata/SegmentSchemaCache.java | 104 +++++++++++------- .../CoordinatorSegmentMetadataCacheTest.java | 2 +- .../metadata/SegmentSchemaCacheTest.java | 6 +- 9 files changed, 88 insertions(+), 65 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 5f4c9902360c..21c5481c6db3 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -899,6 +899,9 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage. | No | True| |`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| No| `P1D`| |`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| Yes if `druid.coordinator.kill.datasource.on` is set to true.| `P90D`| +|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to enable automatic deletion of unused segment schemas. If set to true, Coordinator will periodically identify segment schemas which are not referenced by any used segment and mark them as unused. At a later point, these unused schemas are deleted. Only applies if CentralizedDatasourceSchema feature is enabled or `druid.centralizedDatasourceSchema.enabled` is set to true. | No | True| +|`druid.coordinator.kill.segmentSchema.period`| How often to do automatic deletion of segment schemas in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| No|`P1H`| +|`druid.coordinator.kill.segmentSchema.durationToRetain`| Duration of segment schemas to be retained from the time it was marked as unused in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| Yes if `druid.coordinator.kill.segmentSchema.on` is set to true.|`P6H`| ##### Segment management diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index a877d8b8522d..51041952cfae 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -70,17 +70,11 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`sqlQuery/bytes`|Number of bytes returned in the SQL query response.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`| | |`serverview/init/time`|Time taken to initialize the broker server view. Useful to detect if brokers are taking too long to start.||Depends on the number of segments.| |`metadatacache/init/time`|Time taken to initialize the broker segment metadata cache. Useful to detect if brokers are taking too long to start||Depends on the number of segments.| -|`metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`| -|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`| -|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.|| -|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.|| -|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|| -|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`| -|`schemacache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.| -|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.| -|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.| -|`schemacache/inTransitSMQResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.| -|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which schema is cached after back filling in the database.||Eventually it should be 0.| +|`metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`|| +|`metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`|| +|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.||| +|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.||| +|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||| |`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| |`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| | @@ -375,8 +369,14 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina |`serverview/sync/healthy`|Sync status of the Coordinator with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. You can use this metric in conjunction with `serverview/sync/unstableTime` to debug slow startup of the Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise| |`serverview/sync/unstableTime`|Time in milliseconds for which the Coordinator has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.| |`metadatacache/init/time`|Time taken to initialize the coordinator segment metadata cache.||Depends on the number of segments.| -|`metadatacache/refresh/count`|Number of segments to refresh in coordinator segment metadata cache.|`dataSource`| -|`metadatacache/refresh/time`|Time taken to refresh segments in coordinator segment metadata cache.|`dataSource`| +|`metadatacache/refresh/count`|Number of segments to refresh in coordinator segment metadata cache.|`dataSource`|| +|`metadatacache/refresh/time`|Time taken to refresh segments in coordinator segment metadata cache.|`dataSource`|| +|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|| +|`metadatacache/realtimeSegmentSchema/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments in the cluster.| +|`metadatacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.| +|`metadatacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.| +|`metadatacache/temporaryMetadataQueryResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.| +|`metadatacache/temporaryPublishedMetadataQueryResults/count`|Number of segments for which schema is cached after back filling in the database.||This value gets reset after each database poll. Eventually it should be 0.| ## General Health diff --git a/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java b/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java index f12a676907ec..d367a926ed43 100644 --- a/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java +++ b/processing/src/main/java/org/apache/druid/segment/SegmentMetadata.java @@ -79,9 +79,9 @@ public int hashCode() @Override public String toString() { - return "SegmentStats{" + + return "SegmentMetadata{" + "numRows=" + numRows + - ", fingerprint='" + schemaFingerprint + '\'' + + ", schemaFingerprint='" + schemaFingerprint + '\'' + '}'; } } diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 7e45a45464c5..590a61d78d01 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -1177,7 +1177,7 @@ public Void map(int index, ResultSet r, StatementContext ctx) throws SQLExceptio }) .list(); - segmentSchemaCache.resetInTransitSMQResultPublishedOnDBPoll(); + segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll(); return null; }); diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 204174146777..dad0b78ea778 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -68,7 +68,7 @@ * The schema is merged with any existing schema for the segment and the cache is updated. * Corresponding datasource is marked for refresh. *
  • The refresh mechanism is significantly different from the other implementation, - *
    • SMQ is executed only for those non-realtime segments for which the schema is not cached.
    • + *
      • Metadata query is executed only for those non-realtime segments for which the schema is not cached.
      • *
      • Datasources marked for refresh are then rebuilt.
      * */ @@ -265,7 +265,7 @@ protected boolean segmentMetadataQueryResultHandler( log.debug("Publishing segment schema. SegmentId [%s], RowSignature [%s], numRows [%d]", segmentId, rowSignature, numRows); Map aggregators = analysis.getAggregators(); // cache the signature - segmentSchemaCache.addInTransitSMQResult(segmentId, rowSignature, aggregators, numRows); + segmentSchemaCache.addTemporaryMetadataQueryResult(segmentId, rowSignature, aggregators, numRows); // queue the schema for publishing to the DB segmentSchemaBackfillQueue.add(segmentId, rowSignature, aggregators, numRows); added.set(true); diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java index bb787c83c8ea..75350cd27275 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java @@ -178,7 +178,7 @@ public void processBatchesDue() segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION); // Mark the segments as published in the cache. for (SegmentSchemaMetadataPlus plus : entry.getValue()) { - segmentSchemaCache.markInTransitSMQResultPublished(plus.getSegmentId()); + segmentSchemaCache.markInMetadataQueryResultPublished(plus.getSegmentId()); } emitter.emit( ServiceMetricEvent.builder() diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java index 2ba8aee29cb4..e2fb16817924 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaCache.java @@ -49,10 +49,10 @@ * Additionally, this class caches schema for realtime segments in {@link SegmentSchemaCache#realtimeSegmentSchema}. This mapping * is cleared either when the segment is removed or marked as finalized. *

      - * Finalized segments which do not have their schema information present in the DB, fetch their schema via SMQ. - * SMQ results are cached in {@link SegmentSchemaCache#inTransitSMQResults}. Once the schema information is backfilled - * in the DB, it is removed from {@link SegmentSchemaCache#inTransitSMQResults} and added to {@link SegmentSchemaCache#inTransitSMQPublishedResults}. - * {@link SegmentSchemaCache#inTransitSMQPublishedResults} is cleared on each successfull DB poll. + * Finalized segments which do not have their schema information present in the DB, fetch their schema via metadata query. + * Metadata query results are cached in {@link SegmentSchemaCache#temporaryMetadataQueryResults}. Once the schema information is backfilled + * in the DB, it is removed from {@link SegmentSchemaCache#temporaryMetadataQueryResults} and added to {@link SegmentSchemaCache#temporaryPublishedMetadataQueryResults}. + * {@link SegmentSchemaCache#temporaryPublishedMetadataQueryResults} is cleared on each successfull DB poll. *

      * {@link CoordinatorSegmentMetadataCache} uses this cache to fetch schema for a segment. *

      @@ -81,19 +81,19 @@ public class SegmentSchemaCache private final ConcurrentMap realtimeSegmentSchema = new ConcurrentHashMap<>(); /** - * If the segment schema is fetched via SMQ, subsequently it is added here. + * If the segment schema is fetched via segment metadata query, subsequently it is added here. * The mapping is removed when the schema information is backfilled in the DB. */ - private final ConcurrentMap inTransitSMQResults = new ConcurrentHashMap<>(); + private final ConcurrentMap temporaryMetadataQueryResults = new ConcurrentHashMap<>(); /** * Once the schema information is backfilled in the DB, it is added here. * This map is cleared after each DB poll. * After the DB poll and before clearing this map it is possible that some results were added to this map. * These results would get lost after clearing this map. - * But, it should be fine since the schema could be retrieved if needed using SMQ, also the schema would be available in the next poll. + * But, it should be fine since the schema could be retrieved if needed using metadata query, also the schema would be available in the next poll. */ - private final ConcurrentMap inTransitSMQPublishedResults = new ConcurrentHashMap<>(); + private final ConcurrentMap temporaryPublishedMetadataQueryResults = new ConcurrentHashMap<>(); private final ServiceEmitter emitter; @@ -121,8 +121,8 @@ public void onLeaderStop() initialized.set(new CountDownLatch(1)); finalizedSegmentSchemaInfo = new FinalizedSegmentSchemaInfo(ImmutableMap.of(), ImmutableMap.of()); - inTransitSMQResults.clear(); - inTransitSMQPublishedResults.clear(); + temporaryMetadataQueryResults.clear(); + temporaryPublishedMetadataQueryResults.clear(); } public boolean isInitialized() @@ -132,7 +132,7 @@ public boolean isInitialized() /** * {@link CoordinatorSegmentMetadataCache} startup waits on the cache initialization. - * This is being done to ensure that we don't execute SMQ for segment with schema already present in the DB. + * This is being done to ensure that we don't execute metadata query for segment with schema already present in the DB. */ public void awaitInitialization() throws InterruptedException { @@ -157,44 +157,44 @@ public void addRealtimeSegmentSchema(SegmentId segmentId, RowSignature rowSignat } /** - * Cache SMQ result. This entry is cleared when SMQ result is published to the DB. + * Cache metadata query result. This entry is cleared when metadata query result is published to the DB. */ - public void addInTransitSMQResult( + public void addTemporaryMetadataQueryResult( SegmentId segmentId, RowSignature rowSignature, Map aggregatorFactories, long numRows ) { - inTransitSMQResults.put(segmentId, new SchemaPayloadPlus(new SchemaPayload(rowSignature, aggregatorFactories), numRows)); + temporaryMetadataQueryResults.put(segmentId, new SchemaPayloadPlus(new SchemaPayload(rowSignature, aggregatorFactories), numRows)); } /** - * After, SMQ result is published to the DB, it is removed from the {@code inTransitSMQResults} - * and added to {@code inTransitSMQPublishedResults}. + * After, metadata query result is published to the DB, it is removed from temporaryMetadataQueryResults + * and added to temporaryPublishedMetadataQueryResults. */ - public void markInTransitSMQResultPublished(SegmentId segmentId) + public void markInMetadataQueryResultPublished(SegmentId segmentId) { - if (!inTransitSMQResults.containsKey(segmentId)) { - log.error("SegmentId [%s] not found in InTransitSMQResultPublished map.", segmentId); + if (!temporaryMetadataQueryResults.containsKey(segmentId)) { + log.error("SegmentId [%s] not found in temporaryMetadataQueryResults map.", segmentId); } - inTransitSMQPublishedResults.put(segmentId, inTransitSMQResults.get(segmentId)); - inTransitSMQResults.remove(segmentId); + temporaryPublishedMetadataQueryResults.put(segmentId, temporaryMetadataQueryResults.get(segmentId)); + temporaryMetadataQueryResults.remove(segmentId); } /** - * {@code inTransitSMQPublishedResults} is reset on each DB poll. + * temporaryPublishedMetadataQueryResults is reset after each DB poll. */ - public void resetInTransitSMQResultPublishedOnDBPoll() + public void resetTemporaryPublishedMetadataQueryResultOnDBPoll() { - inTransitSMQPublishedResults.clear(); + temporaryPublishedMetadataQueryResults.clear(); } /** - * Fetch schema for a given segment. Note, since schema corresponding to the current schema version in - * {@link CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached, there is no check on version here. - * Any change in version would require a service restart, so we will never end up with multi version schema. + * Fetch schema for a given segment. Note, that there is no check on schema version in this method, + * since schema corresponding to a particular version {@link CentralizedDatasourceSchemaConfig#SCHEMA_VERSION} is cached. + * Any change in version would require a service restart, so this cache will never have schema for multiple versions. */ public Optional getSchemaForSegment(SegmentId segmentId) { @@ -208,18 +208,18 @@ public Optional getSchemaForSegment(SegmentId segmentId) return Optional.of(payloadPlus); } - // it is important to lookup {@code inTransitSMQResults} before {@code inTransitSMQPublishedResults} + // it is important to lookup temporaryMetadataQueryResults before temporaryPublishedMetadataQueryResults // other way round, if a segment schema is just published it is possible that the schema is missing - // in {@code inTransitSMQPublishedResults} and by the time we check {@code inTransitSMQResults} it is removed. + // in temporaryPublishedMetadataQueryResults and by the time we check temporaryMetadataQueryResults it is removed. - // segment schema has been fetched via SMQ - payloadPlus = inTransitSMQResults.get(segmentId); + // segment schema has been fetched via metadata query + payloadPlus = temporaryMetadataQueryResults.get(segmentId); if (payloadPlus != null) { return Optional.of(payloadPlus); } - // segment schema has been fetched via SMQ and the schema has been published to the DB - payloadPlus = inTransitSMQPublishedResults.get(segmentId); + // segment schema has been fetched via metadata query and the schema has been published to the DB + payloadPlus = temporaryPublishedMetadataQueryResults.get(segmentId); if (payloadPlus != null) { return Optional.of(payloadPlus); } @@ -247,8 +247,8 @@ public Optional getSchemaForSegment(SegmentId segmentId) public boolean isSchemaCached(SegmentId segmentId) { return realtimeSegmentSchema.containsKey(segmentId) || - inTransitSMQResults.containsKey(segmentId) || - inTransitSMQPublishedResults.containsKey(segmentId) || + temporaryMetadataQueryResults.containsKey(segmentId) || + temporaryPublishedMetadataQueryResults.containsKey(segmentId) || isFinalizedSegmentSchemaCached(segmentId); } @@ -278,8 +278,8 @@ public boolean segmentRemoved(SegmentId segmentId) { // remove the segment from all the maps realtimeSegmentSchema.remove(segmentId); - inTransitSMQResults.remove(segmentId); - inTransitSMQPublishedResults.remove(segmentId); + temporaryMetadataQueryResults.remove(segmentId); + temporaryPublishedMetadataQueryResults.remove(segmentId); // Since finalizedSegmentMetadata & finalizedSegmentSchema is updated on each DB poll, // there is no need to remove segment from them. @@ -296,11 +296,31 @@ public void realtimeSegmentRemoved(SegmentId segmentId) public void emitStats() { - emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/realtime/count", realtimeSegmentSchema.size())); - emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/finalizedSegmentMetadata/count", getSegmentMetadataMap().size())); - emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/finalizedSchemaPayload/count", getSchemaPayloadMap().size())); - emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/inTransitSMQResults/count", inTransitSMQResults.size())); - emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/inTransitSMQPublishedResults/count", inTransitSMQPublishedResults.size())); + emitter.emit(ServiceMetricEvent.builder() + .setMetric( + "metadatacache/realtimeSegmentSchema/count", + realtimeSegmentSchema.size() + )); + emitter.emit(ServiceMetricEvent.builder() + .setMetric( + "metadatacache/finalizedSegmentMetadata/count", + getSegmentMetadataMap().size() + )); + emitter.emit(ServiceMetricEvent.builder() + .setMetric( + "metadatacache/finalizedSchemaPayload/count", + getSchemaPayloadMap().size() + )); + emitter.emit(ServiceMetricEvent.builder().setMetric( + "metadatacache/temporaryMetadataQueryResults/count", + temporaryMetadataQueryResults.size() + ) + ); + emitter.emit(ServiceMetricEvent.builder().setMetric( + "metadatacache/temporaryPublishedMetadataQueryResults/count", + temporaryPublishedMetadataQueryResults.size() + ) + ); } /** diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 9e099a587843..e5b6db1d42df 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -1590,7 +1590,7 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR schema.onLeaderStart(); schema.awaitInitialization(); - // verify SMQ is not executed, since the schema is already cached + // verify metadata query is not executed, since the schema is already cached Assert.assertEquals(0, refreshCount.get()); // verify that datasource schema is built diff --git a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java index 9b7ddde4b7e6..234b16bd9b51 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/SegmentSchemaCacheTest.java @@ -63,20 +63,20 @@ public void testCacheInTransitSMQResult() RowSignature rowSignature = RowSignature.builder().add("cx", ColumnType.FLOAT).build(); SchemaPayloadPlus expected = new SchemaPayloadPlus(new SchemaPayload(rowSignature, Collections.emptyMap()), 20L); SegmentId id = SegmentId.dummy("ds"); - cache.addInTransitSMQResult(id, rowSignature, Collections.emptyMap(), 20); + cache.addTemporaryMetadataQueryResult(id, rowSignature, Collections.emptyMap(), 20); Assert.assertTrue(cache.isSchemaCached(id)); Optional schema = cache.getSchemaForSegment(id); Assert.assertTrue(schema.isPresent()); Assert.assertEquals(expected, schema.get()); - cache.markInTransitSMQResultPublished(id); + cache.markInMetadataQueryResultPublished(id); schema = cache.getSchemaForSegment(id); Assert.assertTrue(schema.isPresent()); Assert.assertEquals(expected, schema.get()); - cache.resetInTransitSMQResultPublishedOnDBPoll(); + cache.resetTemporaryPublishedMetadataQueryResultOnDBPoll(); Assert.assertFalse(cache.isSchemaCached(id)); schema = cache.getSchemaForSegment(id); From 781fe00ab6d0ee45c9d03dd1e2da497e30eeda95 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 2 May 2024 11:33:38 +0530 Subject: [PATCH 4/9] nitpicks --- .../SegmentTransactionalInsertAction.java | 9 ++++-- .../common/task/AbstractBatchIndexTask.java | 6 ++-- .../druid/indexing/common/TestIndexTask.java | 2 +- .../common/task/IngestionTestBase.java | 16 +++++++--- ...stractParallelIndexSupervisorTaskTest.java | 4 +-- .../overlord/RemoteTaskRunnerTest.java | 6 ++-- ...hema.java => DataSegmentWithMetadata.java} | 4 +-- .../druid/metadata/SQLMetadataConnector.java | 13 +++----- .../metadata/SQLMetadataSegmentPublisher.java | 2 +- .../metadata/SegmentSchemaBackFillQueue.java | 2 +- .../appenderator/AppenderatorImpl.java | 16 +++++----- .../appenderator/BatchAppenderator.java | 16 +++++----- .../appenderator/StreamAppenderator.java | 16 +++++----- .../SqlSegmentsMetadataManagerTestBase.java | 31 ------------------- 14 files changed, 62 insertions(+), 81 deletions(-) rename processing/src/main/java/org/apache/druid/segment/{DataSegmentWithSchema.java => DataSegmentWithMetadata.java} (95%) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 4bcc8c5d39f3..c2f542b096e1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -78,8 +78,13 @@ public static SegmentTransactionalInsertAction overwriteAction( @Nullable SegmentSchemaMapping segmentSchemaMapping ) { - return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, segmentsToPublish, null, null, null, - segmentSchemaMapping + return new SegmentTransactionalInsertAction( + segmentsToBeOverwritten, + segmentsToPublish, + null, + null, + null, + segmentSchemaMapping ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 9fe3b78ee2d2..5a17c4379f18 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -443,8 +443,10 @@ protected TaskAction buildPublishAction( case APPEND: return SegmentTransactionalAppendAction.forSegments(segmentsToPublish, segmentSchemaMapping); default: - return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish, - segmentSchemaMapping + return SegmentTransactionalInsertAction.overwriteAction( + segmentsToBeOverwritten, + segmentsToPublish, + segmentSchemaMapping ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java index e94ced42193d..b4166b7bc292 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java @@ -116,7 +116,7 @@ public TaskStatus runTask(TaskToolbox toolbox) return status; } - public TaskAction testBuildPublishAction( + public TaskAction buildPublishActionForTest( Set segmentsToBeOverwritten, Set segmentsToPublish, SegmentSchemaMapping segmentSchemaMapping, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 6b093cd745f7..7caad45bc338 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -548,6 +548,9 @@ public Map getBlacklistedTaskSlotCount() } } + /** + * Verify that schema is present for each segment. + */ public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas) { int nonTombstoneSegments = 0; @@ -556,11 +559,16 @@ public void verifySchema(DataSegmentsWithSchemas dataSegmentsWithSchemas) continue; } nonTombstoneSegments++; - Assert.assertTrue(dataSegmentsWithSchemas.getSegmentSchemaMapping() - .getSegmentIdToMetadataMap() - .containsKey(segment.getId().toString())); + Assert.assertTrue( + dataSegmentsWithSchemas.getSegmentSchemaMapping() + .getSegmentIdToMetadataMap() + .containsKey(segment.getId().toString()) + ); } - Assert.assertEquals(nonTombstoneSegments, dataSegmentsWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size()); + Assert.assertEquals( + nonTombstoneSegments, + dataSegmentsWithSchemas.getSegmentSchemaMapping().getSegmentIdToMetadataMap().size() + ); } public TaskReport.ReportMap getReports() throws IOException diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 29ed44f0ad0b..f888dd76bf0f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -534,7 +534,7 @@ public TaskStatus getStatus(String taskId) } } - public DataSegmentsWithSchemas getPublishedSegments(String taskId) + public DataSegmentsWithSchemas getPublishedSegmentsWithSchemas(String taskId) { final TaskContainer taskContainer = tasks.get(taskId); if (taskContainer == null || taskContainer.actionClient == null) { @@ -667,7 +667,7 @@ public ListenableFuture taskStatus(String taskId) public DataSegmentsWithSchemas getSegmentAndSchemas(Task task) { - return taskRunner.getPublishedSegments(task.getId()); + return taskRunner.getPublishedSegmentsWithSchemas(task.getId()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index f4cb82dcd713..dec98e052910 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -1164,7 +1164,7 @@ public void testBuildPublishAction() Assert.assertEquals( SegmentTransactionalAppendAction.class, - task.testBuildPublishAction( + task.buildPublishActionForTest( Collections.emptySet(), Collections.emptySet(), null, @@ -1174,7 +1174,7 @@ public void testBuildPublishAction() Assert.assertEquals( SegmentTransactionalReplaceAction.class, - task.testBuildPublishAction( + task.buildPublishActionForTest( Collections.emptySet(), Collections.emptySet(), null, @@ -1184,7 +1184,7 @@ public void testBuildPublishAction() Assert.assertEquals( SegmentTransactionalInsertAction.class, - task.testBuildPublishAction( + task.buildPublishActionForTest( Collections.emptySet(), Collections.emptySet(), null, diff --git a/processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java b/processing/src/main/java/org/apache/druid/segment/DataSegmentWithMetadata.java similarity index 95% rename from processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java rename to processing/src/main/java/org/apache/druid/segment/DataSegmentWithMetadata.java index b82b4d266538..e2c1729ed38c 100644 --- a/processing/src/main/java/org/apache/druid/segment/DataSegmentWithSchema.java +++ b/processing/src/main/java/org/apache/druid/segment/DataSegmentWithMetadata.java @@ -26,7 +26,7 @@ /** * Immutable wrapper class for segment and schema. */ -public class DataSegmentWithSchema +public class DataSegmentWithMetadata { @Nullable private final DataSegment dataSegment; @@ -34,7 +34,7 @@ public class DataSegmentWithSchema @Nullable private final SchemaPayloadPlus schemaPayloadPlus; - public DataSegmentWithSchema( + public DataSegmentWithMetadata( @Nullable DataSegment dataSegment, @Nullable SchemaPayloadPlus schemaPayloadPlus ) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 94452de00ee1..01812bf56913 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -339,10 +339,10 @@ public void createSegmentTable(final String tableName) for (String column : columns) { createStatementBuilder.append(column); - createStatementBuilder.append(","); + createStatementBuilder.append(",\n"); } - createStatementBuilder.append("PRIMARY KEY (id))"); + createStatementBuilder.append("PRIMARY KEY (id)\n)"); createTable( tableName, @@ -601,10 +601,7 @@ protected void alterSegmentTable() log.info("Adding columns %s to table[%s].", columnsToAdd, tableName); } - alterTable( - tableName, - alterCommands - ); + alterTable(tableName, alterCommands); } @Override @@ -994,7 +991,7 @@ public void createSegmentSchemaTable(final String tableName) tableName, getSerialType(), getPayloadType() ), StringUtils.format("CREATE INDEX idx_%1$s_fingerprint ON %1$s(fingerprint)", tableName), - StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)", tableName) + StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used, used_status_last_updated)", tableName) ) ); } @@ -1138,7 +1135,7 @@ private void validateSegmentsTable() { String segmentsTables = tablesConfigSupplier.get().getSegmentsTable(); - boolean schemaPersistenceRequirementMet = + final boolean schemaPersistenceRequirementMet = !centralizedDatasourceSchemaConfig.isEnabled() || (tableHasColumn(segmentsTables, "schema_fingerprint") && tableHasColumn(segmentsTables, "num_rows")); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java index 9416f8e53fa1..48a92ecba4e8 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java @@ -76,7 +76,7 @@ public void publishSegment(final DataSegment segment) throws IOException ); } - void publishSegment( + private void publishSegment( final String segmentId, final String dataSource, final String createdDate, diff --git a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java index 75350cd27275..c2995e3087ec 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/SegmentSchemaBackFillQueue.java @@ -183,7 +183,7 @@ public void processBatchesDue() emitter.emit( ServiceMetricEvent.builder() .setDimension("dataSource", entry.getKey()) - .setMetric("metadatacache/backfill/count", polled.size()) + .setMetric("metadatacache/backfill/count", entry.getValue().size()) ); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 6a67d4818ea7..764d7239736a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -55,7 +55,7 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.BaseProgressIndicator; -import org.apache.druid.segment.DataSegmentWithSchema; +import org.apache.druid.segment.DataSegmentWithMetadata; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; @@ -796,16 +796,16 @@ public ListenableFuture push( continue; } - final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush( + final DataSegmentWithMetadata dataSegmentWithMetadata = mergeAndPush( entry.getKey(), entry.getValue(), useUniquePath ); - if (dataSegmentWithSchema != null) { - DataSegment segment = dataSegmentWithSchema.getDataSegment(); + if (dataSegmentWithMetadata != null) { + DataSegment segment = dataSegmentWithMetadata.getDataSegment(); dataSegments.add(segment); - SchemaPayloadPlus schemaPayloadPlus = dataSegmentWithSchema.getSegmentSchemaMetadata(); + SchemaPayloadPlus schemaPayloadPlus = dataSegmentWithMetadata.getSegmentSchemaMetadata(); if (schemaPayloadPlus != null) { SchemaPayload schemaPayload = schemaPayloadPlus.getSchemaPayload(); segmentSchemaMapping.addSchema( @@ -854,7 +854,7 @@ private ListenableFuture pushBarrier() * @return segment descriptor, or null if the sink is no longer valid */ @Nullable - private DataSegmentWithSchema mergeAndPush( + private DataSegmentWithMetadata mergeAndPush( final SegmentIdWithShardSpec identifier, final Sink sink, final boolean useUniquePath @@ -898,7 +898,7 @@ private DataSegmentWithSchema mergeAndPush( ); } else { log.info("Segment[%s] already pushed, skipping.", identifier); - return new DataSegmentWithSchema( + return new DataSegmentWithMetadata( objectMapper.readValue(descriptorFile, DataSegment.class), centralizedDatasourceSchemaConfig.isEnabled() ? TaskSegmentSchemaUtil.getSegmentSchema( mergedTarget, @@ -1017,7 +1017,7 @@ private DataSegmentWithSchema mergeAndPush( objectMapper.writeValueAsString(segment.getLoadSpec()) ); - return new DataSegmentWithSchema( + return new DataSegmentWithMetadata( segment, centralizedDatasourceSchemaConfig.isEnabled() ? TaskSegmentSchemaUtil.getSegmentSchema(mergedTarget, indexIO) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java index 2f973f63bf58..128de15196df 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java @@ -50,7 +50,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.BaseProgressIndicator; -import org.apache.druid.segment.DataSegmentWithSchema; +import org.apache.druid.segment.DataSegmentWithMetadata; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; @@ -715,16 +715,16 @@ public ListenableFuture push( } // push it: - final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush( + final DataSegmentWithMetadata dataSegmentWithMetadata = mergeAndPush( identifier, sinkForIdentifier ); // record it: - if (dataSegmentWithSchema.getDataSegment() != null) { - DataSegment segment = dataSegmentWithSchema.getDataSegment(); + if (dataSegmentWithMetadata.getDataSegment() != null) { + DataSegment segment = dataSegmentWithMetadata.getDataSegment(); dataSegments.add(segment); - SchemaPayloadPlus schemaPayloadPlus = dataSegmentWithSchema.getSegmentSchemaMetadata(); + SchemaPayloadPlus schemaPayloadPlus = dataSegmentWithMetadata.getSegmentSchemaMetadata(); if (schemaPayloadPlus != null) { SchemaPayload schemaPayload = schemaPayloadPlus.getSchemaPayload(); segmentSchemaMapping.addSchema( @@ -758,7 +758,7 @@ public ListenableFuture push( * @param sink sink to push * @return segment descriptor along with schema, or null if the sink is no longer valid */ - private DataSegmentWithSchema mergeAndPush( + private DataSegmentWithMetadata mergeAndPush( final SegmentIdWithShardSpec identifier, final Sink sink ) @@ -793,7 +793,7 @@ private DataSegmentWithSchema mergeAndPush( if (descriptorFile.exists()) { // Already pushed. log.info("Segment[%s] already pushed, skipping.", identifier); - return new DataSegmentWithSchema( + return new DataSegmentWithMetadata( objectMapper.readValue(descriptorFile, DataSegment.class), centralizedDatasourceSchemaConfig.isEnabled() ? TaskSegmentSchemaUtil.getSegmentSchema( mergedTarget, @@ -895,7 +895,7 @@ private DataSegmentWithSchema mergeAndPush( objectMapper.writeValueAsString(segment.getLoadSpec()) ); - return new DataSegmentWithSchema(segment, schemaMetadata); + return new DataSegmentWithMetadata(segment, schemaMetadata); } catch (Exception e) { metrics.incrementFailedHandoffs(); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 1c5dd42dd770..7622b6943af0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -57,7 +57,7 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.BaseProgressIndicator; -import org.apache.druid.segment.DataSegmentWithSchema; +import org.apache.druid.segment.DataSegmentWithMetadata; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.QueryableIndex; @@ -809,15 +809,15 @@ public ListenableFuture push( continue; } - final DataSegmentWithSchema dataSegmentWithSchema = mergeAndPush( + final DataSegmentWithMetadata dataSegmentWithMetadata = mergeAndPush( entry.getKey(), entry.getValue(), useUniquePath ); - if (dataSegmentWithSchema != null) { - DataSegment segment = dataSegmentWithSchema.getDataSegment(); + if (dataSegmentWithMetadata != null) { + DataSegment segment = dataSegmentWithMetadata.getDataSegment(); dataSegments.add(segment); - SchemaPayloadPlus schemaPayloadPlus = dataSegmentWithSchema.getSegmentSchemaMetadata(); + SchemaPayloadPlus schemaPayloadPlus = dataSegmentWithMetadata.getSegmentSchemaMetadata(); if (schemaPayloadPlus != null) { SchemaPayload schemaPayload = schemaPayloadPlus.getSchemaPayload(); segmentSchemaMapping.addSchema( @@ -866,7 +866,7 @@ private ListenableFuture pushBarrier() * @return segment descriptor, or null if the sink is no longer valid */ @Nullable - private DataSegmentWithSchema mergeAndPush( + private DataSegmentWithMetadata mergeAndPush( final SegmentIdWithShardSpec identifier, final Sink sink, final boolean useUniquePath @@ -910,7 +910,7 @@ private DataSegmentWithSchema mergeAndPush( ); } else { log.info("Segment[%s] already pushed, skipping.", identifier); - return new DataSegmentWithSchema( + return new DataSegmentWithMetadata( objectMapper.readValue(descriptorFile, DataSegment.class), centralizedDatasourceSchemaConfig.isEnabled() ? TaskSegmentSchemaUtil.getSegmentSchema( mergedTarget, @@ -988,7 +988,7 @@ private DataSegmentWithSchema mergeAndPush( objectMapper.writeValueAsString(segment.getLoadSpec()) ); - return new DataSegmentWithSchema( + return new DataSegmentWithMetadata( segment, centralizedDatasourceSchemaConfig.isEnabled() ? TaskSegmentSchemaUtil.getSegmentSchema(mergedTarget, indexIO) diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java index 73fb07472f5c..f166befde730 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTestBase.java @@ -22,16 +22,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.metadata.SegmentSchemaCache; import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.joda.time.DateTime; - -import java.io.IOException; public class SqlSegmentsMetadataManagerTestBase { @@ -83,31 +79,4 @@ protected static DataSegment createSegment( "wikipedia/index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip", 0 ); - - protected void publish(DataSegment segment, boolean used) throws IOException - { - publish(segment, used, DateTimes.nowUtc()); - } - - protected void publish(DataSegment segment, boolean used, DateTime usedFlagLastUpdated) throws IOException - { - boolean partitioned = !(segment.getShardSpec() instanceof NoneShardSpec); - - String usedFlagLastUpdatedStr = null; - if (null != usedFlagLastUpdated) { - usedFlagLastUpdatedStr = usedFlagLastUpdated.toString(); - } - publisher.publishSegment( - segment.getId().toString(), - segment.getDataSource(), - DateTimes.nowUtc().toString(), - segment.getInterval().getStart().toString(), - segment.getInterval().getEnd().toString(), - partitioned, - segment.getVersion(), - used, - jsonMapper.writeValueAsBytes(segment), - usedFlagLastUpdatedStr - ); - } } From c2431969a621c8f78714c7b2aaa395fce19f64d2 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 2 May 2024 13:07:09 +0530 Subject: [PATCH 5/9] Remove reference to smq abbreviation from integration-tests --- ...ocker-compose.cds-coordinator-metadata-query-disabled.yml} | 0 integration-tests/docker/druid.sh | 2 +- ...> cds-coordinator-metadata-query-disabled-sample-data.sql} | 0 integration-tests/script/docker_compose_args.sh | 4 ++-- .../src/test/java/org/apache/druid/tests/TestNGGroup.java | 2 +- .../apache/druid/tests/indexer/ITAppendBatchIndexTest.java | 2 +- .../indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java | 2 +- .../tests/indexer/ITBestEffortRollupParallelIndexTest.java | 2 +- .../indexer/ITCombiningInputSourceParallelIndexTest.java | 2 +- .../druid/tests/indexer/ITCompactionSparseColumnTest.java | 2 +- .../org/apache/druid/tests/indexer/ITCompactionTaskTest.java | 2 +- .../org/apache/druid/tests/indexer/ITHttpInputSourceTest.java | 2 +- .../java/org/apache/druid/tests/indexer/ITIndexerTest.java | 2 +- .../ITKafkaIndexingServiceNonTransactionalSerializedTest.java | 2 +- .../ITKafkaIndexingServiceTransactionalSerializedTest.java | 2 +- .../tests/indexer/ITLocalInputSourceAllInputFormatTest.java | 2 +- .../apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java | 2 +- .../druid/tests/indexer/ITPerfectRollupParallelIndexTest.java | 2 +- .../apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java | 2 +- .../druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java | 2 +- 20 files changed, 19 insertions(+), 19 deletions(-) rename integration-tests/docker/{docker-compose.cds-coordinator-smq-disabled.yml => docker-compose.cds-coordinator-metadata-query-disabled.yml} (100%) rename integration-tests/docker/test-data/{cds-coordinator-smq-disabled-sample-data.sql => cds-coordinator-metadata-query-disabled-sample-data.sql} (100%) diff --git a/integration-tests/docker/docker-compose.cds-coordinator-smq-disabled.yml b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml similarity index 100% rename from integration-tests/docker/docker-compose.cds-coordinator-smq-disabled.yml rename to integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh index f112f91d1591..d7bd80ca32ed 100755 --- a/integration-tests/docker/druid.sh +++ b/integration-tests/docker/druid.sh @@ -85,7 +85,7 @@ setupData() # The "query" and "security" test groups require data to be setup before running the tests. # In particular, they requires segments to be download from a pre-existing s3 bucket. # This is done by using the loadSpec put into metadatastore and s3 credientials set below. - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-task-schema-publish-disabled" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-smq-disabled" ]; then + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-task-schema-publish-disabled" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-metadata-query-disabled" ]; then # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ && cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid \ diff --git a/integration-tests/docker/test-data/cds-coordinator-smq-disabled-sample-data.sql b/integration-tests/docker/test-data/cds-coordinator-metadata-query-disabled-sample-data.sql similarity index 100% rename from integration-tests/docker/test-data/cds-coordinator-smq-disabled-sample-data.sql rename to integration-tests/docker/test-data/cds-coordinator-metadata-query-disabled-sample-data.sql diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh index c37d22ca3144..477627d300be 100644 --- a/integration-tests/script/docker_compose_args.sh +++ b/integration-tests/script/docker_compose_args.sh @@ -79,10 +79,10 @@ getComposeArgs() then # cluster with overriden properties for broker and coordinator echo "-f ${DOCKERDIR}/docker-compose.cds-task-schema-publish-disabled.yml" - elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-smq-disabled" ] + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-metadata-query-disabled" ] then # cluster with overriden properties for broker and coordinator - echo "-f ${DOCKERDIR}/docker-compose.cds-coordinator-smq-disabled.yml" + echo "-f ${DOCKERDIR}/docker-compose.cds-coordinator-metadata-query-disabled.yml" else # default echo "-f ${DOCKERDIR}/docker-compose.yml" diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 516dcb65434a..b3417902cef8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -166,5 +166,5 @@ public class TestNGGroup public static final String CDS_TASK_SCHEMA_PUBLISH_DISABLED = "cds-task-schema-publish-disabled"; - public static final String CDS_COORDINATOR_SMQ_DISABLED = "cds-coordinator-smq-disabled"; + public static final String CDS_COORDINATOR_METADATA_QUERY_DISABLED = "cds-coordinator-metadata-query-disabled"; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 84ddb7612e65..ced35949626c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -40,7 +40,7 @@ import java.util.UUID; import java.util.function.Function; -@Test(groups = {TestNGGroup.APPEND_INGESTION, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.APPEND_INGESTION, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java index a077dbd0a978..b69ac0276757 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java @@ -44,7 +44,7 @@ /** * See {@link AbstractITRealtimeIndexTaskTest} for test details. */ -@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITAppenderatorDriverRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java index 0ecfe7ed5c40..be2ab61a5f59 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java @@ -38,7 +38,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java index 40549a2685e3..2af07e017b98 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITCombiningInputSourceParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java index 27b771308b35..58dc7c43ae7b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java @@ -41,7 +41,7 @@ import java.util.List; import java.util.Map; -@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITCompactionSparseColumnTest extends AbstractIndexerTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 6dbcb90c3df5..b974c7d20e97 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -51,7 +51,7 @@ import java.util.Map; import java.util.Set; -@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITCompactionTaskTest extends AbstractIndexerTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java index 11404bdd56e4..25eb83a7c28e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java @@ -29,7 +29,7 @@ import java.io.IOException; import java.util.UUID; -@Test(groups = {TestNGGroup.INPUT_SOURCE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.INPUT_SOURCE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITHttpInputSourceTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 77c64733a622..f527135c80d3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -41,7 +41,7 @@ import java.util.Map; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITIndexerTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java index 33bf5a5d79b3..5df5a708bc5e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java @@ -25,7 +25,7 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; -@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java index a50aa6ce10ef..424d3c670684 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java @@ -25,7 +25,7 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; -@Test(groups = {TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java index 0cc47b9bc630..5e7678e8b6b8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; -@Test(groups = {TestNGGroup.INPUT_FORMAT, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.INPUT_FORMAT, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITLocalInputSourceAllInputFormatTest extends AbstractLocalInputSourceParallelIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java index e81cf74b4571..c8d54e6beac3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java @@ -33,7 +33,7 @@ import java.util.UUID; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITOverwriteBatchIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java index ddae46b18dd8..0e8fc904949c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java @@ -36,7 +36,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX, TestNGGroup.SHUFFLE_DEEP_STORE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX, TestNGGroup.SHUFFLE_DEEP_STORE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java index 6dc2988c3e01..be3f518b098d 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java @@ -44,7 +44,7 @@ /** * See {@link AbstractITRealtimeIndexTaskTest} for test details. */ -@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java index e5f60d87c482..f15081509a08 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java @@ -30,7 +30,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITSystemTableBatchIndexTaskTest extends AbstractITBatchIndexTest { From 916178d999f5012f36845b7fd60090f2677d445b Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 2 May 2024 13:07:09 +0530 Subject: [PATCH 6/9] Remove reference to smq abbreviation from integration-tests --- docs/configuration/index.md | 25 +++++++++++++++---- ...s-coordinator-metadata-query-disabled.yml} | 0 integration-tests/docker/druid.sh | 2 +- ...r-metadata-query-disabled-sample-data.sql} | 0 .../script/docker_compose_args.sh | 4 +-- .../org/apache/druid/tests/TestNGGroup.java | 2 +- .../tests/indexer/ITAppendBatchIndexTest.java | 2 +- ...penderatorDriverRealtimeIndexTaskTest.java | 2 +- .../ITBestEffortRollupParallelIndexTest.java | 2 +- ...CombiningInputSourceParallelIndexTest.java | 2 +- .../indexer/ITCompactionSparseColumnTest.java | 2 +- .../tests/indexer/ITCompactionTaskTest.java | 2 +- .../tests/indexer/ITHttpInputSourceTest.java | 2 +- .../druid/tests/indexer/ITIndexerTest.java | 2 +- ...ServiceNonTransactionalSerializedTest.java | 2 +- ...ingServiceTransactionalSerializedTest.java | 2 +- .../ITLocalInputSourceAllInputFormatTest.java | 2 +- .../indexer/ITOverwriteBatchIndexTest.java | 2 +- .../ITPerfectRollupParallelIndexTest.java | 2 +- .../indexer/ITRealtimeIndexTaskTest.java | 2 +- .../ITSystemTableBatchIndexTaskTest.java | 2 +- 21 files changed, 39 insertions(+), 24 deletions(-) rename integration-tests/docker/{docker-compose.cds-coordinator-smq-disabled.yml => docker-compose.cds-coordinator-metadata-query-disabled.yml} (100%) rename integration-tests/docker/test-data/{cds-coordinator-smq-disabled-sample-data.sql => cds-coordinator-metadata-query-disabled-sample-data.sql} (100%) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index ba8b140a20cc..54ef044893a9 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -276,7 +276,6 @@ To enable sending all the HTTP requests to a log, set `org.apache.druid.jetty.Re The `file` request logger stores daily request logs on disk. |Property|Description|Default| -|--------|-----------|-------| |`druid.request.logging.dir`|Historical, Realtime, and Broker services maintain request logs of all of the requests they get (interaction is via POST, so normal request logs don’t generally capture information about the actual query), this specifies the directory to store the request logs in|none| |`druid.request.logging.filePattern`|[Joda datetime format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) for each file|"yyyy-MM-dd'.log'"| | `druid.request.logging.durationToRetain`| Period to retain the request logs on disk. The period should be at least longer than `P1D`.| none @@ -581,6 +580,23 @@ This deep storage is used to interface with Cassandra. You must load the `druid- |`druid.storage.keyspace`|Cassandra key space.|none| +#### Centralized datasource schema + +Centralized datasource schema is an [experimental feature](../development/experimental.md) to centralized datasource schema management within the Coordinator. +Traditionally, the datasource schema is built in the Brokers by combining schema for all the available segments +of a datasource. Brokers issue segment metadata query to data nodes and tasks to fetch segment schema. +In the new arrangement, tasks publish segment schema along with segment metadata to the database and schema for realtime segments is periodically pushed to the Coordinator in the segment announcement flow. +This enables Coordinator to cache segment schemas and build datasource schema by combining segment schema. +Brokers query the datasource schema from the Coordinator, while retaining the ability to build table schema if the +need arises. + +|Property|Description|Default|Required| +|-----|-----------|-------|--------| +|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building in the Coordinator, this should be specified in the common runtime properties.|false|No.| +|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. This should be specified in the MiddleManager runtime properties.|false|No.| + +Refer to properties with the prefix `druid.coordinator.kill.segmentSchema` in [Metadata Management](#Metadata management) to configure stale schema cleanup opeartions. + ### Ingestion security configuration #### HDFS input source @@ -878,7 +894,6 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service.|1| |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false| |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord services and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| -|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building on the Coordinator. Note, when using MiddleManager to launch task, set `druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled` in MiddleManager runtime config. |false| ##### Metadata management @@ -899,9 +914,9 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage. | No | True| |`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| No| `P1D`| |`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| Yes if `druid.coordinator.kill.datasource.on` is set to true.| `P90D`| -|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to enable automatic deletion of unused segment schemas. If set to true, Coordinator will periodically identify segment schemas which are not referenced by any used segment and mark them as unused. At a later point, these unused schemas are deleted. Only applies if CentralizedDatasourceSchema feature is enabled or `druid.centralizedDatasourceSchema.enabled` is set to true. | No | True| -|`druid.coordinator.kill.segmentSchema.period`| How often to do automatic deletion of segment schemas in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| No|`P1H`| -|`druid.coordinator.kill.segmentSchema.durationToRetain`| Duration of segment schemas to be retained from the time it was marked as unused in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| Yes if `druid.coordinator.kill.segmentSchema.on` is set to true.|`P6H`| +|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to enable automatic deletion of unused segment schemas. If set to true, Coordinator will periodically identify segment schemas which are not referenced by any used segment and mark them as unused. At a later point, these unused schemas are deleted. Only applies if [Centralized Datasource schema](#Centralized Datasource Schema) feature is enabled. | No | True| +|`druid.coordinator.kill.segmentSchema.period`| How often to do automatic deletion of segment schemas in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| No| `P1D`| +|`druid.coordinator.kill.segmentSchema.durationToRetain`| Duration of segment schemas to be retained from the time it was marked as unused in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| Yes, if `druid.coordinator.kill.segmentSchema.on` is set to true.| `P90D`| ##### Segment management diff --git a/integration-tests/docker/docker-compose.cds-coordinator-smq-disabled.yml b/integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml similarity index 100% rename from integration-tests/docker/docker-compose.cds-coordinator-smq-disabled.yml rename to integration-tests/docker/docker-compose.cds-coordinator-metadata-query-disabled.yml diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh index f112f91d1591..d7bd80ca32ed 100755 --- a/integration-tests/docker/druid.sh +++ b/integration-tests/docker/druid.sh @@ -85,7 +85,7 @@ setupData() # The "query" and "security" test groups require data to be setup before running the tests. # In particular, they requires segments to be download from a pre-existing s3 bucket. # This is done by using the loadSpec put into metadatastore and s3 credientials set below. - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-task-schema-publish-disabled" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-smq-disabled" ]; then + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-task-schema-publish-disabled" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-metadata-query-disabled" ]; then # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ && cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid \ diff --git a/integration-tests/docker/test-data/cds-coordinator-smq-disabled-sample-data.sql b/integration-tests/docker/test-data/cds-coordinator-metadata-query-disabled-sample-data.sql similarity index 100% rename from integration-tests/docker/test-data/cds-coordinator-smq-disabled-sample-data.sql rename to integration-tests/docker/test-data/cds-coordinator-metadata-query-disabled-sample-data.sql diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh index c37d22ca3144..477627d300be 100644 --- a/integration-tests/script/docker_compose_args.sh +++ b/integration-tests/script/docker_compose_args.sh @@ -79,10 +79,10 @@ getComposeArgs() then # cluster with overriden properties for broker and coordinator echo "-f ${DOCKERDIR}/docker-compose.cds-task-schema-publish-disabled.yml" - elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-smq-disabled" ] + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "cds-coordinator-metadata-query-disabled" ] then # cluster with overriden properties for broker and coordinator - echo "-f ${DOCKERDIR}/docker-compose.cds-coordinator-smq-disabled.yml" + echo "-f ${DOCKERDIR}/docker-compose.cds-coordinator-metadata-query-disabled.yml" else # default echo "-f ${DOCKERDIR}/docker-compose.yml" diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 516dcb65434a..b3417902cef8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -166,5 +166,5 @@ public class TestNGGroup public static final String CDS_TASK_SCHEMA_PUBLISH_DISABLED = "cds-task-schema-publish-disabled"; - public static final String CDS_COORDINATOR_SMQ_DISABLED = "cds-coordinator-smq-disabled"; + public static final String CDS_COORDINATOR_METADATA_QUERY_DISABLED = "cds-coordinator-metadata-query-disabled"; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java index 84ddb7612e65..ced35949626c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppendBatchIndexTest.java @@ -40,7 +40,7 @@ import java.util.UUID; import java.util.function.Function; -@Test(groups = {TestNGGroup.APPEND_INGESTION, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.APPEND_INGESTION, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITAppendBatchIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java index a077dbd0a978..b69ac0276757 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITAppenderatorDriverRealtimeIndexTaskTest.java @@ -44,7 +44,7 @@ /** * See {@link AbstractITRealtimeIndexTaskTest} for test details. */ -@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITAppenderatorDriverRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java index 0ecfe7ed5c40..be2ab61a5f59 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITBestEffortRollupParallelIndexTest.java @@ -38,7 +38,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java index 40549a2685e3..2af07e017b98 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCombiningInputSourceParallelIndexTest.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITCombiningInputSourceParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java index 27b771308b35..58dc7c43ae7b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionSparseColumnTest.java @@ -41,7 +41,7 @@ import java.util.List; import java.util.Map; -@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITCompactionSparseColumnTest extends AbstractIndexerTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 6dbcb90c3df5..b974c7d20e97 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -51,7 +51,7 @@ import java.util.Map; import java.util.Set; -@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.COMPACTION, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITCompactionTaskTest extends AbstractIndexerTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java index 11404bdd56e4..25eb83a7c28e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITHttpInputSourceTest.java @@ -29,7 +29,7 @@ import java.io.IOException; import java.util.UUID; -@Test(groups = {TestNGGroup.INPUT_SOURCE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.INPUT_SOURCE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITHttpInputSourceTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 77c64733a622..f527135c80d3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -41,7 +41,7 @@ import java.util.Map; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITIndexerTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java index 33bf5a5d79b3..5df5a708bc5e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java @@ -25,7 +25,7 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; -@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java index a50aa6ce10ef..424d3c670684 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java @@ -25,7 +25,7 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; -@Test(groups = {TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java index 0cc47b9bc630..5e7678e8b6b8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITLocalInputSourceAllInputFormatTest.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; -@Test(groups = {TestNGGroup.INPUT_FORMAT, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.INPUT_FORMAT, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITLocalInputSourceAllInputFormatTest extends AbstractLocalInputSourceParallelIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java index e81cf74b4571..c8d54e6beac3 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITOverwriteBatchIndexTest.java @@ -33,7 +33,7 @@ import java.util.UUID; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITOverwriteBatchIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java index ddae46b18dd8..0e8fc904949c 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java @@ -36,7 +36,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX, TestNGGroup.SHUFFLE_DEEP_STORE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX, TestNGGroup.SHUFFLE_DEEP_STORE, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java index 6dc2988c3e01..be3f518b098d 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITRealtimeIndexTaskTest.java @@ -44,7 +44,7 @@ /** * See {@link AbstractITRealtimeIndexTaskTest} for test details. */ -@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.REALTIME_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITRealtimeIndexTaskTest extends AbstractITRealtimeIndexTaskTest { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java index e5f60d87c482..f15081509a08 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITSystemTableBatchIndexTaskTest.java @@ -30,7 +30,7 @@ import java.io.Closeable; import java.util.function.Function; -@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_SMQ_DISABLED}) +@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.CDS_TASK_SCHEMA_PUBLISH_DISABLED, TestNGGroup.CDS_COORDINATOR_METADATA_QUERY_DISABLED}) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITSystemTableBatchIndexTaskTest extends AbstractITBatchIndexTest { From dc3f4b4ebedc6ff4b7e47f7aa825c5a73ae508a9 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 2 May 2024 18:16:23 +0530 Subject: [PATCH 7/9] minor change --- docs/configuration/index.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 54ef044893a9..9ad7eca88156 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -582,9 +582,9 @@ This deep storage is used to interface with Cassandra. You must load the `druid- #### Centralized datasource schema -Centralized datasource schema is an [experimental feature](../development/experimental.md) to centralized datasource schema management within the Coordinator. -Traditionally, the datasource schema is built in the Brokers by combining schema for all the available segments -of a datasource. Brokers issue segment metadata query to data nodes and tasks to fetch segment schema. +Centralized datasource schema is an [experimental feature](../development/experimental.md) to centralized datasource schema building within the Coordinator. +Traditionally, the datasource schema is built in the Brokers by combining schema for all the available segments of a datasource. +Brokers issue segment metadata query to data nodes and tasks to fetch segment schema. In the new arrangement, tasks publish segment schema along with segment metadata to the database and schema for realtime segments is periodically pushed to the Coordinator in the segment announcement flow. This enables Coordinator to cache segment schemas and build datasource schema by combining segment schema. Brokers query the datasource schema from the Coordinator, while retaining the ability to build table schema if the @@ -595,7 +595,7 @@ need arises. |`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building in the Coordinator, this should be specified in the common runtime properties.|false|No.| |`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. This should be specified in the MiddleManager runtime properties.|false|No.| -Refer to properties with the prefix `druid.coordinator.kill.segmentSchema` in [Metadata Management](#Metadata management) to configure stale schema cleanup opeartions. +For, stale schema cleanup configs, refer to properties with the prefix `druid.coordinator.kill.segmentSchema` in [Metadata Management](#Metadata management). ### Ingestion security configuration @@ -1446,7 +1446,7 @@ MiddleManagers pass their configurations down to their child peons. The MiddleMa |`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null| |`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`| |`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`| -|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. |false| +|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when [Centralized Datasource Schema](#Centralized Datasource Schema) feature is enabled. |false| #### Peon processing From e0eb0ad6ca99f3687a70544509f1754bb4f03425 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 2 May 2024 18:26:27 +0530 Subject: [PATCH 8/9] Update index.md --- docs/configuration/index.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 9ad7eca88156..91f4db60e690 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -276,9 +276,10 @@ To enable sending all the HTTP requests to a log, set `org.apache.druid.jetty.Re The `file` request logger stores daily request logs on disk. |Property|Description|Default| +|--------|-----------|-------| |`druid.request.logging.dir`|Historical, Realtime, and Broker services maintain request logs of all of the requests they get (interaction is via POST, so normal request logs don’t generally capture information about the actual query), this specifies the directory to store the request logs in|none| |`druid.request.logging.filePattern`|[Joda datetime format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) for each file|"yyyy-MM-dd'.log'"| -| `druid.request.logging.durationToRetain`| Period to retain the request logs on disk. The period should be at least longer than `P1D`.| none +| `druid.request.logging.durationToRetain`| Period to retain the request logs on disk. The period should be at least longer than `P1D`.| none| The format of request logs is TSV, one line per requests, with five fields: timestamp, remote\_addr, native\_query, query\_context, sql\_query. @@ -595,7 +596,7 @@ need arises. |`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building in the Coordinator, this should be specified in the common runtime properties.|false|No.| |`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. This should be specified in the MiddleManager runtime properties.|false|No.| -For, stale schema cleanup configs, refer to properties with the prefix `druid.coordinator.kill.segmentSchema` in [Metadata Management](#Metadata management). +For, stale schema cleanup configs, refer to properties with the prefix `druid.coordinator.kill.segmentSchema` in [Metadata Management](#metadata-management). ### Ingestion security configuration @@ -914,7 +915,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.datasource.on`| Boolean value for whether to enable automatic deletion of datasource metadata (Note: datasource metadata only exists for datasource created from supervisor). If set to true, Coordinator will periodically remove datasource metadata of terminated supervisor from the datasource table in metadata storage. | No | True| |`druid.coordinator.kill.datasource.period`| How often to do automatic deletion of datasource metadata in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| No| `P1D`| |`druid.coordinator.kill.datasource.durationToRetain`| Duration of datasource metadata to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.datasource.on` is set to true.| Yes if `druid.coordinator.kill.datasource.on` is set to true.| `P90D`| -|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to enable automatic deletion of unused segment schemas. If set to true, Coordinator will periodically identify segment schemas which are not referenced by any used segment and mark them as unused. At a later point, these unused schemas are deleted. Only applies if [Centralized Datasource schema](#Centralized Datasource Schema) feature is enabled. | No | True| +|`druid.coordinator.kill.segmentSchema.on`| Boolean value for whether to enable automatic deletion of unused segment schemas. If set to true, Coordinator will periodically identify segment schemas which are not referenced by any used segment and mark them as unused. At a later point, these unused schemas are deleted. Only applies if [Centralized Datasource schema](#centralized-datasource-schema) feature is enabled. | No | True| |`druid.coordinator.kill.segmentSchema.period`| How often to do automatic deletion of segment schemas in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be equal to or greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| No| `P1D`| |`druid.coordinator.kill.segmentSchema.durationToRetain`| Duration of segment schemas to be retained from the time it was marked as unused in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.segmentSchema.on` is set to true.| Yes, if `druid.coordinator.kill.segmentSchema.on` is set to true.| `P90D`| @@ -1446,7 +1447,7 @@ MiddleManagers pass their configurations down to their child peons. The MiddleMa |`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null| |`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`| |`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`| -|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when [Centralized Datasource Schema](#Centralized Datasource Schema) feature is enabled. |false| +|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when [Centralized Datasource Schema](#centralized-datasource-schema) feature is enabled. |false| #### Peon processing From 8d1a7f6fa7d3c84c36cdfa98eb786a1e24790850 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 2 May 2024 18:34:28 +0530 Subject: [PATCH 9/9] Add delimiter while computing schema fingerprint hash --- .../druid/segment/metadata/FingerprintGenerator.java | 7 +++++++ .../druid/segment/metadata/FingerprintGeneratorTest.java | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java index c13a305da18c..ffaccd09bdf3 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java @@ -58,8 +58,15 @@ public String generateFingerprint(SchemaPayload schemaPayload, String dataSource final Hasher hasher = Hashing.sha256().newHasher(); hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload)); + // add delimiter, inspired from org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1 + hasher.putByte((byte) 0xff); + hasher.putBytes(StringUtils.toUtf8(dataSource)); + hasher.putByte((byte) 0xff); + hasher.putBytes(Ints.toByteArray(version)); + hasher.putByte((byte) 0xff); + return BaseEncoding.base16().encode(hasher.hash().asBytes()); } catch (IOException e) { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java index a1a49d91bbb6..093585508029 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java @@ -51,7 +51,7 @@ public void testGenerateFingerprint_precalculatedHash() SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap); - String expected = "FB7E8AD8F2B96E58ACB99F43E380106D134774B1F5C56641268539FBADB897B3"; + String expected = "DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039"; Assert.assertEquals(expected, fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0)); }