-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connector): Add topic to mqtt additional columns #19017
feat(connector): Add topic to mqtt additional columns #19017
Conversation
a85a7b8
to
a0aed41
Compare
a0aed41
to
31ab0a5
Compare
@@ -87,6 +87,10 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet | |||
"collection_name", | |||
]), | |||
), | |||
( | |||
MQTT_CONNECTOR, | |||
HashSet::from(["topic", "offset", "partition"]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, topic and partition are the same thing in the MQTT connector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I followed the pattern established by the file connectors, which have "file" the same as the "partition" and read the split_id
as the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use partition
instead. For other connectors, split_id is the partition number, no need for another additional column.
src/connector/src/parser/mod.rs
Outdated
return Ok(A::output_for( | ||
self.row_meta | ||
.as_ref() | ||
.map(|ele| ScalarRefImpl::Utf8(ele.split_id)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if subscribing to foo.*
? I think the topic info should come from each message, you can take the kafka source impl as example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The topic is saved in split_id
, just like the "file" additional column is also stored in the split_id
:
https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/source/mqtt/source/reader.rs#L96
https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/source/mqtt/source/message.rs#L41-L47
https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/source/mqtt/source/reader.rs#L97
https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/source/mqtt/source/message.rs#L29-L37
loop { | ||
if self.connected.load(std::sync::atomic::Ordering::Relaxed) { | ||
break; | ||
} | ||
|
||
if start.elapsed().as_secs() > 10 { | ||
bail!("Failed to connect to mqtt broker"); | ||
} | ||
|
||
tokio::time::sleep(std::time::Duration::from_millis(500)).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding the comments #18961 (comment), I still have some concern about the wait time. Would you be able to move the logic in the tokio thread into this func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully understand exactly what you're suggesting, but I'd say it's one of:
- Move the logic inside the tokio thread to the
list_splits
, spawn the thread innew
- During the call to
list_splits
spawn a thread and then stop that thread when the topics have come in - During the call to
list_splits
spawn a new thread to listen to the incoming mqtt topics - During the call to
list_splits
spawn a new thread if one isn't already running to listen to the incoming mqtt topics
Option 3 would spawn a new thread on each enumerator tick, which doesn't seem desirable to me
Option 4 is functionally identical to what we have just with added state to track the existence of the thread; I don't really see what would be gained
Option 2 would risk missing topics that come in after list_splits
returns and come in before the next tick
Option 1 would require some inter thread communication in order to poll
the mqtt eventloop inside the spawned thread whilst having the logic in the list_splits
. That seems to make it more complicated than desired
Please indicate if I've misunderstood
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option 2 would risk missing topics that come in after list_splits returns and come in before the next tick
Oh, I got it. Mqtt does not have persisted metadata so RisingWave needs a thread running in the background. The fix makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reuse partition
for topic in MQTT connector. The concept is hard to align with other connectors.
LGTM, thanks for your contribution.
@tabVersion With Meanwhile foundation-zero/risingwave-docs@8d2b58f is ready for a pull request once this is merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
7604448
to
d3a6caf
Compare
Let's merge |
661939a
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
INCLUDE topic
in yourSELECT