Skip to content

Commit

Permalink
#639 UDP socket support:
Browse files Browse the repository at this point in the history
* add bind-udp and udp-auth options
* man page updates
* "udp" debug logger, disable remote logging when enabled
* use a random uuid for each connection
* disable delta encoding when udp is in use (cannot rely on previous packets having been received)
* some SocketConnection attributes are now optional (ie: udp sockets don't have timeouts)
* add accept() method to protocol, used to enable asynchronous mode once the connection has been accepted and all the packet handlers have been registered
* refactor protocol write functions: start / end / fail callbacks are common to all the items from the same packet, keep them together in the write queue
* add UDPProtocol class with all the packet header parsing code, support for asynchronous "udp-control" packets to manage packet loss and resends
* add "udp listeners" to the servers: those are shared by all clients and we dispatch to the correct protocol instance based on the uuid value in the packet header
* set the optional "asynchronous" flag on some packets so the UDP layer knows that those packets can be processed out of order
* set the "fail callback" attribute on some packets so the UDP layer will sometimes use the callback instead of resending the lost chunk / packet
* add this fail callback to pixel packets so we can just send fresh pixels instead

git-svn-id: https://xpra.org/svn/Xpra/trunk@16734 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Aug 29, 2017
1 parent 2c03925 commit fbb168f
Show file tree
Hide file tree
Showing 18 changed files with 848 additions and 100 deletions.
28 changes: 27 additions & 1 deletion src/man/xpra.1
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ xpra \- viewer for remote, persistent X applications
[\fB\-\-sharing\fP=\fIyes\fP|\fIno\fP]
[\fB\-\-bind\fP=\fIBIND_LOCATION\fP]
[\fB\-\-bind\-tcp\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-udp\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-ws\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-wss\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-ssl\fP=\fI[HOST]:PORT\fP]
Expand Down Expand Up @@ -196,6 +197,7 @@ xpra \- viewer for remote, persistent X applications
[\fB\-\-microphone\-codec\fP=\fICODEC\fP]
[\fB\-\-bind\fP=\fISOCKET|DIRECTORY\fP]
[\fB\-\-bind\-tcp\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-udp\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-ws\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-wss\fP=\fI[HOST]:PORT\fP]
[\fB\-\-bind\-ssl\fP=\fI[HOST]:PORT\fP]
Expand Down Expand Up @@ -437,9 +439,11 @@ the unix domain sockets (see \fIbind\fP, \fIsocket\-dir\fP and
\fIsocket\-dirs\fP).

If the xpra server was given the \fB\-\-bind\-tcp\fP, \fB\-\-bind\-ssl\fP
\fB\-\-bind\-udp\fP=\fI[HOST]:PORT\fP,
\fB\-\-bind\-ws\fP, \fB\-\-bind\-wss\fP or \fB\-\-bind\-vsock\fP option
when started then you can also connect
to it using a display of the form \fBtcp:HOST:PORT\fP, \fBssl:HOST:PORT\fP,
to it using a display of the form \fBtcp:HOST:PORT\fP,
\fBudp:HOST:PORT\fP, \fBssl:HOST:PORT\fP,
\fBws:HOST:PORT\fP, \fBwss:HOST:PORT\fP or \fBvsock:HOST:PORT\fP.
(Notice that \fBssh:\fP takes an optional \fIdisplay\fP number,
while those take a required \fIport\fP number.)
Expand Down Expand Up @@ -743,6 +747,24 @@ Using this switch without using the \fItcp\-auth\fP option is not recommended,
and is a major security risk (especially when passing 0.0.0.0)!
Anyone at all may connect to this port and access your session.

TCP sockets may also process HTTP / Websocket connections
if the \fBhtml\fP switch is enabled.
TCP sockets may also be upgraded to SSL sockets if the
\fBssl\fP switch is enabled.
.TP
\fB\-\-bind\-udp\fP=\fI[HOST]:PORT\fP
Create a UDP socket for each \fB\-\-bind\-udp\fP option specified.
If the host portion is omitted, then 127.0.0.1 (localhost) will be
used. If you wish to accept connections on all interfaces, pass
0.0.0.0 for the host portion.

Using this switch without using the \fIudp\-auth\fP option is not recommended,
and is a major security risk (especially when passing 0.0.0.0)!
Anyone at all may connect to this port and access your session.
UDP sessions are trivial to hijack for anyone able to sniff
even just a single packet, it should only be used in very specific
use-cases, and never over unsecured networks.

TCP sockets may also process HTTP / Websocket connections
if the \fBhtml\fP switch is enabled.
TCP sockets may also be upgraded to SSL sockets if the
Expand Down Expand Up @@ -852,6 +874,10 @@ automatically (either \fBpam\fP or \fBwin32\fP)
Just like the \fBauth\fP switch, except this one only applies
to TCP sockets (sockets defined using the \fBbind\-tcp\fP switch).
.TP
\fB\-\-udp\-auth\fP=\fIMODULE\fP
Just like the \fBtcp\-auth\fP switch, except this one only applies
to UDP sockets (sockets defined using the \fBbind\-udp\fP switch).
.TP
\fB\-\-ws\-auth\fP=\fIMODULE\fP
Just like the \fBauth\fP switch, except this one only applies
to ws sockets: sockets defined using the \fBbind\-ws\fP switch,
Expand Down
22 changes: 20 additions & 2 deletions src/xpra/client/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,17 @@ def get_scheduler(self):

def setup_connection(self, conn):
netlog("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
if conn.socktype=="udp":
from xpra.net.udp_protocol import UDPClientProtocol
self._protocol = UDPClientProtocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
#use a random uuid:
import random
self._protocol.uuid = random.randint(0, 2**64-1)
self.set_packet_handlers(self._packet_handlers, {
"udp-control" : self._process_udp_control,
})
else:
self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
self._protocol.large_packets.append("keymap-changed")
self._protocol.large_packets.append("server-settings")
self._protocol.large_packets.append("logging")
Expand All @@ -254,6 +264,10 @@ def setup_connection(self, conn):
getChildReaper().add_process(proc, name, command, ignore=True, forget=False)
netlog("setup_connection(%s) protocol=%s", conn, self._protocol)

def _process_udp_control(self, packet):
#send it back to the protocol object:
self._protocol.process_control(*packet[1:])


def remove_packet_handlers(self, *keys):
for k in keys:
Expand Down Expand Up @@ -440,19 +454,22 @@ def have_more(self):
p.source_has_more()

def next_packet(self):
netlog("next_packet() packets in queues: priority=%i, ordinary=%i, mouse=%s", len(self._priority_packets), len(self._ordinary_packets), bool(self._mouse_position))
synchronous = True
if self._priority_packets:
packet = self._priority_packets.pop(0)
elif self._ordinary_packets:
packet = self._ordinary_packets.pop(0)
elif self._mouse_position is not None:
packet = self._mouse_position
synchronous = False
self._mouse_position = None
else:
packet = None
has_more = packet is not None and \
(bool(self._priority_packets) or bool(self._ordinary_packets) \
or self._mouse_position is not None)
return packet, None, None, has_more
return packet, None, None, None, synchronous, has_more


def cleanup(self):
Expand Down Expand Up @@ -815,6 +832,7 @@ def parse_network_capabilities(self):
if not p or not p.enable_encoder_from_caps(c):
return False
p.enable_compressor_from_caps(c)
p.accept()
return True

def parse_encryption_capabilities(self):
Expand Down
10 changes: 8 additions & 2 deletions src/xpra/client/gtk_base/gtk_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,18 @@ def make_hello(self):
"configure.pointer" : True,
"frame_sizes" : self.get_window_frame_sizes()
})
from xpra.client.window_backing_base import DELTA_BUCKETS
updict(capabilities, "encoding", {
"icons.greedy" : True, #we don't set a default window icon any more
"icons.size" : (64, 64), #size we want
"icons.max_size" : (128, 128), #limit
"delta_buckets" : DELTA_BUCKETS,
})
from xpra.client import window_backing_base
if self._protocol._conn.socktype=="udp":
#lossy protocol means we can't use delta regions:
log("no delta buckets with udp, since we can drop paint packets")
window_backing_base.DELTA_BUCKETS = 0
updict(capabilities, "encoding", {
"delta_buckets" : window_backing_base.DELTA_BUCKETS,
})
return capabilities

