diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 0f9c4ac54a570..f4c779f851dba 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -3213,7 +3213,7 @@ impl CatalogControllerInner { /// `list_sources` return all sources and `CREATED` ones if contains any streaming jobs. async fn list_sources(&self) -> MetaResult> { - let source_objs = Source::find() + let mut source_objs = Source::find() .find_also_related(Object) .join(JoinType::LeftJoin, object::Relation::StreamingJob.def()) .filter( @@ -3223,7 +3223,27 @@ impl CatalogControllerInner { ) .all(&self.db) .await?; - // TODO: filter out inner connector source that are still under creating. + + // filter out inner connector sources that are still under creating. + let created_table_ids: HashSet = Table::find() + .select_only() + .column(table::Column::TableId) + .join(JoinType::InnerJoin, table::Relation::Object1.def()) + .join(JoinType::LeftJoin, object::Relation::StreamingJob.def()) + .filter( + table::Column::OptionalAssociatedSourceId + .is_not_null() + .and(streaming_job::Column::JobStatus.eq(JobStatus::Created)), + ) + .into_tuple() + .all(&self.db) + .await? + .into_iter() + .collect(); + source_objs.retain_mut(|(source, _)| { + source.optional_associated_table_id.is_none() + || created_table_ids.contains(&source.optional_associated_table_id.unwrap()) + }); Ok(source_objs .into_iter()