Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 26, 2024
1 parent 6a5989f commit b8af1c8
Showing 1 changed file with 83 additions and 71 deletions.
154 changes: 83 additions & 71 deletions src/connector/src/sink/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::future::Future;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::LazyLock;

use anyhow::anyhow;
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::future::{try_join_all, TryJoinAll};
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use itertools::Itertools;
use mongodb::bson::{bson, doc, Array, Bson, Document};
use mongodb::{Client, Database, Namespace};
use mongodb::{Client, Namespace};
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::log::LogSuppresser;
Expand Down Expand Up @@ -300,13 +298,70 @@ impl Sink for MongodbSink {
.into_log_sinker(MONGODB_SEND_FUTURE_BUFFER_MAX_SIZE))
}
}
mod send_bulk_write_command_future {
use core::future::Future;

use anyhow::anyhow;
use mongodb::bson::Document;
use mongodb::Database;

use crate::sink::{Result, SinkError};

pub(super) type SendBulkWriteCommandFuture = impl Future<Output = Result<()>> + 'static;

pub(super) fn send_bulk_write_commands(
db: Database,
upsert: Option<Document>,
delete: Option<Document>,
) -> SendBulkWriteCommandFuture {
async move {
if let Some(upsert) = upsert {
send_bulk_write_command(db.clone(), upsert).await?;
}
if let Some(delete) = delete {
send_bulk_write_command(db, delete).await?;
}
Ok(())
}
}

async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> {
let result = db.run_command(command, None).await.map_err(|err| {
SinkError::Mongodb(anyhow!(err).context(format!(
"sending bulk write command failed, database: {}",
db.name()
)))
})?;

if let Ok(write_errors) = result.get_array("writeErrors") {
return Err(SinkError::Mongodb(anyhow!(
"bulk write respond with write errors: {:?}",
write_errors,
)));
}

let n = result.get_i32("n").map_err(|err| {
SinkError::Mongodb(
anyhow!(err).context("can't extract field n from bulk write response"),
)
})?;
if n < 1 {
return Err(SinkError::Mongodb(anyhow!(
"bulk write respond with an abnormal state, n = {}",
n
)));
}

Ok(())
}
}

use send_bulk_write_command_future::*;

pub struct MongodbSinkWriter {
pub config: MongodbConfig,
payload_writer: MongodbPayloadWriter,
is_append_only: bool,
// // TODO switching to bulk write API when mongodb driver supports it
// command_builder: CommandBuilder,
}

