diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index 13e1a9366498d..791f32a4f31f2 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::future::Future; use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; use std::sync::LazyLock; use anyhow::anyhow; -use futures::future::try_join_all; -use futures::prelude::future::FutureExt; +use futures::future::{try_join_all, TryJoinAll}; use futures::prelude::TryFuture; use futures::TryFutureExt; use itertools::Itertools; use mongodb::bson::{bson, doc, Array, Bson, Document}; -use mongodb::{Client, Database, Namespace}; +use mongodb::{Client, Namespace}; use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::log::LogSuppresser; @@ -300,13 +298,70 @@ impl Sink for MongodbSink { .into_log_sinker(MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE)) } } +mod send_bulk_write_command_future { + use core::future::Future; + + use anyhow::anyhow; + use mongodb::bson::Document; + use mongodb::Database; + + use crate::sink::{Result, SinkError}; + + pub(super) type SendBulkWriteCommandFuture = impl Future> + 'static; + + pub(super) fn send_bulk_write_commands( + db: Database, + upsert: Option, + delete: Option, + ) -> SendBulkWriteCommandFuture { + async move { + if let Some(upsert) = upsert { + send_bulk_write_command(db.clone(), upsert).await?; + } + if let Some(delete) = delete { + send_bulk_write_command(db, delete).await?; + } + Ok(()) + } + } + + async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { + let result = db.run_command(command, None).await.map_err(|err| { + SinkError::Mongodb(anyhow!(err).context(format!( + "sending bulk write command failed, database: {}", + db.name() + ))) + })?; + + if let Ok(write_errors) = result.get_array("writeErrors") { + return Err(SinkError::Mongodb(anyhow!( + "bulk write respond with write errors: {:?}", + write_errors, + ))); + } + + let n = result.get_i32("n").map_err(|err| { + SinkError::Mongodb( + anyhow!(err).context("can't extract field n from bulk write response"), + ) + })?; + if n < 1 { + return Err(SinkError::Mongodb(anyhow!( + "bulk write respond with an abnormal state, n = {}", + n + ))); + } + + Ok(()) + } +} + +use send_bulk_write_command_future::*; pub struct MongodbSinkWriter { pub config: MongodbConfig, payload_writer: MongodbPayloadWriter, is_append_only: bool, - // // TODO switching to bulk write API when mongodb driver supports it - // command_builder: CommandBuilder, } impl MongodbSinkWriter { @@ -351,12 +406,6 @@ impl MongodbSinkWriter { let row_encoder = BsonEncoder::new(schema.clone(), Some(col_indices), pk_indices.clone()); - // let command_builder = if is_append_only { - // CommandBuilder::AppendOnly(HashMap::new()) - // } else { - // CommandBuilder::Upsert(HashMap::new()) - // }; - let payload_writer = MongodbPayloadWriter::new( schema, pk_indices, @@ -373,7 +422,7 @@ impl MongodbSinkWriter { }) } - fn append(&mut self, chunk: StreamChunk) -> Result>>> { + fn append(&mut self, chunk: StreamChunk) -> Result> { let mut insert_builder: HashMap = HashMap::new(); for (op, row) in chunk.rows() { if op != Op::Insert { @@ -389,10 +438,10 @@ impl MongodbSinkWriter { } self.payload_writer.append(&mut insert_builder, row)?; } - self.payload_writer.flush_insert(&mut insert_builder) + Ok(self.payload_writer.flush_insert(insert_builder)) } - fn upsert(&mut self, chunk: StreamChunk) -> Result>>> { + fn upsert(&mut self, chunk: StreamChunk) -> Result> { let mut upsert_builder: HashMap = HashMap::new(); for (op, row) in chunk.rows() { if op == Op::UpdateDelete { @@ -401,7 +450,7 @@ impl MongodbSinkWriter { } self.payload_writer.upsert(&mut upsert_builder, op, row)?; } - self.payload_writer.flush_upsert(&mut upsert_builder) + Ok(self.payload_writer.flush_upsert(upsert_builder)) } } @@ -415,14 +464,14 @@ impl AsyncTruncateSinkWriter for MongodbSinkWriter { chunk: StreamChunk, mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - let boxed_futures = if self.is_append_only { - let futures = self.append(chunk)?; - try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed() + let futures = if self.is_append_only { + self.append(chunk)? } else { - let futures = self.upsert(chunk)?; - try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed() + self.upsert(chunk)? }; - add_future.add_future_may_await(boxed_futures).await?; + add_future + .add_future_may_await(futures.map_ok(|_: Vec<()>| ())) + .await?; Ok(()) } } @@ -648,70 +697,33 @@ impl MongodbPayloadWriter { fn flush_insert( &self, - insert_builder: &mut HashMap, - ) -> Result>>> { + insert_builder: HashMap, + ) -> TryJoinAll { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. - let mut futures = Vec::with_capacity(insert_builder.len()); - for (ns, builder) in insert_builder.drain() { + let futures = insert_builder.into_iter().map(|(ns, builder)| { let db = self.client.database(&ns.0); - futures.push(Self::send_bulk_write_command(db, builder.build())); - } - Ok(futures) + send_bulk_write_commands(db, Some(builder.build()), None) + }); + try_join_all(futures) } fn flush_upsert( &self, - upsert_builder: &mut HashMap, - ) -> Result>>> { + upsert_builder: HashMap, + ) -> TryJoinAll { // TODO try sending bulk-write of each collection concurrently to improve the performance when // `dynamic collection` is enabled. We may need to provide best practice to guide user on setting // the MongoDB driver's connection properties. - let mut futures = Vec::with_capacity(upsert_builder.len()); - for (ns, builder) in upsert_builder.drain() { + let futures = upsert_builder.into_iter().map(|(ns, builder)| { let (upsert, delete) = builder.build(); // we are sending the bulk upsert first because, under same pk, the `Insert` and `UpdateInsert` // should always appear before `Delete`. we have already ignored the `UpdateDelete` // which is useless in upsert mode. let db = self.client.database(&ns.0); - if let Some(upsert) = upsert { - futures.push(Self::send_bulk_write_command(db.clone(), upsert)); - } - if let Some(delete) = delete { - futures.push(Self::send_bulk_write_command(db, delete)); - } - } - Ok(futures) - } - - async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { - let result = db.run_command(command, None).await.map_err(|err| { - SinkError::Mongodb(anyhow!(err).context(format!( - "sending bulk write command failed, database: {}", - db.name() - ))) - })?; - - if let Ok(write_errors) = result.get_array("writeErrors") { - return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with write errors: {:?}", - write_errors, - ))); - } - - let n = result.get_i32("n").map_err(|err| { - SinkError::Mongodb( - anyhow!(err).context("can't extract field n from bulk write response"), - ) - })?; - if n < 1 { - return Err(SinkError::Mongodb(anyhow!( - "bulk write respond with an abnormal state, n = {}", - n - ))); - } - - Ok(()) + send_bulk_write_commands(db, upsert, delete) + }); + try_join_all(futures) } }