Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XTerm parsing improvements #570

Merged
merged 12 commits into from
Jun 17, 2022
1 change: 0 additions & 1 deletion src/textual/_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class TestParser(Parser[str]):
def parse(
self, on_token: Callable[[str], None]
) -> Generator[Awaitable, str, None]:
data = yield self.read1()
while True:
data = yield self.read1()
if not data:
Expand Down
33 changes: 30 additions & 3 deletions src/textual/_xterm_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import re
from collections import deque
from typing import Any, Callable, Generator, Iterable

from . import messages
Expand All @@ -10,6 +11,11 @@
from ._parser import Awaitable, Parser, TokenCallback
from ._ansi_sequences import ANSI_SEQUENCES_KEYS

# When trying to determine whether the current sequence is a supported/valid
# escape sequence, at which length should we give up and consider our search
# to be unsuccessful?
_MAX_SEQUENCE_SEARCH_THRESHOLD = 20
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was 20 chosen?

Copy link
Member Author

@darrenburns darrenburns Jun 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The largest that I think we could expect to receive (that I'm aware of) is length 14 (mouse events in large terminal windows). The existence of the re_sgr_mouse regex suggested to me that longer sequences could exist, so I added on some leeway for defensiveness.


_re_mouse_event = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two regexes for mouse events that run.

This one suggests a mouse event can have any number of parameters, but when we parse the mouse event it assumes there are a fixed number.

Is this one definitely required? Could we possibly get away with running just the 1 regex for mouse events rather than 2?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite possibly. IIRC there are at least two mouse codes that can be generated. At one point I thought I would have to support both, but everything supports the newer method. That might explain the dual regexes.

_re_terminal_mode_response = re.compile(
"^" + re.escape("\x1b[") + r"\?(?P<mode_id>\d+);(?P<setting_parameter>\d)\$y"
Expand All @@ -30,7 +36,7 @@ def __init__(
self.last_x = 0
self.last_y = 0

self._debug_log_file = open("keys.log", "wt") if debug else None
self._debug_log_file = open("keys.log", "wt")
darrenburns marked this conversation as resolved.
Show resolved Hide resolved

super().__init__()

Expand Down Expand Up @@ -105,7 +111,6 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:

character = yield read1()

# If we're currently performing a bracketed paste,
if bracketed_paste:
paste_buffer.append(character)
self.debug_log(f"paste_buffer={paste_buffer!r}")
Expand All @@ -130,7 +135,28 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:
continue

while True:
sequence += yield read1()
# If we look ahead and see an escape, then we've failed
# to find an escape sequence and should reissue the characters
# up till this point.
buffer = yield self.peek_buffer()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right. peek_buffer may return empty string if there is nothing in the buffer, but it could still read an ESC on L155

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll think about how I can get a test to cover this scenario and fix it on Tuesday.


if (
buffer
and buffer[0] == ESC
or len(sequence) > _MAX_SEQUENCE_SEARCH_THRESHOLD
):
for character in sequence:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some very similar code further down, I wonder if it could be hoisted in to a closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but will do it now that I know I'm not the only one with that thought :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you said closure...

I've factored it out into a separate method

keys = get_key_ansi_sequence(character, None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that if we have an ESC to be sent from an unrecognised sequence, we should send a ^ since other wise it would be seen as the user hitting the literal escape key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you might be right. I was thinking that we might not know if it was literally the user pressing an escape key followed by some data that doesn't match an ANSI sequence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made it such that when we perform the backtracking, \x1b gets converted to ^ rather than escape.

if keys is not None:
for key in keys:
on_token(events.Key(self.sender, key=key))
else:
on_token(events.Key(self.sender, key=character))
break

sequence_character = yield read1()
sequence += sequence_character

self.debug_log(f"sequence={sequence!r}")

# Firstly, check if it's a bracketed paste escape code
Expand Down Expand Up @@ -161,6 +187,7 @@ def parse(self, on_token: TokenCallback) -> Generator[Awaitable, str, None]:
mouse_match = _re_mouse_event.match(sequence)
if mouse_match is not None:
mouse_code = mouse_match.group(0)
print(mouse_code)
event = self.parse_mouse_code(mouse_code, self.sender)
if event:
on_token(event)
Expand Down
2 changes: 0 additions & 2 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ def parse(self, on_token):
on_token(data)

test_parser = TestParser()

test_data = "Where there is a Will there is a way!"

for size in range(1, len(test_data) + 1):
# Feed the parser in pieces, first 1 character at a time, then 2, etc
test_parser = TestParser()
data = []
for offset in range(0, len(test_data), size):
for chunk in test_parser.feed(test_data[offset : offset + size]):
Expand Down
229 changes: 229 additions & 0 deletions tests/test_xterm_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
from unittest import mock

import pytest

from textual._xterm_parser import XTermParser
from textual.events import Paste, Key, MouseDown, MouseUp, MouseMove, MouseScrollDown, \
MouseScrollUp
from textual.messages import TerminalSupportsSynchronizedOutput


@pytest.fixture
def parser():
return XTermParser(sender=mock.sentinel, more_data=lambda: False)


def test_bracketed_paste(parser):
""" When bracketed paste mode is enabled in the terminal emulator and
the user pastes in some text, it will surround the pasted input
with the escape codes "\x1b[200~" and "\x1b[201~". The text between
these codes corresponds to a single `Paste` event in Textual.
"""
pasted_text = "PASTED"
events = list(parser.feed(f"\x1b[200~{pasted_text}\x1b[201~"))

assert len(events) == 1
assert isinstance(events[0], Paste)
assert events[0].text == pasted_text
assert events[0].sender == mock.sentinel


def test_bracketed_paste_content_contains_escape_codes(parser):
"""When performing a bracketed paste, if the pasted content contains
supported ANSI escape sequences, it should not interfere with the paste,
and no escape sequences within the bracketed paste should be converted
into Textual events.
"""
pasted_text = "PAS\x0fTED"
events = list(parser.feed(f"\x1b[200~{pasted_text}\x1b[201~"))
assert len(events) == 1
assert events[0].text == pasted_text


def test_bracketed_paste_amongst_other_codes(parser):
pasted_text = "PASTED"
events = list(parser.feed(f"\x1b[8~\x1b[200~{pasted_text}\x1b[201~\x1b[8~"))
assert len(events) == 3 # Key.End -> Paste -> Key.End
assert events[0].key == "end"
assert events[1].text == pasted_text
assert events[2].key == "end"


def test_cant_match_escape_sequence_too_long(parser):
""" The sequence did not match, and we hit the maximum sequence search
length threshold, so each character should be issued as a key-press instead.
"""
sequence = "\x1b[123456789123456789123"
events = list(parser.feed(sequence))

# Every character in the sequence is converted to a key press
assert len(events) == len(sequence)
assert all(isinstance(event, Key) for event in events)

# '\x1b' is translated to 'escape'
assert events[0].key == "escape"

# The rest of the characters correspond to the expected key presses
events = events[1:]
for index, character in enumerate(sequence[1:]):
assert events[index].key == character


def test_unknown_sequence_followed_by_known_sequence(parser):
""" When we feed the parser an unknown sequence followed by a known
sequence. The characters in the unknown sequence are delivered as keys,
and the known escape sequence that follows is delivered as expected.
"""
unknown_sequence = "\x1b[?"
known_sequence = "\x1b[8~" # key = 'end'

sequence = unknown_sequence + known_sequence
events = parser.feed(sequence)

assert next(events).key == "escape"
assert next(events).key == "["
assert next(events).key == "?"
assert next(events).key == "end"

with pytest.raises(StopIteration):
next(events)


def test_simple_key_presses_all_delivered_correct_order(parser):
sequence = "123abc"
events = parser.feed(sequence)
assert "".join(event.key for event in events) == sequence


def test_simple_keypress_non_character_key(parser):
sequence = "\x09"
events = list(parser.feed(sequence))
assert len(events) == 1
assert events[0].key == "tab"


def test_key_presses_and_escape_sequence_mixed(parser):
sequence = "abc\x1b[13~123"
events = list(parser.feed(sequence))

assert len(events) == 7
assert "".join(event.key for event in events) == "abcf3123"


def test_single_escape(parser):
"""A single \x1b should be interpreted as a single press of the Escape key"""
events = parser.feed("\x1b")
assert [event.key for event in events] == ["escape"]


def test_double_escape(parser):
"""Windows Terminal writes double ESC when the user presses the Escape key once."""
events = parser.feed("\x1b\x1b")
assert [event.key for event in events] == ["escape"]


@pytest.mark.parametrize("sequence, event_type, shift, meta", [
# Mouse down, with and without modifiers
("\x1b[<0;50;25M", MouseDown, False, False),
("\x1b[<4;50;25M", MouseDown, True, False),
("\x1b[<8;50;25M", MouseDown, False, True),
# Mouse up, with and without modifiers
("\x1b[<0;50;25m", MouseUp, False, False),
("\x1b[<4;50;25m", MouseUp, True, False),
("\x1b[<8;50;25m", MouseUp, False, True),
])
def test_mouse_click(parser, sequence, event_type, shift, meta):
"""ANSI codes for mouse should be converted to Textual events"""
events = list(parser.feed(sequence))

assert len(events) == 1

event = events[0]

assert isinstance(event, event_type)
assert event.x == 49
assert event.y == 24
assert event.screen_x == 49
assert event.screen_y == 24
assert event.meta is meta
assert event.shift is shift


@pytest.mark.parametrize("sequence, shift, meta, button", [
("\x1b[<32;15;38M", False, False, 1), # Click and drag
("\x1b[<35;15;38M", False, False, 0), # Basic cursor movement
("\x1b[<39;15;38M", True, False, 0), # Shift held down
("\x1b[<43;15;38M", False, True, 0), # Meta held down
])
def test_mouse_move(parser, sequence, shift, meta, button):
events = list(parser.feed(sequence))

assert len(events) == 1

event = events[0]

assert isinstance(event, MouseMove)
assert event.x == 14
assert event.y == 37
assert event.shift is shift
assert event.meta is meta
assert event.button == button


@pytest.mark.parametrize("sequence", [
"\x1b[<64;18;25M",
"\x1b[<68;18;25M",
"\x1b[<72;18;25M",
])
def test_mouse_scroll_down(parser, sequence):
"""Scrolling the mouse with and without modifiers held down.
We don't currently capture modifier keys in scroll events.
"""
events = list(parser.feed(sequence))

assert len(events) == 1

event = events[0]

assert isinstance(event, MouseScrollDown)
assert event.x == 17
assert event.y == 24


@pytest.mark.parametrize("sequence, shift, meta", [
("\x1b[<65;18;25M", False, False),
("\x1b[<69;18;25M", True, False),
("\x1b[<73;18;25M", False, True),
])
def test_mouse_scroll_up(parser, sequence, shift, meta):
events = list(parser.feed(sequence))

assert len(events) == 1

event = events[0]

assert isinstance(event, MouseScrollUp)
assert event.x == 17
assert event.y == 24


def test_escape_sequence_resulting_in_multiple_keypresses(parser):
"""Some sequences are interpreted as more than 1 keypress"""
events = list(parser.feed("\x1b[2;4~"))
assert len(events) == 2
assert events[0].key == "escape"
assert events[1].key == "shift+insert"


def test_terminal_mode_reporting_synchronized_output_supported(parser):
sequence = "\x1b[?2026;1$y"
events = list(parser.feed(sequence))
assert len(events) == 1
assert isinstance(events[0], TerminalSupportsSynchronizedOutput)
assert events[0].sender == mock.sentinel


def test_terminal_mode_reporting_synchronized_output_not_supported(parser):
sequence = "\x1b[?2026;0$y"
events = list(parser.feed(sequence))
assert events == []