-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sink): support async for mongodb dynamodb #17645
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
src/connector/src/sink/mongodb.rs
Outdated
} else { | ||
CommandBuilder::Upsert(HashMap::new()) | ||
}; | ||
// let command_builder = if is_append_only { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the dead code.
src/connector/src/sink/mongodb.rs
Outdated
} | ||
Ok(()) | ||
self.payload_writer.flush_insert(&mut insert_builder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flush_insert
can change to consume the ownership of insert_builder
. So can flush_upsert
.
src/connector/src/sink/mongodb.rs
Outdated
upsert_builder: &mut HashMap<MongodbNamespace, UpsertCommandBuilder>, | ||
) -> Result<()> { | ||
) -> Result<Vec<impl futures::Future<Output = std::result::Result<(), SinkError>>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change to return try_join_all(...).boxed() here so that we can avoid allocating an unnecessary intermediate vec.
We can collect the future by converting the builder into an iterator of future by code like the following
try_join_all(upsert_builder.into_iter().flat_map(|(ns, builder)| {
let (upsert, delete) = builder.build();
let db = self.client.database(&ns.0);
upsert
.map(|upsert| Self::send_bulk_write_command(db.clone(), upsert))
.into_iter()
.chain(delete.map(|delete| Self::send_bulk_write_command(db, delete)))
}))
So as flush_insert
.
src/connector/src/sink/mongodb.rs
Outdated
async fn send_bulk_write_command(&self, database: &str, command: Document) -> Result<()> { | ||
let db = self.client.database(database); | ||
|
||
async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realize that both flush_upsert
and flush_insert
are returning a vector of the same future generated by this method.
If so, we can make the return type of flush_upsert
and flush_insert
to be the same, and then we don't need to add a boxed to the try_join_all future.
This method can be defined in this way
type SendBulkWriteCommandFuture = impl Future<Output = Result<()>> + 'static;
fn send_bulk_write_command(db: Database, command: Document) -> SendBulkWriteCommandFuture {
async move {
...
}
}
And then both flush_upsert
and flush_insert
can change to return TryJoinAll<SendBulkWriteCommandFuture>
, and in write_chunk
we don't need an extra boxed
to unify the type.
b8af1c8
to
e8333dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM. Thanks for the PR.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
as title and fix #17572
bench mongo db
before async
after async
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
We remove mogodb's option
bulk_write_max_entries
, and dynamodb's optiondefault_max_batch_rows
We Add dynamodb's option
max_batch_item_nums
max_future_send_nums
The
max_batch_item_nums
is the max num of items in abatch_write_item
, which should be >1 and <=25, and we set the default value to 25.https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html
The
max_future_send_nums
is the num of write futures that exist at the same time, he is related to themax parallelism units
set by the user in dynamodb, this value is theoretically equal tomax_parallelism_units /(stream_chunk_size /max_batch_item_nums)
, the default value ofmax parallelism units
is 40000, so the default value of this value should be <360, here we default value is 256We also need to prompt the user to select the appropriate
max parallelism units
for dynamodb, when the throughout of RisingWave writes > themax parallelism units
set by dynamodb, an error will be reportedAfter this pr sink_douple is default for mongodb , dynamodb , redis