Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relocating Table Schema Building: Shifting from Brokers to Coordinator for Improved Efficiency #14985

Merged
merged 103 commits into from
Nov 4, 2023

Conversation

findingrish
Copy link
Contributor

@findingrish findingrish commented Sep 14, 2023

Description

Original proposal

Issue: #14989

In the current design, brokers query both data nodes and tasks to fetch the schema of the segments they serve. The table schema is then constructed by combining the schemas of all segments within a datasource. However, this approach leads to a high number of segment metadata queries during broker startup, resulting in slow startup times and various issues outlined in the design proposal.

To address these challenges, we propose centralizing the table schema management process within the coordinator. This change is the first step in that direction. In the new arrangement, the coordinator will take on the responsibility of querying both data nodes and tasks to fetch segment schema and subsequently building the table schema. Brokers will now simply query the Coordinator to fetch table schema. Importantly, brokers will still retain the capability to build table schemas if the need arises, ensuring both flexibility and resilience.

Design

Coordinator changes

These changes enhance the coordinator's capabilities, enabling it to manage table schema and provide schema related information through its APIs. As of now, the segment metadata cache structure will be maintained on both leader and follower processes, this would help in leader failover scenario.

SegmentMetadataCache refactor: We introduce a new class, CoordinatorSegmentMetadataCache, extending AbstractSegmentMetadataCache which manages the cache and forms the core of the schema management process.
We also add an implementation of QuerySegmentWalker, which is specific to segment metadata queries executed during segment refresh mechanism.

Changes to Coordinator timeline: CoordinatorServerView has minor changes to register a timeline callback. This is required for CoordinatorSegmentMetadata to update its cache using the timeline.

API Changes:

  • We introduce a new container class, DataSourceInformation, for handling RowSignature information. An API is exposed to retrieve datasource information, check MetadataResource#getDataSourceInformation.
    -- Request Path: /druid/coordinator/v1/metadata/dataSourceInformation
    -- HTTP Method: POST
    -- Request Body: Set of dataSource names
    -- Query params: No query params
    -- Response: List of DataSourceInformation object.

  • The existing api MetadataResource#getAllUsedSegments, responsible for returning used segments is modified (backward compatible though) to accept a new query parameter called includeRealtimeSegments. When set, this parameter will include realtime segments in the result. Additionally, we enhance the return object SegmentStatusInCluster, to provide additional information regarding realtime and numRows.
    -- Request Path: /druid/coordinator/v1/metadata/segments
    -- HTTP Method: GET
    -- Request Body: No changes
    -- Query params: Added new query parameter includeRealtimeSegments, when set returns realtime segments as well
    -- Response: SegmentStatusInCluster object has two new fields, realtime & numRows

Binding for Query Modules: We add essential bindings to run segment metadata query in CliCoordinator.

Broker changes

Broker now defaults to querying the Coordinator for fetching table schema, eliminating the need for querying data nodes and tasks for segment schema. Additionally, there are changes in the logic for building sys segments table.

Broker-side SegmentMetadataCache: We introduce a new implementation of AbstractSegmentMetadataCache called BrokerSegmentMetadataCache. This implementation queries the coordinator for table schema and falls back to running a segment metadata query when needed. It also assumes the responsibility for building PhysicalDataSourceMetadata, for which a new class is introduced.

System schema changes: In MetadataSegmentView, requests for realtime segments are now included when polling the coordinator. The segment table building logic is updated accordingly.

Testing

  • The changes have been tested locally with the wikipedia dataset.

  • Unit test has been added.

  • All of the existing integration tests have been tested with feature enabled (39fb248 & eb3e3c1 & 30438f4).

  • Integration test with the group name centralized-table-schema has been added.

  • The changes have also been tested in a druid cluster with,

    • 1 million segments, 225 datasources
    • 3 master nodes
    • 2 query nodes
    • 150 data nodes

    Notable observations,

    • The memory usage on Broker was same as earlier.
    • The memory usage on Coordinator increased a bit, with the new cache occupying 200 MBs on the Coordinator.
    • The cache initialisation time on the Coordinator varied around 8-12 minutes. Same as that for brokers with feature disabled.
    • The cache initialisation time on the Broker was significantly reduced to 20 seconds.
    • In the scenario of a Coordinator leader failover, brokers began polling datasource schema from the new leader. In this case, no segment metadata queries were executed by the Broker nodes.
    • When Coordinator nodes were restarted, followed by Broker nodes, both services executed segment metadata queries to initialise the cache. However, after the Coordinator nodes completed the cache initialisation, brokers began fetching datasource schema from the Coordinator, rather than executing segment metadata queries.
    • In the scenario of Broker nodes restarting followed by Coordinator nodes restarting, both services executed segment metadata queries. Once the cache on the Coordinator node was initialised, the Broker nodes began polling the datasource schema from the Coordinator instead of executing segment metadata queries.

