Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding wildcards and LWT to mqtt.pm #809

Merged
merged 1 commit into from
Dec 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 69 additions & 34 deletions lib/mqtt.pm
Original file line number Diff line number Diff line change
Expand Up @@ -222,29 +222,29 @@ sub mqtt_connect() {
# Can't use this at this time
# $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance);

if ( !defined($socket) ) {
if ($$self{recon_timer}->inactive) {
if ( !defined($socket) ) {
if ( $$self{recon_timer}->inactive ) {
::print_log("*** mqtt connection for $$self{instance} failed, I will try to reconnect in 20 seconds");
my $inst = $$self{instance};
$$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
$$self{recon_timer}->set( 20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() } );
return;
}
}


}

$self->{socket} = $socket;
$self->{got_ping_response} = 1;
$self->{next_ping} = $self->{keep_alive_timer};

# --------------------------------------------------------------------------
### 2) Send MQTT_CONNECT
# 20-12-2020 added registration of mqtt Last Will and Testament if defined in mh.ini
$self->send_mqtt_msg(
message_type => MQTT_CONNECT,
keep_alive_timer => $self->{keep_alive_timer},
,
user_name => $self->{user_name},
password => $self->{password}
user_name => $self->{user_name},
password => $self->{password},
will_topic => $::config_parms{mqtt_LWT_topic},
will_message => $::config_parms{mqtt_LWT_payload}
);

### 3) Check for ACK or fail
Expand Down Expand Up @@ -297,7 +297,7 @@ sub mqtt_connect() {

sub isConnected {
my ($self) = @_;
unless( defined($$self{socket}) ) { return 0 }
unless ( defined( $$self{socket} ) ) { return 0 }
return $$self{socket}->connected;
}

Expand All @@ -308,7 +308,7 @@ sub isConnected {

sub isNotConnected {
my ($self) = @_;
unless( defined($$self{socket}) ) { return 1 }
unless ( defined( $$self{socket} ) ) { return 1 }
return !$$self{socket}->connected;
}

Expand Down Expand Up @@ -379,13 +379,15 @@ sub new {

@{ $$self{command_stack} } = ();

$$self{instance} = $instance;
$$self{instance} = $instance;
$$self{recon_timer} = ::Timer::new();
$$self{host} = "$host" || "127.0.0.1";
$$self{port} = $port || 1883;
$$self{host} = "$host" || "127.0.0.1";
$$self{port} = $port || 1883;

# Use the wildcard here, not in the mqtt_Item
$$self{topic} = "$topic" || "home/ha/#";
#$$self{topic} = "$topic" || "home/ha/#";
# 20-12-2020 edit to enable MH to monitor all mqtt topics especially for wildcards e.g. LWT
$$self{topic} = "$topic" || "#";

# Currently not used
$$self{user_name} = "$user" || "";
Expand Down Expand Up @@ -416,9 +418,9 @@ sub new {
### ------------------------------------------------------------------------
$self->mqtt_connect();

unless ($self) {
&main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n");
return;
unless ($self) {
&main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n");
return;
}

# Hey what happens when we fail ?
Expand Down Expand Up @@ -462,10 +464,10 @@ sub check_for_data {
### @FIXME: failed connection
if ( 'off' ne $self->{state} ) {

if ($$self{recon_timer}->inactive) {
::print_log("*** mqtt $inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds");
$$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
}
if ( $$self{recon_timer}->inactive ) {
::print_log("*** mqtt $inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds");
$$self{recon_timer}->set( 20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() } );
}

# check the state to see if it's off already

Expand Down Expand Up @@ -625,12 +627,12 @@ sub read_mqtt_msg {

# We get no bytes if there is an error or the socket has closed
unless ($bytes) {
my $inst = $$self{instance};
if ($$self{recon_timer}->inactive) {
::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) );
::print_log( "*** mqtt instance $$self{instance} will try to reconnect in 20 seconds");
$$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() });
}
my $inst = $$self{instance};
if ( $$self{recon_timer}->inactive ) {
::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) );
::print_log("*** mqtt instance $$self{instance} will try to reconnect in 20 seconds");
$$self{recon_timer}->set( 20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() } );
}

# Not a permanent solution just a way to keep debugging
#&main::print_log( "*** mqtt deleting $$self{instance}\n" . Dumper( \$self ) )
Expand Down Expand Up @@ -694,7 +696,7 @@ sub set {
if ( $main::Debug{mqtt} ) {
my $xStr = defined($msg) ? "($msg)" : "undefined message";
$xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by, Obj: ";
$xStr .= defined($$self{object_name}) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value
$xStr .= defined( $$self{object_name} ) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value

&main::print_log("*** mqtt mqtt set $$self{instance}: [$xStr]");
&main::print_log(
Expand Down Expand Up @@ -876,13 +878,46 @@ sub remove_item {
sub parse_data_to_obj {
my ( $self, $msg, $p_setby ) = @_;

# 20-12-2020 added support for wildcard mqtt devices e.g. in items.mht
# MQTT_DEVICE, MQTT_test_wildcard, , mqtt_1, tele/+/LWT
# or for a multilevel wildcard
# MQTT_DEVICE, MQTT_test_multi_wildcard, , mqtt_1, tele/#
# NOTE, use of multi level wildcards can consume a lot of CPU
# it also exits the loop if it finds a match for speed when there is a large number of mqtt devices

my ( @split_incoming, @split_device, $counter, $device_topic, $message_topic );
#
$message_topic = "$$msg{topic}";
for my $obj ( @{ $$self{objects} } ) {
if ( "$$obj{topic}" eq "$$msg{topic}" || "$$obj{topic}" eq "$$msg{topic}/set" ) {
$device_topic = $obj->{topic};

# check if this mqtt device is a wildcard, and if so replace the wildcard characters
# with the incoming message topic pieces
if ( index( $device_topic, "\+" ) > 0
|| index( $device_topic, "\#" ) > 0 )
{
@split_incoming = split( "/", $$msg{topic} );
@split_device = split( "/", $device_topic );
$counter = 0;
foreach (@split_device) {
if ( $split_device[$counter] eq "+" ) {
$device_topic =~ s/\+/$split_incoming[$counter]/;
}
if ( $split_device[$counter] eq "#" ) {
$device_topic = substr( $device_topic, 0, index( $device_topic, "#" ) ) . substr( $$msg{topic}, index( $device_topic, "#" ) );
last;
}
$counter++;
}
}

# the edited device topic is now ready to compare with the incoming message topic
if ( $device_topic eq $message_topic ) {
$obj->set( $$msg{message}, $self, );
}
else {
&main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})") if ( $main::Debug{mqtt} );
$obj->{set_by_topic} = $message_topic;

# one we have a match, no need to examine any more devices
last;
}
}

Expand Down Expand Up @@ -951,7 +986,7 @@ sub new {
$$self{topic} = $topic;
$$self{message} = '';
$$self{retain} = $retain || 0;
$$self{QOS} = $qos || 0;
$$self{QOS} = $qos || 0;

$$self{instance}->add($self);

Expand Down