Expand Down
5 changes: 3 additions & 2 deletions src/xpra/client/ui_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2048,9 +2048,10 @@ def parse_logging_capabilities(self):
if self.client_supports_remote_logging and c.boolget("remote-logging"):
#check for debug:
from xpra.log import is_debug_enabled
for x in ("network", "crypto"):
for x in ("network", "crypto", "udp"):
if is_debug_enabled(x):
log.warn("Warning: cannot enable remote logging as '%s' debug logging is enabled", x)
log.warn("Warning: cannot enable remote logging")
log.warn(" because '%s' debug logging is enabled", x)
return
log.info("enabled remote logging")
if not self.log_both:
Expand Down
2 changes: 1 addition & 1 deletion src/xpra/clipboard/clipboard_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def _process_clipboard_token(self, packet):
synchronous_client = len(packet)>=11 and bool(packet[10])
proxy.got_token(targets, target_data, claim, synchronous_client)

def _get_clipboard_from_remote_handler(self, proxy, selection, target):
def _get_clipboard_from_remote_handler(self, _proxy, selection, target):
if must_discard(target):
log("invalid target '%s'", target)
return None
Expand Down
1 change: 1 addition & 0 deletions src/xpra/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def enable_format(format_string):
("protocol" , "Packet input and output (formatting, parsing, sending and receiving)"),
("websocket" , "WebSocket layer"),
("named-pipe" , "Named pipe"),
("udp" , "UDP"),
("crypto" , "Encryption"),
("auth" , "Authentication"),
])),
Expand Down
19 changes: 15 additions & 4 deletions src/xpra/net/bytestreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ def close(self):
i = s
log("%s.close() for socket=%s", self, i)
Connection.close(self)
#meaningless for udp:
try:
s.settimeout(0)
except:
pass
#this is more proper but would break the proxy server:
#s.shutdown(socket.SHUT_RDWR)
s.close()
Expand Down Expand Up @@ -305,14 +310,20 @@ def do_get_socket_info(self):
s = self._socket
if not s:
return None
return {
#"class" : str(type(s)),
"fileno" : s.fileno(),
"timeout" : int(1000*(s.gettimeout() or 0)),
info = {
"family" : FAMILY_STR.get(s.family, s.family),
"proto" : s.proto,
"type" : PROTOCOL_STR.get(s.type, s.type),
}
try:
info["timeout"] = int(1000*(s.gettimeout() or 0))
except:
pass
try:
info["fileno"] = s.fileno()
except:
pass
return info

try:
#this wrapper class allows us to override the normal ssl.Socket
Expand Down
41 changes: 18 additions & 23 deletions src/xpra/net/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def restore_state(self, state):
self.enable_compressor(self.compressor)
self.enable_encoder(self.encoder)


def wait_for_io_threads_exit(self, timeout=None):
io_threads = [x for x in (self._read_thread, self._write_thread) if x is not None]
for t in io_threads:
Expand Down Expand Up @@ -205,6 +206,9 @@ def __repr__(self):
def get_threads(self):
return [x for x in [self._write_thread, self._read_thread, self._read_parser_thread, self._write_format_thread] if x is not None]

def accept(self):
pass


def get_info(self, alias_info=True):
info = {
Expand Down Expand Up @@ -311,7 +315,7 @@ def _write_format_thread_loop(self):
return
self._internal_error("error in network packet write/format", e, exc_info=True)

def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, has_more=False):
def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True, has_more=False):
if has_more:
self._source_has_more.set()
if packet is None:
Expand All @@ -322,24 +326,17 @@ def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, has
if self._closed:
return
try:
self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb)
self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb, fail_cb, synchronous)
except:
log.error("Error: failed to queue '%s' packet", packet[0])
log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb), exc_info=True)
log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb, fail_cb), exc_info=True)
raise

