Skip to content

Commit

Permalink
[ADDED] Config options to disable MQTT QOS2 support (#4705)
Browse files Browse the repository at this point in the history
Added MQTT config options `reject_qos2_publish` and
`downgrade_qos2_subscriptions`. When used together, fully disable QOS2
support (effectively going back to v2.9 level of QOS support).

---------

Signed-off-by: Lev Brouk <[email protected]>
  • Loading branch information
levb authored Oct 25, 2023
1 parent 9d3208b commit 3f7ad66
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 2 deletions.
32 changes: 31 additions & 1 deletion server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
mqttConnAckRCServerUnavailable = byte(0x3)
mqttConnAckRCBadUserOrPassword = byte(0x4)
mqttConnAckRCNotAuthorized = byte(0x5)
mqttConnAckRCQoS2WillRejected = byte(0x10)

// Maximum payload size of a control packet
mqttMaxPayloadSize = 0xFFFFFFF
Expand Down Expand Up @@ -345,6 +346,14 @@ type mqtt struct {
asm *mqttAccountSessionManager // quick reference to account session manager, immutable after processConnect()
sess *mqttSession // quick reference to session, immutable after processConnect()
cid string // client ID

// rejectQoS2Pub tells the MQTT client to not accept QoS2 PUBLISH, instead
// error and terminate the connection.
rejectQoS2Pub bool

// downgradeQOS2Sub tells the MQTT client to downgrade QoS2 SUBSCRIBE
// requests to QoS1.
downgradeQoS2Sub bool
}

type mqttPending struct {
Expand Down Expand Up @@ -475,7 +484,11 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {
}
now := time.Now()

c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: &mqtt{}, ws: ws}
mqtt := &mqtt{
rejectQoS2Pub: opts.MQTT.rejectQoS2Pub,
downgradeQoS2Sub: opts.MQTT.downgradeQoS2Sub,
}
c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: mqtt, ws: ws}
c.headers = true
c.mqtt.pp = &mqttPublish{}
// MQTT clients don't send NATS CONNECT protocols. So make it an "echo"
Expand Down Expand Up @@ -2226,6 +2239,10 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client,
if f.qos > 2 {
f.qos = 2
}
if c.mqtt.downgradeQoS2Sub && f.qos == 2 {
c.Warnf("Downgrading subscription QoS2 to QoS1 for %q, as configured", f.filter)
f.qos = 1
}
subject := f.filter
sid := subject

Expand Down Expand Up @@ -2329,6 +2346,10 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi
if qos > sub.mqtt.qos {
qos = sub.mqtt.qos
}
if c.mqtt.rejectQoS2Pub && qos == 2 {
c.Warnf("Rejecting retained message with QoS2 for subscription %q, as configured", sub.subject)
continue
}
if qos > 0 {
pi = sess.trackPublishRetained()

Expand Down Expand Up @@ -3001,6 +3022,10 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int, hasMappings bool) (byte
hasWill = true
}

if c.mqtt.rejectQoS2Pub && hasWill && wqos == 2 {
return mqttConnAckRCQoS2WillRejected, nil, fmt.Errorf("server does not accept QoS2 for Will messages")
}