Potential side effects

  • When the Druid cluster employs service tiering, it's possible for brokers in different tiers to generate different table schemas. This can lead to users observing varying table schemas on the web console, depending on which broker handles the query.
    With this change, brokers will now poll the datasource schema from the coordinator, ensuring that all brokers consistently use the same datasource schema.

Known issues

  • The datasource schema can be partial, especially during service startup. Eventually, as all the segments for a datasource is refreshed the datasource schema would be accurate. This behaviour would happen irrespective of enabling the feature.
  • If the feature is enabled and a new datasource is added, both coordinator and broker might end up executing segment metadata queries to refresh the segment.

Upgrade considerations

The general upgrade order should be followed. The new code is behind a feature flag, so it is compatible with existing setups. Brokers with the new changes can communicate with old Coordinator without issues.

Release note

This feature is experimental and addresses multiple challenges outlined in the description. To enable it, set druid.coordinator.centralizedTableSchema.enabled in Coordinator configurations.

As part of enabling queries on the coordinator, following config classes are available with the specified base property names.

druid.coordinator.segmentMetadata -> SegmentMetadataQueryConfig
druid.coordinator.internal.query.config -> InternalQueryConfig
druid.coordinator.query.retryPolicy -> RetryQueryRunnerConfig
druid.coordinator.query.scheduler -> QuerySchedulerProvider
druid.coordinator.query.default -> DefaultQueryConfig


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

