-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(barrier): explicitly maintain database barrier state separately in local barrier manager #19556
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR, I suppose we have N streaming graphs, 1 for each DB.
After backfill completes, the partial stream graph will be merged into one of these DB streaming graphs.
@@ -40,16 +41,23 @@ pub type UpDownActorIds = (ActorId, ActorId); | |||
pub type UpDownFragmentIds = (FragmentId, FragmentId); | |||
|
|||
#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)] | |||
struct PartialGraphId(u64); | |||
pub(crate) struct PartialGraphId(u32); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall we used to have database id part of the partial graph id #19173 (comment).
I suppose now that we always have a separate graph per db, this is no longer necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. partial graph id should be unique only under a same database, and does not require to be unique globally.
b5d838a
to
9defad3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
repeated uint32 actor_ids_to_collect = 4; | ||
repeated uint32 table_ids_to_sync = 5; | ||
uint64 partial_graph_id = 6; | ||
uint32 partial_graph_id = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the partial_graph_id globally unique or only unique per database?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm. It is answered in your reply to noel's comment: "partial graph id should be unique only under a same database"
Does it mean that we won't support cross-database references in streaming queries (at least for the initial release)? |
As discussed in some offline discussion with @hzxa21 , for the initial version of cross-database streaming queries, we will always read the L0 log store of the upstream table if the table is in other database, so we don't need a streaming dispatcher in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
for (database_id, futures) in &mut *futures { | ||
if let Poll::Ready(Some((partial_graph_id, barrier, result))) = | ||
futures.poll_next_unpin(cx) | ||
{ | ||
return Poll::Ready((*database_id, partial_graph_id, barrier, result)); | ||
} | ||
} | ||
Poll::Pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are a considerable amount of databases, we may consider constructing them into an outer FuturesUnordered
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be difficult to do so.
If we create a temporary FuturesUnordered
in every select, it may hurt performance somehow since we need to allocate new memory to create new FuturesUnordered
. If we store the FuturesUnordered
as a field of local barrier worker, the future has to be static and otherwise we will need to self-reference the DatabaseManagedBarrierState
stored in local barrier worker. If we make the future to be static, we need to move the ownership of DatabaseManagedBarrierState
to the future, and we will not be able to access the DatabaseManagedBarrierState
before the future returns.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
To simplify the implementation to support database failure isolation, we should isolate different databases for fields in local barrier worker. Previously, we isolate the barrier injection and collection of different partial graphs. Ideally, it will be great to isolate the fields of local barrier worker by partial graph, so that the local barrier worker won't be aware of the concept of database.
However, after some efforts, I found it complicated to directly support isolation in different partial graphs. In snapshot backfill, during the stage of backfill, the streaming job has its own partial graph, but after the backfill finishes, its own partial graph will be merged to the global partial graph, along with its actors migrating from the own partial graph to the global partial graph. In this case, partial graphs are not completely isolated between each other. If we change to not merging the partial graph and assume that actors always belong to a fixed partial graph, we can easily support isolation in partial graphs, but the related code in global barrier manager will be unnecessarily complicated. In either way, it will be complicated to isolate different partial graphs.
Therefore, in this PR, we will let local barrier worker be aware of the concept of database. Fields in local barrier manager will be maintained mostly in the way of
HashMap<DatabaseId, T>
. In this way, the logic of database isolation can be easily implemented. Changes are mostly as followed:ManagedBarrierState
is renamed toDatabaseManagedBarrierState
, to store the barrier state per database. In the streaming control bidi-stream, we will specify thedatabase_id
in each request to specify the database_id to operate on.LocalBarrierManager
, which holds two channel tx for actors to report barrier event of actor error, will become per-database, so that the barrier event or actor error can be reported independently. It is previously stored in theSharedContext
. In this PR, we change to store it separately out ofSharedContext
.SharedContext
, which is used to store pending exchange channels, will also become per-database. This is based on the assumption that there won't be exchange channel between two different databases. In exchange service, to get the exchange channel, we need to specify the current database_id to locate the correctSharedContext
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.