impl MongodbSinkWriter {
Expand Down Expand Up @@ -351,12 +406,6 @@ impl MongodbSinkWriter {

let row_encoder = BsonEncoder::new(schema.clone(), Some(col_indices), pk_indices.clone());

// let command_builder = if is_append_only {
// CommandBuilder::AppendOnly(HashMap::new())
// } else {
// CommandBuilder::Upsert(HashMap::new())
// };

let payload_writer = MongodbPayloadWriter::new(
schema,
pk_indices,
Expand All @@ -373,7 +422,7 @@ impl MongodbSinkWriter {
})
}

fn append(&mut self, chunk: StreamChunk) -> Result<Vec<impl Future<Output = Result<()>>>> {
fn append(&mut self, chunk: StreamChunk) -> Result<TryJoinAll<SendBulkWriteCommandFuture>> {
let mut insert_builder: HashMap<MongodbNamespace, InsertCommandBuilder> = HashMap::new();
for (op, row) in chunk.rows() {
if op != Op::Insert {
Expand All @@ -389,10 +438,10 @@ impl MongodbSinkWriter {
}
self.payload_writer.append(&mut insert_builder, row)?;
}
self.payload_writer.flush_insert(&mut insert_builder)
Ok(self.payload_writer.flush_insert(insert_builder))
}

fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<impl Future<Output = Result<()>>>> {
fn upsert(&mut self, chunk: StreamChunk) -> Result<TryJoinAll<SendBulkWriteCommandFuture>> {
let mut upsert_builder: HashMap<MongodbNamespace, UpsertCommandBuilder> = HashMap::new();
for (op, row) in chunk.rows() {
if op == Op::UpdateDelete {
Expand All @@ -401,7 +450,7 @@ impl MongodbSinkWriter {
}
self.payload_writer.upsert(&mut upsert_builder, op, row)?;
}
self.payload_writer.flush_upsert(&mut upsert_builder)
Ok(self.payload_writer.flush_upsert(upsert_builder))
}
}

Expand All @@ -415,14 +464,14 @@ impl AsyncTruncateSinkWriter for MongodbSinkWriter {
chunk: StreamChunk,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
let boxed_futures = if self.is_append_only {
let futures = self.append(chunk)?;
try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed()
let futures = if self.is_append_only {
self.append(chunk)?
} else {
let futures = self.upsert(chunk)?;
try_join_all(futures).map_ok(|_: Vec<()>| ()).boxed()
self.upsert(chunk)?
};
add_future.add_future_may_await(boxed_futures).await?;
add_future
.add_future_may_await(futures.map_ok(|_: Vec<()>| ()))
.await?;
Ok(())
}
}
Expand Down Expand Up @@ -648,70 +697,33 @@ impl MongodbPayloadWriter {

fn flush_insert(
&self,
insert_builder: &mut HashMap<MongodbNamespace, InsertCommandBuilder>,
) -> Result<Vec<impl futures::Future<Output = std::result::Result<(), SinkError>>>> {
insert_builder: HashMap<MongodbNamespace, InsertCommandBuilder>,
) -> TryJoinAll<SendBulkWriteCommandFuture> {
// TODO try sending bulk-write of each collection concurrently to improve the performance when
// `dynamic collection` is enabled. We may need to provide best practice to guide user on setting
// the MongoDB driver's connection properties.
let mut futures = Vec::with_capacity(insert_builder.len());
for (ns, builder) in insert_builder.drain() {
let futures = insert_builder.into_iter().map(|(ns, builder)| {
let db = self.client.database(&ns.0);
futures.push(Self::send_bulk_write_command(db, builder.build()));
}
Ok(futures)
send_bulk_write_commands(db, Some(builder.build()), None)
});
try_join_all(futures)
}

fn flush_upsert(
&self,
upsert_builder: &mut HashMap<MongodbNamespace, UpsertCommandBuilder>,
) -> Result<Vec<impl futures::Future<Output = std::result::Result<(), SinkError>>>> {
upsert_builder: HashMap<MongodbNamespace, UpsertCommandBuilder>,
) -> TryJoinAll<SendBulkWriteCommandFuture> {
// TODO try sending bulk-write of each collection concurrently to improve the performance when
// `dynamic collection` is enabled. We may need to provide best practice to guide user on setting
// the MongoDB driver's connection properties.
let mut futures = Vec::with_capacity(upsert_builder.len());
for (ns, builder) in upsert_builder.drain() {
let futures = upsert_builder.into_iter().map(|(ns, builder)| {
let (upsert, delete) = builder.build();
// we are sending the bulk upsert first because, under same pk, the `Insert` and `UpdateInsert`
// should always appear before `Delete`. we have already ignored the `UpdateDelete`
// which is useless in upsert mode.
let db = self.client.database(&ns.0);
if let Some(upsert) = upsert {
futures.push(Self::send_bulk_write_command(db.clone(), upsert));
}
if let Some(delete) = delete {
futures.push(Self::send_bulk_write_command(db, delete));
}
}
Ok(futures)
}

async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> {
let result = db.run_command(command, None).await.map_err(|err| {
SinkError::Mongodb(anyhow!(err).context(format!(
"sending bulk write command failed, database: {}",
db.name()
)))
})?;

if let Ok(write_errors) = result.get_array("writeErrors") {
return Err(SinkError::Mongodb(anyhow!(
"bulk write respond with write errors: {:?}",
write_errors,
)));
}

let n = result.get_i32("n").map_err(|err| {
SinkError::Mongodb(
anyhow!(err).context("can't extract field n from bulk write response"),
)
})?;
if n < 1 {
return Err(SinkError::Mongodb(anyhow!(
"bulk write respond with an abnormal state, n = {}",
n
)));
}

Ok(())
send_bulk_write_commands(db, upsert, delete)
});
try_join_all(futures)
}
}

0 comments on commit b8af1c8

Please sign in to comment.