Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): Add topic to mqtt additional columns #19017

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions e2e_test/sink/mqtt_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,46 @@ WITH
force_append_only='true',
);

# First the (retained) topics are primed, so that they will be listened
# to when the mqtt source initializes. Otherwise it would take 30 seconds
# for the next enumerator tick

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 56.0 );

statement ok
FLUSH;

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 20.0 );

statement ok
FLUSH;

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('12', '/nested/12'), 56.0 );

statement ok
FLUSH;

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('13', null), 22.0 );

statement ok
CREATE TABLE mqtt_source
(
device_id varchar,
temperature double
)
INCLUDE partition AS mqtt_topic
WITH (
connector='mqtt',
url='tcp://mqtt-server',
topic= '/device/+',
connector ='mqtt',
url ='tcp://mqtt-server',
topic = '/device/+',
qos = 'at_least_once',
) FORMAT PLAIN ENCODE JSON;

Expand All @@ -75,29 +104,23 @@ WITH (
) FORMAT PLAIN ENCODE JSON;


statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 56.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '12', 59.0 );

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 20.0 );
FLUSH;

statement ok
INSERT INTO mqtt (device_id, temperature)
VALUES ( '13', 22.0 );

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('12', '/nested/12'), 56.0 );
FLUSH;

statement ok
INSERT INTO mqtt_nested (info, temperature)
VALUES( ROW('13', null), 22.0 );
VALUES( ROW('12', '/nested/12'), 56.0 );

statement ok
FLUSH;
Expand All @@ -112,6 +135,14 @@ SELECT device_id, temperature FROM mqtt ORDER BY device_id, temperature;
13 20
13 22

query ITT rowsort
SELECT device_id, temperature, mqtt_topic FROM mqtt_source ORDER BY device_id, temperature;
----
12 56 /device/12
12 59 /device/12
13 20 /device/13
13 22 /device/13

query IT rowsort
SELECT (info).device_id device_id, temperature from mqtt_nested_source ORDER BY device_id, temperature ;
----
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/mqtt/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE
CREATE TABLE mqtt_source_table
(
id integer,
name varchar,
name varchar
)
WITH (
connector='mqtt',
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/parser/additional_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use risingwave_pb::plan_common::{
use crate::error::ConnectorResult;
use crate::source::cdc::MONGODB_CDC_CONNECTOR;
use crate::source::{
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR,
OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, MQTT_CONNECTOR,
NATS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
};

// Hidden additional columns connectors which do not support `include` syntax.
Expand Down Expand Up @@ -87,6 +87,7 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock<HashMap<&'static str, HashSet
"collection_name",
]),
),
(MQTT_CONNECTOR, HashSet::from(["offset", "partition"])),
])
});

Expand Down
22 changes: 16 additions & 6 deletions src/connector/src/source/mqtt/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use async_trait::async_trait;
use risingwave_common::bail;
use rumqttc::v5::{ConnectionError, Event, Incoming};
use rumqttc::Outgoing;
use thiserror_ext::AsReport;
use tokio::sync::RwLock;

use super::source::MqttSplit;
use super::{MqttError, MqttProperties};
use crate::error::{ConnectorError, ConnectorResult};
use super::MqttProperties;
use crate::error::ConnectorResult;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

pub struct MqttSplitEnumerator {
#[expect(dead_code)]
topic: String,
#[expect(dead_code)]
client: rumqttc::v5::AsyncClient,
Expand Down Expand Up @@ -117,10 +119,18 @@ impl SplitEnumerator for MqttSplitEnumerator {

async fn list_splits(&mut self) -> ConnectorResult<Vec<MqttSplit>> {
if !self.connected.load(std::sync::atomic::Ordering::Relaxed) {
return Err(ConnectorError::from(MqttError(format!(
"Failed to connect to MQTT broker for topic {}",
self.topic
))));
let start = std::time::Instant::now();
loop {
if self.connected.load(std::sync::atomic::Ordering::Relaxed) {
break;
}

if start.elapsed().as_secs() > 10 {
bail!("Failed to connect to mqtt broker");
}

tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Comment on lines +123 to +133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the comments #18961 (comment), I still have some concern about the wait time. Would you be able to move the logic in the tokio thread into this func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand exactly what you're suggesting, but I'd say it's one of:

  1. Move the logic inside the tokio thread to the list_splits, spawn the thread in new
  2. During the call to list_splits spawn a thread and then stop that thread when the topics have come in
  3. During the call to list_splits spawn a new thread to listen to the incoming mqtt topics
  4. During the call to list_splits spawn a new thread if one isn't already running to listen to the incoming mqtt topics

Option 3 would spawn a new thread on each enumerator tick, which doesn't seem desirable to me
Option 4 is functionally identical to what we have just with added state to track the existence of the thread; I don't really see what would be gained
Option 2 would risk missing topics that come in after list_splits returns and come in before the next tick
Option 1 would require some inter thread communication in order to poll the mqtt eventloop inside the spawned thread whilst having the logic in the list_splits. That seems to make it more complicated than desired

Please indicate if I've misunderstood

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 2 would risk missing topics that come in after list_splits returns and come in before the next tick

Oh, I got it. Mqtt does not have persisted metadata so RisingWave needs a thread running in the background. The fix makes sense.

}

let topics = self.topics.read().await;
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/mqtt/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl SplitReader for MqttSplitReader {
) -> Result<Self> {
let (client, eventloop) = properties
.common
.build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)?;
.build_client(source_ctx.actor_id, source_ctx.fragment_id as u64)
.inspect_err(|e| tracing::error!("Failed to build mqtt client: {}", e.as_report()))?;

let qos = properties.common.qos();

Expand Down
Loading