Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XTerm parsing improvements #570

Merged
merged 12 commits into from
Jun 17, 2022
1 change: 0 additions & 1 deletion src/textual/_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class TestParser(Parser[str]):
def parse(
self, on_token: Callable[[str], None]
) -> Generator[Awaitable, str, None]:
data = yield self.read1()
while True:
data = yield self.read1()
if not data:
Expand Down
103 changes: 75 additions & 28 deletions src/textual/_xterm_parser.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from __future__ import annotations


import re
from typing import Any, Callable, Generator, Iterable

from . import messages
from . import events
from ._types import MessageTarget
from ._parser import Awaitable, Parser, TokenCallback
from . import messages
from ._ansi_sequences import ANSI_SEQUENCES_KEYS
from ._parser import Awaitable, Parser, TokenCallback
from ._types import MessageTarget

# When trying to determine whether the current sequence is a supported/valid
# escape sequence, at which length should we give up and consider our search
# to be unsuccessful?
from .keys import Keys

_MAX_SEQUENCE_SEARCH_THRESHOLD = 20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was 20 chosen?

Copy link
Member Author

@darrenburns darrenburns Jun 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The largest that I think we could expect to receive (that I'm aware of) is length 14 (mouse events in large terminal windows). The existence of the re_sgr_mouse regex suggested to me that longer sequences could exist, so I added on some leeway for defensiveness.


_re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two regexes for mouse events that run.

This one suggests a mouse event can have any number of parameters, but when we parse the mouse event it assumes there are a fixed number.

Is this one definitely required? Could we possibly get away with running just the 1 regex for mouse events rather than 2?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite possibly. IIRC there are at least two mouse codes that can be generated. At one point I thought I would have to support both, but everything supports the newer method. That might explain the dual regexes.

_re_terminal_mode_response = re.compile(
Expand All @@ -19,7 +25,6 @@


class XTermParser(Parser[events.Event]):

_re_sgr_mouse = re.compile(r"\x1b\[<(\d+);(\d+);(\d+)([Mm])")

def __init__(
Expand All @@ -34,7 +39,7 @@ def __init__(

super().__init__()

def debug_log(self, *args: Any) -> None:
def debug_log(self, *args: Any) -> None: # pragma: no cover
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the typing error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no typing error. I was trying to ensure we had full test coverage of this module, and thought this code was temporary "for us" code so didn't think coverage.py should measure it. I can remove this and add a test case around it if you'd prefer.

if self._debug_log_file is not None:
self._debug_log_file.write(" ".join(args) + "\n")
self._debug_log_file.flush()
Expand Down Expand Up @@ -85,10 +90,19 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:

ESC = "\x1b"
read1 = self.read1
get_key_ansi_sequence = ANSI_SEQUENCES_KEYS.get
sequence_to_key_events = self._sequence_to_key_events
more_data = self.more_data
paste_buffer: list[str] = []
bracketed_paste = False
use_prior_escape = False

def reissue_sequence_as_keys(reissue_sequence: str) -> None:
for character in reissue_sequence:
key_events = sequence_to_key_events(character)
for event in key_events:
if event.key == "escape":
event = events.Key(event.sender, key="^")
on_token(event)

while not self.is_eof:
if not bracketed_paste and paste_buffer:
Expand All @@ -99,22 +113,30 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:
# ESC from the closing bracket, since at that point we didn't know what
# the full escape code was.
pasted_text = "".join(paste_buffer[:-1])
self.debug_log(f"pasted_text={pasted_text!r}")
on_token(events.Paste(self.sender, text=pasted_text))
paste_buffer.clear()

character = yield read1()
character = ESC if use_prior_escape else (yield read1())
use_prior_escape = False

# If we're currently performing a bracketed paste,
if bracketed_paste:
paste_buffer.append(character)
self.debug_log(f"paste_buffer={paste_buffer!r}")

self.debug_log(f"character={character!r}")
if character == ESC:
# Could be the escape key was pressed OR the start of an escape sequence
sequence: str = character
if not bracketed_paste:
# TODO: There's nothing left in the buffer at the moment,
# but since we're on an escape, how can we be sure that the
# data that next gets fed to the parser isn't an escape sequence?

# This problem arises when an ESC falls at the end of a chunk.
# We'll be at an escape, but peek_buffer will return an empty
# string because there's nothing in the buffer yet.

# This code makes an assumption that an escape sequence will never be
# "chopped up", so buffers would never contain partial escape sequences.
peek_buffer = yield self.peek_buffer()
if not peek_buffer:
# An escape arrived without any following characters
Expand All @@ -129,33 +151,54 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:
if len(peek_buffer) == 1 and not more_data():
continue

# Look ahead through the suspected escape sequence for a match
while True:
sequence += yield read1()

# If we run into another ESC at this point, then we've failed
# to find a match, and should issue everything we've seen within
# the suspected sequence as Key events instead.
sequence_character = yield read1()
new_sequence = sequence + sequence_character

threshold_exceeded = len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD
found_escape = sequence_character and sequence_character == ESC

if threshold_exceeded:
# We exceeded the sequence length threshold, so reissue all the
# characters in that sequence as key-presses.
reissue_sequence_as_keys(new_sequence)
break

if found_escape:
# We've hit an escape, so we need to reissue all the keys
# up to but not including it, since this escape could be
# part of an upcoming control sequence.
use_prior_escape = True
reissue_sequence_as_keys(sequence)
break

sequence = new_sequence

self.debug_log(f"sequence={sequence!r}")

# Firstly, check if it's a bracketed paste escape code
bracketed_paste_start_match = _re_bracketed_paste_start.match(
sequence
)
self.debug_log(f"sequence = {repr(sequence)}")
self.debug_log(f"match = {repr(bracketed_paste_start_match)}")
if bracketed_paste_start_match is not None:
bracketed_paste = True
self.debug_log("BRACKETED PASTE START DETECTED")
break

bracketed_paste_end_match = _re_bracketed_paste_end.match(sequence)
if bracketed_paste_end_match is not None:
bracketed_paste = False
self.debug_log("BRACKETED PASTE ENDED")
break

if not bracketed_paste:
# Was it a pressed key event that we received?
keys = get_key_ansi_sequence(sequence, None)
if keys is not None:
for key in keys:
on_token(events.Key(self.sender, key=key))
key_events = list(sequence_to_key_events(sequence))
for event in key_events:
on_token(event)
if key_events:
break
# Or a mouse event?
mouse_match = _re_mouse_event.match(sequence)
Expand All @@ -165,7 +208,9 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:
if event:
on_token(event)
break
# Or a mode report? (i.e. the terminal telling us if it supports a mode we requested)

# Or a mode report?
# (i.e. the terminal saying it supports a mode we requested)
mode_report_match = _re_terminal_mode_response.match(sequence)
if mode_report_match is not None:
if (
Expand All @@ -180,9 +225,11 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:
break
else:
if not bracketed_paste:
keys = get_key_ansi_sequence(character, None)
if keys is not None:
for key in keys:
on_token(events.Key(self.sender, key=key))
else:
on_token(events.Key(self.sender, key=character))
for event in sequence_to_key_events(character):
on_token(event)

def _sequence_to_key_events(self, sequence: str) -> Iterable[events.Key]:
default = (sequence,) if len(sequence) == 1 else ()
keys = ANSI_SEQUENCES_KEYS.get(sequence, default)
for key in keys:
yield events.Key(self.sender, key)
2 changes: 0 additions & 2 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ def parse(self, on_token):
on_token(data)

test_parser = TestParser()

test_data = "Where there is a Will there is a way!"

for size in range(1, len(test_data) + 1):
# Feed the parser in pieces, first 1 character at a time, then 2, etc
test_parser = TestParser()
data = []
for offset in range(0, len(test_data), size):
for chunk in test_parser.feed(test_data[offset : offset + size]):
Expand Down
Loading