Skip to content

Commit

Permalink
Flag row errors earlier in the pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
goakley authored and AndrooTheChen committed Sep 23, 2024
1 parent 918cd25 commit d13e1ba
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions src/sinks/gcp/bigquery/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub enum BigqueryServiceError {
Transport { error: tonic::transport::Error },
#[snafu(display("BigQuery request failure: {}", status))]
Request { status: tonic::Status },
#[snafu(display("BigQuery row write failures: {:?}", row_errors))]
RowWrite { row_errors: Vec<proto::RowError> },
}

impl From<tonic::transport::Error> for BigqueryServiceError {
Expand All @@ -116,6 +118,12 @@ impl From<tonic::Status> for BigqueryServiceError {
}
}

impl From<Vec<proto::RowError>> for BigqueryServiceError {
fn from(row_errors: Vec<proto::RowError>) -> Self {
Self::RowWrite { row_errors }
}
}

type BigQueryWriteClient = proto::big_query_write_client::BigQueryWriteClient<
InterceptedService<tonic::transport::Channel, AuthInterceptor>,
>;
Expand Down Expand Up @@ -153,14 +161,28 @@ impl Service<BigqueryRequest> for BigqueryService {
Box::pin(async move {
// Ideally, we would maintain the gRPC stream, detect when auth expired and re-request with new auth.
// But issuing a new request every time leads to more comprehensible code with reasonable performance.
trace!(
message = "Sending request to BigQuery",
request = format!("{:?}", request.request),
);
let stream = tokio_stream::once(request.request);
let response = client.append_rows(stream).await?;
match response.into_inner().message().await? {
Some(body) => Ok(BigqueryResponse {
body,
request_byte_size,
request_uncompressed_size,
}),
Some(body) => {
trace!(
message = "Received response body from BigQuery",
body = format!("{:?}", body),
);
if body.row_errors.is_empty() {
Ok(BigqueryResponse {
body,
request_byte_size,
request_uncompressed_size,
})
} else {
Err(body.row_errors.into())
}
}
None => Err(tonic::Status::unknown("response stream closed").into()),
}
})
Expand Down

0 comments on commit d13e1ba

Please sign in to comment.