-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor SQLMetadataSegmentManager; Change contract of REST methods in DataSourcesResource #7653
Conversation
…n DataSourcesResource
@gianm @surekhasaharan @dampcake @egor-ryashin could you please review this PR (at least respective parts for which you are mentioned in the first message)? |
I'll take a look at the SQLMetadataSegmentManager changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This review is only for SQLMetadataSegmentManager.java.
private @Nullable Future<?> periodicPollTaskFuture = null; | ||
|
||
/** The number of times {@link #startPollingDatabasePeriodically} was called. */ | ||
private long startPollingCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be marked @GuardedBy("startStopPollLock")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, marked
*/ | ||
private long currentStartOrder = -1; | ||
private long currentStartPollingOrder = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be marked @GuardedBy("startStopPollLock")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
this.dbTables = dbTables; | ||
this.connector = connector; | ||
} | ||
|
||
@Override | ||
/** | ||
* Don't confuse this method with {@link #startPollingDatabasePeriodically}. This is a lifecycle starting method to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth including a sanity check that this method is only called one time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's called just once by our lifecycling logic. Most of @LifecycleStart
methods in the project are idempotent. I think that's better than if they were throwing exceptions on non-first calls.
However, this method was lacking an idempotency check, I've added it.
catch (Exception e) { | ||
log.makeAlert(e, "uncaught exception in segment manager polling thread").emit(); | ||
catch (Throwable t) { | ||
log.makeAlert(t, "Uncaught exception in " + getClass().getName() + "'s polling thread").emit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makeAlert
supports embedded format strings, if you like to use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
* the Coordinator is not the leader and therefore SqlSegmentsMetadata isn't polling the database periodically. | ||
* | ||
* The notion and the complexity of "on demand" database polls was introduced to simplify the interface of {@link | ||
* MetadataSegmentManager} and guarantee that it always returns consistent and relatively up-to-date data from methods like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this interface change, can callers still tell if there has been a poll since the start of the leadership epoch?
(I think it's nice if they can, for the reason mentioned in #7595 (comment) -- preventing leadership epoch N from using an older snapshot than leadership epoch N-1, due to its polling being out of date.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you noted below, this is guaranteed, indeed. I've extended the comment.
However, note that there is actually no semantic difference between periodic and on-demand database polls because both maintain the same invariant that the results no older than periodicPollDelay
are used. The main difference is in performance: since on-demand polls are irregular and happen in the context of the thread wanting to access the dataSources
, that may cause delays in the logic. On the other hand, periodic polls are decoupled into exec
and dataSources
-accessing methods should be generally "wait free" for database polls.
I've added this into the comment too.
final Collection<String> segmentIds, | ||
final Handle handle | ||
) | ||
private boolean awaitPeriodicOrFreshOnDemandDatabasePoll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about calling this awaitLatestDatabasePoll()
? It seems more intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
.orElseThrow(() -> new UnknownSegmentIdException(StringUtils.format("Cannot find segment id [%s]", segmentId))) | ||
) | ||
.collect(Collectors.toList()); | ||
DatabasePoll latestDatabasePoll = this.latestDatabasePoll; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the idea is that if we are in the midst of a leadership epoch, then:
isPollingDatabasePeriodically
would return truelatestDatabasePoll
will be a PeriodicDatabasePoll- we are guaranteed to await for that PeriodicDatabasePoll to complete its first poll, if
awaitPeriodicOrFreshOnDemandDatabasePoll
is called after the leadership epoch starts
Is that right? If so it should provide the property I mentioned in another comment (preventing leadership epoch N from using an older snapshot than leadership epoch N-1). If so IMO it'd be nice to have some comments or assertions about these class invariants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added, see #7653 (comment)
} | ||
} | ||
|
||
private List<DataSegment> retreiveUnusedSegments( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retrieve (spelling)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed
return segments; | ||
} | ||
|
||
private Iterator<DataSegment> retreiveUsedSegmentsOverlappingIntervals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retrieve (spelling)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -268,7 +276,9 @@ Runs a [Kill task](../ingestion/tasks.html) for a given interval and datasource. | |||
|
|||
* `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}` | |||
|
|||
Disables a segment. | |||
Marks as unused a segment of a data source. Returns a JSON object of the form `{"segmentStateChanged": <boolean>"}` with | |||
the boolean indicating if the state of the segment has been changed (that is, the segment was marked as used) as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"segment was marked as used" -> "segment was marked as unused"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed
dataSource.addSegment(segment); | ||
} | ||
String s = segment.getId().toString(); | ||
segmentIdsToMarkAsUsed.add(s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : rename s to segmentId or get rid of s
and do it inline segmentIdsToMarkAsUsed.add(segment.getId().toString())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
@@ -713,6 +1021,8 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE | |||
} | |||
catch (IOException e) { | |||
log.makeAlert(e, "Failed to read segment from db.").emit(); | |||
// If one entry is database is corrupted, doPoll() should continue to work overall. See |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If one entry "is" -> "in" database ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed
Reviewed the REST API changes in DataSourcesResource and MetadataResource, those LGTM. |
@@ -43,17 +43,17 @@ | |||
private final Long pushTimeout; | |||
|
|||
public static ClientCompactQueryTuningConfig from( | |||
@Nullable UserCompactTuningConfig userCompactTuningConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this is too local a variable for this renaming effort (just a note).
@@ -186,6 +186,6 @@ public boolean equals(Object o) | |||
@Override | |||
public int hashCode() | |||
{ | |||
return Objects.hash(name, properties, idToSegments); | |||
return Objects.hash(name, properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you comment on this? There are too many places ImmutableDruidDataSource is used, so I couldn't check in a reasonable time the purpose and correction of that change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #6901 (comment). This is actually noted in the PR description, "Optimizations" section.
* | ||
* Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an | ||
* ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only | ||
* once rather than several times. | ||
*/ | ||
public Collection<DataSegment> getLazyAllSegments() | ||
public Collection<DataSegment> iterateAllSegments() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhat misleading as it doesn't iterate but creates a collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intentional: collections returned from all methods whose names start with iterate-
are not expected to be stored anywhere. They are only expected to be consumed immediately in a for-each statement, forEach()
call or a Stream pipeline.
*/ | ||
private final ReentrantReadWriteLock startStopLock = new ReentrantReadWriteLock(); | ||
private interface DatabasePoll |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why the marker interface was introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly for the ability to make Javadoc references.
@egor-ryashin could you please check this PR again? |
{ | ||
holderSet.add(chunk); | ||
return holderMap.putIfAbsent(chunk, chunk) == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is a bug. PartitionChunk
is one of most wacky things in Druid when it comes to equals() and hashCode() implementations. It considers only the partitionId and ignores the object in the PartitionChunk
. The intention here is to always replace the existing one with the new one. I think this behavior is to swap realtime segments with published and available ones in historicals. I know this is confusing and must be addressed in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry. I was confused. Any new value shouldn't overwrite the existing value, so this looks correct.
@@ -75,12 +79,11 @@ | |||
Comparators.intervalsByStartThenEnd() | |||
); | |||
private final Map<Interval, TreeMap<VersionType, TimelineEntry>> allTimelineEntries = new HashMap<>(); | |||
private final AtomicInteger numObjects = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VersionedIntervalTimeline
is not thread-safe. Should this be just an integer? I'm not sure how you're planning to use VersionedIntervalTimeline
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, this variable is used only in iterateAllObjects
. Should it return a collection? Or can we change the return type to Stream or Iterator? With this change, we don't have to track the number of objects in the timeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VersionedIntervalTimeline
actually has to be thread-safe: see #7285.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more convenient for users of the class if the method returns a Collection
rather than Iterable
or Stream
: it makes possible both for-each iteration and a stream pipeline. Although currently this method is used in just one place with forEach()
call, I don't see much value of over-specializing for this case now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As aforementioned, we don't have to have numObjects
if iterateAllObjects()
returns an Iterable
or Stream
. As you noted, the only user is calling forEach()
on the result, it doesn't have to be a collection. As far as I remember, we agreed on simplifying unnecessarily generalized code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually discovered a use of numObjects in #7306, from where this PR is partially extracted: https://github.com/apache/incubator-druid/pull/7306/files#diff-b3571a010cbbb3cb648676a89936a51fR277
final Set<SegmentId> overshadowedSegments = Optional | ||
.ofNullable(metadataSegmentManager.getOvershadowedSegments()) | ||
.orElse(Collections.emptySet()); | ||
DataSourcesSnapshot dataSourcesSnapshot = segmentsMetadata.getSnapshotOfDataSourcesWithAllUsedSegments(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With an empty database that has no segments I'm getting a null pointer exception because the dataSourceSnapshot
set here is null
.
Allowing an empty database to still set a snapshot, by not returning here and instead making an 'empty' snapshot remedies this issue, but I'm not sure if it is the right fix because it's different than previous behavior, where a null would be retained until segments existed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raised #8106 if the approach to fixing is legitimate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for finding this and proposing a fix
* periodically (see {@link PeriodicDatabasePoll}, {@link #startPollingDatabasePeriodically}, {@link | ||
* #stopPollingDatabasePeriodically}) or "on demand" (see {@link OnDemandDatabasePoll}), when one of the methods that | ||
* accesses {@link #dataSourcesSnapshot}'s state (such as {@link #getImmutableDataSourceWithUsedSegments}) is | ||
* called when the Coordinator is not the leader and therefore SqlSegmentsMetadata isn't polling the database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this explanation. All API calls to non-leaders should be redirected to the leader. How could database polling happen for non-leaders?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"All API calls to non-leaders should be redirected to the leader" - you mean internally, within a Druid cluster, by a non-leader Coordinator, or externally, API callers should subscribe to the leader updates in ZK and always call to the leading Coordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All methods in SQLMetadataSegmentManager
are called by DruidCoordinator
or MetadataResource
. DruidCoordinator
has its lifecycle and doesn't do any coordinator work if it's leading.
All HTTP calls to MetadataResource
of non-leader from outside Druid should be automatically redirected to the leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, thanks. I didn't know about this redirect.
It still seems to me that SQLMetadataSegmentManager
can be called from MetadataResource
outside of leadership between the moment when this Coordinator becomes the leader (DruidLeaderSelector.getCurrentLeader()
returns this Coordinator's id) and SQLMetadataSegmentManager
gets notified about that in DruidCoordinator.becomeLeader()
. (See also the Javadoc for LeaderLatchListener
which describes another intersting race condition.)
If this is true, I think it's worth reflecting in some comments in code. I will think about where to put this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point and the javadoc of LeaderLatchListener
does sound interesting.
It still seems to me that
SQLMetadataSegmentManager
can be called fromMetadataResource
outside of leadership between the moment when this Coordinator becomes the leader (DruidLeaderSelector.getCurrentLeader()
returns this Coordinator's id) andSQLMetadataSegmentManager
gets notified about that inDruidCoordinator.becomeLeader()
.
My feeling about this is DruidCoordinator.isLeader()
and DruidCoordinator.getCurrentLeader()
could be wrong. Even when DruidLeaderSelector
says this coordinator is leading now, DruidCoordinator.isLeader()
shouldn't return true until DruidCoordinator.becomeLeader()
is finished. A similar issue was in the overlord and now it fixed as here (TaskMaster.getCurrentLeader()
looks still wrong though).
This PR is a large portion of #7306 PR extracted, but avoiding renaming classes so that Git doesn't lose track of files' history.
Description
Decouple
MetadataSegmentManager.start
/stop
andstartPollingDatabasePeriodically
/stopPollingDatabasePeriodically
.Lifecycle
'sstart()
now doesn't start polling the database periodically.stopPollingDatabasePeriodically()
doesn't shutdown the executor, only the polling task. NextstartPollingDatabasePeriodically()
reuses the executor.SQLMetadataSegmentManager
: periodic and on-demand database pollsMade the
MetadataSegmentManager
's interface simpler and more robust by not exposing "periodic polls" implementation detail. Instead, when the Coordinator is not leading (henceMetadataSegmentManager
is not polling the database periodically) and somebody (e. g.DataSourcesResource
) queries the data fromMetadataSegmentManager
, an on-demand database poll is initiated under the hoods (if the previous on-demand poll was long time ago).This change reworks the change #7447 by @gianm.
DataSourcesResource
return code changes: returning numChangedSegments: N and segmentStateChanged: booleanPOST
/druid/coordinator/v1/datasources/{dataSourceName}
DELETE
/druid/coordinator/v1/datasources/{dataSourceName}
POST
/druid/coordinator/v1/datasources/{dataSourceName}/markUnused
POST
/druid/coordinator/v1/datasources/{dataSourceName}/markUsed
Now return a JSON object of the form
{"numChangedSegments": N}
instead of 204 (empty response) when no segments were changed. On the other hand, 500 (server error) is not returned instead of 204 (empty response).There is a separate question of whether all endpoints in
DataSourcesResource
should return 404 on non-existent data source: see #7652. This aspect is not changed in this PR.POST
/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}
DELETE
/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}
Now return a JSON object of the form
{"segmentStateChanged": true/false}
.The change originates from this discussion: #7306 (comment)
REST API change: GET
/druid/coordinator/v1/metadata/datasources?includeDisabled
->includeUnused
includeDisabled
is still accepted, but a warning is emitted in a log. See changes inDataSourcesResource
class.FYI added a method
JettyUtils.getQueryParam()
to facilitate such parameter renames. (That's why this PR hasCompatibility
tag.)MetadataSegmentManager
: API refactorenableDataSource
->markAsUsedAllNonOvershadowedSegmentsInDataSource
enableSegments(dataSource, interval)
->markAsUsedNonOvershadowedSegmentsInInterval
enableSegments(dataSource, Collection segmentIds)
->markAsUsedNonOvershadowedSegments(dataSource, Set segmentIds) throws UnknownSegmentIdException
enableSegment
->markSegmentAsUsed
removeDataSource
->markAsUnusedAllSegmentsInDataSource
disableSegments(dataSource, interval)
->markAsUnusedSegmentsInInterval
disableSegments(dataSource, Collection segmentIds)
->markSegmentsAsUnused(dataSource, Set segmentIds) throws UnknownSegmentIdException
removeSegment
->markSegmentAsUnused
getDataSource
->prepareImmutableDataSourceWithUsedSegments
getDataSources
->prepareImmutableDataSourcesWithAllUsedSegments
iterateAllSegments
->iterateAllUsedSegments
getAllDataSourceNames
->retrieveAllDataSourceNames
SQLMetadataSegmentManager
: remove data fromdataSources
inmarkAsUnusedSegmentsInInterval
andmarkSegmentsAsUnused
Fixed the logic added in #7490 to match the behavior of
markAsUnusedAllSegmentsInDataSource
. FYI @dampcake.Optimizations
ImmutableDruidDataSource.hashCode()
, see Introduce published segment cache in broker #6901 (comment).toImmutableDataSource()
(hence garbage) inDataSourcesResource.getDataSource()
.This PR is
Incompatible
because it changes return codes of REST methods.This PR is
Development Blocker
because it blocks #7306.For reviewers: key changed classes are
SQLMetadataSegmentManager
andDataSourcesResource
.