def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True):
""" the write_lock must be held when calling this function """
counter = 0
items = []
for proto_flags,index,level,data in chunks:
scb, ecb = None, None
#fire the start_send_callback just before the first packet is processed:
if counter==0:
scb = start_send_cb
#fire the end_send callback when the last packet (index==0) makes it out:
if index==0:
ecb = end_send_cb
payload_size = len(data)
actual_size = payload_size
if self.cipher_out:
Expand All @@ -360,9 +357,9 @@ def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
assert not self.cipher_out
#for plain/text packets (ie: gibberish response)
log("sending %s bytes without header", payload_size)
items.append((data, scb, ecb))
items.append(data)
elif actual_size<PACKET_JOIN_SIZE:
if type(data) not in JOIN_TYPES:
if not isinstance(data, JOIN_TYPES):
data = memoryview_to_bytes(data)
header_and_data = pack_header(proto_flags, level, index, payload_size) + data
items.append(header_and_data)
Expand All @@ -371,19 +368,17 @@ def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
items.append(header)
items.append(data)
counter += 1
if self._write_thread is None:
self.start_write_thread()
self._write_queue.put((items, scb, ecb))
self.output_packetcount += 1
self.raw_write(items, start_send_cb, end_send_cb, fail_cb, synchronous)

def start_write_thread(self):
self._write_thread = start_thread(self._write_thread_loop, "write", daemon=True)