*/
public DruidServerMetadata pickOne()
{
synchronized (this) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, this still isn't quite right i think, everything needs to be synchronized if we are dropping the concurrent set. It seems simpler to add back the concurrent set, I don't think the synchronized was necessary. Sets.newConcurrentHashSet is backed by the keyset of a ConcurrentHashMap. Per javadocs,

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:
* they may proceed concurrently with other operations
* they will never throw ConcurrentModificationException
* they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

So I think we can just iterate to pick one...

I don't think we need to do anything like guard from the DruidServerMetadata being removed from the SegmentLoadInfo, but if we do, then something should probably be pushing a computation into this class to do the thing so that we can do the whole operation under lock...

{
final PhysicalDatasourceMetadata oldTable = tables.put(dataSource, physicalDatasourceMetadata);
if (oldTable == null || !oldTable.getRowSignature().equals(physicalDatasourceMetadata.getRowSignature())) {
log.info("[%s] has new signature: %s.", dataSource, physicalDatasourceMetadata.getRowSignature());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't quite figured it out yet, or if it is related to the refactor in this PR or some pre-existing problem, but might be worth looking further into anyway...

while bouncing a historical repeatedly, so that datasources are going offline and online periodically, i keep finding log messages where it gets into a state that it is printing an empty row signature.

2023-10-28T03:33:46,084 INFO [DruidSchema-Cache-0] org.apache.druid.sql.calcite.schema.BrokerSegmentMetadataCache - [wikipedia-schemaless] has new signature: {}.

which I think must mean that it knows the table needs refreshed, but has not yet obtained any segment metadata. I'll try to see if i can determine how this happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the historical went offline, the table would have been removed from the cache, when it again came up, the datasource schema is built and the message is logged since this condition oldTable == null is met.

RowSignature being empty implies that SMQ queries succeeded but returned empty rowSignature for the segments? is that possible?
Not sure if this got introduced by the new changes, since we are just fetching the schema from Coordinator (which is having empty rowSignature) and the flow on Coordinator is same as what used to be on the Broker.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from the minor comments, the changes LGTM.

*/
public DruidServerMetadata pickOne()
{
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I kind of agree that we might hit 'IndexOutOfBoundsExceptions`
From concurrentHashMap java docs:

 However, iterators are designed to be used by only one thread at a time.
 * Bear in mind that the results of aggregate status methods including
 * {@code size}, {@code isEmpty}, and {@code containsValue} are typically
 * useful only when a map is not undergoing concurrent updates in other threads.
 * Otherwise the results of these methods reflect transient states
 * that may be adequate for monitoring or estimation purposes, but not
 * for program control.

but since you can iterator the elements safely what you could do is

  • get the position of the element that you want to compute using size checks.
  • create iterator
  • and then the following pseudo code
int prev_element;
for each (val:iterator){
  if(index==pos){
  return val;
  }
  prev_element=val;
  
  index++;
}
return prev_element; 

docs/api-reference/legacy-metadata-api.md Show resolved Hide resolved
Integer replicationFactor = isOvershadowed ? (Integer) 0
: coordinator.getReplicationFactor(segment.getId());
// conditionally add realtime segments information
if (null != includeRealtimeSegments && null != coordinatorSegmentMetadataCache) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the realtime flag only works if the centralizedSchemaManagementis enabled lets document this in user facing docs and well as in the code.

In the code we should mention that we did not want to increase the payload of the broker to coordinator communication if this feature is disabled and we did not want to introduce a new feature flag on the broker so this was a trade off done. This api will be deprecated once we fully move over to coordinator to power sys.segments q's from the broker.

Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@cryptoe cryptoe merged commit 8c802e4 into apache:master Nov 4, 2023
81 of 83 checks passed
@cryptoe
Copy link
Contributor

cryptoe commented Nov 4, 2023

Thanks you for the contribution @findingrish.

cryptoe pushed a commit that referenced this pull request Nov 14, 2023
…datasources (#15355)

In pull request #14985, a bug was introduced where periodic refresh would skip rebuilding a datasource's schema after encountering a non-existent datasource. This resulted in remaining datasources having stale schema information.

This change addresses the bug and adds a unit test to validate the refresh mechanism's behaviour when a datasource is removed, and other datasources have schema changes.
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
…r for Improved Efficiency (apache#14985)

In the current design, brokers query both data nodes and tasks to fetch the schema of the segments they serve. The table schema is then constructed by combining the schemas of all segments within a datasource. However, this approach leads to a high number of segment metadata queries during broker startup, resulting in slow startup times and various issues outlined in the design proposal.

To address these challenges, we propose centralizing the table schema management process within the coordinator. This change is the first step in that direction. In the new arrangement, the coordinator will take on the responsibility of querying both data nodes and tasks to fetch segment schema and subsequently building the table schema. Brokers will now simply query the Coordinator to fetch table schema. Importantly, brokers will still retain the capability to build table schemas if the need arises, ensuring both flexibility and resilience.
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
…datasources (apache#15355)

In pull request apache#14985, a bug was introduced where periodic refresh would skip rebuilding a datasource's schema after encountering a non-existent datasource. This resulted in remaining datasources having stale schema information.

This change addresses the bug and adds a unit test to validate the refresh mechanism's behaviour when a datasource is removed, and other datasources have schema changes.
yashdeep97 pushed a commit to yashdeep97/druid that referenced this pull request Dec 1, 2023
…datasources (apache#15355)

In pull request apache#14985, a bug was introduced where periodic refresh would skip rebuilding a datasource's schema after encountering a non-existent datasource. This resulted in remaining datasources having stale schema information.

This change addresses the bug and adds a unit test to validate the refresh mechanism's behaviour when a datasource is removed, and other datasources have schema changes.
cryptoe pushed a commit that referenced this pull request Jan 10, 2024
…h Schema for Seamless Coordinator Updates (#15475)

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information. This task encompasses addressing both realtime and finalized segments.

This modification specifically addresses the issue with realtime segments. Tasks will now routinely communicate the schema for realtime segments during the segment announcement process. The Coordinator will identify the schema alongside the segment announcement and subsequently update the schema for realtime segments in the metadata cache.
@LakshSingla LakshSingla added this to the 29.0.0 milestone Jan 29, 2024
cryptoe pushed a commit that referenced this pull request Apr 24, 2024
…rce Schema Building (#15817)

Issue: #14989

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Thereafter, we addressed the problem of publishing schema for realtime segments (#15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.

This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants