From 6ef06f38e1e2e950bd1eae4e3e200a3c2042a2ca Mon Sep 17 00:00:00 2001 From: DaveNeudoerffer Date: Tue, 22 Aug 2023 17:12:40 -0400 Subject: [PATCH] Fixes to mqtt -- logging and message processing --- lib/mqtt.pm | 152 ++++++++++++++++++++---------------------- lib/mqtt_discovery.pm | 1 + lib/mqtt_items.pm | 1 + lib/read_table_A.pl | 1 + 4 files changed, 76 insertions(+), 79 deletions(-) diff --git a/lib/mqtt.pm b/lib/mqtt.pm index 5a4e42b62..03197c1bb 100644 --- a/lib/mqtt.pm +++ b/lib/mqtt.pm @@ -190,6 +190,7 @@ eval "use bytes"; # Not on all installs, so eval to avoid errors # Need to share this with the outside world my $msg_id = 1; +my $blocking_read_timeout = .5; my %MQTT_Data; @@ -201,15 +202,23 @@ sub dump() { } sub log { - my ($self, $str) = @_; - &main::print_log( 'MQTT: '. $str ); + my ($self, $str, $prefix) = @_; + my $maxlength = 300; + + $prefix = $prefix || 'MQTT: '; + while( length( $str ) > $maxlength ) { + &main::print_log( $prefix . substr($str,0,$maxlength) ); + $str = substr( $str, $maxlength ); + $prefix = '.... '; + } + &main::print_log( $prefix . $str ); } sub debug { my( $self, $level, $str ) = @_; if( $main::Debug{mqtt} >= $level ) { $level = 'D' if $level == 0; - &main::print_log( "MQTT D$level: $str" ); + $self->log( $str, "MQTT D$level: " ); } } @@ -226,7 +235,7 @@ sub error { sub mqtt_connect() { my ($self) = @_; - $self->debug( 1, "mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) "); + $self->log( "mqtt_connect Socket ($$self{host}:$$self{port},$$self{keep_alive_timer}) Topic ($$self{topic}) "); ### 1) open a socket (host, port and keepalive my $socket = IO::Socket::INET->new( @@ -249,7 +258,8 @@ sub mqtt_connect() { $self->{socket} = $socket; $self->{got_ping_response} = 1; - $self->{next_ping} = $self->{keep_alive_timer}; + $self->{ping_missed_count} = 0; + $self->{next_ping} = Time::HiRes::time + $$self{keep_alive_timer}; $self->{buf} = ''; # -------------------------------------------------------------------------- @@ -267,7 +277,7 @@ sub mqtt_connect() { ### 3) Check for ACK or fail $self->debug( 1, "Socket check ($$self{keep_alive_timer}) [ $! ]: " . ( $self->isConnected() ? "Connected" : "Failed" ) ); - my $msg = $self->read_mqtt_msg_timeout(); + my $msg = $self->read_mqtt_msg( $blocking_read_timeout ); if ( !$msg ) { $self->error("mqtt $$self{instance} No ConnAck "); @@ -287,9 +297,9 @@ sub mqtt_connect() { ); ### 5) Check for ACK or fail - $msg = $self->read_mqtt_msg_timeout(); + $msg = $self->read_mqtt_msg( $blocking_read_timeout ); if( !$msg ) { - $self->log( "$$self{instance} Received: " . "No SubAck" ); + $self->log( "$$self{instance} Received: " . "No subscription Ack" ); } if ( $main::Debug{mqtt} ) { my $s = @@ -299,11 +309,13 @@ sub mqtt_connect() { ### ### IF we're not getting $$msg{string} then what are we getting ? ### - $self->log( "$$self{instance} Sub 1 Received: " . "$s" ); # @FIXME: Use of uninitialized value + $self->log( "$$self{instance} Subscription 1 ($$self{topic}) acknowledged: " . "$s" ); # @FIXME: Use of uninitialized value } ### 6) check for data $self->debug( 1, "$$self{instance} Initializing MQTT connection ..."); + + $self->set( 'on', $self ); } # ------------------------------------------------------------------------------ @@ -378,11 +390,11 @@ sub new { ### 5) Check for ACK or fail $self->{buf} = ''; - my $msg = $self->read_mqtt_msg(); + my $msg = $self->read_mqtt_msg( $blocking_read_timeout ); if( !$msg ) { - $self->log( "$$self{instance} Received: " . "No SubAck" ); + $self->log( "$inst Received: " . "No Subscription Ack" ); } else { - $self->debug( 1, "$inst Sub 2 Received: " . $msg->string ); + $self->log( 1, "$inst Subscription 2 ($topic) acknowledged: " . $msg->string ); } } @@ -420,11 +432,12 @@ sub new { $$self{next_ping} = 0; $$self{got_ping_response} = 1; + $$self{ping_missed_count} = 0; bless $self, $class; # This is the little messages that appear when MH starts - $self->log("Creating $instance on $host:$port $topic"); + $self->log("Creating $instance on $host:$port topic:$topic"); $self->set_states( "off", "on" ); @@ -458,7 +471,7 @@ sub new { #exit 1; } - $self->set( 'on', $self ); + # $self->set( 'on', $self ); return $self; } @@ -492,6 +505,7 @@ sub check_for_data { ### ### @FIXME: failed connection if ( 'off' ne $self->{state} ) { + my $inst = $self->{instance}; if ($$self{recon_timer}->inactive) { $self->log("$inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds"); @@ -507,12 +521,12 @@ sub check_for_data { next; } - # This one doesn't block - my $msg = $self->read_mqtt_msg(); - ### -[ Input ]---------------------------------------------------------- - if ($msg) { + # This one doesn't block + my $msg; + + while( $msg = $self->read_mqtt_msg( 0 ) ) { ### ### Okay this is the hard part ### For now I'm only worried about data that fits into 1 read @@ -576,16 +590,21 @@ sub check_for_data { ### # Ping check if ( Time::HiRes::time > $$self{next_ping} ) { - ### - ### We've exceeded the ping time - ### - $self->log("$inst read_mqtt_msg Ping Response timeout.") - unless ( $$self{got_ping_response} ); - ### - ### This has confused me, I'm not certain if I should put it back in or not - ### I'll need to sit down a put together a state table and review this - ### - # return unless ($$self{got_ping_response}); + if( $self->{got_ping_response} ) { + $self->{ping_missed_count} = 0; + } else { + ### + ### We've exceeded the ping time + ### + $self->{ping_missed_count} += 1; + $self->log("$inst check_for_data Ping Response timeout."); + if( $self->{ping_missed_count} >= 4 ) { + $self->log("$inst check_for_data Ping Response threshold exceeded."); + shutdown( $self->{socket}, 2 ); + $self->{socket} = undef; + # check_for_data will reconnect socket + } + } $self->debug( 2, "$inst read_mqtt_msg Ping Request" ); send_mqtt_msg( $self, message_type => MQTT_PINGREQ ); @@ -620,10 +639,12 @@ sub send_mqtt_msg { =cut sub read_mqtt_msg { - my $self = shift; + my ($self, $timeout) = @_; my $select = IO::Select->new( $$self{socket} ); - my $timeout = $$self{next_ping} - Time::HiRes::time; + if( !defined $timeout ) { + $timeout = 0; + } do { ### @@ -638,15 +659,11 @@ sub read_mqtt_msg { return $mqtt; } - ### very short wait - ### Return if there is no data - $select->can_read(0.1) || return; - - # - $timeout = $$self{next_ping} - Time::HiRes::time; + ### Return if there is no data within the alloted time + $select->can_read($timeout) || return; # can return undef (error) or 0 bytes (eof) - my $bytes = sysread $$self{socket}, $self->{buf}, 2048, length $self->{buf}; + my $bytes = sysread $self->{socket}, $self->{buf}, 2048, length $self->{buf}; # We get no bytes if there is an error or the socket has closed unless ($bytes) { @@ -664,50 +681,11 @@ sub read_mqtt_msg { return; } - } while ( $timeout > 0 ); + } while ( 1 ); } # ------------------------------------------------------------------------------ -=item C -=cut - -sub read_mqtt_msg_timeout { - my $self = shift; - - my $select = IO::Select->new( $$self{socket} ); - my $timeout = $$self{next_ping} - Time::HiRes::time; - - do { - my $mqtt = Net::MQTT::Message->new_from_bytes( $self->{buf}, 1 ); - - return $mqtt if ( defined $mqtt ); - - ### - ### This is where it waits (blocking) - ### - $select->can_read($timeout) || return; - - # - $timeout = $$self{next_ping} - Time::HiRes::time; - - # can return undef (error) or 0 bytes (eof) - my $bytes = sysread $$self{socket}, $self->{buf}, 2048, length $self->{buf}; - - # We get no bytes if there is an error or the socket has closed - unless ($bytes) { - $self->log( "$$self{instance}: read_mqtt_msg Socket closed " . ( defined $bytes ? 'gracefully ' : "with error [ $! ]" ) ); - - # Not a permanent solution just a way to keep debugging - $self->debug( 1, "deleting $$self{instance}\n" . Dumper( \$self ) ); - delete( $MQTT_Data{ $$self{instance} } ); - - return; - } - } while ( $timeout > 0 ); -} - -# ------------------------------------------------------------------------------ =item C =cut @@ -772,9 +750,25 @@ sub pub_msg { # First say something $self->error("$$self{instance} is not connected -- publish failed to $p_objects{topic}"); - # Then do something (reconnect) + # Check_for_data should initiate reconnect + +# ### +# ### This needs a lot of work +# ### +# ### @FIXME: failed connection +# if ( 'off' ne $self->{state} ) { +# my $inst = $self->{instance}; +# +# if ($$self{recon_timer}->inactive) { +# $self->log("$inst connection failed ($$self{host}/$$self{port}/$$self{topic}), I will try to reconnect in 20 seconds"); +# $$self{recon_timer}->set(20, sub { $MQTT_Data{$inst}{self}->mqtt_connect() }); +# } +# +# # check the state to see if it's off already +# +# $self->set( 'off', $self ); +# } - # Skip if we're not connected return; } $self->debug( 1, "$$self{instance} Pub: R:$p_objects{retain} T:'$p_objects{topic}' M:'$p_objects{message}'" ); diff --git a/lib/mqtt_discovery.pm b/lib/mqtt_discovery.pm index 81205fe01..916a97929 100644 --- a/lib/mqtt_discovery.pm +++ b/lib/mqtt_discovery.pm @@ -104,6 +104,7 @@ sub new { #### mqtt_DiscoveredItem } elsif( $disc_type eq 'binary_sensor' ) { } elsif( $disc_type eq 'sensor' ) { } elsif( $disc_type eq 'switch' ) { + } elsif( $disc_type eq 'multi_switch' ) { } else { $disc_obj->log( "UNRECOGNIZED DISCOVERY TYPE: $disc_type" ); return; diff --git a/lib/mqtt_items.pm b/lib/mqtt_items.pm index 47fb06075..d8c13dd4c 100644 --- a/lib/mqtt_items.pm +++ b/lib/mqtt_items.pm @@ -1528,6 +1528,7 @@ sub new { ### mqtt_InstMqttItem $self->{disc_info}->{command_topic} = "$topic_prefix/level"; $self->{disc_info}->{schema} = 'json'; $self->{disc_info}->{brightness} = "true"; + $self->{disc_info}->{brightness_scale} = 100; } elsif( $base_type eq 'binary_sensor' ) { $self->{disc_info}->{device_class} = $device_class; } elsif( $base_type eq 'sensor' ) { diff --git a/lib/read_table_A.pl b/lib/read_table_A.pl index d3df0939c..f202141c6 100644 --- a/lib/read_table_A.pl +++ b/lib/read_table_A.pl @@ -1919,6 +1919,7 @@ sub read_table_A { # e.g.MQTT_BROKER, mqtt_1 require 'mqtt.pm'; my ( $name, $topic, $host, $port, $username, $password, $keepalive ) = @item_info; + $topic =~ s/\*/#/g; $code .= sprintf( "\n\$%-35s = new mqtt(\"%s\", '$host', '$port', '$topic', '$username', '$password', $keepalive );\n", $name, $name ); } elsif ( $type eq "MQTT_DEVICE" ) {