Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Config options to disable MQTT QOS2 support #4705

Merged
merged 3 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ type mqtt struct {
asm *mqttAccountSessionManager // quick reference to account session manager, immutable after processConnect()
sess *mqttSession // quick reference to session, immutable after processConnect()
cid string // client ID

// rejectQoS2Pub tells the MQTT client to not accept QoS2 PUBLISH, instead
// error and terminate the connection.
rejectQoS2Pub bool

// downgradeQOS2Sub tells the MQTT client to downgrade QoS2 SUBSCRIBE
// requests to QoS1.
downgradeQoS2Sub bool
}

type mqttPending struct {
Expand Down Expand Up @@ -475,7 +483,11 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {
}
now := time.Now()

c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: &mqtt{}, ws: ws}
mqtt := &mqtt{
rejectQoS2Pub: opts.MQTT.rejectQoS2Pub,
downgradeQoS2Sub: opts.MQTT.downgradeQoS2Sub,
}
c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: mqtt, ws: ws}
c.headers = true
c.mqtt.pp = &mqttPublish{}
// MQTT clients don't send NATS CONNECT protocols. So make it an "echo"
Expand Down Expand Up @@ -2226,6 +2238,10 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
if f.qos > 2 {
f.qos = 2
}
if c.mqtt.downgradeQoS2Sub && f.qos == 2 {
c.Warnf("Downgrading subscription QoS2 to QoS1 for %q, as configured", f.filter)
f.qos = 1
}
subject := f.filter
sid := subject

Expand Down Expand Up @@ -2329,6 +2345,10 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi
if qos > sub.mqtt.qos {
qos = sub.mqtt.qos
}
if c.mqtt.rejectQoS2Pub && qos == 2 {
c.Warnf("Rejecting retained message with QoS2 for subscription %q, as configured", sub.subject)
continue
}
if qos > 0 {
pi = sess.trackPublishRetained()

Expand Down Expand Up @@ -3001,6 +3021,10 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int, hasMappings bool) (byte
hasWill = true
}

if c.mqtt.rejectQoS2Pub && hasWill && wqos == 2 {
return 0, nil, fmt.Errorf("server does not accept QoS2 for Will messages")
}

// Spec [MQTT-3.1.2-19]
hasUser := cp.flags&mqttConnFlagUsernameFlag != 0
// Spec [MQTT-3.1.2-21]
Expand Down Expand Up @@ -3385,6 +3409,11 @@ func (c *client) mqttParsePub(r *mqttReader, pl int, pp *mqttPublish, hasMapping
if qos > 2 {
return fmt.Errorf("QoS=%v is invalid in MQTT", qos)
}

if c.mqtt.rejectQoS2Pub && qos == 2 {
return fmt.Errorf("QoS=2 is disabled for PUBLISH messages")
}

// Keep track of where we are when starting to read the variable header
start := r.pos

Expand Down
65 changes: 65 additions & 0 deletions server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,28 @@ func TestMQTTParseOptions(t *testing.T) {
}
return nil
}, ""},
{"reject_qos2_publish",
`
mqtt {
reject_qos2_publish: true
}
`, func(o *MQTTOpts) error {
if !o.rejectQoS2Pub {
return fmt.Errorf("Invalid: expected rejectQoS2Pub to be set")
}
return nil
}, ""},
{"downgrade_qos2_subscribe",
`
mqtt {
downgrade_qos2_subscribe: true
}
`, func(o *MQTTOpts) error {
if !o.downgradeQoS2Sub {
return fmt.Errorf("Invalid: expected downgradeQoS2Sub to be set")
}
return nil
}, ""},
} {
t.Run(test.name, func(t *testing.T) {
conf := createConfFile(t, []byte(test.content))
Expand Down Expand Up @@ -1928,6 +1950,27 @@ func TestMQTTSubAck(t *testing.T) {
testMQTTSub(t, 1, mc, r, subs, expected)
}

func TestMQTTQoS2SubDowngrade(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.downgradeQoS2Sub = true
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

subs := []*mqttFilter{
{filter: "bar", qos: 1},
{filter: "baz", qos: 2},
}
expected := []byte{
1,
1,
}
testMQTTSub(t, 1, mc, r, subs, expected)
}

func testMQTTFlush(t testing.TB, c net.Conn, bw *bufio.Writer, r *mqttReader) {
t.Helper()
w := &mqttWriter{}
Expand Down Expand Up @@ -2160,6 +2203,28 @@ func TestMQTTPublish(t *testing.T) {
testMQTTPublish(t, mcp, mpr, 2, false, false, "foo", 2, []byte("msg"))
}

func TestMQTTQoS2PubReject(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.rejectQoS2Pub = true
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

nc := natsConnect(t, s.ClientURL())
defer nc.Close()

mcp, mpr := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mcp.Close()
testMQTTCheckConnAck(t, mpr, mqttConnAckRCConnectionAccepted, false)

testMQTTPublish(t, mcp, mpr, 1, false, false, "foo", 1, []byte("msg"))

testMQTTPublishNoAcks(t, mcp, 2, false, false, "foo", 2, []byte("msg"))
_, err := mpr.readByte("failed attempt")
levb marked this conversation as resolved.
Show resolved Hide resolved
if err == nil || !strings.Contains(err.Error(), "error reading failed attempt: EOF") {
t.Fatalf("Expected error about QoS 2 publish rejected, got %v", err)
}
}

levb marked this conversation as resolved.
Show resolved Hide resolved
func TestMQTTSub(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
Expand Down
13 changes: 13 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,14 @@ type MQTTOpts struct {

// Snapshot of configured TLS options.
tlsConfigOpts *TLSConfigOpts

// rejectQoS2Pub tells the MQTT client to not accept QoS2 PUBLISH, instead
// error and terminate the connection.
rejectQoS2Pub bool
kozlovic marked this conversation as resolved.
Show resolved Hide resolved

// downgradeQOS2Sub tells the MQTT client to downgrade QoS2 SUBSCRIBE
// requests to QoS1.
downgradeQoS2Sub bool
}

type netResolver interface {
Expand Down Expand Up @@ -4631,6 +4639,11 @@ func parseMQTT(v interface{}, o *Options, errors *[]error, warnings *[]error) er
case "consumer_inactive_threshold", "consumer_auto_cleanup":
o.MQTT.ConsumerInactiveThreshold = parseDuration("consumer_inactive_threshold", tk, mv, errors, warnings)

case "reject_qos2_publish":
o.MQTT.rejectQoS2Pub, _ = mv.(bool)
levb marked this conversation as resolved.
Show resolved Hide resolved
case "downgrade_qos2_subscribe":
o.MQTT.downgradeQoS2Sub, _ = mv.(bool)

default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
Loading