def raw_write(self, contents, start_cb=None, end_cb=None):
def raw_write(self, items, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
""" Warning: this bypasses the compression and packet encoder! """
if self._write_thread is None:
self.start_write_thread()
self._write_queue.put(((contents, start_cb, end_cb), ))
self._write_queue.put((items, start_cb, end_cb, fail_cb, synchronous))


def verify_packet(self, packet):
""" look for None values which may have caused the packet to fail encoding """
Expand Down Expand Up @@ -604,7 +599,7 @@ def _write(self):
return False
return self.write_items(*items)

def write_items(self, buf_data, start_cb=None, end_cb=None):
def write_items(self, buf_data, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
con = self._conn
if not con:
return False
Expand All @@ -614,7 +609,7 @@ def write_items(self, buf_data, start_cb=None, end_cb=None):
except:
if not self._closed:
log.error("Error on write start callback %s", start_cb, exc_info=True)
self.write_buffers(buf_data)
self.write_buffers(buf_data, fail_cb, synchronous)
if end_cb:
try:
end_cb(self._conn.output_bytecount)
Expand All @@ -623,7 +618,7 @@ def write_items(self, buf_data, start_cb=None, end_cb=None):
log.error("Error on write end callback %s", end_cb, exc_info=True)
return True

def write_buffers(self, buf_data):
def write_buffers(self, buf_data, _fail_cb, _synchronous):
con = self._conn
if not con:
return 0
Expand Down Expand Up @@ -967,7 +962,7 @@ def packet_queued(*_args):
if wait_for_packet_sent():
#check again every 100ms
self.timeout_add(100, wait_for_packet_sent)
self._add_chunks_to_queue(chunks, start_send_cb=None, end_send_cb=packet_queued)
self._add_chunks_to_queue(chunks, start_send_cb=None, end_send_cb=packet_queued, synchronous=False)
#just in case wait_for_packet_sent never fires:
self.timeout_add(5*1000, close_and_release)

Expand Down
2 changes: 1 addition & 1 deletion src/xpra/net/subprocess_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def get_packet(self):
item = self.send_queue.get(False)
except:
item = None
return (item, None, None, self.send_queue.qsize()>0)
return (item, None, None, None, False, self.send_queue.qsize()>0)

def send(self, *packet_data):
self.send_queue.put(packet_data)
Expand Down
Loading

0 comments on commit fbb168f

Please sign in to comment.