diff --git a/lib/mqtt.pm b/lib/mqtt.pm index 34999a83e..a76a29838 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -222,16 +222,14 @@ sub mqtt_connect() { # Can't use this at this time # $socket = new main::Socket_Item(undef, undef, "$host:$port", $instance); - if ( !defined($socket) ) { - if ($$self{recon_timer}->inactive) { + if ( !defined($socket) ) { + if ( $$self{recon_timer}->inactive ) { ::print_log("*** mqtt connection for $$self{instance} failed, I will try to reconnect in 20 seconds"); my $inst = $$self{instance}; - $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); + $$self{recon_timer}->set( 20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() } ); return; } - } - - + } $self->{socket} = $socket; $self->{got_ping_response} = 1; @@ -239,12 +237,14 @@ sub mqtt_connect() { # -------------------------------------------------------------------------- ### 2) Send MQTT_CONNECT + # 20-12-2020 added registration of mqtt Last Will and Testament if defined in mh.ini $self->send_mqtt_msg( message_type => MQTT_CONNECT, keep_alive_timer => $self->{keep_alive_timer}, - , - user_name => $self->{user_name}, - password => $self->{password} + user_name => $self->{user_name}, + password => $self->{password}, + will_topic => $::config_parms{mqtt_LWT_topic}, + will_message => $::config_parms{mqtt_LWT_payload} ); ### 3) Check for ACK or fail @@ -297,7 +297,7 @@ sub mqtt_connect() { sub isConnected { my ($self) = @_; - unless( defined($$self{socket}) ) { return 0 } + unless ( defined( $$self{socket} ) ) { return 0 } return $$self{socket}->connected; } @@ -308,7 +308,7 @@ sub isConnected { sub isNotConnected { my ($self) = @_; - unless( defined($$self{socket}) ) { return 1 } + unless ( defined( $$self{socket} ) ) { return 1 } return !$$self{socket}->connected; } @@ -379,13 +379,15 @@ sub new { @{ $$self{command_stack} } = (); - $$self{instance} = $instance; + $$self{instance} = $instance; $$self{recon_timer} = ::Timer::new(); - $$self{host} = "$host" || "127.0.0.1"; - $$self{port} = $port || 1883; + $$self{host} = "$host" || "127.0.0.1"; + $$self{port} = $port || 1883; # Use the wildcard here, not in the mqtt_Item - $$self{topic} = "$topic" || "home/ha/#"; + #$$self{topic} = "$topic" || "home/ha/#"; + # 20-12-2020 edit to enable MH to monitor all mqtt topics especially for wildcards e.g. LWT + $$self{topic} = "$topic" || "#"; # Currently not used $$self{user_name} = "$user" || ""; @@ -416,9 +418,9 @@ sub new { ### ------------------------------------------------------------------------ $self->mqtt_connect(); - unless ($self) { - &main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n"); - return; + unless ($self) { + &main::print_log("\n***\n*** Hmm, this is not good!, can't find myself\n***\n"); + return; } # Hey what happens when we fail ? @@ -462,10 +464,10 @@ sub check_for_data { ### @FIXME: failed connection if ( 'off' ne $self->{state} ) { - if ($$self{recon_timer}->inactive) { - ::print_log("*** mqtt $inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds"); - $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); - } + if ( $$self{recon_timer}->inactive ) { + ::print_log("*** mqtt $inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds"); + $$self{recon_timer}->set( 20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() } ); + } # check the state to see if it's off already @@ -625,12 +627,12 @@ sub read_mqtt_msg { # We get no bytes if there is an error or the socket has closed unless ($bytes) { - my $inst = $$self{instance}; - if ($$self{recon_timer}->inactive) { - ::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) ); - ::print_log( "*** mqtt instance $$self{instance} will try to reconnect in 20 seconds"); - $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); - } + my $inst = $$self{instance}; + if ( $$self{recon_timer}->inactive ) { + ::print_log( "*** mqtt $$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) ); + ::print_log("*** mqtt instance $$self{instance} will try to reconnect in 20 seconds"); + $$self{recon_timer}->set( 20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() } ); + } # Not a permanent solution just a way to keep debugging #&main::print_log( "*** mqtt deleting $$self{instance}\n" . Dumper( \$self ) ) @@ -694,7 +696,7 @@ sub set { if ( $main::Debug{mqtt} ) { my $xStr = defined($msg) ? "($msg)" : "undefined message"; $xStr .= defined($set_by) ? ", ($set_by)" : ", undefined set_by, Obj: "; - $xStr .= defined($$self{object_name}) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value + $xStr .= defined( $$self{object_name} ) ? ", $$self{object_name}" : ", undefined object_name"; # @FIXME: Use of uninitialized value &main::print_log("*** mqtt mqtt set $$self{instance}: [$xStr]"); &main::print_log( @@ -876,13 +878,46 @@ sub remove_item { sub parse_data_to_obj { my ( $self, $msg, $p_setby ) = @_; + # 20-12-2020 added support for wildcard mqtt devices e.g. in items.mht + # MQTT_DEVICE, MQTT_test_wildcard, , mqtt_1, tele/+/LWT + # or for a multilevel wildcard + # MQTT_DEVICE, MQTT_test_multi_wildcard, , mqtt_1, tele/# + # NOTE, use of multi level wildcards can consume a lot of CPU + # it also exits the loop if it finds a match for speed when there is a large number of mqtt devices + + my ( @split_incoming, @split_device, $counter, $device_topic, $message_topic ); # + $message_topic = "$$msg{topic}"; for my $obj ( @{ $$self{objects} } ) { - if ( "$$obj{topic}" eq "$$msg{topic}" || "$$obj{topic}" eq "$$msg{topic}/set" ) { + $device_topic = $obj->{topic}; + + # check if this mqtt device is a wildcard, and if so replace the wildcard characters + # with the incoming message topic pieces + if ( index( $device_topic, "\+" ) > 0 + || index( $device_topic, "\#" ) > 0 ) + { + @split_incoming = split( "/", $$msg{topic} ); + @split_device = split( "/", $device_topic ); + $counter = 0; + foreach (@split_device) { + if ( $split_device[$counter] eq "+" ) { + $device_topic =~ s/\+/$split_incoming[$counter]/; + } + if ( $split_device[$counter] eq "#" ) { + $device_topic = substr( $device_topic, 0, index( $device_topic, "#" ) ) . substr( $$msg{topic}, index( $device_topic, "#" ) ); + last; + } + $counter++; + } + } + + # the edited device topic is now ready to compare with the incoming message topic + if ( $device_topic eq $message_topic ) { $obj->set( $$msg{message}, $self, ); - } - else { - &main::print_log ("***mqtt mqtt obj ($$obj{topic}) vs ($$msg{topic})") if ( $main::Debug{mqtt} ); + $obj->{set_by_topic} = $message_topic; + + # one we have a match, no need to examine any more devices + last; } } @@ -951,7 +986,7 @@ sub new { $$self{topic} = $topic; $$self{message} = ''; $$self{retain} = $retain || 0; - $$self{QOS} = $qos || 0; + $$self{QOS} = $qos || 0; $$self{instance}->add($self);