Skip to content

Commit

Permalink
deprecate upstream_actor_id in StreamActor
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 24, 2025
1 parent 53af608 commit 16121c8
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 99 deletions.
2 changes: 1 addition & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ message StreamActor {
// Note that upstream actor ids are also stored in the proto of merge nodes.
// It is painstaking to traverse through the node tree and get upstream actor id from the root StreamNode.
// We duplicate the information here to ease the parsing logic in stream manager.
repeated uint32 upstream_actor_id = 6;
repeated uint32 upstream_actor_id = 6 [deprecated = true];
// Vnodes that the executors in this actor own.
// If the fragment is a singleton, this field will not be set and leave a `None`.
common.Buffer vnode_bitmap = 8;
Expand Down
20 changes: 1 addition & 19 deletions src/ctl/src/cmd_impl/meta/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,14 @@ pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow:
for actor in &fragment.actors {
if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
println!(
"\t\tActor{} ({} splits): [{}]{}",
"\t\tActor{} ({} splits): [{}]",
if ignore_id {
"".to_owned()
} else {
format!(" #{:<3}", actor.actor_id,)
},
split_count,
splits,
if !actor.upstream_actor_id.is_empty() {
let upstream_splits = actor
.upstream_actor_id
.iter()
.find_map(|id| actor_splits_map.get(id))
.expect("should have one upstream source actor");
format!(
" <- Upstream Actor{}: [{}]",
if ignore_id {
"".to_owned()
} else {
format!(" #{}", actor.upstream_actor_id[0])
},
upstream_splits.1
)
} else {
"".to_owned()
}
);
} else {
println!(
Expand Down
39 changes: 5 additions & 34 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ impl CatalogController {
fragment_id,
nodes: _,
dispatcher: pb_dispatcher,
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition: _,
expr_context: pb_expr_context,
..
} = actor;

let splits = actor_splits.get(actor_id).map(|splits| {
Expand All @@ -279,17 +279,6 @@ impl CatalogController {

let worker_id = status.worker_id() as _;

assert_eq!(
pb_upstream_actor_id
.iter()
.cloned()
.collect::<BTreeSet<_>>(),
upstream_actors
.values()
.flatten()
.cloned()
.collect::<BTreeSet<_>>()
);
let pb_expr_context = pb_expr_context
.as_ref()
.expect("no expression context found");
Expand Down Expand Up @@ -466,12 +455,6 @@ impl CatalogController {
let pb_vnode_bitmap = vnode_bitmap.map(|vnode_bitmap| vnode_bitmap.to_protobuf());
let pb_expr_context = Some(expr_context.to_protobuf());

let pb_upstream_actor_id = upstream_fragment_actors
.values()
.flatten()
.map(|&id| id as _)
.collect();

let pb_dispatcher = actor_dispatcher
.remove(&actor_id)
.unwrap_or_default()
Expand All @@ -491,12 +474,13 @@ impl CatalogController {
pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
}

#[expect(deprecated)]
pb_actors.push(PbStreamActor {
actor_id: actor_id as _,
fragment_id: fragment_id as _,
nodes: pb_nodes,
dispatcher: pb_dispatcher,
upstream_actor_id: pb_upstream_actor_id,
upstream_actor_id: vec![],
vnode_bitmap: pb_vnode_bitmap,
mview_definition: "".to_owned(),
expr_context: pb_expr_context,
Expand Down Expand Up @@ -1747,11 +1731,6 @@ mod tests {
fragment_id: TEST_FRAGMENT_ID as _,
nodes: Some(stream_node),
dispatcher: generate_dispatchers_for_actor(actor_id),
upstream_actor_id: actor_upstream_actor_ids
.values()
.flatten()
.map(|id| *id as _)
.collect(),
vnode_bitmap: actor_bitmaps
.get(&actor_id)
.cloned()
Expand All @@ -1761,6 +1740,7 @@ mod tests {
time_zone: String::from("America/New_York"),
strict_mode: false,
}),
..Default::default()
}
})
.collect_vec();
Expand Down Expand Up @@ -1973,26 +1953,17 @@ mod tests {
fragment_id: pb_fragment_id,
nodes: pb_nodes,
dispatcher: pb_dispatcher,
upstream_actor_id: pb_upstream_actor_id,
vnode_bitmap: pb_vnode_bitmap,
mview_definition,
expr_context: pb_expr_context,
..
},
) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
{
assert_eq!(actor_id, pb_actor_id as ActorId);
assert_eq!(fragment_id, pb_fragment_id as FragmentId);
let upstream_actor_ids = upstream_actor_ids.into_inner();

assert_eq!(
upstream_actor_ids
.values()
.flatten()
.map(|id| *id as u32)
.collect_vec(),
pb_upstream_actor_id
);

let actor_dispatcher: Vec<PbDispatcher> = actor_dispatchers
.remove(&actor_id)
.unwrap()
Expand Down
15 changes: 0 additions & 15 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,6 @@ impl CatalogController {
actor_id,
fragment_id,
dispatcher,
upstream_actor_id,
vnode_bitmap,
expr_context,
..
Expand All @@ -1657,20 +1656,6 @@ impl CatalogController {
.map(|(k, v)| (k, v.into_iter().collect()))
.collect();

debug_assert_eq!(
actor_upstreams
.values()
.flatten()
.cloned()
.sorted()
.collect_vec(),
upstream_actor_id
.iter()
.map(|actor_id| *actor_id as i32)
.sorted()
.collect_vec()
);

let actor_upstreams = ActorUpstreamActors(actor_upstreams);

let splits = actor_splits
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,6 @@ impl DdlController {
}
}

// update downstream actors' upstream_actor_id and upstream_fragment_id
for actor in &mut union_fragment.actors {
actor.upstream_actor_id.extend(sink_actor_ids.clone());
}

union_fragment
.upstream_fragment_ids
.push(upstream_fragment_id);
Expand Down
21 changes: 3 additions & 18 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ pub struct CustomActorInfo {
pub actor_id: u32,
pub fragment_id: u32,
pub dispatcher: Vec<Dispatcher>,
pub upstream_actor_id: Vec<u32>,
/// `None` if singleton.
pub vnode_bitmap: Option<Bitmap>,
}
Expand All @@ -90,7 +89,6 @@ impl From<&PbStreamActor> for CustomActorInfo {
actor_id,
fragment_id,
dispatcher,
upstream_actor_id,
vnode_bitmap,
..
}: &PbStreamActor,
Expand All @@ -99,7 +97,6 @@ impl From<&PbStreamActor> for CustomActorInfo {
actor_id: *actor_id,
fragment_id: *fragment_id,
dispatcher: dispatcher.clone(),
upstream_actor_id: upstream_actor_id.clone(),
vnode_bitmap: vnode_bitmap.as_ref().map(Bitmap::from),
}
}
Expand Down Expand Up @@ -530,9 +527,9 @@ impl ScaleController {
status: _,
splits: _,
worker_id,
upstream_actor_ids,
vnode_bitmap,
expr_context,
..
},
) in actors
{
Expand All @@ -547,12 +544,6 @@ impl ScaleController {
actor_id: actor_id as _,
fragment_id: fragment_id as _,
dispatcher: dispatchers,
upstream_actor_id: upstream_actor_ids
.into_inner()
.values()
.flatten()
.map(|id| *id as _)
.collect(),
vnode_bitmap: vnode_bitmap.map(|b| Bitmap::from(&b.to_protobuf())),
};

Expand Down Expand Up @@ -590,13 +581,13 @@ impl ScaleController {
actor_id,
fragment_id,
dispatcher,
upstream_actor_id,
vnode_bitmap,
} = actors.first().unwrap().clone();

let (related_job, job_definition) =
related_jobs.get(&job_id).expect("job not found");

#[expect(deprecated)]
let fragment = CustomFragmentInfo {
fragment_id: fragment_id as _,
fragment_type_mask: fragment_type_mask as _,
Expand All @@ -608,7 +599,7 @@ impl ScaleController {
actor_id,
fragment_id: fragment_id as _,
dispatcher,
upstream_actor_id,
upstream_actor_id: vec![],
vnode_bitmap: vnode_bitmap.map(|b| b.to_protobuf()),
mview_definition: job_definition.to_owned(),
expr_context: expr_contexts
Expand Down Expand Up @@ -1619,12 +1610,6 @@ impl ScaleController {
}
}

new_actor.upstream_actor_id = applied_upstream_fragment_actor_ids
.values()
.flatten()
.cloned()
.collect_vec();

if let Some(node) = new_actor.nodes.as_mut() {
visit_stream_node(node, |node_body| {
if let NodeBody::Merge(s) = node_body {
Expand Down
9 changes: 2 additions & 7 deletions src/meta/src/stream/stream_graph/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,25 +320,20 @@ impl ActorBuilder {
) -> MetaResult<StreamActorWithUpstreams> {
let (rewritten_nodes, actor_upstreams) = self.rewrite()?;

// TODO: store each upstream separately
let upstream_actor_id = self
.upstreams
.into_values()
.flat_map(|ActorUpstream { actors, .. }| actors.as_global_ids())
.collect();
// Only fill the definition when debug assertions enabled, otherwise use name instead.
#[cfg(not(debug_assertions))]
let mview_definition = job.name();
#[cfg(debug_assertions)]
let mview_definition = job.definition();

Ok((
#[expect(deprecated)]
StreamActor {
actor_id: self.actor_id.as_global_id(),
fragment_id: self.fragment_id.as_global_id(),
nodes: Some(rewritten_nodes),
dispatcher: self.downstreams.into_values().collect(),
upstream_actor_id,
upstream_actor_id: vec![],
vnode_bitmap: self.vnode_bitmap.map(|b| b.to_protobuf()),
mview_definition,
expr_context: Some(expr_context),
Expand Down

0 comments on commit 16121c8

Please sign in to comment.