Skip to content

Commit

Permalink
feat: report sink failure event to event log (#18958)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Oct 29, 2024
1 parent 1fe14fd commit 914234d
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 0 deletions.
8 changes: 8 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,12 @@ message EventLog {
string cdc_table_id = 3;
string upstream_ddl = 4;
}
message EventSinkFail {
uint32 sink_id = 1;
string sink_name = 2;
string connector = 3;
string error = 4;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
Expand All @@ -782,6 +788,7 @@ message EventLog {
EventCollectBarrierFail collect_barrier_fail = 8;
EventLog.EventWorkerNodePanic worker_node_panic = 9;
EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 10;
EventLog.EventSinkFail sink_fail = 11;
}
}

Expand All @@ -795,6 +802,7 @@ message AddEventLogRequest {
// A subset of EventLog.event that can be added by non meta node.
oneof event {
EventLog.EventWorkerNodePanic worker_node_panic = 1;
EventLog.EventSinkFail sink_fail = 2;
}
}

Expand Down
23 changes: 23 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,29 @@ impl SinkMetaClient {
}
}
}

pub async fn add_sink_fail_evet_log(
&self,
sink_id: u32,
sink_name: String,
connector: String,
error: String,
) {
match self {
SinkMetaClient::MetaClient(meta_client) => {
match meta_client
.add_sink_fail_evet(sink_id, sink_name, connector, error)
.await
{
Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
}
}
}
SinkMetaClient::MockMetaClient(_) => {}
}
}
}

impl SinkWriterParam {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ fn event_type(e: &Event) -> String {
Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL",
Event::WorkerNodePanic(_) => "WORKER_NODE_PANIC",
Event::AutoSchemaChangeFail(_) => "AUTO_SCHEMA_CHANGE_FAIL",
Event::SinkFail(_) => "SINK_FAIL",
}
.into()
}
3 changes: 3 additions & 0 deletions src/meta/service/src/event_log_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ impl EventLogService for EventLogServiceImpl {
risingwave_pb::meta::add_event_log_request::Event::WorkerNodePanic(e) => {
risingwave_pb::meta::event_log::Event::WorkerNodePanic(e)
}
risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => {
risingwave_pb::meta::event_log::Event::SinkFail(e)
}
};
self.event_log_manager.add_event_logs(vec![e]);
Ok(Response::new(AddEventLogResponse {}))
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl From<&EventLog> for ChannelId {
Event::CollectBarrierFail(_) => 6,
Event::WorkerNodePanic(_) => 7,
Event::AutoSchemaChangeFail(_) => 8,
Event::SinkFail(_) => 9,
}
}
}
20 changes: 20 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,26 @@ impl MetaClient {
.join();
}

pub async fn add_sink_fail_evet(
&self,
sink_id: u32,
sink_name: String,
connector: String,
error: String,
) -> Result<()> {
let event = event_log::EventSinkFail {
sink_id,
sink_name,
connector,
error,
};
let req = AddEventLogRequest {
event: Some(add_event_log_request::Event::SinkFail(event)),
};
self.inner.add_event_log(req).await?;
Ok(())
}

pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
let req = CancelCompactTaskRequest {
task_id,
Expand Down
11 changes: 11 additions & 0 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
actor_context.fragment_id.to_string(),
]);

if let Some(meta_client) = sink_writer_param.meta_client.as_ref() {
meta_client
.add_sink_fail_evet_log(
sink_writer_param.sink_id.sink_id,
sink_writer_param.sink_name.clone(),
sink_writer_param.connector.clone(),
e.to_report_string(),
)
.await;
}

match log_reader.rewind().await {
Ok((true, curr_vnode_bitmap)) => {
warn!(
Expand Down
39 changes: 39 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/err_isolation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,42 @@ async fn test_sink_decouple_err_isolation() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_sink_error_event_logs() -> Result<()> {
let mut cluster = start_sink_test_cluster().await?;

let source_parallelism = 6;

let test_sink = SimulationTestSink::register_new();
test_sink.set_err_rate(1.0);
let test_source = SimulationTestSource::register_new(source_parallelism, 0..100000, 0.2, 20);

let mut session = cluster.start_session();

session.run("set streaming_parallelism = 6").await?;
session.run("set sink_decouple = true").await?;
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));

test_sink.store.wait_for_err(1).await?;

session.run(DROP_SINK).await?;
session.run(DROP_SOURCE).await?;

// Due to sink failure isolation, source stream should not be recreated
assert_eq!(
source_parallelism,
test_source.create_stream_count.load(Relaxed)
);

// Sink error should be recorded in rw_event_logs
let result = session
.run("select * from rw_event_logs where event_type = 'SINK_FAIL'")
.await?;
assert!(!result.is_empty());
println!("Sink fail event logs: {:?}", result);

Ok(())
}
17 changes: 17 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct TestSinkStoreInner {
pub id_name: HashMap<i32, Vec<String>>,
pub epochs: Vec<u64>,
pub checkpoint_count: usize,
pub err_count: usize,
}

#[derive(Clone)]
Expand All @@ -66,6 +67,7 @@ impl TestSinkStore {
id_name: HashMap::new(),
epochs: Vec::new(),
checkpoint_count: 0,
err_count: 0,
})),
}
}
Expand Down Expand Up @@ -99,6 +101,10 @@ impl TestSinkStore {
self.inner().id_name.len()
}

pub fn err_count(&self) -> usize {
self.inner().err_count
}

pub async fn wait_for_count(&self, count: usize) -> anyhow::Result<()> {
let mut prev_count = 0;
let mut has_printed = false;
Expand All @@ -122,6 +128,16 @@ impl TestSinkStore {
}
Ok(())
}

pub async fn wait_for_err(&self, count: usize) -> anyhow::Result<()> {
loop {
sleep(Duration::from_secs(1)).await;
if self.err_count() >= count {
break;
}
}
Ok(())
}
}

pub struct TestWriter {
Expand Down Expand Up @@ -154,6 +170,7 @@ impl SinkWriter for TestWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> risingwave_connector::sink::Result<()> {
if thread_rng().gen_ratio(self.err_rate.load(Relaxed), u32::MAX) {
println!("write with err");
self.store.inner().err_count += 1;
return Err(SinkError::Internal(anyhow::anyhow!("fail to write")));
}
for (op, row) in chunk.rows() {
Expand Down

0 comments on commit 914234d

Please sign in to comment.