Skip to content

Commit

Permalink
report error with score
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 4, 2024
1 parent ae53d6d commit d28b483
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
9 changes: 6 additions & 3 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use risingwave_pb::stream_service::streaming_control_stream_response::{
};
use risingwave_pb::stream_service::{
streaming_control_stream_response, BarrierCompleteResponse, InjectBarrierRequest,
StreamingControlStreamRequest, StreamingControlStreamResponse,
PbScoredError, StreamingControlStreamRequest, StreamingControlStreamResponse,
};

use crate::executor::exchange::permit::Receiver;
Expand Down Expand Up @@ -160,12 +160,15 @@ impl ControlStreamHandle {
pub(super) fn ack_reset_database(
&mut self,
database_id: DatabaseId,
root_err: Option<StreamError>,
root_err: Option<ScoredStreamError>,
reset_request_id: u32,
) {
self.send_response(Response::ResetDatabase(ResetDatabaseResponse {
database_id: database_id.database_id,
root_err: root_err.map(|err| err.to_report_string()),
root_err: root_err.map(|err| PbScoredError {
err_msg: err.error.to_report_string(),
score: err.score.0,
}),
reset_request_id,
}));
}
Expand Down
8 changes: 3 additions & 5 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use risingwave_pb::stream_service::streaming_control_stream_request::{
use risingwave_pb::stream_service::InjectBarrierRequest;

use crate::task::barrier_manager::await_epoch_completed_future::AwaitEpochCompletedFuture;
use crate::task::barrier_manager::LocalBarrierEvent;
use crate::task::barrier_manager::{LocalBarrierEvent, ScoredStreamError};

pub(super) struct ManagedBarrierStateDebugInfo<'a> {
running_actors: BTreeSet<ActorId>,
Expand Down Expand Up @@ -361,9 +361,7 @@ impl SuspendedDatabaseState {
if let Some(hummock) = self.inner.actor_manager.env.state_store().as_hummock() {
hummock.clear_tables(self.inner.table_ids).await;
}
ResetDatabaseOutput {
root_err: root_err.error,
}
ResetDatabaseOutput { root_err }
}
}

Expand All @@ -373,7 +371,7 @@ pub(crate) struct ResettingDatabaseState {
}

pub(crate) struct ResetDatabaseOutput {
pub(crate) root_err: StreamError,
pub(crate) root_err: ScoredStreamError,
}

pub(crate) enum DatabaseStatus {
Expand Down

0 comments on commit d28b483

Please sign in to comment.