diff --git a/src/frontend/mosh-server.cc b/src/frontend/mosh-server.cc index 567252f66..5806ceac2 100644 --- a/src/frontend/mosh-server.cc +++ b/src/frontend/mosh-server.cc @@ -88,6 +88,7 @@ #include "select.h" #include "timestamp.h" #include "fatal_assert.h" +#include "multiplexer.h" #ifndef _PATH_BSHELL #define _PATH_BSHELL "/bin/sh" @@ -95,9 +96,10 @@ #include "networktransport-impl.h" -typedef Network::Transport< Terminal::Complete, Network::UserStream > ServerConnection; +typedef Network::Transport< Network::MultiplexerStream, Network::MultiplexerStream > ServerConnection; static void serve( int host_fd, + Network::MultiplexerStream &local, Terminal::Complete &terminal, ServerConnection &network, long network_timeout, @@ -422,11 +424,14 @@ static int run_server( const char *desired_ip, const char *desired_port, /* open parser and terminal */ Terminal::Complete terminal( window_size.ws_col, window_size.ws_row ); + Network::MultiplexerStream local({&terminal}); /* open network */ Network::UserStream blank; + Network::MultiplexerStream remote({&blank}); + typedef shared::shared_ptr NetworkPointer; - NetworkPointer network( new ServerConnection( terminal, blank, desired_ip, desired_port ) ); + NetworkPointer network( new ServerConnection( local, remote, desired_ip, desired_port ) ); network->set_verbose( verbose ); Select::set_verbose( verbose ); @@ -624,7 +629,7 @@ static int run_server( const char *desired_ip, const char *desired_port, #endif try { - serve( master, terminal, *network, network_timeout, network_signaled_timeout ); + serve( master, local, terminal, *network, network_timeout, network_signaled_timeout ); } catch ( const Network::NetworkException &e ) { fprintf( stderr, "Network exception: %s\n", e.what() ); @@ -648,7 +653,7 @@ static int run_server( const char *desired_ip, const char *desired_port, return 0; } -static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network, long network_timeout, long network_signaled_timeout ) +static void serve( int host_fd, Network::MultiplexerStream &local, Terminal::Complete &terminal, ServerConnection &network, long network_timeout, long network_signaled_timeout ) { /* scale timeouts */ const uint64_t network_timeout_ms = static_cast( network_timeout ) * 1000; @@ -731,14 +736,14 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & if ( sel.read( network_fd ) ) { /* packet received from the network */ network.recv(); - + /* is new user input available for the terminal? */ if ( network.get_remote_state_num() != last_remote_num ) { last_remote_num = network.get_remote_state_num(); - + Network::UserStream us; - us.apply_string( network.get_remote_diff() ); + us.apply_string( Network::MultiplexerStream::diffForStream(0, network.get_remote_diff()) ); /* apply userstream to terminal */ for ( size_t i = 0; i < us.size(); i++ ) { const Parser::Action &action = us.get_action( i ); @@ -775,7 +780,9 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & /* update client with new state of terminal */ if ( !network.shutdown_in_progress() ) { - network.set_current_state( terminal ); + terminal.reset_input(); + local.set(0, &terminal); + network.set_current_state( local ); } #if defined(HAVE_SYSLOG) || defined(HAVE_UTEMPTER) #ifdef HAVE_UTEMPTER @@ -833,12 +840,12 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & } } } - + if ( (!network.shutdown_in_progress()) && sel.read( host_fd ) ) { /* input from the host needs to be fed to the terminal */ const int buf_size = 16384; char buf[ buf_size ]; - + /* fill buffer if possible */ ssize_t bytes_read = read( host_fd, buf, buf_size ); @@ -848,9 +855,11 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & network.start_shutdown(); } else { terminal_to_host += terminal.act( string( buf, bytes_read ) ); - + /* update client with new state of terminal */ - network.set_current_state( terminal ); + terminal.reset_input(); + local.set(0, &terminal); + network.set_current_state( local ); } } @@ -863,7 +872,7 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & if ( network_timeout_ms && network_timeout_ms <= time_since_remote_state ) { idle_shutdown = true; - fprintf( stderr, "Network idle for %llu seconds.\n", + fprintf( stderr, "Network idle for %llu seconds.\n", static_cast( time_since_remote_state / 1000 ) ); } if ( sel.signal( SIGUSR1 ) @@ -881,7 +890,7 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & break; } } - + /* quit if our shutdown has been acknowledged */ if ( network.shutdown_in_progress() && network.shutdown_acknowledged() ) { break; @@ -899,7 +908,7 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & #ifdef HAVE_UTEMPTER /* update utmp if has been more than 30 seconds since heard from client */ - if ( connected_utmp + if ( connected_utmp && time_since_remote_state > 30000 ) { utempter_remove_record( host_fd ); @@ -913,7 +922,9 @@ static void serve( int host_fd, Terminal::Complete &terminal, ServerConnection & if ( terminal.set_echo_ack( now ) && !network.shutdown_in_progress() ) { /* update client with new echo ack */ - network.set_current_state( terminal ); + terminal.reset_input(); + local.set(0, &terminal); + network.set_current_state( local ); } if ( !network.get_remote_state_num() diff --git a/src/frontend/stmclient.cc b/src/frontend/stmclient.cc index 22f4d75fc..b541107dd 100644 --- a/src/frontend/stmclient.cc +++ b/src/frontend/stmclient.cc @@ -206,7 +206,7 @@ void STMClient::shutdown( void ) /* Restore terminal and terminal-driver state */ swrite( STDOUT_FILENO, display.close().c_str() ); - + if ( tcsetattr( STDIN_FILENO, TCSANOW, &saved_termios ) < 0 ) { perror( "tcsetattr" ); exit( 1 ); @@ -237,7 +237,7 @@ void STMClient::main_init( void ) if ( ioctl( STDIN_FILENO, TIOCGWINSZ, &window_size ) < 0 ) { perror( "ioctl TIOCGWINSZ" ); return; - } + } /* local state */ local_framebuffer = Terminal::Framebuffer( window_size.ws_col, window_size.ws_row ); @@ -248,14 +248,23 @@ void STMClient::main_init( void ) swrite( STDOUT_FILENO, init.data(), init.size() ); /* open network */ - Network::UserStream blank; - Terminal::Complete local_terminal( window_size.ws_col, window_size.ws_row ); - network = NetworkPointer( new NetworkType( blank, local_terminal, key.c_str(), ip.c_str(), port.c_str() ) ); + vector localStreams = { + new Network::UserStream(), + }; + + vector remoteStreams = { + new Terminal::Complete( window_size.ws_col, window_size.ws_row ), + }; + + Network::MultiplexerStream local(localStreams); + Network::MultiplexerStream remote(remoteStreams); + + network = NetworkPointer( new NetworkType( local, remote, key.c_str(), ip.c_str(), port.c_str() ) ); network->set_send_delay( 1 ); /* minimal delay on outgoing keystrokes */ /* tell server the size of the terminal */ - network->get_current_state().push_back( Parser::Resize( window_size.ws_col, window_size.ws_row ) ); + network->get_current_state().stream(0)->push_back( Parser::Resize( window_size.ws_col, window_size.ws_row ) ); /* be noisy as necessary */ network->set_verbose( verbose ); @@ -269,7 +278,7 @@ void STMClient::output_new_frame( void ) } /* fetch target state */ - new_state = network->get_latest_remote_state().state.get_fb(); + new_state = network->get_latest_remote_state().state.stream(0)->get_fb(); /* apply local overlays */ overlays.apply( new_state ); @@ -288,14 +297,14 @@ void STMClient::output_new_frame( void ) void STMClient::process_network_input( void ) { network->recv(); - + /* Now give hints to the overlays */ overlays.get_notification_engine().server_heard( network->get_latest_remote_state().timestamp ); overlays.get_notification_engine().server_acked( network->get_sent_state_acked_timestamp() ); overlays.get_prediction_engine().set_local_frame_acked( network->get_sent_state_acked() ); overlays.get_prediction_engine().set_send_interval( network->send_interval() ); - overlays.get_prediction_engine().set_local_frame_late_acked( network->get_latest_remote_state().state.get_echo_ack() ); + overlays.get_prediction_engine().set_local_frame_late_acked( network->get_latest_remote_state().state.stream(0)->get_echo_ack() ); } bool STMClient::process_user_input( int fd ) @@ -360,11 +369,11 @@ bool STMClient::process_user_input( int fd ) } else if ( (the_byte == escape_pass_key) || (the_byte == escape_pass_key2) ) { /* Emulation sequence to type escape_key is escape_key + escape_pass_key (that is escape key without Ctrl) */ - net.get_current_state().push_back( Parser::UserByte( escape_key ) ); + net.get_current_state().stream(0)->push_back( Parser::UserByte( escape_key ) ); } else { /* Escape key followed by anything other than . and ^ gets sent literally */ - net.get_current_state().push_back( Parser::UserByte( escape_key ) ); - net.get_current_state().push_back( Parser::UserByte( the_byte ) ); + net.get_current_state().stream(0)->push_back( Parser::UserByte( escape_key ) ); + net.get_current_state().stream(0)->push_back( Parser::UserByte( the_byte ) ); } quit_sequence_started = false; @@ -389,7 +398,7 @@ bool STMClient::process_user_input( int fd ) repaint_requested = true; } - net.get_current_state().push_back( Parser::UserByte( the_byte ) ); + net.get_current_state().stream(0)->push_back( Parser::UserByte( the_byte ) ); } return true; @@ -402,16 +411,16 @@ bool STMClient::process_resize( void ) perror( "ioctl TIOCGWINSZ" ); return false; } - + /* tell remote emulator */ Parser::Resize res( window_size.ws_col, window_size.ws_row ); - + if ( !network->shutdown_in_progress() ) { - network->get_current_state().push_back( res ); + network->get_current_state().stream(0)->push_back( res ); } /* note remote emulator will probably reply with its own Resize to adjust our state */ - + /* tell prediction engine */ overlays.get_prediction_engine().reset(); @@ -478,7 +487,7 @@ bool STMClient::main( void ) if ( network_ready_to_read ) { process_network_input(); } - + if ( sel.read( STDIN_FILENO ) && !process_user_input( STDIN_FILENO ) ) { /* input from the user needs to be fed to the network */ if ( !network->has_remote_addr() ) { break; diff --git a/src/frontend/stmclient.h b/src/frontend/stmclient.h index c1440c3ac..eaacca7fc 100644 --- a/src/frontend/stmclient.h +++ b/src/frontend/stmclient.h @@ -42,6 +42,7 @@ #include "user.h" #include "shared.h" #include "terminaloverlay.h" +#include "multiplexer.h" class STMClient { private: @@ -61,7 +62,7 @@ class STMClient { Terminal::Framebuffer local_framebuffer, new_state; Overlay::OverlayManager overlays; - typedef Network::Transport< Network::UserStream, Terminal::Complete > NetworkType; + typedef Network::Transport< Network::MultiplexerStream, Network::MultiplexerStream > NetworkType; typedef shared::shared_ptr< NetworkType > NetworkPointer; NetworkPointer network; Terminal::Display display; @@ -122,7 +123,7 @@ class STMClient { } if ( predict_overwrite && !strcmp( predict_overwrite, "yes" ) ) { overlays.get_prediction_engine().set_predict_overwrite( true ); - } + } } void init( void ); diff --git a/src/network/transportsender.h b/src/network/transportsender.h index 30d4db7ae..41c4ba462 100644 --- a/src/network/transportsender.h +++ b/src/network/transportsender.h @@ -142,7 +142,6 @@ namespace Network { { assert( !shutdown_in_progress ); current_state = x; - current_state.reset_input(); } void set_verbose( unsigned int s_verbose ) { verbose = s_verbose; } diff --git a/src/protobufs/Makefile.am b/src/protobufs/Makefile.am index 13fdce89c..063951a2a 100644 --- a/src/protobufs/Makefile.am +++ b/src/protobufs/Makefile.am @@ -1,4 +1,4 @@ -source = userinput.proto hostinput.proto transportinstruction.proto +source = userinput.proto hostinput.proto transportinstruction.proto stream.proto AM_CPPFLAGS = $(protobuf_CFLAGS) AM_CXXFLAGS = $(WARNING_CXXFLAGS) $(HARDEN_CFLAGS) $(MISC_CXXFLAGS) -Wno-error diff --git a/src/protobufs/stream.proto b/src/protobufs/stream.proto new file mode 100644 index 000000000..70437c95e --- /dev/null +++ b/src/protobufs/stream.proto @@ -0,0 +1,9 @@ +syntax = "proto2"; + +option optimize_for = LITE_RUNTIME; + +package StreamBuffers; + +message StreamMessage { + repeated string diffs = 1; +} diff --git a/src/statesync/Makefile.am b/src/statesync/Makefile.am index 470ece2a8..c58ce0c6f 100644 --- a/src/statesync/Makefile.am +++ b/src/statesync/Makefile.am @@ -3,4 +3,4 @@ AM_CXXFLAGS = $(WARNING_CXXFLAGS) $(PICKY_CXXFLAGS) $(HARDEN_CFLAGS) $(MISC_CXXF noinst_LIBRARIES = libmoshstatesync.a -libmoshstatesync_a_SOURCES = completeterminal.cc completeterminal.h user.cc user.h +libmoshstatesync_a_SOURCES = completeterminal.cc completeterminal.h user.cc user.h multiplexer.cc multiplexer.h stream.h diff --git a/src/statesync/completeterminal.cc b/src/statesync/completeterminal.cc index eddfda58d..ef05ce435 100644 --- a/src/statesync/completeterminal.cc +++ b/src/statesync/completeterminal.cc @@ -47,7 +47,7 @@ string Complete::act( const string &str ) for ( unsigned int i = 0; i < str.size(); i++ ) { /* parse octet into up to three actions */ parser.input( str[ i ], actions ); - + /* apply actions to terminal and delete them */ for ( Actions::iterator it = actions.begin(); it != actions.end(); @@ -69,8 +69,9 @@ string Complete::act( const Action &act ) } /* interface for Network::Transport */ -string Complete::diff_from( const Complete &existing ) const +string Complete::diff_from( const Network::Stream &existingStream ) const { + const Complete &existing = dynamic_cast(existingStream); HostBuffers::HostMessage output; if ( existing.get_echo_ack() != get_echo_ack() ) { @@ -92,7 +93,7 @@ string Complete::diff_from( const Complete &existing ) const new_inst->MutableExtension( hostbytes )->set_hoststring( update ); } } - + return output.SerializeAsString(); } @@ -121,8 +122,9 @@ void Complete::apply_string( const string & diff ) } } -bool Complete::operator==( Complete const &x ) const +bool Complete::operator==( Network::Stream const &xStream ) const { + const Complete &x = dynamic_cast(xStream); // assert( parser == x.parser ); /* parser state is irrelevant for us */ return (terminal == x.terminal) && (echo_ack == x.echo_ack); } diff --git a/src/statesync/completeterminal.h b/src/statesync/completeterminal.h index 94ee7d9cd..0dcc8988f 100644 --- a/src/statesync/completeterminal.h +++ b/src/statesync/completeterminal.h @@ -38,11 +38,12 @@ #include "parser.h" #include "terminal.h" +#include "stream.h" /* This class represents the complete terminal -- a UTF8Parser feeding Actions to an Emulator. */ namespace Terminal { - class Complete { + class Complete : public Network::Stream { private: Parser::UTF8Parser parser; Terminal::Emulator terminal; @@ -62,7 +63,6 @@ namespace Terminal { public: Complete( size_t width, size_t height ) : parser(), terminal( width, height ), display( false ), actions(), input_history(), echo_ack( 0 ) {} - std::string act( const std::string &str ); std::string act( const Parser::Action &act ); @@ -74,11 +74,14 @@ namespace Terminal { int wait_time( uint64_t now ) const; /* interface for Network::Transport */ - void subtract( const Complete * ) const {} - std::string diff_from( const Complete &existing ) const; + void subtract( const Network::Stream * ) {} + std::string diff_from( const Network::Stream &existing ) const; std::string init_diff( void ) const; void apply_string( const std::string & diff ); - bool operator==( const Complete &x ) const; + bool operator==( const Network::Stream &x ) const; + Stream* copy() const { + return new Complete(*this); + } bool compare( const Complete &other ) const; }; diff --git a/src/statesync/multiplexer.cc b/src/statesync/multiplexer.cc new file mode 100644 index 000000000..769af1c9c --- /dev/null +++ b/src/statesync/multiplexer.cc @@ -0,0 +1,52 @@ +#include +#include + +#include "stream.pb.h" +#include "fatal_assert.h" +#include "multiplexer.h" + +using namespace Network; + +void MultiplexerStream::subtract( const Stream *prefixStream ) +{ + const MultiplexerStream *prefix = dynamic_cast(prefixStream); + assert(streams.size() == prefix->streams.size()); + for (std::vector::size_type i = 0; i < prefix->streams.size(); i++) { + streams.at(i)->subtract(prefix->streams.at(i)); + } +} + +std::string MultiplexerStream::diff_from( const Stream &existingStream ) const +{ + const MultiplexerStream &existing = dynamic_cast(existingStream); + assert(streams.size() == existing.streams.size()); + StreamBuffers::StreamMessage output; + for (std::vector::size_type i = 0; i < existing.streams.size(); i++) { + std::string diff = streams.at(i)->diff_from(*existing.streams.at(i)); + output.add_diffs(diff); + } + return output.SerializeAsString(); +} + +void MultiplexerStream::apply_string( const std::string &diff ) +{ + StreamBuffers::StreamMessage input; + fatal_assert( input.ParseFromString( diff ) ); + assert(static_cast(streams.size()) == input.diffs_size()); + for (int i = 0; i < input.diffs_size(); i++) { + streams.at(i)->apply_string(input.diffs(i)); + } +} + +MultiplexerStream* MultiplexerStream::copy(void) const { + return new MultiplexerStream(streams); +} + +std::string MultiplexerStream::init_diff( void ) const { + StreamBuffers::StreamMessage output; + for (std::vector::size_type i = 0; i < streams.size(); i++) { + std::string diff = streams.at(i)->init_diff(); + output.add_diffs(diff); + } + return output.SerializeAsString(); +} diff --git a/src/statesync/multiplexer.h b/src/statesync/multiplexer.h new file mode 100644 index 000000000..1ea9271e2 --- /dev/null +++ b/src/statesync/multiplexer.h @@ -0,0 +1,70 @@ +#ifndef MULTIPLEXER_HPP +#define MULTIPLEXER_HPP + +#include +#include +#include + +#include "stream.h" +#include "stream.pb.h" +#include "fatal_assert.h" + +namespace Network { + class MultiplexerStream : public Stream { + private: + std::vector streams; + + public: + MultiplexerStream(std::vector s) : streams() { + for (Stream *s : s) { + streams.push_back(s->copy()); + } + } + MultiplexerStream(const MultiplexerStream & other) : streams() { + for (Stream *s : other.streams) { + streams.push_back(s->copy()); + } + } + + ~MultiplexerStream() { + for (Stream *s : streams) { + delete s; + } + streams.clear(); + } + + template + T* stream(int i) const { + Stream* stream = streams.at(i); + assert(stream != NULL); + return dynamic_cast(stream); + } + + void set(int i, Stream *s) { + Stream *old = streams.at(i); + delete old; + streams.at(i) = s->copy(); + } + + /* interface for Network::Transport */ + void subtract( const Stream *prefix ); + std::string diff_from( const Stream &existing ) const; + std::string init_diff( void ) const; + void apply_string( const std::string &diff ); + bool operator==( const Stream &xStream ) const { + const MultiplexerStream &x = dynamic_cast(xStream); + return streams == x.streams; + } + + bool compare( const MultiplexerStream & ) { return false; } + MultiplexerStream* copy(void) const; + + static std::string diffForStream(int i, std::string diff) { + StreamBuffers::StreamMessage input; + fatal_assert( input.ParseFromString( diff ) ); + return input.diffs(i); + }; + }; +} + +#endif diff --git a/src/statesync/stream.h b/src/statesync/stream.h new file mode 100644 index 000000000..083b2832e --- /dev/null +++ b/src/statesync/stream.h @@ -0,0 +1,21 @@ +#ifndef STREAM_HPP +#define STREAM_HPP + +#include + +namespace Network { + class Stream { + public: + virtual ~Stream() {}; + + /* interface for Network::Transport */ + virtual void subtract( const Stream *prefix ) = 0; + virtual std::string diff_from( const Stream &existing ) const = 0; + virtual std::string init_diff( void ) const = 0; + virtual void apply_string( const std::string &diff ) = 0; + virtual bool operator==( const Stream &x ) const = 0; + virtual Stream* copy() const = 0; + }; +} + +#endif diff --git a/src/statesync/user.cc b/src/statesync/user.cc index 9ea29c122..0ccf6e1ef 100644 --- a/src/statesync/user.cc +++ b/src/statesync/user.cc @@ -41,8 +41,9 @@ using namespace Parser; using namespace Network; using namespace ClientBuffers; -void UserStream::subtract( const UserStream *prefix ) +void UserStream::subtract( const Stream *prefixStream ) { + const UserStream *prefix = dynamic_cast(prefixStream); // if we are subtracting ourself from ourself, just clear the deque if ( this == prefix ) { actions.clear(); @@ -58,8 +59,9 @@ void UserStream::subtract( const UserStream *prefix ) } } -string UserStream::diff_from( const UserStream &existing ) const +string UserStream::diff_from( const Stream &existingStream ) const { + const UserStream &existing = dynamic_cast(existingStream); deque::const_iterator my_it = actions.begin(); for ( deque::const_iterator i = existing.actions.begin(); diff --git a/src/statesync/user.h b/src/statesync/user.h index fbf27285f..35421a55d 100644 --- a/src/statesync/user.h +++ b/src/statesync/user.h @@ -39,6 +39,7 @@ #include #include "parseraction.h" +#include "stream.h" using std::deque; using std::list; @@ -67,29 +68,40 @@ namespace Network { bool operator==( const UserEvent &x ) const { return ( type == x.type ) && ( userbyte == x.userbyte ) && ( resize == x.resize ); } }; - class UserStream + class UserStream : public Stream { private: deque actions; - + public: UserStream() : actions() {} - + void push_back( const Parser::UserByte & s_userbyte ) { actions.push_back( UserEvent( s_userbyte ) ); } void push_back( const Parser::Resize & s_resize ) { actions.push_back( UserEvent( s_resize ) ); } - + bool empty( void ) const { return actions.empty(); } size_t size( void ) const { return actions.size(); } const Parser::Action &get_action( unsigned int i ) const; - + /* interface for Network::Transport */ - void subtract( const UserStream *prefix ); - string diff_from( const UserStream &existing ) const; + void subtract( const Stream *prefix ); + string diff_from( const Stream &existing ) const; string init_diff( void ) const { return diff_from( UserStream() ); }; void apply_string( const string &diff ); - bool operator==( const UserStream &x ) const { return actions == x.actions; } + bool operator==( const Stream &xStream ) const { + const UserStream &x = dynamic_cast(xStream); + return actions == x.actions; + } + Stream* copy() const { + return new UserStream(*this); + } bool compare( const UserStream & ) { return false; } + + template + UserStream* stream(int) { + return this; + } }; }