// Spec [MQTT-3.1.2-19]
hasUser := cp.flags&mqttConnFlagUsernameFlag != 0
// Spec [MQTT-3.1.2-21]
Expand Down Expand Up @@ -3385,6 +3410,11 @@ func (c *client) mqttParsePub(r *mqttReader, pl int, pp *mqttPublish, hasMapping
if qos > 2 {
return fmt.Errorf("QoS=%v is invalid in MQTT", qos)
}

if c.mqtt.rejectQoS2Pub && qos == 2 {
return fmt.Errorf("QoS=2 is disabled for PUBLISH messages")
}

// Keep track of where we are when starting to read the variable header
start := r.pos

Expand Down
126 changes: 125 additions & 1 deletion server/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,28 @@ func TestMQTTParseOptions(t *testing.T) {
}
return nil
}, ""},
{"reject_qos2_publish",
`
mqtt {
reject_qos2_publish: true
}
`, func(o *MQTTOpts) error {
if !o.rejectQoS2Pub {
return fmt.Errorf("Invalid: expected rejectQoS2Pub to be set")
}
return nil
}, ""},
{"downgrade_qos2_subscribe",
`
mqtt {
downgrade_qos2_subscribe: true
}
`, func(o *MQTTOpts) error {
if !o.downgradeQoS2Sub {
return fmt.Errorf("Invalid: expected downgradeQoS2Sub to be set")
}
return nil
}, ""},
} {
t.Run(test.name, func(t *testing.T) {
conf := createConfFile(t, []byte(test.content))
Expand Down Expand Up @@ -1944,6 +1966,27 @@ func TestMQTTSubAck(t *testing.T) {
testMQTTSub(t, 1, mc, r, subs, expected)
}

func TestMQTTQoS2SubDowngrade(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.downgradeQoS2Sub = true
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mc, r := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCConnectionAccepted, false)

subs := []*mqttFilter{
{filter: "bar", qos: 1},
{filter: "baz", qos: 2},
}
expected := []byte{
1,
1,
}
testMQTTSub(t, 1, mc, r, subs, expected)
}

func testMQTTFlush(t testing.TB, c net.Conn, bw *bufio.Writer, r *mqttReader) {
t.Helper()
w := &mqttWriter{}
Expand Down Expand Up @@ -2176,6 +2219,25 @@ func TestMQTTPublish(t *testing.T) {
testMQTTPublish(t, mcp, mpr, 2, false, false, "foo", 2, []byte("msg"))
}

func TestMQTTQoS2PubReject(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.rejectQoS2Pub = true
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

nc := natsConnect(t, s.ClientURL())
defer nc.Close()

mcp, mpr := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mcp.Close()
testMQTTCheckConnAck(t, mpr, mqttConnAckRCConnectionAccepted, false)

testMQTTPublish(t, mcp, mpr, 1, false, false, "foo", 1, []byte("msg"))

testMQTTPublishNoAcks(t, mcp, 2, false, false, "foo", 2, []byte("msg"))
testMQTTExpectDisconnect(t, mcp)
}

func TestMQTTSub(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
Expand Down Expand Up @@ -3830,14 +3892,15 @@ func TestMQTTWill(t *testing.T) {
}{
{"will qos 0", true, 0},
{"will qos 1", true, 1},
{"will qos 2", true, 2},
{"proper disconnect no will", false, 0},
} {
t.Run(test.name, func(t *testing.T) {
mcs, rs := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mcs.Close()
testMQTTCheckConnAck(t, rs, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "will/#", qos: 1}}, []byte{1})
testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "will/#", qos: 2}}, []byte{2})
testMQTTFlush(t, mcs, nil, rs)

mc, r := testMQTTConnect(t,
Expand Down Expand Up @@ -3871,6 +3934,32 @@ func TestMQTTWill(t *testing.T) {
}
}

func TestMQTTQoS2WillReject(t *testing.T) {
o := testMQTTDefaultOptions()
o.MQTT.rejectQoS2Pub = true
s := testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mcs, rs := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mcs.Close()
testMQTTCheckConnAck(t, rs, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mcs, rs, []*mqttFilter{{filter: "will/#", qos: 2}}, []byte{2})
testMQTTFlush(t, mcs, nil, rs)

mc, r := testMQTTConnect(t,
&mqttConnInfo{
cleanSess: true,
will: &mqttWill{
topic: []byte("will/topic"),
message: []byte("bye"),
qos: 2,
},
}, o.MQTT.Host, o.MQTT.Port)
defer mc.Close()
testMQTTCheckConnAck(t, r, mqttConnAckRCQoS2WillRejected, false)
}

func TestMQTTWillRetain(t *testing.T) {
for _, test := range []struct {
name string
Expand Down Expand Up @@ -4103,6 +4192,41 @@ func TestMQTTPublishRetain(t *testing.T) {
}
}

func TestMQTTQoS2RetainedReject(t *testing.T) {
// Start the server with QOS2 enabled, and submit retained messages with QoS
// 1 and 2.
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
mc1, rs1 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
testMQTTCheckConnAck(t, rs1, mqttConnAckRCConnectionAccepted, false)
testMQTTPublish(t, mc1, rs1, 2, false, true, "foo", 1, []byte("qos2 failed"))
testMQTTPublish(t, mc1, rs1, 1, false, true, "bar", 2, []byte("qos1 retained"))
testMQTTFlush(t, mc1, nil, rs1)
testMQTTDisconnect(t, mc1, nil)
mc1.Close()
s.Shutdown()

// Restart the server with QOS2 disabled; we should be using the same
// JetStream store, so the retained message should still be there.
o.MQTT.rejectQoS2Pub = true
s = testMQTTRunServer(t, o)
defer testMQTTShutdownServer(s)

mc2, rs2 := testMQTTConnect(t, &mqttConnInfo{cleanSess: true}, o.MQTT.Host, o.MQTT.Port)
defer mc2.Close()
testMQTTCheckConnAck(t, rs2, mqttConnAckRCConnectionAccepted, false)

testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "bar/#", qos: 2}}, []byte{2})
pflags, _ := testMQTTGetPubMsg(t, mc2, rs2, "bar", []byte("qos1 retained"))
if !mqttIsRetained(pflags) {
t.Fatalf("retain flag should have been set, it was not: flags=%v", pflags)
}

testMQTTSub(t, 1, mc2, rs2, []*mqttFilter{{filter: "foo/#", qos: 2}}, []byte{2})
testMQTTExpectNothing(t, rs2)
testMQTTDisconnect(t, mc2, nil)
}

func TestMQTTRetainFlag(t *testing.T) {
o := testMQTTDefaultOptions()
s := testMQTTRunServer(t, o)
Expand Down
13 changes: 13 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,14 @@ type MQTTOpts struct {

// Snapshot of configured TLS options.
tlsConfigOpts *TLSConfigOpts

// rejectQoS2Pub tells the MQTT client to not accept QoS2 PUBLISH, instead
// error and terminate the connection.
rejectQoS2Pub bool

// downgradeQOS2Sub tells the MQTT client to downgrade QoS2 SUBSCRIBE
// requests to QoS1.
downgradeQoS2Sub bool
}

type netResolver interface {
Expand Down Expand Up @@ -4631,6 +4639,11 @@ func parseMQTT(v interface{}, o *Options, errors *[]error, warnings *[]error) er
case "consumer_inactive_threshold", "consumer_auto_cleanup":
o.MQTT.ConsumerInactiveThreshold = parseDuration("consumer_inactive_threshold", tk, mv, errors, warnings)

case "reject_qos2_publish":
o.MQTT.rejectQoS2Pub = mv.(bool)
case "downgrade_qos2_subscribe":
o.MQTT.downgradeQoS2Sub = mv.(bool)

default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down

0 comments on commit 3f7ad66

Please sign in to comment.